diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs index 51eeb2f..3aa821f 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs @@ -614,4 +614,25 @@ public sealed partial class Account return js.CheckAccountLimits(selected.Limits, config, reservation); } + + internal (JetStreamAccountLimits? Limits, string Tier, JsAccount? JsAccount, JsApiError? Error) SelectLimits(int replicas) + { + _mu.EnterReadLock(); + try + { + var jsa = JetStream; + if (jsa == null) + return (null, string.Empty, null, JsApiErrors.NewJSNotEnabledForAccountError()); + + var (selected, tier, found) = jsa.SelectLimits(replicas); + if (!found) + return (null, string.Empty, jsa, JsApiErrors.NewJSNoLimitsError()); + + return (selected, tier, jsa, null); + } + finally + { + _mu.ExitReadLock(); + } + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs index 0e48ec7..c5d149e 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs @@ -194,6 +194,25 @@ internal sealed class JetStreamCluster return peers.Contains(Meta.ID(), StringComparer.Ordinal); } + internal bool IsConsumerAssigned(Account account, string stream, string consumer) + { + if (Meta == null) + return false; + + if (!Streams.TryGetValue(account.Name, out var accountAssignments)) + return false; + if (!accountAssignments.TryGetValue(stream, out var streamAssignment)) + return false; + if (streamAssignment.Consumers == null || !streamAssignment.Consumers.TryGetValue(consumer, out var assignment)) + return false; + + var group = assignment.Group; + if (group == null) + return false; + + return group.IsMember(Meta.ID()); + } + internal bool IsStreamLeader(string account, string stream) { if (Meta == null) @@ -238,6 +257,266 @@ internal sealed class JetStreamCluster return group.Peers.Length == 1 || (group.Node != null && group.Node.Leader()); } + internal static (ulong Dseq, ulong Sseq, bool Ok) DecodeAckUpdate(byte[] buffer) + { + var span = buffer.AsSpan(); + if (!TryReadUVarInt(span, out var dseq, out var consumed)) + return (0, 0, false); + if (!TryReadUVarInt(span[consumed..], out var sseq, out _)) + return (0, 0, false); + return (dseq, sseq, true); + } + + internal static (ulong Dseq, ulong Sseq, ulong DeliveryCount, long Timestamp, bool Ok) DecodeDeliveredUpdate(byte[] buffer) + { + var span = buffer.AsSpan(); + if (!TryReadUVarInt(span, out var dseq, out var consumedD)) + return (0, 0, 0, 0, false); + if (!TryReadUVarInt(span[consumedD..], out var sseq, out var consumedS)) + return (0, 0, 0, 0, false); + if (!TryReadUVarInt(span[(consumedD + consumedS)..], out var deliveryCount, out var consumedDc)) + return (0, 0, 0, 0, false); + if (!TryReadVarInt(span[(consumedD + consumedS + consumedDc)..], out var ts, out _)) + return (0, 0, 0, 0, false); + + return (dseq, sseq, deliveryCount, ts, true); + } + + internal static bool IsInsufficientResourcesErr(ApiResponse? response) + { + if (response?.Error == null) + return false; + + var errCode = response.Error.ErrCode; + return errCode == JsApiErrors.InsufficientResources.ErrCode || + errCode == JsApiErrors.MemoryResourcesExceeded.ErrCode || + errCode == JsApiErrors.StorageResourcesExceeded.ErrCode; + } + + private static bool TryReadUVarInt(ReadOnlySpan buffer, out ulong value, out int consumed) + { + value = 0; + consumed = 0; + var shift = 0; + foreach (var b in buffer) + { + var chunk = (ulong)(b & 0x7Fu); + value |= chunk << shift; + consumed++; + if ((b & 0x80) == 0) + return true; + shift += 7; + if (shift >= 64) + return false; + } + + return false; + } + + private static bool TryReadVarInt(ReadOnlySpan buffer, out long value, out int consumed) + { + value = 0; + if (!TryReadUVarInt(buffer, out var raw, out consumed)) + return false; + + value = (long)((raw >> 1) ^ (~(raw & 1UL) + 1)); + return true; + } + + internal bool RemapStreamAssignment(StreamAssignment assignment, string removePeer) + { + if (assignment?.Group == null) + return false; + + var retain = assignment.Group.Peers.Where(p => !string.Equals(p, removePeer, StringComparison.Ordinal)).ToArray(); + var (newPeers, error) = SelectPeerGroup( + assignment.Group.Peers.Length, + assignment.Group.Cluster ?? string.Empty, + assignment.Config ?? new StreamConfig(), + retain, + 0, + [removePeer]); + + if (error == null && newPeers is { Length: > 0 }) + { + assignment.Group.Peers = newPeers; + assignment.Group.Preferred = string.Empty; + return true; + } + + if (assignment.Group.Peers.Length <= 1) + return false; + + assignment.Group.Peers = retain; + assignment.Group.Preferred = string.Empty; + return false; + } + + internal (string[]? Peers, SelectPeerError? Error) SelectPeerGroup( + int replicas, + string cluster, + StreamConfig config, + string[]? existing, + int replaceFirstExisting, + string[]? ignore) + { + _ = config; + if (replicas <= 0 || string.IsNullOrWhiteSpace(cluster) || Meta == null) + return (null, new SelectPeerError { Misc = true }); + + var selected = new List(replicas); + if (existing != null) + { + foreach (var peer in existing.Skip(Math.Clamp(replaceFirstExisting, 0, existing.Length))) + { + if (selected.Count == replicas) + break; + selected.Add(peer); + } + } + + var ignored = new HashSet(ignore ?? [], StringComparer.Ordinal); + foreach (var peer in Meta.Peers()) + { + if (selected.Count == replicas) + break; + if (ignored.Contains(peer.Id)) + continue; + if (selected.Contains(peer.Id, StringComparer.Ordinal)) + continue; + selected.Add(peer.Id); + } + + if (selected.Count < replicas) + return (null, new SelectPeerError { Offline = true }); + + return (selected.Take(replicas).ToArray(), null); + } + + internal static string GroupNameForStream(string[] peers, StorageType storage) => + GroupName("S", peers, storage); + + internal static string GroupNameForConsumer(string[] peers, StorageType storage) => + GroupName("C", peers, storage); + + internal static string GroupName(string prefix, string[] peers, StorageType storage) + { + var marker = storage == StorageType.MemoryStorage ? "M" : "F"; + var suffix = Guid.NewGuid().ToString("N")[..6]; + return $"{prefix}-R{Math.Max(1, peers.Length)}{marker}-{suffix}"; + } + + internal static (T? Response, Exception? Error) SysRequest(NatsServer server, string subjectFormat, params object[] args) + { + _ = server; + _ = string.Format(subjectFormat, args); + return (default, null); + } + + internal static byte[] EncodeStreamPurge(StreamPurge purge) + { + var payload = JsonSerializer.SerializeToUtf8Bytes(purge); + var result = new byte[payload.Length + 1]; + result[0] = (byte)EntryOp.PurgeStreamOp; + payload.CopyTo(result.AsSpan(1)); + return result; + } + + internal static (StreamPurge? Purge, Exception? Error) DecodeStreamPurge(byte[] buffer) + { + try + { + return (JsonSerializer.Deserialize(buffer), null); + } + catch (Exception ex) + { + return (null, ex); + } + } + + internal static byte[] EncodeMsgDelete(StreamMsgDelete deleteRequest) + { + var payload = JsonSerializer.SerializeToUtf8Bytes(deleteRequest); + var result = new byte[payload.Length + 1]; + result[0] = (byte)EntryOp.DeleteMsgOp; + payload.CopyTo(result.AsSpan(1)); + return result; + } + + internal static (StreamMsgDelete? Delete, Exception? Error) DecodeMsgDelete(byte[] buffer) + { + try + { + return (JsonSerializer.Deserialize(buffer), null); + } + catch (Exception ex) + { + return (null, ex); + } + } + + internal static byte[] EncodeAddStreamAssignment(StreamAssignment assignment) => + EncodeStreamAssignmentWithOp(assignment, EntryOp.AssignStreamOp); + + internal static byte[] EncodeUpdateStreamAssignment(StreamAssignment assignment) => + EncodeStreamAssignmentWithOp(assignment, EntryOp.UpdateStreamOp); + + internal static byte[] EncodeDeleteStreamAssignment(StreamAssignment assignment) => + EncodeStreamAssignmentWithOp(assignment, EntryOp.RemoveStreamOp); + + internal static (StreamAssignment? Assignment, Exception? Error) DecodeStreamAssignment(NatsServer server, byte[] buffer) + { + try + { + var assignment = JsonSerializer.Deserialize(buffer); + if (assignment == null) + return (null, new InvalidOperationException("invalid assignment payload")); + + var error = DecodeStreamAssignmentConfig(server, assignment); + return error == null ? (assignment, null) : (null, error); + } + catch (Exception ex) + { + return (null, ex); + } + } + + internal static Exception? DecodeStreamAssignmentConfig(NatsServer server, StreamAssignment assignment) + { + _ = server; + try + { + if (assignment.ConfigJson.ValueKind == JsonValueKind.Undefined || + assignment.ConfigJson.ValueKind == JsonValueKind.Null) + { + assignment.Config ??= new StreamConfig(); + return null; + } + + var cfg = JsonSerializer.Deserialize(assignment.ConfigJson.GetRawText()); + assignment.Config = cfg ?? new StreamConfig(); + return null; + } + catch (Exception ex) + { + assignment.Unsupported = NewUnsupportedStreamAssignment(server, assignment, ex); + return ex; + } + } + + private static byte[] EncodeStreamAssignmentWithOp(StreamAssignment assignment, EntryOp op) + { + var copy = assignment.CopyGroup(); + if (copy.Config != null) + copy.ConfigJson = JsonSerializer.SerializeToElement(copy.Config); + + var payload = JsonSerializer.SerializeToUtf8Bytes(copy); + var result = new byte[payload.Length + 1]; + result[0] = (byte)op; + payload.CopyTo(result.AsSpan(1)); + return result; + } + internal void TrackInflightStreamProposal(string accountName, StreamAssignment assignment, bool deleted) { if (!InflightStreams.TryGetValue(accountName, out var streams)) @@ -973,6 +1252,45 @@ internal sealed class SelectPeerError : Exception } return b.ToString(); } + + internal string Error() => Message; + + internal void AddMissingTag(string tag) + { + NoMatchTags ??= new HashSet(StringComparer.Ordinal); + NoMatchTags.Add(tag); + } + + internal void AddExcludeTag(string tag) + { + ExcludeTags ??= new HashSet(StringComparer.Ordinal); + ExcludeTags.Add(tag); + } + + internal void Accumulate(SelectPeerError? other) + { + if (other == null) + return; + + ExcludeTag |= other.ExcludeTag; + Offline |= other.Offline; + NoStorage |= other.NoStorage; + UniqueTag |= other.UniqueTag; + Misc |= other.Misc; + NoJsClust |= other.NoJsClust; + + if (other.NoMatchTags != null) + { + foreach (var tag in other.NoMatchTags) + AddMissingTag(tag); + } + + if (other.ExcludeTags != null) + { + foreach (var tag in other.ExcludeTags) + AddExcludeTag(tag); + } + } } // ============================================================================ diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs index 16f33a9..9710ae4 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs @@ -1,4 +1,5 @@ using System.Text; +using System.Text.Json; using ZB.MOM.NatsNet.Server.Internal; namespace ZB.MOM.NatsNet.Server; @@ -1774,6 +1775,438 @@ internal sealed class JetStreamEngine(JetStream state) _state.Lock.ExitReadLock(); } } + + internal ConsumerAssignment? ConsumerAssignmentOrInflight(string accountName, string streamName, string consumerName) + { + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return null; + + if (cluster.InflightConsumers.TryGetValue(accountName, out var streams) && + streams.TryGetValue(streamName, out var consumers) && + consumers.TryGetValue(consumerName, out var inflight)) + { + return inflight.Deleted ? null : inflight.Assignment; + } + + if (!cluster.Streams.TryGetValue(accountName, out var accountStreams)) + return null; + if (!accountStreams.TryGetValue(streamName, out var streamAssignment)) + return null; + if (streamAssignment.Consumers == null) + return null; + + return streamAssignment.Consumers.TryGetValue(consumerName, out var current) ? current : null; + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal IEnumerable ConsumerAssignmentsOrInflightSeq(string accountName, string streamName) + { + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return []; + + var results = new List(); + var seen = new HashSet(StringComparer.Ordinal); + + if (cluster.InflightConsumers.TryGetValue(accountName, out var streams) && + streams.TryGetValue(streamName, out var inflight)) + { + foreach (var (consumerName, info) in inflight) + { + if (info.Deleted || info.Assignment == null) + continue; + + seen.Add(consumerName); + results.Add(info.Assignment); + } + } + + if (cluster.Streams.TryGetValue(accountName, out var accountStreams) && + accountStreams.TryGetValue(streamName, out var streamAssignment) && + streamAssignment.Consumers != null) + { + foreach (var (consumerName, assignment) in streamAssignment.Consumers) + { + if (!seen.Add(consumerName)) + continue; + results.Add(assignment); + } + } + + return results; + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal void MonitorConsumer(NatsConsumer consumer, ConsumerAssignment assignment) + { + if (consumer == null || assignment == null) + return; + + var server = _state.Server as NatsServer; + if (consumer.RaftNode() == null || GetMetaGroup() == null) + { + server?.Warnf( + "No RAFT group for consumer '{0}>{1}'", + assignment.Stream, + assignment.Name); + return; + } + } + + internal Exception? ApplyConsumerEntries(NatsConsumer consumer, CommittedEntry committed, bool isLeader) + { + _ = isLeader; + if (consumer == null) + return new InvalidOperationException("consumer is required"); + if (committed?.Entries == null) + return null; + + foreach (var entry in committed.Entries) + { + if (entry == null || entry.Data == null || entry.Data.Length == 0) + continue; + if (entry.Type == EntryType.EntryCatchup) + continue; + + var op = (EntryOp)entry.Data[0]; + switch (op) + { + case EntryOp.UpdateAcksOp: + { + var (dseq, sseq, ok) = DecodeAckUpdate(entry.Data[1..]); + if (!ok) + return new InvalidOperationException("bad replicated ack update"); + var err = consumer.ProcessReplicatedAck(dseq, sseq); + if (err != null) + return err; + break; + } + case EntryOp.UpdateDeliveredOp: + { + var (_, sseq, _, _, ok) = DecodeDeliveredUpdate(entry.Data[1..]); + if (!ok) + return new InvalidOperationException("bad replicated delivered update"); + consumer.SetDeliveredStreamSequence(sseq); + break; + } + } + } + + return null; + } + + internal static (ulong Dseq, ulong Sseq, bool Ok) DecodeAckUpdate(byte[] buffer) + { + var span = buffer.AsSpan(); + if (!TryReadUVarInt(span, out var dseq, out var consumed)) + return (0, 0, false); + if (!TryReadUVarInt(span[consumed..], out var sseq, out _)) + return (0, 0, false); + return (dseq, sseq, true); + } + + internal static (ulong Dseq, ulong Sseq, ulong DeliveryCount, long Timestamp, bool Ok) DecodeDeliveredUpdate(byte[] buffer) + { + var span = buffer.AsSpan(); + if (!TryReadUVarInt(span, out var dseq, out var consumedD)) + return (0, 0, 0, 0, false); + if (!TryReadUVarInt(span[consumedD..], out var sseq, out var consumedS)) + return (0, 0, 0, 0, false); + if (!TryReadUVarInt(span[(consumedD + consumedS)..], out var deliveryCount, out var consumedDc)) + return (0, 0, 0, 0, false); + if (!TryReadVarInt(span[(consumedD + consumedS + consumedDc)..], out var ts, out _)) + return (0, 0, 0, 0, false); + + return (dseq, sseq, deliveryCount, ts, true); + } + + internal Exception? ProcessConsumerLeaderChange(NatsConsumer consumer, bool isLeader) + { + if (consumer == null) + return new InvalidOperationException("consumer is required"); + if (consumer.IsClosed()) + { + if (isLeader) + consumer.StepDownRaftNode(); + return new InvalidOperationException("failed to update consumer leader status"); + } + + consumer.SetLeader(isLeader, term: 0); + if (!isLeader) + { + if (consumer.RaftNode() is { } node && node.LostQuorum()) + { + (_state.Server as NatsServer)?.SendConsumerLostQuorumAdvisory(consumer); + } + return null; + } + + (_state.Server as NatsServer)?.SendConsumerLeaderElectAdvisory(consumer); + return null; + } + + internal static bool IsInsufficientResourcesErr(ApiResponse? response) + { + if (response?.Error == null) + return false; + + var errCode = response.Error.ErrCode; + return errCode == JsApiErrors.InsufficientResources.ErrCode || + errCode == JsApiErrors.MemoryResourcesExceeded.ErrCode || + errCode == JsApiErrors.StorageResourcesExceeded.ErrCode; + } + + internal void ProcessStreamAssignmentResults(object? sub, ClientConnection? client, Account? account, string subject, string reply, byte[] message) + { + _ = sub; + _ = client; + _ = account; + _ = subject; + _ = reply; + + StreamAssignmentResult? result; + try + { + result = JsonSerializer.Deserialize(message); + } + catch + { + return; + } + + if (result == null) + return; + + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return; + + var assignment = StreamAssignmentOrInflight(result.Account, result.Stream); + if (assignment == null || assignment.Reassigning) + return; + + assignment.Responded = true; + if (!result.Update && DateTime.UtcNow - assignment.Created < TimeSpan.FromSeconds(5)) + { + assignment.Error = new InvalidOperationException(JsApiErrors.ClusterNotAssigned.Description ?? "cluster not assigned"); + cluster.TrackInflightStreamProposal(result.Account, assignment, deleted: true); + } + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal void ProcessConsumerAssignmentResults(object? sub, ClientConnection? client, Account? account, string subject, string reply, byte[] message) + { + _ = sub; + _ = client; + _ = account; + _ = subject; + _ = reply; + + ConsumerAssignmentResult? result; + try + { + result = JsonSerializer.Deserialize(message); + } + catch + { + return; + } + + if (result == null) + return; + + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return; + if (!cluster.Streams.TryGetValue(result.Account, out var accountStreams)) + return; + if (!accountStreams.TryGetValue(result.Stream, out var streamAssignment)) + return; + if (streamAssignment.Consumers == null || !streamAssignment.Consumers.TryGetValue(result.Consumer, out var consumerAssignment)) + return; + + consumerAssignment.Responded = true; + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal void StartUpdatesSub() + { + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return; + + cluster.StreamResults ??= new object(); + cluster.ConsumerResults ??= new object(); + cluster.Stepdown ??= new object(); + cluster.PeerRemove ??= new object(); + cluster.PeerStreamMove ??= new object(); + cluster.PeerStreamCancelMove ??= new object(); + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal void StopUpdatesSub() + { + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return; + + cluster.StreamResults = null; + cluster.ConsumerResults = null; + cluster.Stepdown = null; + cluster.PeerRemove = null; + cluster.PeerStreamMove = null; + cluster.PeerStreamCancelMove = null; + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal void ProcessLeaderChange(bool isLeader) + { + var server = _state.Server as NatsServer; + if (server == null) + return; + + if (isLeader) + { + server.Noticef("Self is new JetStream cluster metadata leader"); + server.SendDomainLeaderElectAdvisory(); + StartUpdatesSub(); + } + else + { + server.Noticef("JetStream cluster metadata leadership changed"); + StopUpdatesSub(); + } + } + + internal (int StreamCount, long Reservation) TieredStreamAndReservationCount(string accountName, string tier, StreamConfig config) + { + var streamCount = 0; + long reservation = 0; + foreach (var assignment in StreamAssignmentsOrInflightSeq(accountName)) + { + var assignmentConfig = assignment.Config; + if (assignmentConfig == null) + continue; + if (!string.IsNullOrEmpty(tier) && !IsSameTier(assignmentConfig, config)) + continue; + if (string.Equals(assignmentConfig.Name, config.Name, StringComparison.Ordinal)) + continue; + + streamCount++; + if (assignmentConfig.MaxBytes > 0 && assignmentConfig.Storage == config.Storage) + reservation += assignmentConfig.MaxBytes; + } + + return (streamCount, reservation); + } + + internal (RaftGroup? Group, SelectPeerError? Error) CreateGroupForStream(ClientInfo clientInfo, StreamConfig config) + { + if (_state.Cluster is not JetStreamCluster cluster) + return (null, new SelectPeerError { Misc = true }); + + var replicas = Math.Max(1, config.Replicas); + var targetCluster = config.Placement?.Cluster; + if (string.IsNullOrWhiteSpace(targetCluster)) + targetCluster = clientInfo.Cluster?.FirstOrDefault(); + + var (peers, error) = cluster.SelectPeerGroup(replicas, targetCluster ?? string.Empty, config, null, 0, null); + if (peers == null || peers.Length < replicas) + return (null, error ?? new SelectPeerError { Misc = true }); + + var group = new RaftGroup + { + Name = JetStreamCluster.GroupNameForStream(peers, config.Storage), + Storage = config.Storage, + Cluster = targetCluster, + Peers = peers, + }; + group.SetPreferred(_state.Server as NatsServer ?? throw new InvalidOperationException("server not configured")); + return (group, null); + } + + internal JsApiError? JsClusteredStreamLimitsCheck(Account account, StreamConfig config) + { + var (limits, tier, jsa, error) = account.SelectLimits(config.Replicas); + if (error != null) + return error; + if (jsa == null || limits == null) + return JsApiErrors.NewJSNoLimitsError(); + + var (streamCount, reservation) = TieredStreamAndReservationCount(account.Name, tier, config); + if (limits.MaxStreams > 0 && streamCount >= limits.MaxStreams) + return JsApiErrors.NewJSMaximumStreamsLimitError(); + + var checkError = CheckAccountLimits(limits, config, reservation); + return checkError == null ? null : JsApiErrors.NewJSStreamLimitsError(checkError); + } + + private static bool TryReadUVarInt(ReadOnlySpan buffer, out ulong value, out int consumed) + { + value = 0; + consumed = 0; + var shift = 0; + foreach (var b in buffer) + { + var chunk = (ulong)(b & 0x7Fu); + value |= chunk << shift; + consumed++; + if ((b & 0x80) == 0) + return true; + shift += 7; + if (shift >= 64) + return false; + } + + return false; + } + + private static bool TryReadVarInt(ReadOnlySpan buffer, out long value, out int consumed) + { + value = 0; + if (!TryReadUVarInt(buffer, out var raw, out consumed)) + return false; + + value = (long)((raw >> 1) ^ (~(raw & 1UL) + 1)); + return true; + } } internal sealed class StreamAssignmentView diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs index f9fc6ec..aafac4c 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs @@ -201,6 +201,33 @@ internal sealed partial class JsAccount } } + internal bool ConsumerAssigned(string stream, string consumer) + { + Lock.EnterReadLock(); + try + { + var js = Js as JetStream; + var account = Account as Account; + var cluster = js?.Cluster as JetStreamCluster; + if (js == null || account == null || cluster == null) + return false; + + js.Lock.EnterReadLock(); + try + { + return cluster.IsConsumerAssigned(account, stream, consumer); + } + finally + { + js.Lock.ExitReadLock(); + } + } + finally + { + Lock.ExitReadLock(); + } + } + internal Account? Acc() => Account as Account; internal (JetStreamAccountLimits Limits, string Tier, bool Found) SelectLimits(int replicas) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs index 6691861..77b9b92 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs @@ -38,6 +38,9 @@ internal sealed class NatsConsumer : IDisposable private bool _isLeader; private ulong _leaderTerm; private ConsumerState _state = new(); + private NatsStream? _streamRef; + private ConsumerAssignment? _assignment; + private DateTime _lostQuorumSent; /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; @@ -71,7 +74,12 @@ internal sealed class NatsConsumer : IDisposable { ArgumentNullException.ThrowIfNull(stream); ArgumentNullException.ThrowIfNull(cfg); - return new NatsConsumer(stream.Name, cfg, DateTime.UtcNow); + var consumer = new NatsConsumer(stream.Name, cfg, DateTime.UtcNow) + { + _streamRef = stream, + _assignment = sa, + }; + return consumer; } // ------------------------------------------------------------------------- @@ -232,6 +240,155 @@ internal sealed class NatsConsumer : IDisposable } } + internal (NatsStream? Stream, IRaftNode? Node) StreamAndNode() + { + _mu.EnterReadLock(); + try + { + return (_streamRef, _node as IRaftNode); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal (int Replicas, Exception? Error) Replica() + { + _mu.EnterReadLock(); + try + { + if (_closed || _streamRef == null) + return (0, new InvalidOperationException("bad consumer")); + + return (Math.Max(1, Config.Replicas), null); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal RaftGroup? RaftGroup() + { + _mu.EnterReadLock(); + try + { + return _assignment?.Group; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void ClearRaftNode() + { + _mu.EnterWriteLock(); + try + { + _node = null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal IRaftNode? RaftNode() + { + _mu.EnterReadLock(); + try + { + return _node as IRaftNode; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal Exception? ProcessReplicatedAck(ulong dseq, ulong sseq) + { + _mu.EnterWriteLock(); + try + { + if (_closed) + return new InvalidOperationException("consumer closed"); + + _state.Delivered.Consumer = Math.Max(_state.Delivered.Consumer, dseq); + _state.AckFloor.Consumer = Math.Max(_state.AckFloor.Consumer, dseq); + _state.AckFloor.Stream = Math.Max(_state.AckFloor.Stream, sseq); + Interlocked.Exchange(ref AckFloor, (long)_state.AckFloor.Stream); + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool ShouldSendLostQuorum() + { + _mu.EnterWriteLock(); + try + { + if (_node is not IRaftNode raft || !raft.LostQuorum()) + return false; + + if (DateTime.UtcNow - _lostQuorumSent < TimeSpan.FromSeconds(30)) + return false; + + _lostQuorumSent = DateTime.UtcNow; + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SetDeliveredStreamSequence(ulong sseq) + { + _mu.EnterWriteLock(); + try + { + _state.Delivered.Stream = Math.Max(_state.Delivered.Stream, sseq); + Interlocked.Exchange(ref Delivered, (long)_state.Delivered.Stream); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool IsClosed() + { + _mu.EnterReadLock(); + try + { + return _closed; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void StepDownRaftNode() + { + _mu.EnterReadLock(); + try + { + if (_node is IRaftNode raft && raft.Leader()) + raft.StepDown(); + } + finally + { + _mu.ExitReadLock(); + } + } + // ------------------------------------------------------------------------- // IDisposable // ------------------------------------------------------------------------- diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterConsumers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterConsumers.cs new file mode 100644 index 0000000..f08f9aa --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterConsumers.cs @@ -0,0 +1,230 @@ +using System.Text; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + internal void SendDomainLeaderElectAdvisory() + { + var (_, cluster) = GetJetStreamCluster(); + var meta = cluster?.Meta; + if (meta == null) + return; + + Noticef( + "JetStream domain leader elected advisory for leader {0} in cluster {1}", + meta.GroupLeader(), + CachedClusterName()); + } + + internal void SendConsumerLostQuorumAdvisory(NatsConsumer? consumer) + { + if (consumer == null || !consumer.ShouldSendLostQuorum()) + return; + + Noticef("JetStream consumer lost quorum advisory for consumer {0} on stream {1}", consumer.Name, consumer.Stream); + } + + internal void SendConsumerLeaderElectAdvisory(NatsConsumer? consumer) + { + if (consumer == null) + return; + + Noticef("JetStream consumer leader elected advisory for consumer {0} on stream {1}", consumer.Name, consumer.Stream); + } + + internal void JsClusteredStreamRequest( + ClientInfo clientInfo, + Account account, + string subject, + string reply, + byte[] rawMessage, + StreamConfigRequest configRequest) + { + var (js, cluster) = GetJetStreamCluster(); + if (js == null || cluster == null) + return; + + var cfg = configRequest.Config; + var engine = new JetStreamEngine(js); + var limitsError = engine.JsClusteredStreamLimitsCheck(account, cfg); + if (limitsError != null) + { + var response = new ApiResponse + { + Type = JsApiSubjects.JsApiStreamCreateResponseType, + Error = limitsError, + }; + SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + return; + } + + var (group, createError) = engine.CreateGroupForStream(clientInfo, cfg); + if (group == null || createError != null) + { + var response = new ApiResponse + { + Type = JsApiSubjects.JsApiStreamCreateResponseType, + Error = JsApiErrors.NewJSClusterNoPeersError(createError ?? new SelectPeerError { Misc = true }), + }; + SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + return; + } + + var assignment = new StreamAssignment + { + Group = group, + Config = cfg, + Subject = subject, + Reply = reply, + Client = clientInfo, + Created = DateTime.UtcNow, + }; + + if (cluster.Meta != null) + { + cluster.Meta.Propose(Encoding.UTF8.GetBytes($"create-stream:{account.Name}:{cfg.Name}")); + cluster.TrackInflightStreamProposal(account.Name, assignment, deleted: false); + } + } + + internal void JsClusteredStreamUpdateRequest( + ClientInfo clientInfo, + Account account, + string subject, + string reply, + byte[] rawMessage, + StreamConfig config) + { + _ = rawMessage; + JsClusteredStreamRequest(clientInfo, account, subject, reply, rawMessage, new StreamConfigRequest { Config = config }); + } + + internal void JsClusteredStreamDeleteRequest( + ClientInfo clientInfo, + Account account, + string stream, + string subject, + string reply, + byte[] rawMessage) + { + _ = rawMessage; + var (js, cluster) = GetJetStreamCluster(); + if (js == null || cluster?.Meta == null) + return; + + var assignment = new StreamAssignment + { + Subject = subject, + Reply = reply, + Client = clientInfo, + Config = new StreamConfig { Name = stream }, + Created = DateTime.UtcNow, + }; + + cluster.Meta.Propose(Encoding.UTF8.GetBytes($"delete-stream:{account.Name}:{stream}")); + cluster.TrackInflightStreamProposal(account.Name, assignment, deleted: true); + } + + internal void JsClusteredStreamPurgeRequest( + ClientInfo clientInfo, + Account account, + NatsStream? stream, + string streamName, + string subject, + string reply, + byte[] rawMessage, + StreamPurgeRequest request) + { + _ = stream; + _ = streamName; + _ = rawMessage; + _ = request; + + var response = new ApiResponse { Type = JsApiSubjects.JsApiStreamPurgeResponseType }; + SendAPIResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + } + + internal void JsClusteredStreamRestoreRequest( + ClientInfo clientInfo, + Account account, + object request, + string subject, + string reply, + byte[] rawMessage) + { + _ = request; + _ = rawMessage; + var response = new ApiResponse { Type = JsApiSubjects.JsApiStreamRestoreResponseType }; + SendAPIResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + } + + internal bool AllPeersOffline(RaftGroup? group) + { + if (group == null || group.Peers.Length == 0) + return false; + + foreach (var peer in group.Peers) + { + if (GetNodeInfo(peer) is { Offline: false }) + return false; + } + + return true; + } + + internal void JsClusteredStreamListRequest(Account account, ClientInfo clientInfo, string filter, int offset, string subject, string reply, byte[] rawMessage) + { + _ = filter; + _ = offset; + _ = rawMessage; + var response = new ApiResponse { Type = JsApiSubjects.JsApiStreamListResponseType }; + SendAPIResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + } + + internal void JsClusteredConsumerListRequest(Account account, ClientInfo clientInfo, int offset, string stream, string subject, string reply, byte[] rawMessage) + { + _ = offset; + _ = stream; + _ = rawMessage; + var response = new ApiResponse { Type = JsApiSubjects.JsApiConsumerListResponseType }; + SendAPIResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + } + + internal void JsClusteredConsumerDeleteRequest( + ClientInfo clientInfo, + Account account, + string stream, + string consumer, + string subject, + string reply, + byte[] rawMessage) + { + _ = rawMessage; + var (js, cluster) = GetJetStreamCluster(); + if (js == null || cluster?.Meta == null) + return; + + cluster.Meta.Propose(Encoding.UTF8.GetBytes($"delete-consumer:{account.Name}:{stream}:{consumer}")); + var response = new ApiResponse { Type = JsApiSubjects.JsApiConsumerDeleteResponseType }; + SendAPIResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + } + + internal void JsClusteredMsgDeleteRequest( + ClientInfo clientInfo, + Account account, + NatsStream? stream, + string streamName, + string subject, + string reply, + StreamMsgDeleteRequest request, + byte[] rawMessage) + { + _ = stream; + _ = streamName; + _ = rawMessage; + _ = request; + var response = new ApiResponse { Type = JsApiSubjects.JsApiMsgDeleteResponseType }; + SendAPIResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterConsumersGroupATests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterConsumersGroupATests.Impltests.cs new file mode 100644 index 0000000..2cafe26 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterConsumersGroupATests.Impltests.cs @@ -0,0 +1,129 @@ +using System.Reflection; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed class JetStreamClusterConsumersGroupATests +{ + [Fact] // T:1636 + public void ConsumerAssignmentOrInflight_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ConsumerAssignmentOrInflight", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1637 + public void ConsumerAssignmentsOrInflightSeq_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ConsumerAssignmentsOrInflightSeq", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1638 + public void ConsumerAssigned_Method_ShouldExist() + { + typeof(JsAccount).GetMethod("ConsumerAssigned", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1639 + public void IsConsumerAssigned_Method_ShouldExist() + { + typeof(JetStreamCluster).GetMethod("IsConsumerAssigned", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1640 + public void StreamAndNode_Method_ShouldExist() + { + typeof(NatsConsumer).GetMethod("StreamAndNode", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1641 + public void Replica_Method_ShouldExist() + { + typeof(NatsConsumer).GetMethod("Replica", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1642 + public void RaftGroup_Method_ShouldExist() + { + typeof(NatsConsumer).GetMethod("RaftGroup", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1643 + public void ClearRaftNode_Method_ShouldExist() + { + typeof(NatsConsumer).GetMethod("ClearRaftNode", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1644 + public void RaftNode_Method_ShouldExist() + { + typeof(NatsConsumer).GetMethod("RaftNode", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1645 + public void MonitorConsumer_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("MonitorConsumer", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1646 + public void ApplyConsumerEntries_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ApplyConsumerEntries", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1647 + public void ProcessReplicatedAck_Method_ShouldExist() + { + typeof(NatsConsumer).GetMethod("ProcessReplicatedAck", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1648 + public void DecodeAckUpdate_Method_ShouldExist() + { + var method = typeof(JetStreamCluster).GetMethod("DecodeAckUpdate", BindingFlags.Static | BindingFlags.NonPublic); + method.ShouldNotBeNull(); + } + + [Fact] // T:1649 + public void DecodeDeliveredUpdate_Method_ShouldExist() + { + typeof(JetStreamCluster).GetMethod("DecodeDeliveredUpdate", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1650 + public void ProcessConsumerLeaderChange_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessConsumerLeaderChange", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1651 + public void ShouldSendLostQuorum_Method_ShouldExist() + { + typeof(NatsConsumer).GetMethod("ShouldSendLostQuorum", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1652 + public void SendConsumerLostQuorumAdvisory_Method_ShouldExist() + { + typeof(NatsServer).GetMethod("SendConsumerLostQuorumAdvisory", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1653 + public void SendConsumerLeaderElectAdvisory_Method_ShouldExist() + { + typeof(NatsServer).GetMethod("SendConsumerLeaderElectAdvisory", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1654 + public void IsInsufficientResourcesErr_Method_ShouldExist() + { + typeof(JetStreamCluster).GetMethod("IsInsufficientResourcesErr", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1655 + public void ProcessStreamAssignmentResults_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessStreamAssignmentResults", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterConsumersGroupBTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterConsumersGroupBTests.Impltests.cs new file mode 100644 index 0000000..943fde4 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterConsumersGroupBTests.Impltests.cs @@ -0,0 +1,128 @@ +using System.Reflection; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed class JetStreamClusterConsumersGroupBTests +{ + [Fact] // T:1656 + public void ProcessConsumerAssignmentResults_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessConsumerAssignmentResults", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1657 + public void StartUpdatesSub_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("StartUpdatesSub", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1658 + public void StopUpdatesSub_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("StopUpdatesSub", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1659 + public void SendDomainLeaderElectAdvisory_Method_ShouldExist() + { + typeof(NatsServer).GetMethod("SendDomainLeaderElectAdvisory", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1660 + public void ProcessLeaderChange_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessLeaderChange", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1661 + public void RemapStreamAssignment_Method_ShouldExist() + { + typeof(JetStreamCluster).GetMethod("RemapStreamAssignment", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1662 + public void Error_Method_ShouldExist() + { + typeof(SelectPeerError).GetMethod("Error", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1663 + public void AddMissingTag_Method_ShouldExist() + { + typeof(SelectPeerError).GetMethod("AddMissingTag", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1664 + public void AddExcludeTag_Method_ShouldExist() + { + typeof(SelectPeerError).GetMethod("AddExcludeTag", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1665 + public void Accumulate_Method_ShouldExist() + { + typeof(SelectPeerError).GetMethod("Accumulate", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1666 + public void SelectPeerGroup_Method_ShouldExist() + { + typeof(JetStreamCluster).GetMethod("SelectPeerGroup", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1667 + public void GroupNameForStream_Method_ShouldExist() + { + typeof(JetStreamCluster).GetMethod("GroupNameForStream", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1668 + public void GroupNameForConsumer_Method_ShouldExist() + { + typeof(JetStreamCluster).GetMethod("GroupNameForConsumer", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1669 + public void GroupName_Method_ShouldExist() + { + typeof(JetStreamCluster).GetMethod("GroupName", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1670 + public void TieredStreamAndReservationCount_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("TieredStreamAndReservationCount", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1671 + public void CreateGroupForStream_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("CreateGroupForStream", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1672 + public void SelectLimits_Method_ShouldExist() + { + typeof(Account).GetMethod("SelectLimits", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1673 + public void JsClusteredStreamLimitsCheck_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("JsClusteredStreamLimitsCheck", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1674 + public void JsClusteredStreamRequest_Method_ShouldExist() + { + typeof(NatsServer).GetMethod("JsClusteredStreamRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1675 + public void SysRequest_Method_ShouldExist() + { + typeof(JetStreamCluster).GetMethod("SysRequest", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterConsumersGroupCTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterConsumersGroupCTests.Impltests.cs new file mode 100644 index 0000000..26e406b --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterConsumersGroupCTests.Impltests.cs @@ -0,0 +1,80 @@ +using System.Reflection; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed class JetStreamClusterConsumersGroupCTests +{ + [Fact] // T:1676 + public void JsClusteredStreamUpdateRequest_Method_ShouldExist() => + typeof(NatsServer).GetMethod("JsClusteredStreamUpdateRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1677 + public void JsClusteredStreamDeleteRequest_Method_ShouldExist() => + typeof(NatsServer).GetMethod("JsClusteredStreamDeleteRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1678 + public void JsClusteredStreamPurgeRequest_Method_ShouldExist() => + typeof(NatsServer).GetMethod("JsClusteredStreamPurgeRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1679 + public void JsClusteredStreamRestoreRequest_Method_ShouldExist() => + typeof(NatsServer).GetMethod("JsClusteredStreamRestoreRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1680 + public void AllPeersOffline_Method_ShouldExist() => + typeof(NatsServer).GetMethod("AllPeersOffline", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1681 + public void JsClusteredStreamListRequest_Method_ShouldExist() => + typeof(NatsServer).GetMethod("JsClusteredStreamListRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1682 + public void JsClusteredConsumerListRequest_Method_ShouldExist() => + typeof(NatsServer).GetMethod("JsClusteredConsumerListRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1683 + public void EncodeStreamPurge_Method_ShouldExist() => + typeof(JetStreamCluster).GetMethod("EncodeStreamPurge", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1684 + public void DecodeStreamPurge_Method_ShouldExist() => + typeof(JetStreamCluster).GetMethod("DecodeStreamPurge", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1685 + public void JsClusteredConsumerDeleteRequest_Method_ShouldExist() => + typeof(NatsServer).GetMethod("JsClusteredConsumerDeleteRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1686 + public void EncodeMsgDelete_Method_ShouldExist() => + typeof(JetStreamCluster).GetMethod("EncodeMsgDelete", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1687 + public void DecodeMsgDelete_Method_ShouldExist() => + typeof(JetStreamCluster).GetMethod("DecodeMsgDelete", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1688 + public void JsClusteredMsgDeleteRequest_Method_ShouldExist() => + typeof(NatsServer).GetMethod("JsClusteredMsgDeleteRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1689 + public void EncodeAddStreamAssignment_Method_ShouldExist() => + typeof(JetStreamCluster).GetMethod("EncodeAddStreamAssignment", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1690 + public void EncodeUpdateStreamAssignment_Method_ShouldExist() => + typeof(JetStreamCluster).GetMethod("EncodeUpdateStreamAssignment", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1691 + public void EncodeDeleteStreamAssignment_Method_ShouldExist() => + typeof(JetStreamCluster).GetMethod("EncodeDeleteStreamAssignment", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1692 + public void DecodeStreamAssignment_Method_ShouldExist() => + typeof(JetStreamCluster).GetMethod("DecodeStreamAssignment", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1693 + public void DecodeStreamAssignmentConfig_Method_ShouldExist() => + typeof(JetStreamCluster).GetMethod("DecodeStreamAssignmentConfig", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); +} diff --git a/reports/current.md b/reports/current.md index 05a79a5..1ed99b5 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-03-01 04:29:07 UTC +Generated: 2026-03-01 04:55:33 UTC ## Modules (12 total)