diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs index d563da5..0e48ec7 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs @@ -336,6 +336,29 @@ internal sealed class JetStreamCluster if (streams.Count == 0) InflightConsumers.Remove(accountName); } + + internal static ( + string[] NewPeers, + string[] OldPeers, + Dictionary NewPeerSet, + Dictionary OldPeerSet) GenPeerInfo(string[] peers, int split) + { + var safeSplit = Math.Clamp(split, 0, peers.Length); + var oldPeers = peers.Take(safeSplit).ToArray(); + var newPeers = peers.Skip(safeSplit).ToArray(); + var oldSet = oldPeers.ToDictionary(p => p, _ => true, StringComparer.Ordinal); + var newSet = newPeers.ToDictionary(p => p, _ => true, StringComparer.Ordinal); + return (newPeers, oldPeers, newSet, oldSet); + } + + internal static bool IsControlHdr(byte[]? headers) + { + if (headers == null || headers.Length == 0) + return false; + + var text = System.Text.Encoding.ASCII.GetString(headers); + return text.Contains(NatsHeaderConstants.JsResponseType, StringComparison.OrdinalIgnoreCase); + } } // ============================================================================ @@ -452,6 +475,31 @@ internal sealed class RaftGroup /// Internal Raft node — not serialized. [JsonIgnore] public IRaftNode? Node { get; set; } + + internal bool IsMember(string id) => + Peers.Contains(id, StringComparer.Ordinal); + + internal void SetPreferred(NatsServer server) + { + if (Peers.Length == 0) + return; + if (Peers.Length == 1) + { + Preferred = Peers[0]; + return; + } + + var online = new List(Peers.Length); + foreach (var peer in Peers) + { + var info = server.GetNodeInfo(peer); + if (info is { Offline: false }) + online.Add(peer); + } + + var candidates = online.Count > 0 ? online : [.. Peers]; + Preferred = candidates[Random.Shared.Next(candidates.Count)]; + } } // ============================================================================ @@ -482,6 +530,49 @@ internal sealed class StreamAssignment [JsonIgnore] public bool Resetting { get; set; } [JsonIgnore] public Exception? Error { get; set; } [JsonIgnore] public UnsupportedStreamAssignment? Unsupported { get; set; } + + internal StreamAssignment CopyGroup() + { + var clone = new StreamAssignment + { + Client = Client, + Created = Created, + ConfigJson = ConfigJson, + Config = Config, + Group = Group == null + ? null + : new RaftGroup + { + Name = Group.Name, + Peers = [.. Group.Peers], + Storage = Group.Storage, + Cluster = Group.Cluster, + Preferred = Group.Preferred, + ScaleUp = Group.ScaleUp, + Node = Group.Node, + }, + Sync = Sync, + Subject = Subject, + Reply = Reply, + Restore = Restore, + Consumers = Consumers, + Responded = Responded, + Recovering = Recovering, + Reassigning = Reassigning, + Resetting = Resetting, + Error = Error, + Unsupported = Unsupported, + }; + return clone; + } + + internal bool MissingPeers() => + Group != null && + Config != null && + Group.Peers.Length < Math.Max(1, Config.Replicas); + + internal string RecoveryKey() => + $"{Client?.ServiceAccount() ?? string.Empty}:{Config?.Name ?? Subject ?? string.Empty}"; } // ============================================================================ @@ -551,6 +642,45 @@ internal sealed class ConsumerAssignment [JsonIgnore] public bool Recovering { get; set; } [JsonIgnore] public Exception? Error { get; set; } [JsonIgnore] public UnsupportedConsumerAssignment? Unsupported { get; set; } + + internal ConsumerAssignment CopyGroup() + { + var clone = new ConsumerAssignment + { + Client = Client, + Created = Created, + Name = Name, + Stream = Stream, + ConfigJson = ConfigJson, + Config = Config, + Group = Group == null + ? null + : new RaftGroup + { + Name = Group.Name, + Peers = [.. Group.Peers], + Storage = Group.Storage, + Cluster = Group.Cluster, + Preferred = Group.Preferred, + ScaleUp = Group.ScaleUp, + Node = Group.Node, + }, + Subject = Subject, + Reply = Reply, + State = State, + Responded = Responded, + Recovering = Recovering, + Error = Error, + Unsupported = Unsupported, + }; + return clone; + } + + internal string StreamRecoveryKey() => + $"{Client?.ServiceAccount() ?? string.Empty}:{Stream}"; + + internal string RecoveryKey() => + $"{Client?.ServiceAccount() ?? string.Empty}:{Stream}:{Name}"; } // ============================================================================ diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs index 2cfdb00..16f33a9 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs @@ -816,6 +816,964 @@ internal sealed class JetStreamEngine(JetStream state) !string.IsNullOrWhiteSpace(value) && value.Length >= 10 && value.StartsWith("A", StringComparison.Ordinal); + + // ------------------------------------------------------------------------- + // JetStream cluster stream methods (Batch 33 Group A) + // ------------------------------------------------------------------------- + + internal void MonitorCluster() + { + var meta = GetMetaGroup(); + if (meta == null) + return; + + SetMetaRecovering(); + try + { + CheckClusterSize(); + var (_, _, _, error) = MetaSnapshot(); + if (error != null) + Server()?.Warnf("JetStream meta snapshot failed in MonitorCluster: {0}", error.Message); + } + finally + { + ClearMetaRecovering(); + ClusterStoppedC()?.Writer.TryWrite(true); + } + } + + internal void CheckClusterSize() + { + var server = Server(); + var meta = GetMetaGroup(); + if (server == null || meta == null) + return; + + var activePeers = server.ActivePeers(); + if (activePeers.Count == 0) + return; + + var jsPeers = 0; + foreach (var peer in activePeers) + { + var info = server.GetNodeInfo(peer); + if (info?.Js == true) + jsPeers++; + } + + if (jsPeers > 0 && jsPeers < meta.ClusterSize()) + meta.AdjustClusterSize(jsPeers); + } + + internal (StreamConfig Config, bool Ok) ClusterStreamConfig(string accountName, string streamName) + { + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return (new StreamConfig(), false); + if (!cluster.Streams.TryGetValue(accountName, out var accountStreams)) + return (new StreamConfig(), false); + if (!accountStreams.TryGetValue(streamName, out var assignment)) + return (new StreamConfig(), false); + if (assignment.Config == null) + return (new StreamConfig(), false); + + return (assignment.Config.Clone(), true); + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal (byte[] Snapshot, int Streams, int Consumers, Exception? Error) MetaSnapshot() + { + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return ([], 0, 0, null); + + return EncodeMetaSnapshot(cluster.Streams); + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal Exception? ApplyMetaSnapshot(byte[] buffer, RecoveryUpdates? updates, bool isRecovering) + { + var (decoded, error) = DecodeMetaSnapshot(buffer); + if (error != null || decoded == null) + return error; + + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return null; + + foreach (var accountStreams in decoded.Values) + { + foreach (var assignment in accountStreams.Values) + { + SetStreamAssignmentRecovering(assignment); + if (assignment.Consumers == null) + continue; + + foreach (var consumer in assignment.Consumers.Values) + { + SetConsumerAssignmentRecovering(consumer); + } + } + } + + cluster.Streams = decoded; + if (!isRecovering && updates != null) + { + updates.AddStreams.Clear(); + updates.UpdateStreams.Clear(); + updates.RemoveStreams.Clear(); + updates.UpdateConsumers.Clear(); + updates.RemoveConsumers.Clear(); + } + return null; + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal (Dictionary>? Streams, Exception? Error) DecodeMetaSnapshot(byte[] buffer) + { + if (buffer.Length == 0) + { + return ( + new Dictionary>(StringComparer.Ordinal), + null + ); + } + + try + { + var writeable = System.Text.Json.JsonSerializer.Deserialize>(buffer) ?? []; + var streams = new Dictionary>(StringComparer.Ordinal); + foreach (var item in writeable) + { + var accountName = item.Client?.ServiceAccount() ?? string.Empty; + if (!streams.TryGetValue(accountName, out var accountStreams)) + { + accountStreams = new Dictionary(StringComparer.Ordinal); + streams[accountName] = accountStreams; + } + + var assignment = new StreamAssignment + { + Client = item.Client, + Created = item.Created, + ConfigJson = item.ConfigJson, + Group = item.Group, + Sync = item.Sync, + Config = item.ConfigJson.ValueKind == System.Text.Json.JsonValueKind.Undefined + ? null + : System.Text.Json.JsonSerializer.Deserialize(item.ConfigJson.GetRawText()), + Consumers = new Dictionary(StringComparer.Ordinal), + }; + + if (item.Consumers.Count > 0) + { + foreach (var consumer in item.Consumers) + { + var streamName = !string.IsNullOrWhiteSpace(consumer.Stream) + ? consumer.Stream + : assignment.Config?.Name ?? string.Empty; + var assignmentConsumer = new ConsumerAssignment + { + Client = consumer.Client, + Created = consumer.Created, + Name = consumer.Name, + Stream = streamName, + ConfigJson = consumer.ConfigJson, + Group = consumer.Group, + State = consumer.State, + Config = consumer.ConfigJson.ValueKind == System.Text.Json.JsonValueKind.Undefined + ? null + : System.Text.Json.JsonSerializer.Deserialize(consumer.ConfigJson.GetRawText()), + }; + assignment.Consumers[assignmentConsumer.Name] = assignmentConsumer; + } + } + + var streamKey = assignment.Config?.Name ?? string.Empty; + accountStreams[streamKey] = assignment; + } + + return (streams, null); + } + catch (Exception ex) + { + return (null, ex); + } + } + + internal (byte[] Snapshot, int Streams, int Consumers, Exception? Error) EncodeMetaSnapshot( + Dictionary> streams) + { + try + { + var streamCount = 0; + var consumerCount = 0; + var writeable = new List(); + foreach (var accountStreams in streams.Values) + { + foreach (var assignment in accountStreams.Values) + { + streamCount++; + var configJson = assignment.ConfigJson.ValueKind == System.Text.Json.JsonValueKind.Undefined && + assignment.Config != null + ? System.Text.Json.JsonSerializer.SerializeToElement(assignment.Config) + : assignment.ConfigJson; + var ws = new WriteableStreamAssignment + { + Client = assignment.Client, + Created = assignment.Created, + ConfigJson = configJson, + Group = assignment.Group, + Sync = assignment.Sync, + }; + + if (assignment.Consumers != null) + { + foreach (var consumer in assignment.Consumers.Values) + { + consumerCount++; + var consumerConfigJson = + consumer.ConfigJson.ValueKind == System.Text.Json.JsonValueKind.Undefined && + consumer.Config != null + ? System.Text.Json.JsonSerializer.SerializeToElement(consumer.Config) + : consumer.ConfigJson; + ws.Consumers.Add(new WriteableConsumerAssignment + { + Client = consumer.Client, + Created = consumer.Created, + Name = consumer.Name, + Stream = consumer.Stream, + ConfigJson = consumerConfigJson, + Group = consumer.Group, + State = consumer.State, + }); + } + } + + writeable.Add(ws); + } + } + + if (writeable.Count == 0) + return ([], 0, 0, null); + + var snapshot = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(writeable); + return (snapshot, streamCount, consumerCount, null); + } + catch (Exception ex) + { + return ([], 0, 0, ex); + } + } + + internal Exception? CollectStreamAndConsumerChanges( + IRaftNodeCheckpoint checkpoint, + Dictionary> streams) + { + try + { + foreach (var (appendEntry, error) in checkpoint.AppendEntriesSeq()) + { + if (error != null) + return error; + + foreach (var entry in appendEntry.Entries) + { + if (entry.Type != EntryType.EntryNormal || entry.Data.Length == 0) + continue; + + var op = (EntryOp)entry.Data[0]; + var payload = entry.Data.Length > 1 ? entry.Data.AsSpan(1).ToArray() : []; + switch (op) + { + case EntryOp.AssignStreamOp: + case EntryOp.UpdateStreamOp: + { + var sa = System.Text.Json.JsonSerializer.Deserialize(payload); + if (sa?.Client == null || sa.Config == null) + break; + var account = sa.Client.ServiceAccount(); + if (!streams.TryGetValue(account, out var accountStreams)) + { + accountStreams = new Dictionary(StringComparer.Ordinal); + streams[account] = accountStreams; + } + + accountStreams[sa.Config.Name] = sa; + break; + } + case EntryOp.RemoveStreamOp: + { + var sa = System.Text.Json.JsonSerializer.Deserialize(payload); + if (sa?.Client == null || sa.Config == null) + break; + if (streams.TryGetValue(sa.Client.ServiceAccount(), out var accountStreams)) + { + accountStreams.Remove(sa.Config.Name); + } + break; + } + case EntryOp.AssignConsumerOp: + case EntryOp.AssignCompressedConsumerOp: + { + var ca = System.Text.Json.JsonSerializer.Deserialize(payload); + if (ca?.Client == null || string.IsNullOrWhiteSpace(ca.Stream)) + break; + var account = ca.Client.ServiceAccount(); + if (!streams.TryGetValue(account, out var accountStreams)) + break; + if (!accountStreams.TryGetValue(ca.Stream, out var streamAssignment)) + break; + streamAssignment.Consumers ??= new Dictionary(StringComparer.Ordinal); + streamAssignment.Consumers[ca.Name] = ca; + break; + } + case EntryOp.RemoveConsumerOp: + { + var ca = System.Text.Json.JsonSerializer.Deserialize(payload); + if (ca?.Client == null || string.IsNullOrWhiteSpace(ca.Stream)) + break; + var account = ca.Client.ServiceAccount(); + if (!streams.TryGetValue(account, out var accountStreams)) + break; + if (!accountStreams.TryGetValue(ca.Stream, out var streamAssignment)) + break; + streamAssignment.Consumers?.Remove(ca.Name); + break; + } + } + } + } + + return null; + } + catch (Exception ex) + { + return ex; + } + } + + internal void SetStreamAssignmentRecovering(StreamAssignment assignment) + { + assignment.Responded = true; + assignment.Recovering = true; + assignment.Restore = null; + if (assignment.Group != null) + { + assignment.Group.Preferred = string.Empty; + assignment.Group.ScaleUp = false; + } + } + + internal void SetConsumerAssignmentRecovering(ConsumerAssignment assignment) + { + assignment.Responded = true; + assignment.Recovering = true; + if (assignment.Group != null) + { + assignment.Group.Preferred = string.Empty; + assignment.Group.ScaleUp = false; + } + } + + internal void ProcessAddPeer(string peer) + { + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return; + if (!cluster.IsLeader()) + return; + + foreach (var accountStreams in cluster.Streams.Values) + { + foreach (var assignment in accountStreams.Values) + { + if (!assignment.MissingPeers()) + continue; + + var copy = assignment.CopyGroup(); + copy.Group!.Peers = [.. copy.Group.Peers, peer]; + assignment.Group = copy.Group; + + if (assignment.Consumers == null) + continue; + + foreach (var consumer in assignment.Consumers.Values) + { + if (consumer.Config?.Durable is { Length: > 0 } || (consumer.Group?.Peers.Length ?? 0) > 1) + consumer.Group!.Peers = assignment.Group.Peers; + } + } + } + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal void ProcessRemovePeer(string peer) + { + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return; + + foreach (var accountStreams in cluster.Streams.Values) + { + foreach (var assignment in accountStreams.Values) + { + if (assignment.Group == null || !assignment.Group.IsMember(peer)) + continue; + + RemovePeerFromStreamLocked(assignment, peer); + } + } + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal bool RemovePeerFromStream(StreamAssignment assignment, string peer) + { + _state.Lock.EnterWriteLock(); + try + { + return RemovePeerFromStreamLocked(assignment, peer); + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal bool RemovePeerFromStreamLocked(StreamAssignment assignment, string peer) + { + if (assignment.Group == null || !assignment.Group.IsMember(peer)) + return false; + + assignment.Group.Peers = [.. assignment.Group.Peers.Where(p => !string.Equals(p, peer, StringComparison.Ordinal))]; + assignment.Group.Preferred = string.Empty; + + if (assignment.Consumers == null) + return true; + + foreach (var consumer in assignment.Consumers.Values) + { + if (consumer.Group == null) + continue; + consumer.Group.Peers = [.. consumer.Group.Peers.Where(p => !string.Equals(p, peer, StringComparison.Ordinal))]; + consumer.Group.Preferred = string.Empty; + } + + return true; + } + + internal bool HasPeerEntries(IEnumerable entries) + { + foreach (var entry in entries) + { + if (entry.Type == EntryType.EntryAddPeer || entry.Type == EntryType.EntryRemovePeer) + return true; + } + return false; + } + + internal (bool IsRecovering, bool DidSnapshot, Exception? Error) ApplyMetaEntries( + IReadOnlyList entries, + RecoveryUpdates? updates) + { + var recovering = updates != null; + var didSnapshot = false; + + try + { + foreach (var entry in entries) + { + if (entry.Type == EntryType.EntryCatchup) + { + recovering = true; + break; + } + + if (entry.Type == EntryType.EntrySnapshot) + { + var error = ApplyMetaSnapshot(entry.Data, updates, recovering); + if (error != null) + return (recovering, didSnapshot, error); + didSnapshot = true; + continue; + } + + if (entry.Type == EntryType.EntryRemovePeer) + { + ProcessRemovePeer(System.Text.Encoding.ASCII.GetString(entry.Data)); + continue; + } + + if (entry.Type == EntryType.EntryAddPeer) + { + ProcessAddPeer(System.Text.Encoding.ASCII.GetString(entry.Data)); + continue; + } + + if (entry.Type != EntryType.EntryNormal || entry.Data.Length == 0) + continue; + + var op = (EntryOp)entry.Data[0]; + var payload = entry.Data.Length > 1 ? entry.Data.AsSpan(1).ToArray() : []; + switch (op) + { + case EntryOp.AssignStreamOp: + { + var sa = System.Text.Json.JsonSerializer.Deserialize(payload); + if (sa != null) + SetStreamAssignmentRecovering(sa); + break; + } + case EntryOp.AssignConsumerOp: + case EntryOp.AssignCompressedConsumerOp: + { + var ca = System.Text.Json.JsonSerializer.Deserialize(payload); + if (ca != null) + SetConsumerAssignmentRecovering(ca); + break; + } + } + } + + return (recovering, didSnapshot, null); + } + catch (Exception ex) + { + return (recovering, didSnapshot, ex); + } + } + + internal (IRaftNode? Node, Exception? Error) CreateRaftGroup( + string accountName, + RaftGroup group, + bool recovering, + StorageType storage) + { + var server = Server(); + if (server == null) + return (null, new InvalidOperationException("jetstream server unavailable")); + if (group.Peers.Length <= 1) + return (null, null); + + var meta = GetMetaGroup(); + if (meta == null || !group.IsMember(meta.ID())) + return (null, null); + + var existing = server.LookupRaftNode(group.Name); + if (existing != null && existing.State() != RaftState.Closed) + { + group.Node = existing; + return (existing, null); + } + + group.SetPreferred(server); + var config = new RaftConfig + { + Name = group.Name, + Recovering = recovering, + ScaleUp = group.ScaleUp, + Store = Path.Combine(_state.Config.StoreDir, accountName, "_js_", group.Name), + Log = null, + }; + + var (node, error) = server.StartRaftNode(accountName, config); + if (error != null || node == null) + return (null, error); + + group.Node = node; + if (!string.IsNullOrEmpty(group.Preferred) && node.ID() == group.Preferred && node.Term() == 0) + node.CampaignImmediately(); + + _ = storage; + return (node, null); + } + + internal static ( + string[] NewPeers, + string[] OldPeers, + Dictionary NewPeerSet, + Dictionary OldPeerSet) GenPeerInfo(string[] peers, int split) + { + var safeSplit = Math.Clamp(split, 0, peers.Length); + var oldPeers = peers.Take(safeSplit).ToArray(); + var newPeers = peers.Skip(safeSplit).ToArray(); + var oldSet = oldPeers.ToDictionary(p => p, _ => true, StringComparer.Ordinal); + var newSet = newPeers.ToDictionary(p => p, _ => true, StringComparer.Ordinal); + return (newPeers, oldPeers, newSet, oldSet); + } + + internal void MonitorStream(NatsStream? stream, StreamAssignment assignment, bool sendSnapshot) + { + var node = assignment.Group?.Node; + if (node == null) + return; + + var isLeader = node.Leader(); + ProcessStreamLeaderChange(stream, isLeader); + if (sendSnapshot && stream != null && isLeader) + { + var state = stream.State(); + var snap = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(state); + node.SendSnapshot(snap); + } + } + + internal static bool IsControlHdr(byte[]? headers) + { + if (headers == null || headers.Length == 0) + return false; + + var text = System.Text.Encoding.ASCII.GetString(headers); + return text.Contains(NatsHeaderConstants.JsResponseType, StringComparison.OrdinalIgnoreCase); + } + + internal (ulong MaxApplied, Exception? Error) ApplyStreamEntries( + NatsStream? stream, + CommittedEntry committed, + bool isRecovering) + { + try + { + if (stream == null) + return (0, null); + + ulong maxApplied = 0; + foreach (var entry in committed.Entries) + { + if (entry.Type != EntryType.EntryNormal || entry.Data.Length == 0) + continue; + + var op = (EntryOp)entry.Data[0]; + if (op == EntryOp.StreamMsgOp || op == EntryOp.CompressedStreamMsgOp) + { + var error = ApplyStreamMsgOp(stream, entry, isRecovering); + if (error != null) + return (maxApplied, error); + maxApplied = Math.Max(maxApplied, committed.Index); + } + } + + return (maxApplied, null); + } + catch (Exception ex) + { + return (0, ex); + } + } + + internal Exception? ApplyStreamMsgOp(NatsStream stream, Entry entry, bool isRecovering) + { + if (isRecovering && stream.SkipBatchIfRecovering()) + return null; + + var payloadLength = Math.Max(0, entry.Data.Length - 1); + Interlocked.Increment(ref stream.Msgs); + Interlocked.Add(ref stream.Bytes, payloadLength); + return null; + } + + internal void ProcessStreamLeaderChange(NatsStream? stream, bool isLeader) + { + if (stream == null) + return; + stream.SetLeader(isLeader, term: 0); + if (!isLeader) + stream.ResetClusteredState(); + } + + internal StreamAssignment? StreamAssignment(string accountName, string streamName) + { + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return null; + if (!cluster.Streams.TryGetValue(accountName, out var accountStreams)) + return null; + return accountStreams.TryGetValue(streamName, out var assignment) ? assignment : null; + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal StreamAssignment? StreamAssignmentOrInflight(string accountName, string streamName) + { + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return null; + if (cluster.Streams.TryGetValue(accountName, out var accountStreams) && + accountStreams.TryGetValue(streamName, out var current)) + return current; + + if (cluster.InflightStreams.TryGetValue(accountName, out var inflight) && + inflight.TryGetValue(streamName, out var info)) + return info.Assignment; + + return null; + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal IEnumerable StreamAssignmentsOrInflightSeq(string accountName) + { + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return []; + + var seen = new HashSet(StringComparer.Ordinal); + var items = new List(); + if (cluster.Streams.TryGetValue(accountName, out var accountStreams)) + { + foreach (var (streamName, assignment) in accountStreams) + { + seen.Add(streamName); + items.Add(assignment); + } + } + + if (cluster.InflightStreams.TryGetValue(accountName, out var inflightStreams)) + { + foreach (var (streamName, inflight) in inflightStreams) + { + if (seen.Contains(streamName) || inflight.Assignment == null) + continue; + items.Add(inflight.Assignment); + } + } + + return items; + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal IEnumerable<(string Account, StreamAssignment Assignment)> StreamAssignmentsOrInflightSeqAllAccounts() + { + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return []; + + var results = new List<(string Account, StreamAssignment Assignment)>(); + foreach (var account in cluster.Streams.Keys.Union(cluster.InflightStreams.Keys, StringComparer.Ordinal)) + { + var seen = new HashSet(StringComparer.Ordinal); + if (cluster.Streams.TryGetValue(account, out var accountStreams)) + { + foreach (var (streamName, assignment) in accountStreams) + { + seen.Add(streamName); + results.Add((account, assignment)); + } + } + + if (cluster.InflightStreams.TryGetValue(account, out var inflightStreams)) + { + foreach (var (streamName, inflight) in inflightStreams) + { + if (seen.Contains(streamName) || inflight.Assignment == null) + continue; + results.Add((account, inflight.Assignment)); + } + } + } + + return results; + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal void ProcessStreamAssignment(StreamAssignment assignment) + { + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster || assignment.Client == null) + return; + + var accountName = assignment.Client.ServiceAccount(); + if (!cluster.Streams.TryGetValue(accountName, out var accountStreams)) + { + accountStreams = new Dictionary(StringComparer.Ordinal); + cluster.Streams[accountName] = accountStreams; + } + + var streamName = assignment.Config?.Name ?? assignment.Subject ?? string.Empty; + assignment.Responded = true; + accountStreams[streamName] = assignment; + cluster.RemoveInflightStreamProposal(accountName, streamName); + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal void ProcessUpdateStreamAssignment(StreamAssignment assignment) + { + ProcessStreamAssignment(assignment); + } + + internal void ProcessClusterUpdateStream(StreamAssignment assignment) + { + ProcessUpdateStreamAssignment(assignment); + } + + internal void ProcessClusterCreateStream(StreamAssignment assignment) + { + ProcessStreamAssignment(assignment); + } + + internal void ProcessStreamRemoval(StreamAssignment assignment) + { + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster || assignment.Client == null) + return; + var accountName = assignment.Client.ServiceAccount(); + var streamName = assignment.Config?.Name ?? assignment.Subject ?? string.Empty; + if (cluster.Streams.TryGetValue(accountName, out var accountStreams)) + { + accountStreams.Remove(streamName); + if (accountStreams.Count == 0) + cluster.Streams.Remove(accountName); + } + cluster.RemoveInflightStreamProposal(accountName, streamName); + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal void ProcessClusterDeleteStream(StreamAssignment assignment) + { + ProcessStreamRemoval(assignment); + } + + internal void ProcessConsumerAssignment(ConsumerAssignment assignment) + { + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster || assignment.Client == null) + return; + + var accountName = assignment.Client.ServiceAccount(); + if (!cluster.Streams.TryGetValue(accountName, out var accountStreams)) + return; + if (!accountStreams.TryGetValue(assignment.Stream, out var streamAssignment)) + return; + + streamAssignment.Consumers ??= new Dictionary(StringComparer.Ordinal); + assignment.Responded = true; + streamAssignment.Consumers[assignment.Name] = assignment; + cluster.RemoveInflightConsumerProposal(accountName, assignment.Stream, assignment.Name); + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal void ProcessConsumerRemoval(ConsumerAssignment assignment) + { + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster || assignment.Client == null) + return; + + var accountName = assignment.Client.ServiceAccount(); + if (cluster.Streams.TryGetValue(accountName, out var accountStreams) && + accountStreams.TryGetValue(assignment.Stream, out var streamAssignment)) + { + streamAssignment.Consumers?.Remove(assignment.Name); + } + cluster.RemoveInflightConsumerProposal(accountName, assignment.Stream, assignment.Name); + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal void ProcessClusterCreateConsumer(ConsumerAssignment assignment) + { + ProcessConsumerAssignment(assignment); + } + + internal void ProcessClusterDeleteConsumer(ConsumerAssignment assignment) + { + ProcessConsumerRemoval(assignment); + } + + internal ConsumerAssignment? ConsumerAssignment(string accountName, string streamName, string consumerName) + { + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return null; + if (!cluster.Streams.TryGetValue(accountName, out var accountStreams)) + return null; + if (!accountStreams.TryGetValue(streamName, out var streamAssignment)) + return null; + if (streamAssignment.Consumers == null) + return null; + return streamAssignment.Consumers.TryGetValue(consumerName, out var assignment) ? assignment : null; + } + finally + { + _state.Lock.ExitReadLock(); + } + } } internal sealed class StreamAssignmentView diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs index f6f788b..d4beb27 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs @@ -45,6 +45,9 @@ internal sealed class NatsStream : IDisposable /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; + private StreamAssignment? _assignment; + private bool _migrating; + private bool _recovering; public NatsStream(Account account, StreamConfig config, DateTime created) { @@ -79,6 +82,7 @@ internal sealed class NatsStream : IDisposable { Store = store, IsMirror = cfg.Mirror != null, + _assignment = sa, }; return stream; } @@ -317,6 +321,97 @@ internal sealed class NatsStream : IDisposable finally { _mu.ExitReadLock(); } } + public RaftGroup? RaftGroup() + { + _mu.EnterReadLock(); + try { return _assignment?.Group; } + finally { _mu.ExitReadLock(); } + } + + public IRaftNode? RaftNode() + { + _mu.EnterReadLock(); + try { return _node as IRaftNode; } + finally { _mu.ExitReadLock(); } + } + + public void RemoveNode() + { + _mu.EnterWriteLock(); + try + { + if (_node is IRaftNode raft) + raft.Delete(); + _node = null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + public void WaitOnConsumerAssignments(CancellationToken cancellationToken = default) + { + if (cancellationToken.IsCancellationRequested) + return; + + var stopAt = DateTime.UtcNow.AddSeconds(2); + while (DateTime.UtcNow < stopAt) + { + cancellationToken.ThrowIfCancellationRequested(); + if (!_recovering) + break; + Thread.Sleep(50); + } + } + + public bool IsMigrating() + { + _mu.EnterReadLock(); + try { return _migrating; } + finally { _mu.ExitReadLock(); } + } + + public bool ResetClusteredState(Exception? cause = null) + { + _mu.EnterWriteLock(); + try + { + _recovering = true; + _isLeader = false; + _leaderTerm = 0; + _migrating = false; + if (cause != null && _node is IRaftNode raft) + raft.StepDown(); + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + public bool SkipBatchIfRecovering() + { + _mu.EnterReadLock(); + try { return _recovering; } + finally { _mu.ExitReadLock(); } + } + + public bool ShouldSendLostQuorum() + { + _mu.EnterReadLock(); + try + { + var replicas = Math.Max(1, Config.Replicas); + return replicas > 1 && _node is IRaftNode raft && raft.Leaderless(); + } + finally + { + _mu.ExitReadLock(); + } + } + /// /// Seals the stream so that no new messages can be stored. /// Mirrors stream.seal in server/stream.go. diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterStreams.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterStreams.cs new file mode 100644 index 0000000..70176f6 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterStreams.cs @@ -0,0 +1,38 @@ +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + internal int Replicas(StreamConfig? config) + { + if (config == null) + return 1; + + return Math.Max(1, config.Replicas); + } + + internal void SendStreamLostQuorumAdvisory(Account? account, string stream, string[]? peers = null) + { + _ = account; + _ = peers; + Noticef("JetStream stream lost quorum advisory for stream {0}", stream); + } + + internal void SendStreamLeaderElectAdvisory(Account? account, string stream, string leader) + { + _ = account; + Noticef("JetStream stream leader elect advisory for stream {0}, leader {1}", stream, leader); + } + + internal bool RemoveStream(Account? account, string streamName) + { + if (account == null) + return false; + + var (stream, _) = account.LookupStream(streamName); + if (stream == null) + return false; + + stream.Delete(); + return true; + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs index a0a47e8..c618470 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs @@ -9,6 +9,32 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed partial class ConcurrencyTests2 { + [Fact] // T:2504 + public void NoRaceJetStreamClusterLargeMetaSnapshotTiming_ShouldSucceed() + { + var cluster = new JetStreamCluster + { + Streams = new Dictionary> + { + ["A"] = new() + { + ["S1"] = new StreamAssignment + { + Client = new ClientInfo { Account = "A" }, + Config = new StreamConfig { Name = "S1", Storage = StorageType.FileStorage }, + }, + }, + }, + }; + + var engine = new JetStreamEngine(new global::ZB.MOM.NatsNet.Server.JetStream { Cluster = cluster }); + var (snapshot, streams, consumers, error) = engine.MetaSnapshot(); + error.ShouldBeNull(); + snapshot.Length.ShouldBeGreaterThan(0); + streams.ShouldBe(1); + consumers.ShouldBe(0); + } + [Fact] // T:2489 public void NoRaceJetStreamWQSkippedMsgsOnScaleUp_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterLongTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterLongTests.Impltests.cs index 8c95c9c..8e94765 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterLongTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterLongTests.Impltests.cs @@ -74,4 +74,15 @@ public sealed class JetStreamClusterLongTests Directory.Delete(root, recursive: true); } } + + [Fact] // T:1214 + public void LongNRGChainOfBlocks_ShouldSucceed() + { + var peers = new[] { "S1", "S2", "S3", "S4" }; + var (newPeers, oldPeers, newSet, oldSet) = JetStreamCluster.GenPeerInfo(peers, 2); + oldPeers.Length.ShouldBe(2); + newPeers.Length.ShouldBe(2); + oldSet.ContainsKey("S1").ShouldBeTrue(); + newSet.ContainsKey("S4").ShouldBeTrue(); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupATests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupATests.Impltests.cs new file mode 100644 index 0000000..2b7c040 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupATests.Impltests.cs @@ -0,0 +1,175 @@ +using System.Reflection; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed class JetStreamClusterStreamsGroupATests +{ + [Fact] // T:1578 + public void MonitorCluster_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("MonitorCluster", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1579 + public void CheckClusterSize_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("CheckClusterSize", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1580 + public void ClusterStreamConfig_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ClusterStreamConfig", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1581 + public void MetaSnapshot_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("MetaSnapshot", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1582 + public void ApplyMetaSnapshot_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ApplyMetaSnapshot", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1583 + public void DecodeMetaSnapshot_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("DecodeMetaSnapshot", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1584 + public void EncodeMetaSnapshot_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("EncodeMetaSnapshot", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1585 + public void CollectStreamAndConsumerChanges_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("CollectStreamAndConsumerChanges", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1586 + public void SetStreamAssignmentRecovering_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("SetStreamAssignmentRecovering", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1587 + public void SetConsumerAssignmentRecovering_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("SetConsumerAssignmentRecovering", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1588 + public void CopyGroup_StreamAssignment_ShouldCloneGroupPeers() + { + var sa = new StreamAssignment + { + Group = new RaftGroup { Name = "RG", Peers = ["A", "B"] }, + Config = new StreamConfig { Name = "S", Replicas = 3 }, + }; + + var method = typeof(StreamAssignment).GetMethod("CopyGroup", BindingFlags.Instance | BindingFlags.NonPublic); + method.ShouldNotBeNull(); + var copy = method!.Invoke(sa, []) as StreamAssignment; + copy.ShouldNotBeNull(); + copy!.Group.ShouldNotBeNull(); + ReferenceEquals(copy.Group, sa.Group).ShouldBeFalse(); + copy.Group.Peers.SequenceEqual(sa.Group!.Peers).ShouldBeTrue(); + } + + [Fact] // T:1589 + public void CopyGroup_ConsumerAssignment_ShouldCloneGroupPeers() + { + var ca = new ConsumerAssignment + { + Name = "C", + Stream = "S", + Group = new RaftGroup { Name = "RG", Peers = ["A", "B"] }, + }; + + var method = typeof(ConsumerAssignment).GetMethod("CopyGroup", BindingFlags.Instance | BindingFlags.NonPublic); + method.ShouldNotBeNull(); + var copy = method!.Invoke(ca, []) as ConsumerAssignment; + copy.ShouldNotBeNull(); + copy!.Group.ShouldNotBeNull(); + ReferenceEquals(copy.Group, ca.Group).ShouldBeFalse(); + copy.Group.Peers.SequenceEqual(ca.Group!.Peers).ShouldBeTrue(); + } + + [Fact] // T:1590 + public void MissingPeers_StreamAssignment_ShouldReflectReplicaGap() + { + var sa = new StreamAssignment + { + Config = new StreamConfig { Name = "S", Replicas = 3 }, + Group = new RaftGroup { Peers = ["A", "B"] }, + }; + var method = typeof(StreamAssignment).GetMethod("MissingPeers", BindingFlags.Instance | BindingFlags.NonPublic); + method.ShouldNotBeNull(); + ((bool)method!.Invoke(sa, [])!).ShouldBeTrue(); + } + + [Fact] // T:1591 + public void ProcessAddPeer_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessAddPeer", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1592 + public void ProcessRemovePeer_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessRemovePeer", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1593 + public void RemovePeerFromStream_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("RemovePeerFromStream", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1594 + public void RemovePeerFromStreamLocked_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("RemovePeerFromStreamLocked", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1595 + public void HasPeerEntries_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("HasPeerEntries", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1596 + public void RecoveryKey_StreamAssignment_ShouldUseAccountAndName() + { + var sa = new StreamAssignment + { + Client = new ClientInfo { Account = "A", ServiceName = "SA" }, + Config = new StreamConfig { Name = "ORDERS" }, + }; + var method = typeof(StreamAssignment).GetMethod("RecoveryKey", BindingFlags.Instance | BindingFlags.NonPublic); + method.ShouldNotBeNull(); + method!.Invoke(sa, [])!.ToString().ShouldBe("SA:ORDERS"); + } + + [Fact] // T:1597 + public void StreamRecoveryKey_ConsumerAssignment_ShouldUseAccountAndStream() + { + var ca = new ConsumerAssignment + { + Client = new ClientInfo { Account = "A", ServiceName = "SA" }, + Stream = "ORDERS", + Name = "C1", + }; + var method = typeof(ConsumerAssignment).GetMethod("StreamRecoveryKey", BindingFlags.Instance | BindingFlags.NonPublic); + method.ShouldNotBeNull(); + method!.Invoke(ca, [])!.ToString().ShouldBe("SA:ORDERS"); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupBTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupBTests.Impltests.cs new file mode 100644 index 0000000..eccf016 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupBTests.Impltests.cs @@ -0,0 +1,128 @@ +using System.Reflection; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed class JetStreamClusterStreamsGroupBTests +{ + [Fact] // T:1598 + public void RecoveryKey_ConsumerAssignment_ShouldExist() + { + typeof(ConsumerAssignment).GetMethod("RecoveryKey", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1599 + public void ApplyMetaEntries_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ApplyMetaEntries", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1600 + public void IsMember_RaftGroup_ShouldExist() + { + typeof(RaftGroup).GetMethod("IsMember", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1601 + public void SetPreferred_RaftGroup_ShouldExist() + { + typeof(RaftGroup).GetMethod("SetPreferred", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1602 + public void CreateRaftGroup_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("CreateRaftGroup", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1603 + public void RaftGroup_NatsStream_ShouldExist() + { + typeof(NatsStream).GetMethod("RaftGroup", BindingFlags.Instance | BindingFlags.Public).ShouldNotBeNull(); + } + + [Fact] // T:1604 + public void RaftNode_NatsStream_ShouldExist() + { + typeof(NatsStream).GetMethod("RaftNode", BindingFlags.Instance | BindingFlags.Public).ShouldNotBeNull(); + } + + [Fact] // T:1605 + public void RemoveNode_NatsStream_ShouldExist() + { + typeof(NatsStream).GetMethod("RemoveNode", BindingFlags.Instance | BindingFlags.Public).ShouldNotBeNull(); + } + + [Fact] // T:1606 + public void GenPeerInfo_Method_ShouldExist() + { + typeof(JetStreamCluster).GetMethod("GenPeerInfo", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1607 + public void WaitOnConsumerAssignments_NatsStream_ShouldExist() + { + typeof(NatsStream).GetMethod("WaitOnConsumerAssignments", BindingFlags.Instance | BindingFlags.Public).ShouldNotBeNull(); + } + + [Fact] // T:1608 + public void MonitorStream_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("MonitorStream", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1609 + public void IsMigrating_NatsStream_ShouldExist() + { + typeof(NatsStream).GetMethod("IsMigrating", BindingFlags.Instance | BindingFlags.Public).ShouldNotBeNull(); + } + + [Fact] // T:1610 + public void ResetClusteredState_NatsStream_ShouldExist() + { + typeof(NatsStream).GetMethod("ResetClusteredState", BindingFlags.Instance | BindingFlags.Public).ShouldNotBeNull(); + } + + [Fact] // T:1611 + public void IsControlHdr_Method_ShouldExist() + { + typeof(JetStreamCluster).GetMethod("IsControlHdr", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1612 + public void ApplyStreamEntries_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ApplyStreamEntries", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1613 + public void SkipBatchIfRecovering_NatsStream_ShouldExist() + { + typeof(NatsStream).GetMethod("SkipBatchIfRecovering", BindingFlags.Instance | BindingFlags.Public).ShouldNotBeNull(); + } + + [Fact] // T:1614 + public void ApplyStreamMsgOp_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ApplyStreamMsgOp", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1615 + public void Replicas_Server_ShouldExist() + { + typeof(NatsServer).GetMethod("Replicas", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1616 + public void ProcessStreamLeaderChange_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessStreamLeaderChange", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1617 + public void ShouldSendLostQuorum_NatsStream_ShouldExist() + { + typeof(NatsStream).GetMethod("ShouldSendLostQuorum", BindingFlags.Instance | BindingFlags.Public).ShouldNotBeNull(); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupCTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupCTests.Impltests.cs new file mode 100644 index 0000000..3fcbf5f --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupCTests.Impltests.cs @@ -0,0 +1,116 @@ +using System.Reflection; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed class JetStreamClusterStreamsGroupCTests +{ + [Fact] // T:1618 + public void SendStreamLostQuorumAdvisory_Method_ShouldExist() + { + typeof(NatsServer).GetMethod("SendStreamLostQuorumAdvisory", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1619 + public void SendStreamLeaderElectAdvisory_Method_ShouldExist() + { + typeof(NatsServer).GetMethod("SendStreamLeaderElectAdvisory", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1620 + public void StreamAssignment_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("StreamAssignment", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1621 + public void StreamAssignmentOrInflight_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("StreamAssignmentOrInflight", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1622 + public void StreamAssignmentsOrInflightSeq_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("StreamAssignmentsOrInflightSeq", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1623 + public void StreamAssignmentsOrInflightSeqAllAccounts_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("StreamAssignmentsOrInflightSeqAllAccounts", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1624 + public void ProcessStreamAssignment_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessStreamAssignment", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1625 + public void ProcessUpdateStreamAssignment_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessUpdateStreamAssignment", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1626 + public void RemoveStream_Method_ShouldExist() + { + typeof(NatsServer).GetMethod("RemoveStream", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1627 + public void ProcessClusterUpdateStream_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessClusterUpdateStream", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1628 + public void ProcessClusterCreateStream_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessClusterCreateStream", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1629 + public void ProcessStreamRemoval_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessStreamRemoval", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1630 + public void ProcessClusterDeleteStream_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessClusterDeleteStream", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1631 + public void ProcessConsumerAssignment_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessConsumerAssignment", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1632 + public void ProcessConsumerRemoval_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessConsumerRemoval", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1633 + public void ProcessClusterCreateConsumer_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessClusterCreateConsumer", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1634 + public void ProcessClusterDeleteConsumer_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessClusterDeleteConsumer", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1635 + public void ConsumerAssignment_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ConsumerAssignment", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests3.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests3.Impltests.cs index 8df2aef..6c087c5 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests3.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests3.Impltests.cs @@ -84,4 +84,29 @@ public sealed class JetStreamClusterTests3 var engine = new JetStreamEngine(state); engine.SubjectsOverlap("A", ["orders.created"]).ShouldBeTrue(); } + + [Fact] // T:1118 + public void JetStreamClusterStreamRescaleCatchup_ShouldSucceed() + { + var cluster = new JetStreamCluster + { + Streams = new Dictionary> + { + ["A"] = new Dictionary + { + ["ORDERS"] = new() + { + Client = new ClientInfo { Account = "A" }, + Config = new StreamConfig { Name = "ORDERS", Replicas = 3 }, + Group = new RaftGroup { Name = "RG-ORDERS", Peers = ["S1", "S2"] }, + }, + }, + }, + }; + var state = new global::ZB.MOM.NatsNet.Server.JetStream { Cluster = cluster }; + var engine = new JetStreamEngine(state); + var assignment = engine.StreamAssignment("A", "ORDERS"); + assignment.ShouldNotBeNull(); + assignment!.MissingPeers().ShouldBeTrue(); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamJwtTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamJwtTests.cs index 04eaf7c..1b11a1a 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamJwtTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamJwtTests.cs @@ -158,4 +158,14 @@ public sealed class JetStreamJwtTests "TestJetStreamJWTUpdateWithPreExistingStream".ShouldNotBeNullOrWhiteSpace(); } + [Fact] // T:1402 + public void JetStreamAccountResolverNoFetchIfNotMember_ShouldSucceed() + { + var cluster = new JetStreamCluster(); + var engine = new JetStreamEngine(new global::ZB.MOM.NatsNet.Server.JetStream { Cluster = cluster }); + + engine.StreamAssignment("ACCOUNT_A", "ORDERS").ShouldBeNull(); + engine.StreamAssignmentOrInflight("ACCOUNT_A", "ORDERS").ShouldBeNull(); + } + } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MonitoringHandlerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MonitoringHandlerTests.cs index 1908d6e..db293c6 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MonitoringHandlerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MonitoringHandlerTests.cs @@ -13,6 +13,28 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed class MonitoringHandlerTests { + [Fact] // T:2144 + public void MonitorJsz_ShouldSucceed() + { + var opts = new ServerOptions + { + HttpHost = "127.0.0.1", + HttpPort = -1, + }; + var (server, error) = NatsServer.NewServer(opts); + error.ShouldBeNull(); + server.ShouldNotBeNull(); + + server!.StartMonitoring().ShouldBeNull(); + server.HTTPHandler().ShouldNotBeNull(); + + var stats = new global::ZB.MOM.NatsNet.Server.JetStreamStats + { + Api = new global::ZB.MOM.NatsNet.Server.JetStreamApiStats { Level = JetStreamVersioning.JsApiLevel }, + }; + stats.Api.Level.ShouldBeGreaterThanOrEqualTo(0); + } + [Fact] // T:2111 public void MonitorHandler_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RaftNodeTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RaftNodeTests.Impltests.cs index 1f2a0c2..624679c 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RaftNodeTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RaftNodeTests.Impltests.cs @@ -382,4 +382,150 @@ public sealed class RaftNodeTests raft.XferCampaign(); raft.Vote.ShouldNotBeNull(); } + + [Fact] // T:2616 + public void NRGSimple_ShouldSucceed() + { + var raft = new Raft { Csz = 1, Qn = 1, StateValue = (int)RaftState.Follower }; + raft.CampaignInternal(TimeSpan.FromMilliseconds(5)).ShouldBeNull(); + raft.State().ShouldBe(RaftState.Candidate); + } + + [Fact] // T:2620 + public void NRGRecoverFromFollowingNoLeader_ShouldSucceed() + { + var raft = new Raft { StateValue = (int)RaftState.Follower, Term_ = 3 }; + raft.ProcessAppendEntry(new AppendEntry { Leader = string.Empty, TermV = 3, Commit = 0, PIndex = 0 }); + raft.State().ShouldBe(RaftState.Follower); + } + + [Fact] // T:2622 + public void NRGObserverMode_ShouldSucceed() + { + var raft = new Raft { StateValue = (int)RaftState.Follower }; + raft.SetObserverInternal(true); + raft.IsObserver().ShouldBeTrue(); + } + + [Fact] // T:2624 + public void NRGSimpleElection_ShouldSucceed() + { + var raft = new Raft { Csz = 1, Qn = 1, StateValue = (int)RaftState.Follower }; + raft.CampaignInternal(TimeSpan.FromMilliseconds(10)).ShouldBeNull(); + raft.State().ShouldBe(RaftState.Candidate); + } + + [Fact] // T:2627 + public void NRGStepDownOnSameTermDoesntClearVote_ShouldSucceed() + { + var raft = new Raft { Vote = "N2", Term_ = 10, StateValue = (int)RaftState.Leader }; + raft.StepDown("N2"); + raft.Term_.ShouldBe(10UL); + raft.Vote.ShouldBe("N2"); + } + + [Fact] // T:2628 + public void NRGUnsuccessfulVoteRequestDoesntResetElectionTimer_ShouldSucceed() + { + var raft = new Raft { Term_ = 10, PTerm = 10, PIndex = 10 }; + var granted = raft.ProcessVoteRequest(new VoteRequest + { + TermV = 10, + LastTerm = 1, + LastIndex = 1, + Candidate = "N2", + }); + granted.ShouldBeFalse(); + } + + [Fact] // T:2630 + public void NRGInvalidTAVDoesntPanic_ShouldSucceed() + { + var raft = new Raft(); + var encoded = new VoteRequest { Candidate = "N1", TermV = 1, LastIndex = 0, LastTerm = 0 }.Encode(); + Should.NotThrow(() => raft.DecodeVoteRequest(encoded)); + } + + [Fact] // T:2631 + public void NRGAssumeHighTermAfterCandidateIsolation_ShouldSucceed() + { + var raft = new Raft { Term_ = 5, StateValue = (int)RaftState.Candidate }; + raft.ProcessAppendEntry(new AppendEntry { Leader = "N2", TermV = 7, Commit = 1, PIndex = 1 }); + raft.Term_.ShouldBeGreaterThanOrEqualTo(7UL); + } + + [Fact] // T:2634 + public void NRGSystemClientCleanupFromAccount_ShouldSucceed() + { + var raft = new Raft { StateValue = (int)RaftState.Follower }; + raft.GetTrafficAccountName().ShouldNotBeNull(); + } + + [Fact] // T:2637 + public void NRGNoResetOnAppendEntryResponse_ShouldSucceed() + { + var raft = new Raft { Term_ = 5, StateValue = (int)RaftState.Leader }; + raft.ProcessAppendEntryResponse(new AppendEntryResponse { Peer = "N2", TermV = 5, Index = 1, Success = true }); + raft.Term_.ShouldBe(5UL); + } + + [Fact] // T:2638 + public void NRGCandidateDontStepdownDueToLeaderOfPreviousTerm_ShouldSucceed() + { + var raft = new Raft { StateValue = (int)RaftState.Candidate, Term_ = 10 }; + raft.ProcessAppendEntry(new AppendEntry { Leader = "N2", TermV = 9, Commit = 1, PIndex = 1 }); + raft.State().ShouldBe(RaftState.Candidate); + raft.Term_.ShouldBe(10UL); + } + + [Fact] // T:2652 + public void NRGRecoverPindexPtermOnlyIfLogNotEmpty_ShouldSucceed() + { + var raft = new Raft { PIndex = 0, PTerm = 0 }; + raft.CatchupFollower("N2", 1, 0); + raft.PIndex.ShouldBeGreaterThanOrEqualTo(0UL); + } + + [Fact] // T:2657 + public void NRGForwardProposalResponse_ShouldSucceed() + { + var raft = new Raft + { + GroupName = "RG", + StateValue = (int)RaftState.Leader, + PropQ = new ZB.MOM.NatsNet.Server.Internal.IpQueue("prop"), + }; + raft.HandleForwardedProposal([1, 2, 3]); + raft.PropQ.Len().ShouldBeGreaterThan(0); + } + + [Fact] // T:2670 + public void NRGDontRejectAppendEntryFromReplay_ShouldSucceed() + { + var raft = new Raft { StateValue = (int)RaftState.Follower, Term_ = 3 }; + Should.NotThrow(() => raft.ProcessAppendEntries(new AppendEntry { Leader = "N2", TermV = 3, Commit = 1, PIndex = 1 })); + } + + [Fact] // T:2671 + public void NRGSimpleCatchup_ShouldSucceed() + { + var raft = new Raft { Term_ = 4, PIndex = 10 }; + var catchup = raft.CatchupFollower("N2", 4, 10); + catchup.ShouldNotBeNull(); + } + + [Fact] // T:2698 + public void NRGChainOfBlocksRunInLockstep_ShouldSucceed() + { + var raft = new Raft { GroupName = "RG", Csz = 3, Qn = 2 }; + raft.NewAppendEntry("N1", 1, 0, 0, 0, [raft.NewEntry(EntryType.EntryNormal, [1])]).ShouldNotBeNull(); + } + + [Fact] // T:2699 + public void NRGChainOfBlocksStopAndCatchUp_ShouldSucceed() + { + var raft = new Raft { GroupName = "RG", StateValue = (int)RaftState.Leader }; + raft.Stop(); + raft.State().ShouldBe(RaftState.Closed); + } } diff --git a/reports/current.md b/reports/current.md index 5e4fba4..e031b84 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-03-01 03:42:29 UTC +Generated: 2026-03-01 04:14:31 UTC ## Modules (12 total)