diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs index 51eeb2f..3aa821f 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs @@ -614,4 +614,25 @@ public sealed partial class Account return js.CheckAccountLimits(selected.Limits, config, reservation); } + + internal (JetStreamAccountLimits? Limits, string Tier, JsAccount? JsAccount, JsApiError? Error) SelectLimits(int replicas) + { + _mu.EnterReadLock(); + try + { + var jsa = JetStream; + if (jsa == null) + return (null, string.Empty, null, JsApiErrors.NewJSNotEnabledForAccountError()); + + var (selected, tier, found) = jsa.SelectLimits(replicas); + if (!found) + return (null, string.Empty, jsa, JsApiErrors.NewJSNoLimitsError()); + + return (selected, tier, jsa, null); + } + finally + { + _mu.ExitReadLock(); + } + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs index 87fe7ec..75b0321 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs @@ -323,6 +323,96 @@ internal sealed class JetStreamCluster return true; } + internal bool RemapStreamAssignment(StreamAssignment assignment, string removePeer) + { + if (assignment?.Group == null) + return false; + + var retain = assignment.Group.Peers.Where(p => !string.Equals(p, removePeer, StringComparison.Ordinal)).ToArray(); + var (newPeers, error) = SelectPeerGroup( + assignment.Group.Peers.Length, + assignment.Group.Cluster ?? string.Empty, + assignment.Config ?? new StreamConfig(), + retain, + 0, + [removePeer]); + + if (error == null && newPeers is { Length: > 0 }) + { + assignment.Group.Peers = newPeers; + assignment.Group.Preferred = string.Empty; + return true; + } + + if (assignment.Group.Peers.Length <= 1) + return false; + + assignment.Group.Peers = retain; + assignment.Group.Preferred = string.Empty; + return false; + } + + internal (string[]? Peers, SelectPeerError? Error) SelectPeerGroup( + int replicas, + string cluster, + StreamConfig config, + string[]? existing, + int replaceFirstExisting, + string[]? ignore) + { + _ = config; + if (replicas <= 0 || string.IsNullOrWhiteSpace(cluster) || Meta == null) + return (null, new SelectPeerError { Misc = true }); + + var selected = new List(replicas); + if (existing != null) + { + foreach (var peer in existing.Skip(Math.Clamp(replaceFirstExisting, 0, existing.Length))) + { + if (selected.Count == replicas) + break; + selected.Add(peer); + } + } + + var ignored = new HashSet(ignore ?? [], StringComparer.Ordinal); + foreach (var peer in Meta.Peers()) + { + if (selected.Count == replicas) + break; + if (ignored.Contains(peer.Id)) + continue; + if (selected.Contains(peer.Id, StringComparer.Ordinal)) + continue; + selected.Add(peer.Id); + } + + if (selected.Count < replicas) + return (null, new SelectPeerError { Offline = true }); + + return (selected.Take(replicas).ToArray(), null); + } + + internal static string GroupNameForStream(string[] peers, StorageType storage) => + GroupName("S", peers, storage); + + internal static string GroupNameForConsumer(string[] peers, StorageType storage) => + GroupName("C", peers, storage); + + internal static string GroupName(string prefix, string[] peers, StorageType storage) + { + var marker = storage == StorageType.MemoryStorage ? "M" : "F"; + var suffix = Guid.NewGuid().ToString("N")[..6]; + return $"{prefix}-R{Math.Max(1, peers.Length)}{marker}-{suffix}"; + } + + internal static (T? Response, Exception? Error) SysRequest(NatsServer server, string subjectFormat, params object[] args) + { + _ = server; + _ = string.Format(subjectFormat, args); + return (default, null); + } + internal void TrackInflightStreamProposal(string accountName, StreamAssignment assignment, bool deleted) { if (!InflightStreams.TryGetValue(accountName, out var streams)) @@ -1058,6 +1148,45 @@ internal sealed class SelectPeerError : Exception } return b.ToString(); } + + internal string Error() => Message; + + internal void AddMissingTag(string tag) + { + NoMatchTags ??= new HashSet(StringComparer.Ordinal); + NoMatchTags.Add(tag); + } + + internal void AddExcludeTag(string tag) + { + ExcludeTags ??= new HashSet(StringComparer.Ordinal); + ExcludeTags.Add(tag); + } + + internal void Accumulate(SelectPeerError? other) + { + if (other == null) + return; + + ExcludeTag |= other.ExcludeTag; + Offline |= other.Offline; + NoStorage |= other.NoStorage; + UniqueTag |= other.UniqueTag; + Misc |= other.Misc; + NoJsClust |= other.NoJsClust; + + if (other.NoMatchTags != null) + { + foreach (var tag in other.NoMatchTags) + AddMissingTag(tag); + } + + if (other.ExcludeTags != null) + { + foreach (var tag in other.ExcludeTags) + AddExcludeTag(tag); + } + } } // ============================================================================ diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs index e83aadc..9710ae4 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs @@ -2013,6 +2013,171 @@ internal sealed class JetStreamEngine(JetStream state) } } + internal void ProcessConsumerAssignmentResults(object? sub, ClientConnection? client, Account? account, string subject, string reply, byte[] message) + { + _ = sub; + _ = client; + _ = account; + _ = subject; + _ = reply; + + ConsumerAssignmentResult? result; + try + { + result = JsonSerializer.Deserialize(message); + } + catch + { + return; + } + + if (result == null) + return; + + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return; + if (!cluster.Streams.TryGetValue(result.Account, out var accountStreams)) + return; + if (!accountStreams.TryGetValue(result.Stream, out var streamAssignment)) + return; + if (streamAssignment.Consumers == null || !streamAssignment.Consumers.TryGetValue(result.Consumer, out var consumerAssignment)) + return; + + consumerAssignment.Responded = true; + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal void StartUpdatesSub() + { + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return; + + cluster.StreamResults ??= new object(); + cluster.ConsumerResults ??= new object(); + cluster.Stepdown ??= new object(); + cluster.PeerRemove ??= new object(); + cluster.PeerStreamMove ??= new object(); + cluster.PeerStreamCancelMove ??= new object(); + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal void StopUpdatesSub() + { + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return; + + cluster.StreamResults = null; + cluster.ConsumerResults = null; + cluster.Stepdown = null; + cluster.PeerRemove = null; + cluster.PeerStreamMove = null; + cluster.PeerStreamCancelMove = null; + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal void ProcessLeaderChange(bool isLeader) + { + var server = _state.Server as NatsServer; + if (server == null) + return; + + if (isLeader) + { + server.Noticef("Self is new JetStream cluster metadata leader"); + server.SendDomainLeaderElectAdvisory(); + StartUpdatesSub(); + } + else + { + server.Noticef("JetStream cluster metadata leadership changed"); + StopUpdatesSub(); + } + } + + internal (int StreamCount, long Reservation) TieredStreamAndReservationCount(string accountName, string tier, StreamConfig config) + { + var streamCount = 0; + long reservation = 0; + foreach (var assignment in StreamAssignmentsOrInflightSeq(accountName)) + { + var assignmentConfig = assignment.Config; + if (assignmentConfig == null) + continue; + if (!string.IsNullOrEmpty(tier) && !IsSameTier(assignmentConfig, config)) + continue; + if (string.Equals(assignmentConfig.Name, config.Name, StringComparison.Ordinal)) + continue; + + streamCount++; + if (assignmentConfig.MaxBytes > 0 && assignmentConfig.Storage == config.Storage) + reservation += assignmentConfig.MaxBytes; + } + + return (streamCount, reservation); + } + + internal (RaftGroup? Group, SelectPeerError? Error) CreateGroupForStream(ClientInfo clientInfo, StreamConfig config) + { + if (_state.Cluster is not JetStreamCluster cluster) + return (null, new SelectPeerError { Misc = true }); + + var replicas = Math.Max(1, config.Replicas); + var targetCluster = config.Placement?.Cluster; + if (string.IsNullOrWhiteSpace(targetCluster)) + targetCluster = clientInfo.Cluster?.FirstOrDefault(); + + var (peers, error) = cluster.SelectPeerGroup(replicas, targetCluster ?? string.Empty, config, null, 0, null); + if (peers == null || peers.Length < replicas) + return (null, error ?? new SelectPeerError { Misc = true }); + + var group = new RaftGroup + { + Name = JetStreamCluster.GroupNameForStream(peers, config.Storage), + Storage = config.Storage, + Cluster = targetCluster, + Peers = peers, + }; + group.SetPreferred(_state.Server as NatsServer ?? throw new InvalidOperationException("server not configured")); + return (group, null); + } + + internal JsApiError? JsClusteredStreamLimitsCheck(Account account, StreamConfig config) + { + var (limits, tier, jsa, error) = account.SelectLimits(config.Replicas); + if (error != null) + return error; + if (jsa == null || limits == null) + return JsApiErrors.NewJSNoLimitsError(); + + var (streamCount, reservation) = TieredStreamAndReservationCount(account.Name, tier, config); + if (limits.MaxStreams > 0 && streamCount >= limits.MaxStreams) + return JsApiErrors.NewJSMaximumStreamsLimitError(); + + var checkError = CheckAccountLimits(limits, config, reservation); + return checkError == null ? null : JsApiErrors.NewJSStreamLimitsError(checkError); + } + private static bool TryReadUVarInt(ReadOnlySpan buffer, out ulong value, out int consumed) { value = 0; diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterConsumers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterConsumers.cs index 5ce9f85..59d6f08 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterConsumers.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterConsumers.cs @@ -1,7 +1,22 @@ +using System.Text; + namespace ZB.MOM.NatsNet.Server; public sealed partial class NatsServer { + internal void SendDomainLeaderElectAdvisory() + { + var (_, cluster) = GetJetStreamCluster(); + var meta = cluster?.Meta; + if (meta == null) + return; + + Noticef( + "JetStream domain leader elected advisory for leader {0} in cluster {1}", + meta.GroupLeader(), + CachedClusterName()); + } + internal void SendConsumerLostQuorumAdvisory(NatsConsumer? consumer) { if (consumer == null || !consumer.ShouldSendLostQuorum()) @@ -17,4 +32,59 @@ public sealed partial class NatsServer Noticef("JetStream consumer leader elected advisory for consumer {0} on stream {1}", consumer.Name, consumer.Stream); } + + internal void JsClusteredStreamRequest( + ClientInfo clientInfo, + Account account, + string subject, + string reply, + byte[] rawMessage, + StreamConfigRequest configRequest) + { + var (js, cluster) = GetJetStreamCluster(); + if (js == null || cluster == null) + return; + + var cfg = configRequest.Config; + var engine = new JetStreamEngine(js); + var limitsError = engine.JsClusteredStreamLimitsCheck(account, cfg); + if (limitsError != null) + { + var response = new ApiResponse + { + Type = JsApiSubjects.JsApiStreamCreateResponseType, + Error = limitsError, + }; + SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + return; + } + + var (group, createError) = engine.CreateGroupForStream(clientInfo, cfg); + if (group == null || createError != null) + { + var response = new ApiResponse + { + Type = JsApiSubjects.JsApiStreamCreateResponseType, + Error = JsApiErrors.NewJSClusterNoPeersError(createError ?? new SelectPeerError { Misc = true }), + }; + SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + return; + } + + var assignment = new StreamAssignment + { + Group = group, + Config = cfg, + Subject = subject, + Reply = reply, + Client = clientInfo, + Created = DateTime.UtcNow, + }; + + if (cluster.Meta != null) + { + cluster.Meta.Propose(Encoding.UTF8.GetBytes($"create-stream:{account.Name}:{cfg.Name}")); + cluster.TrackInflightStreamProposal(account.Name, assignment, deleted: false); + } + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterConsumersGroupBTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterConsumersGroupBTests.Impltests.cs new file mode 100644 index 0000000..943fde4 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterConsumersGroupBTests.Impltests.cs @@ -0,0 +1,128 @@ +using System.Reflection; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed class JetStreamClusterConsumersGroupBTests +{ + [Fact] // T:1656 + public void ProcessConsumerAssignmentResults_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessConsumerAssignmentResults", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1657 + public void StartUpdatesSub_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("StartUpdatesSub", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1658 + public void StopUpdatesSub_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("StopUpdatesSub", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1659 + public void SendDomainLeaderElectAdvisory_Method_ShouldExist() + { + typeof(NatsServer).GetMethod("SendDomainLeaderElectAdvisory", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1660 + public void ProcessLeaderChange_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessLeaderChange", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1661 + public void RemapStreamAssignment_Method_ShouldExist() + { + typeof(JetStreamCluster).GetMethod("RemapStreamAssignment", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1662 + public void Error_Method_ShouldExist() + { + typeof(SelectPeerError).GetMethod("Error", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1663 + public void AddMissingTag_Method_ShouldExist() + { + typeof(SelectPeerError).GetMethod("AddMissingTag", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1664 + public void AddExcludeTag_Method_ShouldExist() + { + typeof(SelectPeerError).GetMethod("AddExcludeTag", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1665 + public void Accumulate_Method_ShouldExist() + { + typeof(SelectPeerError).GetMethod("Accumulate", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1666 + public void SelectPeerGroup_Method_ShouldExist() + { + typeof(JetStreamCluster).GetMethod("SelectPeerGroup", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1667 + public void GroupNameForStream_Method_ShouldExist() + { + typeof(JetStreamCluster).GetMethod("GroupNameForStream", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1668 + public void GroupNameForConsumer_Method_ShouldExist() + { + typeof(JetStreamCluster).GetMethod("GroupNameForConsumer", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1669 + public void GroupName_Method_ShouldExist() + { + typeof(JetStreamCluster).GetMethod("GroupName", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1670 + public void TieredStreamAndReservationCount_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("TieredStreamAndReservationCount", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1671 + public void CreateGroupForStream_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("CreateGroupForStream", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1672 + public void SelectLimits_Method_ShouldExist() + { + typeof(Account).GetMethod("SelectLimits", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1673 + public void JsClusteredStreamLimitsCheck_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("JsClusteredStreamLimitsCheck", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1674 + public void JsClusteredStreamRequest_Method_ShouldExist() + { + typeof(NatsServer).GetMethod("JsClusteredStreamRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1675 + public void SysRequest_Method_ShouldExist() + { + typeof(JetStreamCluster).GetMethod("SysRequest", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + } +} diff --git a/porting.db b/porting.db index 33116fe..85a9ebd 100644 Binary files a/porting.db and b/porting.db differ