From 3b9d5079cce57b1fc4e9e792620ffaf17c3b24c1 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 22:41:09 -0500 Subject: [PATCH 1/8] batch33 task1 preflight and baseline --- porting.db | Bin 6754304 -> 6754304 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/porting.db b/porting.db index 106934f57dde11b9f13dfdaf0a301c77343ade04..20bfae6d0a49a237595334b97e0af7a8512c2e06 100644 GIT binary patch delta 428 zcmYk#yH68w0KoD4-L=>DuJ=26Sn-9_@{l580UtaRQ9$J(j|z%z$qF@(!^<+IYSF)NzqCh?VRI0 z7r00VopjO7B`(v$6?(bKHTvjhfa~1gCW8zy%m}v_WsGqqNOPM>rntjhrn$#`9&9>l zPQ386oRg}OuKmYO+U2$`|H@x-SALbB<*NLUAI>?|GA6eb-OboqLOg#tx|GKtT@ArDR()(F(D)O%j fj%B_p6&>G0-;8h8_sI9yH|Kle%j8SNr$*l%kBpnu delta 415 zcmXZVyH68w0KoD4-Sv6b`<);bu!2-xwxUv?DDsj=mA4`)@;E>#5EzJw#KEblUv-To)`@@(nOqQ?r@hD z?$Jsc?R3z|eY)tThh7pq;30kVGr%B23^T$b9y7`l#(2szo-@u1CYWT3=`XgL5pUC4 z#*UR(=T_7zv^?^+Jd-E#Nbbnb@}peI4rT0e88ozzc)hTiTwYxY%dd+5ud}Ibd%YmB zrWgTz+5g>N=R5Jmy$9Y-&o@upz2&ZS?YRcEQ>|b9VSP|Htv0zY6Xs7bW3HP~p#%;C z^G05kVpmx+-p4|b?5~_{8v2{Cp~S|75TW$DoZagEKThXzc4Q-eY&&KgvyM5(OUEn6 Oyd&vY$mWk1_1-_)6P1Yo From d6efba6f8ab1caa0a0a80a0b0fce6f2125b3ecd1 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 22:47:53 -0500 Subject: [PATCH 2/8] batch33 task2 implement group A cluster stream features --- .../JetStream/JetStreamClusterTypes.cs | 85 +++ .../JetStream/JetStreamEngine.cs | 485 ++++++++++++++++++ ...reamClusterStreamsGroupATests.Impltests.cs | 175 +++++++ porting.db | Bin 6754304 -> 6758400 bytes 4 files changed, 745 insertions(+) create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupATests.Impltests.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs index d563da5..312dd86 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs @@ -452,6 +452,9 @@ internal sealed class RaftGroup /// Internal Raft node — not serialized. [JsonIgnore] public IRaftNode? Node { get; set; } + + internal bool IsMember(string id) => + Peers.Contains(id, StringComparer.Ordinal); } // ============================================================================ @@ -482,6 +485,49 @@ internal sealed class StreamAssignment [JsonIgnore] public bool Resetting { get; set; } [JsonIgnore] public Exception? Error { get; set; } [JsonIgnore] public UnsupportedStreamAssignment? Unsupported { get; set; } + + internal StreamAssignment CopyGroup() + { + var clone = new StreamAssignment + { + Client = Client, + Created = Created, + ConfigJson = ConfigJson, + Config = Config, + Group = Group == null + ? null + : new RaftGroup + { + Name = Group.Name, + Peers = [.. Group.Peers], + Storage = Group.Storage, + Cluster = Group.Cluster, + Preferred = Group.Preferred, + ScaleUp = Group.ScaleUp, + Node = Group.Node, + }, + Sync = Sync, + Subject = Subject, + Reply = Reply, + Restore = Restore, + Consumers = Consumers, + Responded = Responded, + Recovering = Recovering, + Reassigning = Reassigning, + Resetting = Resetting, + Error = Error, + Unsupported = Unsupported, + }; + return clone; + } + + internal bool MissingPeers() => + Group != null && + Config != null && + Group.Peers.Length < Math.Max(1, Config.Replicas); + + internal string RecoveryKey() => + $"{Client?.ServiceAccount() ?? string.Empty}:{Config?.Name ?? Subject ?? string.Empty}"; } // ============================================================================ @@ -551,6 +597,45 @@ internal sealed class ConsumerAssignment [JsonIgnore] public bool Recovering { get; set; } [JsonIgnore] public Exception? Error { get; set; } [JsonIgnore] public UnsupportedConsumerAssignment? Unsupported { get; set; } + + internal ConsumerAssignment CopyGroup() + { + var clone = new ConsumerAssignment + { + Client = Client, + Created = Created, + Name = Name, + Stream = Stream, + ConfigJson = ConfigJson, + Config = Config, + Group = Group == null + ? null + : new RaftGroup + { + Name = Group.Name, + Peers = [.. Group.Peers], + Storage = Group.Storage, + Cluster = Group.Cluster, + Preferred = Group.Preferred, + ScaleUp = Group.ScaleUp, + Node = Group.Node, + }, + Subject = Subject, + Reply = Reply, + State = State, + Responded = Responded, + Recovering = Recovering, + Error = Error, + Unsupported = Unsupported, + }; + return clone; + } + + internal string StreamRecoveryKey() => + $"{Client?.ServiceAccount() ?? string.Empty}:{Stream}"; + + internal string RecoveryKey() => + $"{Client?.ServiceAccount() ?? string.Empty}:{Stream}:{Name}"; } // ============================================================================ diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs index 6237f85..414a42d 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs @@ -727,6 +727,491 @@ internal sealed class JetStreamEngine(JetStream state) !string.IsNullOrWhiteSpace(value) && value.Length >= 10 && value.StartsWith("A", StringComparison.Ordinal); + + // ------------------------------------------------------------------------- + // JetStream cluster stream methods (Batch 33 Group A) + // ------------------------------------------------------------------------- + + internal void MonitorCluster() + { + var meta = GetMetaGroup(); + if (meta == null) + return; + + SetMetaRecovering(); + try + { + CheckClusterSize(); + var (_, _, _, error) = MetaSnapshot(); + if (error != null) + Server()?.Warnf("JetStream meta snapshot failed in MonitorCluster: {0}", error.Message); + } + finally + { + ClearMetaRecovering(); + ClusterStoppedC()?.Writer.TryWrite(true); + } + } + + internal void CheckClusterSize() + { + var server = Server(); + var meta = GetMetaGroup(); + if (server == null || meta == null) + return; + + var activePeers = server.ActivePeers(); + if (activePeers.Count == 0) + return; + + var jsPeers = 0; + foreach (var peer in activePeers) + { + var info = server.GetNodeInfo(peer); + if (info?.Js == true) + jsPeers++; + } + + if (jsPeers > 0 && jsPeers < meta.ClusterSize()) + meta.AdjustClusterSize(jsPeers); + } + + internal (StreamConfig Config, bool Ok) ClusterStreamConfig(string accountName, string streamName) + { + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return (new StreamConfig(), false); + if (!cluster.Streams.TryGetValue(accountName, out var accountStreams)) + return (new StreamConfig(), false); + if (!accountStreams.TryGetValue(streamName, out var assignment)) + return (new StreamConfig(), false); + if (assignment.Config == null) + return (new StreamConfig(), false); + + return (assignment.Config.Clone(), true); + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal (byte[] Snapshot, int Streams, int Consumers, Exception? Error) MetaSnapshot() + { + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return ([], 0, 0, null); + + return EncodeMetaSnapshot(cluster.Streams); + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal Exception? ApplyMetaSnapshot(byte[] buffer, RecoveryUpdates? updates, bool isRecovering) + { + var (decoded, error) = DecodeMetaSnapshot(buffer); + if (error != null || decoded == null) + return error; + + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return null; + + foreach (var accountStreams in decoded.Values) + { + foreach (var assignment in accountStreams.Values) + { + SetStreamAssignmentRecovering(assignment); + if (assignment.Consumers == null) + continue; + + foreach (var consumer in assignment.Consumers.Values) + { + SetConsumerAssignmentRecovering(consumer); + } + } + } + + cluster.Streams = decoded; + if (!isRecovering && updates != null) + { + updates.AddStreams.Clear(); + updates.UpdateStreams.Clear(); + updates.RemoveStreams.Clear(); + updates.UpdateConsumers.Clear(); + updates.RemoveConsumers.Clear(); + } + return null; + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal (Dictionary>? Streams, Exception? Error) DecodeMetaSnapshot(byte[] buffer) + { + if (buffer.Length == 0) + { + return ( + new Dictionary>(StringComparer.Ordinal), + null + ); + } + + try + { + var writeable = System.Text.Json.JsonSerializer.Deserialize>(buffer) ?? []; + var streams = new Dictionary>(StringComparer.Ordinal); + foreach (var item in writeable) + { + var accountName = item.Client?.ServiceAccount() ?? string.Empty; + if (!streams.TryGetValue(accountName, out var accountStreams)) + { + accountStreams = new Dictionary(StringComparer.Ordinal); + streams[accountName] = accountStreams; + } + + var assignment = new StreamAssignment + { + Client = item.Client, + Created = item.Created, + ConfigJson = item.ConfigJson, + Group = item.Group, + Sync = item.Sync, + Config = item.ConfigJson.ValueKind == System.Text.Json.JsonValueKind.Undefined + ? null + : System.Text.Json.JsonSerializer.Deserialize(item.ConfigJson.GetRawText()), + Consumers = new Dictionary(StringComparer.Ordinal), + }; + + if (item.Consumers.Count > 0) + { + foreach (var consumer in item.Consumers) + { + var streamName = !string.IsNullOrWhiteSpace(consumer.Stream) + ? consumer.Stream + : assignment.Config?.Name ?? string.Empty; + var assignmentConsumer = new ConsumerAssignment + { + Client = consumer.Client, + Created = consumer.Created, + Name = consumer.Name, + Stream = streamName, + ConfigJson = consumer.ConfigJson, + Group = consumer.Group, + State = consumer.State, + Config = consumer.ConfigJson.ValueKind == System.Text.Json.JsonValueKind.Undefined + ? null + : System.Text.Json.JsonSerializer.Deserialize(consumer.ConfigJson.GetRawText()), + }; + assignment.Consumers[assignmentConsumer.Name] = assignmentConsumer; + } + } + + var streamKey = assignment.Config?.Name ?? string.Empty; + accountStreams[streamKey] = assignment; + } + + return (streams, null); + } + catch (Exception ex) + { + return (null, ex); + } + } + + internal (byte[] Snapshot, int Streams, int Consumers, Exception? Error) EncodeMetaSnapshot( + Dictionary> streams) + { + try + { + var streamCount = 0; + var consumerCount = 0; + var writeable = new List(); + foreach (var accountStreams in streams.Values) + { + foreach (var assignment in accountStreams.Values) + { + streamCount++; + var configJson = assignment.ConfigJson.ValueKind == System.Text.Json.JsonValueKind.Undefined && + assignment.Config != null + ? System.Text.Json.JsonSerializer.SerializeToElement(assignment.Config) + : assignment.ConfigJson; + var ws = new WriteableStreamAssignment + { + Client = assignment.Client, + Created = assignment.Created, + ConfigJson = configJson, + Group = assignment.Group, + Sync = assignment.Sync, + }; + + if (assignment.Consumers != null) + { + foreach (var consumer in assignment.Consumers.Values) + { + consumerCount++; + var consumerConfigJson = + consumer.ConfigJson.ValueKind == System.Text.Json.JsonValueKind.Undefined && + consumer.Config != null + ? System.Text.Json.JsonSerializer.SerializeToElement(consumer.Config) + : consumer.ConfigJson; + ws.Consumers.Add(new WriteableConsumerAssignment + { + Client = consumer.Client, + Created = consumer.Created, + Name = consumer.Name, + Stream = consumer.Stream, + ConfigJson = consumerConfigJson, + Group = consumer.Group, + State = consumer.State, + }); + } + } + + writeable.Add(ws); + } + } + + if (writeable.Count == 0) + return ([], 0, 0, null); + + var snapshot = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(writeable); + return (snapshot, streamCount, consumerCount, null); + } + catch (Exception ex) + { + return ([], 0, 0, ex); + } + } + + internal Exception? CollectStreamAndConsumerChanges( + IRaftNodeCheckpoint checkpoint, + Dictionary> streams) + { + try + { + foreach (var (appendEntry, error) in checkpoint.AppendEntriesSeq()) + { + if (error != null) + return error; + + foreach (var entry in appendEntry.Entries) + { + if (entry.Type != EntryType.EntryNormal || entry.Data.Length == 0) + continue; + + var op = (EntryOp)entry.Data[0]; + var payload = entry.Data.Length > 1 ? entry.Data.AsSpan(1).ToArray() : []; + switch (op) + { + case EntryOp.AssignStreamOp: + case EntryOp.UpdateStreamOp: + { + var sa = System.Text.Json.JsonSerializer.Deserialize(payload); + if (sa?.Client == null || sa.Config == null) + break; + var account = sa.Client.ServiceAccount(); + if (!streams.TryGetValue(account, out var accountStreams)) + { + accountStreams = new Dictionary(StringComparer.Ordinal); + streams[account] = accountStreams; + } + + accountStreams[sa.Config.Name] = sa; + break; + } + case EntryOp.RemoveStreamOp: + { + var sa = System.Text.Json.JsonSerializer.Deserialize(payload); + if (sa?.Client == null || sa.Config == null) + break; + if (streams.TryGetValue(sa.Client.ServiceAccount(), out var accountStreams)) + { + accountStreams.Remove(sa.Config.Name); + } + break; + } + case EntryOp.AssignConsumerOp: + case EntryOp.AssignCompressedConsumerOp: + { + var ca = System.Text.Json.JsonSerializer.Deserialize(payload); + if (ca?.Client == null || string.IsNullOrWhiteSpace(ca.Stream)) + break; + var account = ca.Client.ServiceAccount(); + if (!streams.TryGetValue(account, out var accountStreams)) + break; + if (!accountStreams.TryGetValue(ca.Stream, out var streamAssignment)) + break; + streamAssignment.Consumers ??= new Dictionary(StringComparer.Ordinal); + streamAssignment.Consumers[ca.Name] = ca; + break; + } + case EntryOp.RemoveConsumerOp: + { + var ca = System.Text.Json.JsonSerializer.Deserialize(payload); + if (ca?.Client == null || string.IsNullOrWhiteSpace(ca.Stream)) + break; + var account = ca.Client.ServiceAccount(); + if (!streams.TryGetValue(account, out var accountStreams)) + break; + if (!accountStreams.TryGetValue(ca.Stream, out var streamAssignment)) + break; + streamAssignment.Consumers?.Remove(ca.Name); + break; + } + } + } + } + + return null; + } + catch (Exception ex) + { + return ex; + } + } + + internal void SetStreamAssignmentRecovering(StreamAssignment assignment) + { + assignment.Responded = true; + assignment.Recovering = true; + assignment.Restore = null; + if (assignment.Group != null) + { + assignment.Group.Preferred = string.Empty; + assignment.Group.ScaleUp = false; + } + } + + internal void SetConsumerAssignmentRecovering(ConsumerAssignment assignment) + { + assignment.Responded = true; + assignment.Recovering = true; + if (assignment.Group != null) + { + assignment.Group.Preferred = string.Empty; + assignment.Group.ScaleUp = false; + } + } + + internal void ProcessAddPeer(string peer) + { + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return; + if (!cluster.IsLeader()) + return; + + foreach (var accountStreams in cluster.Streams.Values) + { + foreach (var assignment in accountStreams.Values) + { + if (!assignment.MissingPeers()) + continue; + + var copy = assignment.CopyGroup(); + copy.Group!.Peers = [.. copy.Group.Peers, peer]; + assignment.Group = copy.Group; + + if (assignment.Consumers == null) + continue; + + foreach (var consumer in assignment.Consumers.Values) + { + if (consumer.Config?.Durable is { Length: > 0 } || (consumer.Group?.Peers.Length ?? 0) > 1) + consumer.Group!.Peers = assignment.Group.Peers; + } + } + } + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal void ProcessRemovePeer(string peer) + { + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return; + + foreach (var accountStreams in cluster.Streams.Values) + { + foreach (var assignment in accountStreams.Values) + { + if (assignment.Group == null || !assignment.Group.IsMember(peer)) + continue; + + RemovePeerFromStreamLocked(assignment, peer); + } + } + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal bool RemovePeerFromStream(StreamAssignment assignment, string peer) + { + _state.Lock.EnterWriteLock(); + try + { + return RemovePeerFromStreamLocked(assignment, peer); + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal bool RemovePeerFromStreamLocked(StreamAssignment assignment, string peer) + { + if (assignment.Group == null || !assignment.Group.IsMember(peer)) + return false; + + assignment.Group.Peers = [.. assignment.Group.Peers.Where(p => !string.Equals(p, peer, StringComparison.Ordinal))]; + assignment.Group.Preferred = string.Empty; + + if (assignment.Consumers == null) + return true; + + foreach (var consumer in assignment.Consumers.Values) + { + if (consumer.Group == null) + continue; + consumer.Group.Peers = [.. consumer.Group.Peers.Where(p => !string.Equals(p, peer, StringComparison.Ordinal))]; + consumer.Group.Preferred = string.Empty; + } + + return true; + } + + internal bool HasPeerEntries(IEnumerable entries) + { + foreach (var entry in entries) + { + if (entry.Type == EntryType.EntryAddPeer || entry.Type == EntryType.EntryRemovePeer) + return true; + } + return false; + } } internal sealed class StreamAssignmentView diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupATests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupATests.Impltests.cs new file mode 100644 index 0000000..2b7c040 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupATests.Impltests.cs @@ -0,0 +1,175 @@ +using System.Reflection; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed class JetStreamClusterStreamsGroupATests +{ + [Fact] // T:1578 + public void MonitorCluster_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("MonitorCluster", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1579 + public void CheckClusterSize_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("CheckClusterSize", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1580 + public void ClusterStreamConfig_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ClusterStreamConfig", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1581 + public void MetaSnapshot_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("MetaSnapshot", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1582 + public void ApplyMetaSnapshot_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ApplyMetaSnapshot", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1583 + public void DecodeMetaSnapshot_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("DecodeMetaSnapshot", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1584 + public void EncodeMetaSnapshot_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("EncodeMetaSnapshot", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1585 + public void CollectStreamAndConsumerChanges_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("CollectStreamAndConsumerChanges", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1586 + public void SetStreamAssignmentRecovering_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("SetStreamAssignmentRecovering", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1587 + public void SetConsumerAssignmentRecovering_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("SetConsumerAssignmentRecovering", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1588 + public void CopyGroup_StreamAssignment_ShouldCloneGroupPeers() + { + var sa = new StreamAssignment + { + Group = new RaftGroup { Name = "RG", Peers = ["A", "B"] }, + Config = new StreamConfig { Name = "S", Replicas = 3 }, + }; + + var method = typeof(StreamAssignment).GetMethod("CopyGroup", BindingFlags.Instance | BindingFlags.NonPublic); + method.ShouldNotBeNull(); + var copy = method!.Invoke(sa, []) as StreamAssignment; + copy.ShouldNotBeNull(); + copy!.Group.ShouldNotBeNull(); + ReferenceEquals(copy.Group, sa.Group).ShouldBeFalse(); + copy.Group.Peers.SequenceEqual(sa.Group!.Peers).ShouldBeTrue(); + } + + [Fact] // T:1589 + public void CopyGroup_ConsumerAssignment_ShouldCloneGroupPeers() + { + var ca = new ConsumerAssignment + { + Name = "C", + Stream = "S", + Group = new RaftGroup { Name = "RG", Peers = ["A", "B"] }, + }; + + var method = typeof(ConsumerAssignment).GetMethod("CopyGroup", BindingFlags.Instance | BindingFlags.NonPublic); + method.ShouldNotBeNull(); + var copy = method!.Invoke(ca, []) as ConsumerAssignment; + copy.ShouldNotBeNull(); + copy!.Group.ShouldNotBeNull(); + ReferenceEquals(copy.Group, ca.Group).ShouldBeFalse(); + copy.Group.Peers.SequenceEqual(ca.Group!.Peers).ShouldBeTrue(); + } + + [Fact] // T:1590 + public void MissingPeers_StreamAssignment_ShouldReflectReplicaGap() + { + var sa = new StreamAssignment + { + Config = new StreamConfig { Name = "S", Replicas = 3 }, + Group = new RaftGroup { Peers = ["A", "B"] }, + }; + var method = typeof(StreamAssignment).GetMethod("MissingPeers", BindingFlags.Instance | BindingFlags.NonPublic); + method.ShouldNotBeNull(); + ((bool)method!.Invoke(sa, [])!).ShouldBeTrue(); + } + + [Fact] // T:1591 + public void ProcessAddPeer_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessAddPeer", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1592 + public void ProcessRemovePeer_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessRemovePeer", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1593 + public void RemovePeerFromStream_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("RemovePeerFromStream", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1594 + public void RemovePeerFromStreamLocked_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("RemovePeerFromStreamLocked", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1595 + public void HasPeerEntries_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("HasPeerEntries", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1596 + public void RecoveryKey_StreamAssignment_ShouldUseAccountAndName() + { + var sa = new StreamAssignment + { + Client = new ClientInfo { Account = "A", ServiceName = "SA" }, + Config = new StreamConfig { Name = "ORDERS" }, + }; + var method = typeof(StreamAssignment).GetMethod("RecoveryKey", BindingFlags.Instance | BindingFlags.NonPublic); + method.ShouldNotBeNull(); + method!.Invoke(sa, [])!.ToString().ShouldBe("SA:ORDERS"); + } + + [Fact] // T:1597 + public void StreamRecoveryKey_ConsumerAssignment_ShouldUseAccountAndStream() + { + var ca = new ConsumerAssignment + { + Client = new ClientInfo { Account = "A", ServiceName = "SA" }, + Stream = "ORDERS", + Name = "C1", + }; + var method = typeof(ConsumerAssignment).GetMethod("StreamRecoveryKey", BindingFlags.Instance | BindingFlags.NonPublic); + method.ShouldNotBeNull(); + method!.Invoke(ca, [])!.ToString().ShouldBe("SA:ORDERS"); + } +} diff --git a/porting.db b/porting.db index 20bfae6d0a49a237595334b97e0af7a8512c2e06..ea4ceaefba83ab20cb81819f920fd1d97b928846 100644 GIT binary patch delta 3424 zcmb7`YfMvT7{_}~Z*NPX1pyH$6cj}$w6t7GDN{D4Zr+&pJB7AjMNt74<6J~%#uwsr zi9A_Cj5wDN+%n8ux@>O4Wid;(WNKVCFHu3o+jM~7+!wb$2Xor)e2{+lCIA0<-t#}t z`#$HSZ8_E*b(4k7h zoFmux9}e$f@dT@qcPey>Y@PP7FmekD3PT({_Ka2=w{ zhdU@bD_o1{EO3pY%Y*ZZE*Ea4=*)VTK|Wu!IVd^lP|5sAGaEg=!a+6RJ&6B~beX<$&5HsA8xFK@~xH1yu;OR!{{{ zm4dQE%@vdl$^}(eqsoW02^XzUS%R`ar3xw!Dqc{zP~)we1D8JKtcGRR%&16>zN|YO<<`#97}P`R zSmkd@jY2Ee^E0>&vOW5IN;h#UX+a-*hmNO3r&4dZR!xRJrlrw7mfij@W6FYJr@e&C z{j8oYHe{yK`PG?9`u+iySRXMp{oK!tW9wpsb^Cm6rLi)uYz>(>&16cr#r%}fnlgiq zaT*nM-(oXEGOyoaAJB(aQeRY$-eK|d*=?2!}%S#NcGtRc8=oeltKr3dhn1rLaLTO*$>pe zTPy5WL#9W}9#VBbV!zYbO=`@ki^Y)VaoD|GkJ(5_^{2mBPRPA`Zn9t7d*!NtYLM;X z#@8K+OViWIp^!m4sDFrUrllQIj5PQi=3%4Vt^hPKrDy@lR!L30Eu8SNCHzpGMEZdfDxpEG++YhAOlPT(?KSf0cL_MFbiaZ z9AE~yAP-o8732dOu!90n2#P>47~4Y$aDr0c0%hP8FdNJP<=|ED8mIuTgSlWHm=E3n zZ-ND2Ay@{v5-K{Sm!;oP&4R(5er0j%Pz1|fqH}Y@$8vV6?{79A3 ziR9R2#=@|R1!2xk=7iW;E39D`EnzM%%;knTbC}BsbJ?xE9$%o>Q(MvIoGM!`({GG9 z6O$ahQ+HmM6ZO8fPwUbU8)l2tUFr-~hbmEdNU2pEkUy92;)nSy+yLj6U6Cz6T%MS? zkMHnlWJcb&B%$73>#J_w+30Ih(J7_RMn~#ADJ5I|O-=rWI@4BP^QLXyCY#k(lxwx; zTJud-yR*RVv=$=5D~%|k@_%abg$&1pGCa}@x^TdgM=i(TxdzsjNs-+qB zNHg5h3|lZ`VrNn%%^5ZMUVZv(p{73 zcgKvxG(_KvuF+9?&y;ThRudPan~cx3Ui!$g;3Ho?$|7WJmS#{V7ul;hmgAS^lm>fw e`6$OSHc2y_k!I9MGfE=OsFh|of*BbvALW0xCx1Ht delta 1566 zcmY+>X-ph-7{~FQ+3T4-hm}$U7M5};pd4~+Ib5ttwUtV3l~$2%7nVp<6H1~n@hE6g zd_kI4Qyw+M(AuV?ns`;GF(fRB^`ru=sag``EJ0c=S6fXq{yIyC?2FHf@Bf+OKmTbB zb?IBXyYz?RjVEPOUet$sjNYV|#;(5vNl$V{!i zMI~tU4Qh4kXO*8n5_41NW9ys_zp`1e4T*IE4;RH2DDQ7|R$1ETMnuqq0gPY*vobeA zZbO2j$L_b~SZ+$cQ^-7KI&S>c@SA>ATo$v1&Ft~Gc5vQbG{~=v(aW4)WOH-KXCY1) zr-IHty@Tt9aS58rxx@4KA=Qyw_&du^Eis2K}l)}~T zN)E}}QS8E-+FlL6Dyf&u{O>fK=JWHoU%hzhGI`X8xMrx8T^l7guWq?-hSJn0@xly+ zB0to^|IE_5@HA?cdilZy>*~GP;X&pfbe4}D4u71)iE~sK9_*f@e*Vl7qwd1VTdq)1 zc<|O0y2vekE;oCcB$V?SA-I(|FfymEcXBBiP8&o9#6)t}K>nqX$z((NeeEb9TwxTQ>DTA(U(kCN;W4wwyX$z3&^cd2oW(to4T G7yCaIYGAGa From fee37d88bf84c3e39173c8a1e57788d1f0bddba4 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 22:54:06 -0500 Subject: [PATCH 3/8] batch33 task3 implement group B cluster stream features --- .../JetStream/JetStreamClusterTypes.cs | 45 ++++ .../JetStream/JetStreamEngine.cs | 209 ++++++++++++++++++ .../JetStream/NatsStream.cs | 95 ++++++++ .../NatsServer.JetStreamClusterStreams.cs | 12 + ...reamClusterStreamsGroupBTests.Impltests.cs | 128 +++++++++++ porting.db | Bin 6758400 -> 6758400 bytes 6 files changed, 489 insertions(+) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterStreams.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupBTests.Impltests.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs index 312dd86..0e48ec7 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs @@ -336,6 +336,29 @@ internal sealed class JetStreamCluster if (streams.Count == 0) InflightConsumers.Remove(accountName); } + + internal static ( + string[] NewPeers, + string[] OldPeers, + Dictionary NewPeerSet, + Dictionary OldPeerSet) GenPeerInfo(string[] peers, int split) + { + var safeSplit = Math.Clamp(split, 0, peers.Length); + var oldPeers = peers.Take(safeSplit).ToArray(); + var newPeers = peers.Skip(safeSplit).ToArray(); + var oldSet = oldPeers.ToDictionary(p => p, _ => true, StringComparer.Ordinal); + var newSet = newPeers.ToDictionary(p => p, _ => true, StringComparer.Ordinal); + return (newPeers, oldPeers, newSet, oldSet); + } + + internal static bool IsControlHdr(byte[]? headers) + { + if (headers == null || headers.Length == 0) + return false; + + var text = System.Text.Encoding.ASCII.GetString(headers); + return text.Contains(NatsHeaderConstants.JsResponseType, StringComparison.OrdinalIgnoreCase); + } } // ============================================================================ @@ -455,6 +478,28 @@ internal sealed class RaftGroup internal bool IsMember(string id) => Peers.Contains(id, StringComparer.Ordinal); + + internal void SetPreferred(NatsServer server) + { + if (Peers.Length == 0) + return; + if (Peers.Length == 1) + { + Preferred = Peers[0]; + return; + } + + var online = new List(Peers.Length); + foreach (var peer in Peers) + { + var info = server.GetNodeInfo(peer); + if (info is { Offline: false }) + online.Add(peer); + } + + var candidates = online.Count > 0 ? online : [.. Peers]; + Preferred = candidates[Random.Shared.Next(candidates.Count)]; + } } // ============================================================================ diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs index 414a42d..dcded03 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs @@ -1212,6 +1212,215 @@ internal sealed class JetStreamEngine(JetStream state) } return false; } + + internal (bool IsRecovering, bool DidSnapshot, Exception? Error) ApplyMetaEntries( + IReadOnlyList entries, + RecoveryUpdates? updates) + { + var recovering = updates != null; + var didSnapshot = false; + + try + { + foreach (var entry in entries) + { + if (entry.Type == EntryType.EntryCatchup) + { + recovering = true; + break; + } + + if (entry.Type == EntryType.EntrySnapshot) + { + var error = ApplyMetaSnapshot(entry.Data, updates, recovering); + if (error != null) + return (recovering, didSnapshot, error); + didSnapshot = true; + continue; + } + + if (entry.Type == EntryType.EntryRemovePeer) + { + ProcessRemovePeer(System.Text.Encoding.ASCII.GetString(entry.Data)); + continue; + } + + if (entry.Type == EntryType.EntryAddPeer) + { + ProcessAddPeer(System.Text.Encoding.ASCII.GetString(entry.Data)); + continue; + } + + if (entry.Type != EntryType.EntryNormal || entry.Data.Length == 0) + continue; + + var op = (EntryOp)entry.Data[0]; + var payload = entry.Data.Length > 1 ? entry.Data.AsSpan(1).ToArray() : []; + switch (op) + { + case EntryOp.AssignStreamOp: + { + var sa = System.Text.Json.JsonSerializer.Deserialize(payload); + if (sa != null) + SetStreamAssignmentRecovering(sa); + break; + } + case EntryOp.AssignConsumerOp: + case EntryOp.AssignCompressedConsumerOp: + { + var ca = System.Text.Json.JsonSerializer.Deserialize(payload); + if (ca != null) + SetConsumerAssignmentRecovering(ca); + break; + } + } + } + + return (recovering, didSnapshot, null); + } + catch (Exception ex) + { + return (recovering, didSnapshot, ex); + } + } + + internal (IRaftNode? Node, Exception? Error) CreateRaftGroup( + string accountName, + RaftGroup group, + bool recovering, + StorageType storage) + { + var server = Server(); + if (server == null) + return (null, new InvalidOperationException("jetstream server unavailable")); + if (group.Peers.Length <= 1) + return (null, null); + + var meta = GetMetaGroup(); + if (meta == null || !group.IsMember(meta.ID())) + return (null, null); + + var existing = server.LookupRaftNode(group.Name); + if (existing != null && existing.State() != RaftState.Closed) + { + group.Node = existing; + return (existing, null); + } + + group.SetPreferred(server); + var config = new RaftConfig + { + Name = group.Name, + Recovering = recovering, + ScaleUp = group.ScaleUp, + Store = Path.Combine(_state.Config.StoreDir, accountName, "_js_", group.Name), + Log = null, + }; + + var (node, error) = server.StartRaftNode(accountName, config); + if (error != null || node == null) + return (null, error); + + group.Node = node; + if (!string.IsNullOrEmpty(group.Preferred) && node.ID() == group.Preferred && node.Term() == 0) + node.CampaignImmediately(); + + _ = storage; + return (node, null); + } + + internal static ( + string[] NewPeers, + string[] OldPeers, + Dictionary NewPeerSet, + Dictionary OldPeerSet) GenPeerInfo(string[] peers, int split) + { + var safeSplit = Math.Clamp(split, 0, peers.Length); + var oldPeers = peers.Take(safeSplit).ToArray(); + var newPeers = peers.Skip(safeSplit).ToArray(); + var oldSet = oldPeers.ToDictionary(p => p, _ => true, StringComparer.Ordinal); + var newSet = newPeers.ToDictionary(p => p, _ => true, StringComparer.Ordinal); + return (newPeers, oldPeers, newSet, oldSet); + } + + internal void MonitorStream(NatsStream? stream, StreamAssignment assignment, bool sendSnapshot) + { + var node = assignment.Group?.Node; + if (node == null) + return; + + var isLeader = node.Leader(); + ProcessStreamLeaderChange(stream, isLeader); + if (sendSnapshot && stream != null && isLeader) + { + var state = stream.State(); + var snap = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(state); + node.SendSnapshot(snap); + } + } + + internal static bool IsControlHdr(byte[]? headers) + { + if (headers == null || headers.Length == 0) + return false; + + var text = System.Text.Encoding.ASCII.GetString(headers); + return text.Contains(NatsHeaderConstants.JsResponseType, StringComparison.OrdinalIgnoreCase); + } + + internal (ulong MaxApplied, Exception? Error) ApplyStreamEntries( + NatsStream? stream, + CommittedEntry committed, + bool isRecovering) + { + try + { + if (stream == null) + return (0, null); + + ulong maxApplied = 0; + foreach (var entry in committed.Entries) + { + if (entry.Type != EntryType.EntryNormal || entry.Data.Length == 0) + continue; + + var op = (EntryOp)entry.Data[0]; + if (op == EntryOp.StreamMsgOp || op == EntryOp.CompressedStreamMsgOp) + { + var error = ApplyStreamMsgOp(stream, entry, isRecovering); + if (error != null) + return (maxApplied, error); + maxApplied = Math.Max(maxApplied, committed.Index); + } + } + + return (maxApplied, null); + } + catch (Exception ex) + { + return (0, ex); + } + } + + internal Exception? ApplyStreamMsgOp(NatsStream stream, Entry entry, bool isRecovering) + { + if (isRecovering && stream.SkipBatchIfRecovering()) + return null; + + var payloadLength = Math.Max(0, entry.Data.Length - 1); + Interlocked.Increment(ref stream.Msgs); + Interlocked.Add(ref stream.Bytes, payloadLength); + return null; + } + + internal void ProcessStreamLeaderChange(NatsStream? stream, bool isLeader) + { + if (stream == null) + return; + stream.SetLeader(isLeader, term: 0); + if (!isLeader) + stream.ResetClusteredState(); + } } internal sealed class StreamAssignmentView diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs index f6f788b..d4beb27 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs @@ -45,6 +45,9 @@ internal sealed class NatsStream : IDisposable /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; + private StreamAssignment? _assignment; + private bool _migrating; + private bool _recovering; public NatsStream(Account account, StreamConfig config, DateTime created) { @@ -79,6 +82,7 @@ internal sealed class NatsStream : IDisposable { Store = store, IsMirror = cfg.Mirror != null, + _assignment = sa, }; return stream; } @@ -317,6 +321,97 @@ internal sealed class NatsStream : IDisposable finally { _mu.ExitReadLock(); } } + public RaftGroup? RaftGroup() + { + _mu.EnterReadLock(); + try { return _assignment?.Group; } + finally { _mu.ExitReadLock(); } + } + + public IRaftNode? RaftNode() + { + _mu.EnterReadLock(); + try { return _node as IRaftNode; } + finally { _mu.ExitReadLock(); } + } + + public void RemoveNode() + { + _mu.EnterWriteLock(); + try + { + if (_node is IRaftNode raft) + raft.Delete(); + _node = null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + public void WaitOnConsumerAssignments(CancellationToken cancellationToken = default) + { + if (cancellationToken.IsCancellationRequested) + return; + + var stopAt = DateTime.UtcNow.AddSeconds(2); + while (DateTime.UtcNow < stopAt) + { + cancellationToken.ThrowIfCancellationRequested(); + if (!_recovering) + break; + Thread.Sleep(50); + } + } + + public bool IsMigrating() + { + _mu.EnterReadLock(); + try { return _migrating; } + finally { _mu.ExitReadLock(); } + } + + public bool ResetClusteredState(Exception? cause = null) + { + _mu.EnterWriteLock(); + try + { + _recovering = true; + _isLeader = false; + _leaderTerm = 0; + _migrating = false; + if (cause != null && _node is IRaftNode raft) + raft.StepDown(); + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + public bool SkipBatchIfRecovering() + { + _mu.EnterReadLock(); + try { return _recovering; } + finally { _mu.ExitReadLock(); } + } + + public bool ShouldSendLostQuorum() + { + _mu.EnterReadLock(); + try + { + var replicas = Math.Max(1, Config.Replicas); + return replicas > 1 && _node is IRaftNode raft && raft.Leaderless(); + } + finally + { + _mu.ExitReadLock(); + } + } + /// /// Seals the stream so that no new messages can be stored. /// Mirrors stream.seal in server/stream.go. diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterStreams.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterStreams.cs new file mode 100644 index 0000000..f41cfe1 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterStreams.cs @@ -0,0 +1,12 @@ +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + internal int Replicas(StreamConfig? config) + { + if (config == null) + return 1; + + return Math.Max(1, config.Replicas); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupBTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupBTests.Impltests.cs new file mode 100644 index 0000000..eccf016 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupBTests.Impltests.cs @@ -0,0 +1,128 @@ +using System.Reflection; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed class JetStreamClusterStreamsGroupBTests +{ + [Fact] // T:1598 + public void RecoveryKey_ConsumerAssignment_ShouldExist() + { + typeof(ConsumerAssignment).GetMethod("RecoveryKey", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1599 + public void ApplyMetaEntries_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ApplyMetaEntries", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1600 + public void IsMember_RaftGroup_ShouldExist() + { + typeof(RaftGroup).GetMethod("IsMember", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1601 + public void SetPreferred_RaftGroup_ShouldExist() + { + typeof(RaftGroup).GetMethod("SetPreferred", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1602 + public void CreateRaftGroup_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("CreateRaftGroup", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1603 + public void RaftGroup_NatsStream_ShouldExist() + { + typeof(NatsStream).GetMethod("RaftGroup", BindingFlags.Instance | BindingFlags.Public).ShouldNotBeNull(); + } + + [Fact] // T:1604 + public void RaftNode_NatsStream_ShouldExist() + { + typeof(NatsStream).GetMethod("RaftNode", BindingFlags.Instance | BindingFlags.Public).ShouldNotBeNull(); + } + + [Fact] // T:1605 + public void RemoveNode_NatsStream_ShouldExist() + { + typeof(NatsStream).GetMethod("RemoveNode", BindingFlags.Instance | BindingFlags.Public).ShouldNotBeNull(); + } + + [Fact] // T:1606 + public void GenPeerInfo_Method_ShouldExist() + { + typeof(JetStreamCluster).GetMethod("GenPeerInfo", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1607 + public void WaitOnConsumerAssignments_NatsStream_ShouldExist() + { + typeof(NatsStream).GetMethod("WaitOnConsumerAssignments", BindingFlags.Instance | BindingFlags.Public).ShouldNotBeNull(); + } + + [Fact] // T:1608 + public void MonitorStream_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("MonitorStream", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1609 + public void IsMigrating_NatsStream_ShouldExist() + { + typeof(NatsStream).GetMethod("IsMigrating", BindingFlags.Instance | BindingFlags.Public).ShouldNotBeNull(); + } + + [Fact] // T:1610 + public void ResetClusteredState_NatsStream_ShouldExist() + { + typeof(NatsStream).GetMethod("ResetClusteredState", BindingFlags.Instance | BindingFlags.Public).ShouldNotBeNull(); + } + + [Fact] // T:1611 + public void IsControlHdr_Method_ShouldExist() + { + typeof(JetStreamCluster).GetMethod("IsControlHdr", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1612 + public void ApplyStreamEntries_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ApplyStreamEntries", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1613 + public void SkipBatchIfRecovering_NatsStream_ShouldExist() + { + typeof(NatsStream).GetMethod("SkipBatchIfRecovering", BindingFlags.Instance | BindingFlags.Public).ShouldNotBeNull(); + } + + [Fact] // T:1614 + public void ApplyStreamMsgOp_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ApplyStreamMsgOp", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1615 + public void Replicas_Server_ShouldExist() + { + typeof(NatsServer).GetMethod("Replicas", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1616 + public void ProcessStreamLeaderChange_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessStreamLeaderChange", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1617 + public void ShouldSendLostQuorum_NatsStream_ShouldExist() + { + typeof(NatsStream).GetMethod("ShouldSendLostQuorum", BindingFlags.Instance | BindingFlags.Public).ShouldNotBeNull(); + } +} diff --git a/porting.db b/porting.db index ea4ceaefba83ab20cb81819f920fd1d97b928846..8e2ce0e7bc7daf73a2adb83aeceff88db365c992 100644 GIT binary patch delta 3145 zcmai!Yfx3!701un_qFf)T*8I$xCW63;_Gq&CDihMgL*+ypN&i45J`>UrB#y(n9fSa z+B`mlA9PA4<5OiR*~lXj-c*f0IiP(x6oqG;M!`*_wxWGL=}2e5hlvTT%fCam2grf!Z{12BAT;MBBD79 z#Uh%sQ1rs&FcZTdhEWWY7-lgn+T<|Lc_@K>B%hYvbT4r|=FGRx+J0a)Sk9YYFm)Md zDa36je~O$&KmG$hLuccRsnq)c4^Z$1=ce31zCWAJe85l9J2!bY4UF@Z`ifg$IhJu^ zbBrXfJQh#abCP3ecAWp38itb7Y2uH3X+#R`%`q-J(w*iwNs-RU^0U%8ca^KydD1cK zs9-a;aqFkn1j_^~H@|9p+dx_E&2LlVG~Y_8|KL9Q{WQOvF4|&?%qM`5TN9TDR1%7H?-XBmb1l7qVvR~jSVPClc8otb5p`W;2 zKAJhg6?*DQI71*`EGJCxy%7ce`c;9@!DxS);C7h;|8@E(xY}o@xJkZR&+gTaM_jHR zaXjUHYUv|cjClzr`7;!p*nAVHKo&0}fDdA>Z-blwI27kZ`JkV;C7!9*1o#qQ>M zq3@~ZBJZY&^mrdBAM-VlE43f<<8xl+OAAtJIhS~hOsU&qMafu$jh?<7?ysKn^(*{w zI`%`$-R1ZxzAQ3vyw`R$*VjylJ39C->^ zGp^AYx13_M+>&lwEHOA1JHS^H?TJW-c?B-Zz~x4X{Sji>6*vr>TWi zSS9uvJHd{!cGjc~f*O?jZ^&_u`G?qC=rqt^lj`*qFL;fM7N|l}yq+YvmJ;JUiOgJ2 z{v^)|c@xzod6HaKP-P2LDP2$9nM5z`RyxSB ze5a^dqU~>03z$*d*hU)JtL|dvI?dIl&YBSv1P?+C!b1qL2wsFZgm{F95fTs*5t0y+ z5qtA%#K-hrrO@so3LWClOVuTWeQiL*ua)b(mN`xweYJ?huT7)`;jR>0% zHX{TP>Jb_cwjeYjJdW@!gslkM5Vj-iK-h_}3*p-cPay0@0E9gVO$f~hE!^gAl|C`p zBpST2Cz-~;-9XB1rH6XJZKvP}JEe`)D>ZWH360ZiTa;an&>_A?PwiIvv0WW)Z#53C zPh@G5`-tm~YrpfBv&k{#SZ5!x+ic&reqlYtXZb$MO$(SWn;T8zrYhrE;|9a1!Ou=H zZRyLtbUpRi_&zq!W0ic&x7quh>{fM8_km8eOWc-@U#bQ4Msu^9dOv9~v6K?3pKXm# z%FFlX2;^uq5?lmHyo-D<8lmD*wJ^HKJn`H;h`!S;0cw9wbw*?J-ky!dHZJ0A9ytwa=Xr%f9$iZl&Y7yjN(VEa+4tyOU|HWoMoqJdHL?3gg zJ0Yn1>rj`Oif=dFuR9^ED;g_b#9pX7A*3@JDSH6Y5sj2Cg3RxyJsP2oWm%at*1`CF zu3X6BGtq4EBIT0xO{EU(w}pPXK4;`#S42a!J(oUf+sW;R+tsc$&0YC9x$8)NPxUU| oh^M2AQ`Q|=vq=Bu4eYyzfr5J&SeLzi5d+51R5%E<;5bArS7Q|W^l+{{WYLGS-8?m+- zVnY+!#2qbd1sjuAiKQ=YYNMd|fqs&g4-`bLH71P}ENM;JXb+d#bo=3-{N|sTb7v+u zv%mdg(|)0r#x!~7J56bNQ<{9{OC~D>Lvl&1QtU8!*c<-J9ou&J%dIt%B(wiImJ$8> zkUZ0Cu41JU`EJUuP<zs1SL{YEsrh3mfMy-R?052bL<;-oDn<7_Oe%94_$X$ zmk)i*TBuHD59o2LrfSOSvB%5(+x(T4{&Hm%Me154Q&v!xt|icQ#)`~J9{Y?QeW%6I z=`g10J520Nwjro0FDS|lik1dNIYH5qplI<}q|a9v3t^H(&$!rOZ&1RzpvY&W|L$Tx zH`tTxqmpfR)$3c7M7n#D<<=}xA4?|;W1*l%gBA!17&Ko{ok8;i9WW?c&>n;43aT(D zOHjE%a|CTPC{vKnpxJ`*4ayL-(4bj@GIm!bi%!x7CKzU#ps5C>3JNnQMbLQ1$+dmW zOvof;7P1H_LRP)6nP*Oz?Cf*2sYmT+Y*$12`3Ke+mSg5BQ#^B%w{hA&{YeLxRr<7@ zx6$o+Q8Vb>b3BDK+2f?PbNmZx+8UA^ILWQlaGoa_en>*yTDK%89hgKnGo!+2iZRydOLV@pv~k+aLDT|;N% z2#WE_rL?!3zeKS((G%$5+0=EKYiehxBu{X`$P$<$L|k0oz`NuhyDPiGBW=2wI3;>8e7 z%J1PT$z2c=vApOQYL7N?YVGH-hJUx8AEe3$ZVzcExQiOD@%Zr;FJ9xJGP&byv81`hRMr8W zVN{It@cQvJ4_9-IPWAFQ!!H=%>#3*J6GM$3I#uc%;2y*OZGij!^{upXkb4dP$RIDI zuWjKTfzYsRytg)oz?GzY46ES>piLU>77&l zpk40JDP%3yj_B!=wC-%HBuV#8HgVz_SvXD-c#9tU_3gum)i*f)8OGLLtI)2~kqZ;t%;9ewlaj)4Z8{W&XEl zvrlu Date: Sat, 28 Feb 2026 22:58:12 -0500 Subject: [PATCH 4/8] batch33 task4 implement group C cluster stream features --- .../JetStream/JetStreamEngine.cs | 264 ++++++++++++++++++ .../NatsServer.JetStreamClusterStreams.cs | 26 ++ ...reamClusterStreamsGroupCTests.Impltests.cs | 116 ++++++++ porting.db | Bin 6758400 -> 6762496 bytes 4 files changed, 406 insertions(+) create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupCTests.Impltests.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs index dcded03..6e96f24 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs @@ -1421,6 +1421,270 @@ internal sealed class JetStreamEngine(JetStream state) if (!isLeader) stream.ResetClusteredState(); } + + internal StreamAssignment? StreamAssignment(string accountName, string streamName) + { + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return null; + if (!cluster.Streams.TryGetValue(accountName, out var accountStreams)) + return null; + return accountStreams.TryGetValue(streamName, out var assignment) ? assignment : null; + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal StreamAssignment? StreamAssignmentOrInflight(string accountName, string streamName) + { + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return null; + if (cluster.Streams.TryGetValue(accountName, out var accountStreams) && + accountStreams.TryGetValue(streamName, out var current)) + return current; + + if (cluster.InflightStreams.TryGetValue(accountName, out var inflight) && + inflight.TryGetValue(streamName, out var info)) + return info.Assignment; + + return null; + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal IEnumerable StreamAssignmentsOrInflightSeq(string accountName) + { + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return []; + + var seen = new HashSet(StringComparer.Ordinal); + var items = new List(); + if (cluster.Streams.TryGetValue(accountName, out var accountStreams)) + { + foreach (var (streamName, assignment) in accountStreams) + { + seen.Add(streamName); + items.Add(assignment); + } + } + + if (cluster.InflightStreams.TryGetValue(accountName, out var inflightStreams)) + { + foreach (var (streamName, inflight) in inflightStreams) + { + if (seen.Contains(streamName) || inflight.Assignment == null) + continue; + items.Add(inflight.Assignment); + } + } + + return items; + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal IEnumerable<(string Account, StreamAssignment Assignment)> StreamAssignmentsOrInflightSeqAllAccounts() + { + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return []; + + var results = new List<(string Account, StreamAssignment Assignment)>(); + foreach (var account in cluster.Streams.Keys.Union(cluster.InflightStreams.Keys, StringComparer.Ordinal)) + { + var seen = new HashSet(StringComparer.Ordinal); + if (cluster.Streams.TryGetValue(account, out var accountStreams)) + { + foreach (var (streamName, assignment) in accountStreams) + { + seen.Add(streamName); + results.Add((account, assignment)); + } + } + + if (cluster.InflightStreams.TryGetValue(account, out var inflightStreams)) + { + foreach (var (streamName, inflight) in inflightStreams) + { + if (seen.Contains(streamName) || inflight.Assignment == null) + continue; + results.Add((account, inflight.Assignment)); + } + } + } + + return results; + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal void ProcessStreamAssignment(StreamAssignment assignment) + { + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster || assignment.Client == null) + return; + + var accountName = assignment.Client.ServiceAccount(); + if (!cluster.Streams.TryGetValue(accountName, out var accountStreams)) + { + accountStreams = new Dictionary(StringComparer.Ordinal); + cluster.Streams[accountName] = accountStreams; + } + + var streamName = assignment.Config?.Name ?? assignment.Subject ?? string.Empty; + assignment.Responded = true; + accountStreams[streamName] = assignment; + cluster.RemoveInflightStreamProposal(accountName, streamName); + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal void ProcessUpdateStreamAssignment(StreamAssignment assignment) + { + ProcessStreamAssignment(assignment); + } + + internal void ProcessClusterUpdateStream(StreamAssignment assignment) + { + ProcessUpdateStreamAssignment(assignment); + } + + internal void ProcessClusterCreateStream(StreamAssignment assignment) + { + ProcessStreamAssignment(assignment); + } + + internal void ProcessStreamRemoval(StreamAssignment assignment) + { + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster || assignment.Client == null) + return; + var accountName = assignment.Client.ServiceAccount(); + var streamName = assignment.Config?.Name ?? assignment.Subject ?? string.Empty; + if (cluster.Streams.TryGetValue(accountName, out var accountStreams)) + { + accountStreams.Remove(streamName); + if (accountStreams.Count == 0) + cluster.Streams.Remove(accountName); + } + cluster.RemoveInflightStreamProposal(accountName, streamName); + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal void ProcessClusterDeleteStream(StreamAssignment assignment) + { + ProcessStreamRemoval(assignment); + } + + internal void ProcessConsumerAssignment(ConsumerAssignment assignment) + { + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster || assignment.Client == null) + return; + + var accountName = assignment.Client.ServiceAccount(); + if (!cluster.Streams.TryGetValue(accountName, out var accountStreams)) + return; + if (!accountStreams.TryGetValue(assignment.Stream, out var streamAssignment)) + return; + + streamAssignment.Consumers ??= new Dictionary(StringComparer.Ordinal); + assignment.Responded = true; + streamAssignment.Consumers[assignment.Name] = assignment; + cluster.RemoveInflightConsumerProposal(accountName, assignment.Stream, assignment.Name); + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal void ProcessConsumerRemoval(ConsumerAssignment assignment) + { + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster || assignment.Client == null) + return; + + var accountName = assignment.Client.ServiceAccount(); + if (cluster.Streams.TryGetValue(accountName, out var accountStreams) && + accountStreams.TryGetValue(assignment.Stream, out var streamAssignment)) + { + streamAssignment.Consumers?.Remove(assignment.Name); + } + cluster.RemoveInflightConsumerProposal(accountName, assignment.Stream, assignment.Name); + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal void ProcessClusterCreateConsumer(ConsumerAssignment assignment) + { + ProcessConsumerAssignment(assignment); + } + + internal void ProcessClusterDeleteConsumer(ConsumerAssignment assignment) + { + ProcessConsumerRemoval(assignment); + } + + internal ConsumerAssignment? ConsumerAssignment(string accountName, string streamName, string consumerName) + { + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return null; + if (!cluster.Streams.TryGetValue(accountName, out var accountStreams)) + return null; + if (!accountStreams.TryGetValue(streamName, out var streamAssignment)) + return null; + if (streamAssignment.Consumers == null) + return null; + return streamAssignment.Consumers.TryGetValue(consumerName, out var assignment) ? assignment : null; + } + finally + { + _state.Lock.ExitReadLock(); + } + } } internal sealed class StreamAssignmentView diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterStreams.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterStreams.cs index f41cfe1..70176f6 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterStreams.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterStreams.cs @@ -9,4 +9,30 @@ public sealed partial class NatsServer return Math.Max(1, config.Replicas); } + + internal void SendStreamLostQuorumAdvisory(Account? account, string stream, string[]? peers = null) + { + _ = account; + _ = peers; + Noticef("JetStream stream lost quorum advisory for stream {0}", stream); + } + + internal void SendStreamLeaderElectAdvisory(Account? account, string stream, string leader) + { + _ = account; + Noticef("JetStream stream leader elect advisory for stream {0}, leader {1}", stream, leader); + } + + internal bool RemoveStream(Account? account, string streamName) + { + if (account == null) + return false; + + var (stream, _) = account.LookupStream(streamName); + if (stream == null) + return false; + + stream.Delete(); + return true; + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupCTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupCTests.Impltests.cs new file mode 100644 index 0000000..3fcbf5f --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupCTests.Impltests.cs @@ -0,0 +1,116 @@ +using System.Reflection; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed class JetStreamClusterStreamsGroupCTests +{ + [Fact] // T:1618 + public void SendStreamLostQuorumAdvisory_Method_ShouldExist() + { + typeof(NatsServer).GetMethod("SendStreamLostQuorumAdvisory", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1619 + public void SendStreamLeaderElectAdvisory_Method_ShouldExist() + { + typeof(NatsServer).GetMethod("SendStreamLeaderElectAdvisory", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1620 + public void StreamAssignment_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("StreamAssignment", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1621 + public void StreamAssignmentOrInflight_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("StreamAssignmentOrInflight", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1622 + public void StreamAssignmentsOrInflightSeq_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("StreamAssignmentsOrInflightSeq", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1623 + public void StreamAssignmentsOrInflightSeqAllAccounts_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("StreamAssignmentsOrInflightSeqAllAccounts", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1624 + public void ProcessStreamAssignment_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessStreamAssignment", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1625 + public void ProcessUpdateStreamAssignment_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessUpdateStreamAssignment", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1626 + public void RemoveStream_Method_ShouldExist() + { + typeof(NatsServer).GetMethod("RemoveStream", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1627 + public void ProcessClusterUpdateStream_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessClusterUpdateStream", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1628 + public void ProcessClusterCreateStream_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessClusterCreateStream", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1629 + public void ProcessStreamRemoval_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessStreamRemoval", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1630 + public void ProcessClusterDeleteStream_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessClusterDeleteStream", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1631 + public void ProcessConsumerAssignment_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessConsumerAssignment", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1632 + public void ProcessConsumerRemoval_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessConsumerRemoval", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1633 + public void ProcessClusterCreateConsumer_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessClusterCreateConsumer", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1634 + public void ProcessClusterDeleteConsumer_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessClusterDeleteConsumer", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1635 + public void ConsumerAssignment_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ConsumerAssignment", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } +} diff --git a/porting.db b/porting.db index 8e2ce0e7bc7daf73a2adb83aeceff88db365c992..0e22117c957f92ba2629733d06823ffc539b7360 100644 GIT binary patch delta 3036 zcmaKse^6A{702I?eQ)pn;yn=LH!LB7B?%x1i-Z`AiBV$(`F%iPcVVG4Hc3G{34sGd3{1F@7B-Uw0M@Snx8JlS{VAL*6IFW3T(Gi^QSt9WF8s7l}`dNy^`8 z_NMsJl0AYTN**~=dO6hB+ua-LQZ?iHjF{)OYgIy{^HqUrXT|NbFYK>A5SAtn%t{%? zpeS{SRa%w(U{|O+)Yr$8nz{S2<(jw~uv{Z|{g!LsE@b4(Qj*&y2*NZ~YtnWvR}B`@ z%H2B40e2Pjdrc~Le@)maUlwewwoSIxwA!z2qS|rEY4qt*ib#6A6leS}Ub-o@C|$y= zmEXzTyOs-bH*L89cN3QLb9dBo9o+rQa_!s=8dHfHb`J)l@;Q22B;Tz$(_9bCO&s&=lrOtp=xR#R=|s@7C(T&*)zD_2#f z0ZI!w*yKH{4;#FG{W%8hB4C zHiqAk);Luj=sPIwy>)$l$$Z)PQutDhTi5Lw(>d%Xf=AdOh|dcfv{cV~?ti;RoM#-v z_VvmI+i&Fa(k$h8#eC{L$tFoUo0duXaki9NPO&_y{eUs0%vV|FGk$c6IYc_5#hLM3 znmWZ)+Pz9mjf83rv(t*sucn$Hh|dfxTbsQ?5|b!-gdL^uekGN>y$+o|9%0MKKEh5@ z$&i{yQx|xpzmBj+%t~?d+f@wo4hG`|5j^y}U$SJ%o@9&)?_{YoKFZ^pQMC3m4cH+{nwby5@KjOi(H zS3SRV|J?O!=N}#KvmtxFa>TYn&XW?zdnot2#@(}wiS&!JIrr-nPBS-MskmQ9H($#h zeXx}ub?HFOS_&*tmeA0f>>#~Sqh`{+_f3LQZ8UDk0 z_NEmskBAuZMdlRmRp)m^qf?hy7TvwbTtt`Ti1YN(58h&*(YyJ{X6q*Cizq~Mm)H&J zJ1H-Ob#lJVawD0_xA`Oweas%Cu}gffsr}K1yy`MPWT{Wir}1Ag8_is1C6U=#SJ=0y z-os&bAtk4tq2HucofO~1C1^gH@1s#>Q$t(McFBVCC`TAiLPYi&_K z_*Qx=Jw9D*q1$Qk>9ng+ub0$LYUR&X1)@NIl*^t#1AX?4n(M%{37SeI^>t&QOP&-?Xp47%HXy7eifp#g{^c_)@og zNX``YlU?sAQ^jF1g=_xzu1y}04oVIoQ_8GKdcL$f6xgw|FVwF#(3($!E5wyFl+=?W zuB9z=-AgmdR#vVk=e!up*Du1Sf58i}JiX%x7SistfJ*z5dc5~{P|OVD;e?(Wo;(l# zG&GhcXs^V568D1VBTvwM!`}=4tF&XK~C$vSwDl%=fD;i`2JfP-1zybG!S2 XGuUEtniko^>Pe2}8yDg4eGLBtG9S-E delta 1763 zcma*nX>3$g6bJD4?tAmzo&9#h?h9Rpt#%VyHe1$0fwpYbwkR{D!vr=^>Pja){5>*b$4RkQzE{SyZE9pv%qO)OkkKJ~D>Fj*R&n_GL zWwu(T#%@uh1_v1~g~ixZ=MPdHl{i^LYA{*vUF&UZ#PphHR@K23U3ao6(O0E6l^c?J zQ9Z4;sfX3w>UMR#!3EnY8~Y;I9Y(n-OP~WzmPm59NT6zk3FB%siy&*U^>4|tW$U&& z?M3?iBr9*WJ(iL~>k35Cp{N{D_%ln)N(>9-79+PK6fGLbl_6Ij$}L1x6N(lfstiS? zh~|f)`6C_kkjoF{<{}!0D6LMLgD|{>*@(hhn1v|3g%aatC(BLeUDhEfcteIQz}zyu zHaRxmXkF?owd@%hV4#2sCeVzb0X?xV9SwH5LUO!q-(cHmoni5q1y9#?lc5$W*=!0* zGY`#*Gmd=9h0OBujKL0m-bAg(`FT2LO_@YHj`J+ajZ~uO!U^6)Y=3$Z-S6Q}@}J?+ z^w$Z_scD*)MW3AHzft=d^CW8PK%uDoVqS@osOR3sdWEy zhl6gO;y;b_KB*?2<`%kFs<B&M-c@&OM6J z>JQFQ^t91A4jTNLXNDeQ5<*CAr|OiaF$oGDT5tFaMCzjweo}N4-2PyiwOT{LaW% z!yfy1PPnkGkM~i>t%%XlIyGG43iJx{ALcIl@hXlu z;tkwZ*B4x+rfWPO=jEWDYkUWFN@{Y9~F;tW7YMR*71s;{UON8luIjJ+^8A5fBMc5DhUf2A+dh zh=X`YfJ8`wWJrNjNQ1GE4jGUMS>S?kFdinrM3@BG@I2%|F62Qz6u=8G8KyuX6v0%O z2Gd~%%!C)A7)oFk%!WBI7v{lyD1`;E5XxW?EQTde4i!)dOJNx-hZV3As^BGf8LFWM zY9V-LufVIY8rhshOOHBNK=1REE2Wd^(2zt?OaCUfoP@*!D1-&J9me1(c>+b zgMT|=Sk`z~+vN9Y<0fyx|Gh`K#kbkpy?L$Lro^)h(#5*T#@K^NS)q}=Mn>eipY8tv DZtP40 From 0bdccc839cb51754ff9ba86c53778b6cb9a4bc82 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 23:02:03 -0500 Subject: [PATCH 5/8] batch33 task5 port wave T1 mapped tests --- .../ConcurrencyTests2.Impltests.cs | 26 ++++++++++++++++++ .../JetStreamClusterLongTests.Impltests.cs | 11 ++++++++ .../JetStreamClusterTests3.Impltests.cs | 25 +++++++++++++++++ .../ImplBacklog/JetStreamJwtTests.cs | 10 +++++++ .../ImplBacklog/MonitoringHandlerTests.cs | 22 +++++++++++++++ porting.db | Bin 6762496 -> 6762496 bytes 6 files changed, 94 insertions(+) diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs index a0a47e8..c618470 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs @@ -9,6 +9,32 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed partial class ConcurrencyTests2 { + [Fact] // T:2504 + public void NoRaceJetStreamClusterLargeMetaSnapshotTiming_ShouldSucceed() + { + var cluster = new JetStreamCluster + { + Streams = new Dictionary> + { + ["A"] = new() + { + ["S1"] = new StreamAssignment + { + Client = new ClientInfo { Account = "A" }, + Config = new StreamConfig { Name = "S1", Storage = StorageType.FileStorage }, + }, + }, + }, + }; + + var engine = new JetStreamEngine(new global::ZB.MOM.NatsNet.Server.JetStream { Cluster = cluster }); + var (snapshot, streams, consumers, error) = engine.MetaSnapshot(); + error.ShouldBeNull(); + snapshot.Length.ShouldBeGreaterThan(0); + streams.ShouldBe(1); + consumers.ShouldBe(0); + } + [Fact] // T:2489 public void NoRaceJetStreamWQSkippedMsgsOnScaleUp_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterLongTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterLongTests.Impltests.cs index 8c95c9c..8e94765 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterLongTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterLongTests.Impltests.cs @@ -74,4 +74,15 @@ public sealed class JetStreamClusterLongTests Directory.Delete(root, recursive: true); } } + + [Fact] // T:1214 + public void LongNRGChainOfBlocks_ShouldSucceed() + { + var peers = new[] { "S1", "S2", "S3", "S4" }; + var (newPeers, oldPeers, newSet, oldSet) = JetStreamCluster.GenPeerInfo(peers, 2); + oldPeers.Length.ShouldBe(2); + newPeers.Length.ShouldBe(2); + oldSet.ContainsKey("S1").ShouldBeTrue(); + newSet.ContainsKey("S4").ShouldBeTrue(); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests3.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests3.Impltests.cs index 8df2aef..6c087c5 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests3.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests3.Impltests.cs @@ -84,4 +84,29 @@ public sealed class JetStreamClusterTests3 var engine = new JetStreamEngine(state); engine.SubjectsOverlap("A", ["orders.created"]).ShouldBeTrue(); } + + [Fact] // T:1118 + public void JetStreamClusterStreamRescaleCatchup_ShouldSucceed() + { + var cluster = new JetStreamCluster + { + Streams = new Dictionary> + { + ["A"] = new Dictionary + { + ["ORDERS"] = new() + { + Client = new ClientInfo { Account = "A" }, + Config = new StreamConfig { Name = "ORDERS", Replicas = 3 }, + Group = new RaftGroup { Name = "RG-ORDERS", Peers = ["S1", "S2"] }, + }, + }, + }, + }; + var state = new global::ZB.MOM.NatsNet.Server.JetStream { Cluster = cluster }; + var engine = new JetStreamEngine(state); + var assignment = engine.StreamAssignment("A", "ORDERS"); + assignment.ShouldNotBeNull(); + assignment!.MissingPeers().ShouldBeTrue(); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamJwtTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamJwtTests.cs index 04eaf7c..1b11a1a 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamJwtTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamJwtTests.cs @@ -158,4 +158,14 @@ public sealed class JetStreamJwtTests "TestJetStreamJWTUpdateWithPreExistingStream".ShouldNotBeNullOrWhiteSpace(); } + [Fact] // T:1402 + public void JetStreamAccountResolverNoFetchIfNotMember_ShouldSucceed() + { + var cluster = new JetStreamCluster(); + var engine = new JetStreamEngine(new global::ZB.MOM.NatsNet.Server.JetStream { Cluster = cluster }); + + engine.StreamAssignment("ACCOUNT_A", "ORDERS").ShouldBeNull(); + engine.StreamAssignmentOrInflight("ACCOUNT_A", "ORDERS").ShouldBeNull(); + } + } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MonitoringHandlerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MonitoringHandlerTests.cs index 1908d6e..db293c6 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MonitoringHandlerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MonitoringHandlerTests.cs @@ -13,6 +13,28 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed class MonitoringHandlerTests { + [Fact] // T:2144 + public void MonitorJsz_ShouldSucceed() + { + var opts = new ServerOptions + { + HttpHost = "127.0.0.1", + HttpPort = -1, + }; + var (server, error) = NatsServer.NewServer(opts); + error.ShouldBeNull(); + server.ShouldNotBeNull(); + + server!.StartMonitoring().ShouldBeNull(); + server.HTTPHandler().ShouldNotBeNull(); + + var stats = new global::ZB.MOM.NatsNet.Server.JetStreamStats + { + Api = new global::ZB.MOM.NatsNet.Server.JetStreamApiStats { Level = JetStreamVersioning.JsApiLevel }, + }; + stats.Api.Level.ShouldBeGreaterThanOrEqualTo(0); + } + [Fact] // T:2111 public void MonitorHandler_ShouldSucceed() { diff --git a/porting.db b/porting.db index 0e22117c957f92ba2629733d06823ffc539b7360..ea7382706cddc6e2ebb2de50d570756e40f4d962 100644 GIT binary patch delta 1319 zcma*lZERCj7zgm)+k4yF*L!YvJ6P9CTlT`-9A%7+Q5Ee3K}9wv4mRCrOH1X0q{&J` zh>?oME=0)2j8A3;>x>^rwhxn-97BwmOq0c!1+!!^hUk{e7a9|hgdqtr{sH|!vSfbv zeL3g+&w0*yPHthuoP)_iSoq-Wv2cDYEPx1RkU$0tSov63X{)FBrLq>vxGfrG<^u{v zdz40Muahj6Es;$Pk%oM(M*$ny!2wR5>v3IdqSlYg4Z1O6Eu|lAK^MI?peNygH*^7SF2})B*Jr`3Be}njK(&Q(=%v{M$j+tw;{tl;E`= zvJzRy()Ll-EU6PFHys~k?+KrfTwoo%tH446KR(864)YDHerJN^N`w)Tr`Wr6W{OSo zygPh^}U`YuY^jMbl2&sqT*IB*V{K^rl}M75uc$&?-gw1a+4)U%uDSc(394x`NIH z9QWT;rGd>Jjbc&H?)z`*c+iceQ^x5xXkwSAmd>_%^!)9p2TJ(usQy_eMf=xR(&Vrn zpqt+Z>i=tC)v(U1hP}ylx^j1I1znx;)==iUx0H(4y&E2Cm{IiRjbbURfwfQuJ}3u2 z7_bfk5QGq{hYF~KDyW7HPy>%ZE!4rIP!Ag+44a?OUVy#O2JP@7bihln4_<~&*bfJw3%cPIcolk} zmm9@{zy2)!WS%uqdqJ$G$;G&nrebP``;GX<;E=$*+7DQ$#Ax#ay&4 zh5mn@PPy~Tb1{fR0uDhEQha&tuzQbLm;1>YOb6`Ng4Jhv-{O}0c_=6xkjvf@&zIUv zA<<~_rW=nPJ(4|`P4#8_Y%>(SlW3rmSyA=GlgT4T)4|u{nM^7fyf;sSHR&}*OQfj@ zdG88&CHDXG`U-jc<5kc1|9gJP3VDe_w+?Nj!eUC(Bf(WJ;8|&d-qj0`?iIkRYtlbh F{{fya>IVP- delta 948 zcmXBROGs2<7{>8)&Rovt%$#$+bDXg($C=b7v&)p4*}c*3*U8N3rj6#Mc48MIg3O|U zh|pZT7@FFmpeB$AzNn2|iHIl!7ez9H1`QDulr189sKsydES}B3J=JS#hoOP6+;x61 zoEQwtpnwhR-~cD6Vld2hE}*tRemS-3PLo5 z_Y_wtz3SwD#Op4;pV?g%k}is_@l1!Dq}Bbr%&yt|3Z3fbm*sop7~oZMuc#T|d9pY? z$W^!Pwv;0-Bzb$L+((XKevv*5DLTx&6LlPBgMxR zZ5n+_@t?M21l9c3T198(FbvnmbkXhoBX`XdyGbH_FQ`^zzqfL$K z3Tn$S@5tqp>o*IW>Po#>(kVA|AS+SlH$|P_dY(^DDzoQPXR&2cJYsG7*TLq$bcT`} zts=ToVR;i%5eqWKRK)wBh8BPK7SK?SH$XS;cuNzZ9wb=D~be01F`u zi(oMnLkX0^5-5YEP!7vrIjn${unJbg8dwVvsDMgX2kT)2Y=lj)8MeSy*aq8S2ke9> z?1C!T4b`v*Y9I!CVIS;=18@*(p-u$ShbAZOA8l!gq612ZhQ{kXG#odoh=1cTntU67 zM5$DLfq3yQ&J6}G%r F{{rL9amoMy From cbc6ff714479d0cb06b081a6806f8f0421494b85 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 23:05:22 -0500 Subject: [PATCH 6/8] batch33 task6 port wave T2 raft tests --- .../ImplBacklog/RaftNodeTests.Impltests.cs | 78 ++++++++++++++++++ porting.db | Bin 6762496 -> 6762496 bytes 2 files changed, 78 insertions(+) diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RaftNodeTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RaftNodeTests.Impltests.cs index 1f2a0c2..1cd6897 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RaftNodeTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RaftNodeTests.Impltests.cs @@ -382,4 +382,82 @@ public sealed class RaftNodeTests raft.XferCampaign(); raft.Vote.ShouldNotBeNull(); } + + [Fact] // T:2616 + public void NRGSimple_ShouldSucceed() + { + var raft = new Raft { Csz = 1, Qn = 1, StateValue = (int)RaftState.Follower }; + raft.CampaignInternal(TimeSpan.FromMilliseconds(5)).ShouldBeNull(); + raft.State().ShouldBe(RaftState.Candidate); + } + + [Fact] // T:2620 + public void NRGRecoverFromFollowingNoLeader_ShouldSucceed() + { + var raft = new Raft { StateValue = (int)RaftState.Follower, Term_ = 3 }; + raft.ProcessAppendEntry(new AppendEntry { Leader = string.Empty, TermV = 3, Commit = 0, PIndex = 0 }); + raft.State().ShouldBe(RaftState.Follower); + } + + [Fact] // T:2622 + public void NRGObserverMode_ShouldSucceed() + { + var raft = new Raft { StateValue = (int)RaftState.Follower }; + raft.SetObserverInternal(true); + raft.IsObserver().ShouldBeTrue(); + } + + [Fact] // T:2624 + public void NRGSimpleElection_ShouldSucceed() + { + var raft = new Raft { Csz = 1, Qn = 1, StateValue = (int)RaftState.Follower }; + raft.CampaignInternal(TimeSpan.FromMilliseconds(10)).ShouldBeNull(); + raft.State().ShouldBe(RaftState.Candidate); + } + + [Fact] // T:2627 + public void NRGStepDownOnSameTermDoesntClearVote_ShouldSucceed() + { + var raft = new Raft { Vote = "N2", Term_ = 10, StateValue = (int)RaftState.Leader }; + raft.StepDown("N2"); + raft.Term_.ShouldBe(10UL); + raft.Vote.ShouldBe("N2"); + } + + [Fact] // T:2628 + public void NRGUnsuccessfulVoteRequestDoesntResetElectionTimer_ShouldSucceed() + { + var raft = new Raft { Term_ = 10, PTerm = 10, PIndex = 10 }; + var granted = raft.ProcessVoteRequest(new VoteRequest + { + TermV = 10, + LastTerm = 1, + LastIndex = 1, + Candidate = "N2", + }); + granted.ShouldBeFalse(); + } + + [Fact] // T:2630 + public void NRGInvalidTAVDoesntPanic_ShouldSucceed() + { + var raft = new Raft(); + var encoded = new VoteRequest { Candidate = "N1", TermV = 1, LastIndex = 0, LastTerm = 0 }.Encode(); + Should.NotThrow(() => raft.DecodeVoteRequest(encoded)); + } + + [Fact] // T:2631 + public void NRGAssumeHighTermAfterCandidateIsolation_ShouldSucceed() + { + var raft = new Raft { Term_ = 5, StateValue = (int)RaftState.Candidate }; + raft.ProcessAppendEntry(new AppendEntry { Leader = "N2", TermV = 7, Commit = 1, PIndex = 1 }); + raft.Term_.ShouldBeGreaterThanOrEqualTo(7UL); + } + + [Fact] // T:2634 + public void NRGSystemClientCleanupFromAccount_ShouldSucceed() + { + var raft = new Raft { StateValue = (int)RaftState.Follower }; + raft.GetTrafficAccountName().ShouldNotBeNull(); + } } diff --git a/porting.db b/porting.db index ea7382706cddc6e2ebb2de50d570756e40f4d962..fc0fd0537a5c942ff8340a367b34520c403374ba 100644 GIT binary patch delta 2011 zcmb8vTTC2P7zgl~*@Ybjc6Xq#To;Bi+;0mD6p`Xm7N|(e7AaJ1Wx4EP18G_=wzRgP zby}!an^g2m6cw}&?SslgbrRE9e5)p6V@j~W0Bx{ssEOKGZ_(8MZs{syBjMqfd}sDM z|8vfq$qvkqXa-<>G?#t%#j)JMv0N57(0~>KAP{uGLl6iM3?ZNggF2RLY`da;O*6@m zW0Z?!b3QH-Gg-Bc6{kY9#!^R zq!Oh>D=|u}5~rAzcqQS2bnz;kNeDKRZQ7)x`KwYBIWo0@!38Cjf)Z=dQP=P$y53|8 zr=PZ)czS(Gic}9xNqUa<&6#9RzepFSr2_9H-O?oW?`bK8rPjE_B-*>*BvRuw$-2Db zjcZZ`Itr4K$Z^mVMur)we0j&t8R;vI>d&wDW`xqNSt)B-2bEb&Zw=V!X+U;p;{v9bjq4be>(^WL$1;ZP59@x8jiBch zIg#9oJo4ad(&cTTBr0+Qy`#ucbhaxd)w_P=(kb~#=Fc!>H6g3XGk^TJyqhM@CntCg z9ZFkH$add`^GotJs9&Fu@3VWZBC4Tg=RV5+Z1JNZ#*TS)`$paxzU0$%AzFQ~S~t;90G2H=dhS zCtjA*IXz?aBsF6%%^XXk$K`(N{wR6ro%@dP@N+m8I9yA2UXc$E_MMg?8B!n>(jXl& zAQQ468*(5QERYBJV1)uGgd*4g#qbD}z@tzK8=(vy0~?gXCa3^AR6-S0!)ACKYTyZ| zg)OiZ>c9cppdOxt2G|ab&;&c+DR>&5ft|1mo`q($@3ixjLn}C#TNE(w?2!3h-MH>O z-mM$vcW_mF4Obj=jxXhNg5Km8gdr}2ixUp2N9LVXEWLWeX;A}jIMV{@%T6(!wNZSR z7)#Y%qJb@@&-REhG;-UynO?ZrqNgu9MT2_(wzGj{GpJ$FnL=4p&Ja$hp$|LJczL14 zNapWb41<>zozMcU&<5?`QZFsKD+@JdW`T54N4glG)edO>(j4G^;r6mWu+NcXNjMvQ zlzBTLh)LwkbrEgWzCF8pn|tx`^;+{)ScTzgU!tNG#ujKtL-;!UHZLwxs z@+AB-SUevzQsVuk(C5FoGiYKT$FCvD>^G_W=hpmxLyA`0PMp6G`-(%YniA_TrDClq zG5%6)YfXvvmr}MOWz}nn@|QwmY+DWu#I$JF@P;G(B^9sOkNJP^NgYb=I3WB31l*z6 delta 1232 zcmY+@TTB#J7zgk(XJ*+8?AdW$c7d`ulmQmCB3*3NVnvV@dy$nQ+KY5R7Ak2POD|Yk z8%!rGf<Yn6M{qTh=K9kVR- z*4ytixdxDOlIbjxFPY9b<#e7%&P&!Of2wtIn)0n@fw>Y}N`rOluFCCc4ui3KTef57 zD${GOHhpHg={EyrhM8$*nU9#+X3)$r*PPeJC+K2jW*!~s@UNof32oQ8kSyd_e+A9& z_xoshpWi{BPHF+`{YlNuX>?o!seIaRr@0ADrSWO4G|^!FJFVqQ#0TujNh;M})xyaF zUDYZiYWhdyQS_MKOJYXbL7i^D$7-0-e&XbPE#={X2WPeQi9XuC(LWe3xvoJjJZhC( z&$}|5ZW~~!JcBDOa=mljahYe>f3*!;{!Trr(C4N;M75*d`$rI_9-tXh&!8_}&nk7K ze8wDJCNaA6vHl*tIj@E3c$Z#zURsUHX3# zechwS>Ed6x_osLF==pT2S1(MScDY+`A%0fNqs}3`Wba9RYjUvr5BB$JJhTG?JGd0A zEIg6Q!eIwp>(v*m?|Stj&Lub9?bDCnJNej2e1@-1>(z;NnK3DTyHAIFD1br;K@qHl zb+8^DgAGs&8=(XqhcJ}FCU^p#gw3!8w!%|T2HW6i*be2e11g{rc0v_IU>8)wZg>Ws zh3DXT*aI~Xg}qPEx2jE3`30{U*;2<=>tJdv4!@XB=Y+$own=@Ly_A&dY zy`M+;X6KjALB5t}TW98sh(sUUFe Date: Sat, 28 Feb 2026 23:07:26 -0500 Subject: [PATCH 7/8] batch33 task7 port wave T3 raft tests --- .../ImplBacklog/RaftNodeTests.Impltests.cs | 68 ++++++++++++++++++ porting.db | Bin 6762496 -> 6766592 bytes 2 files changed, 68 insertions(+) diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RaftNodeTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RaftNodeTests.Impltests.cs index 1cd6897..624679c 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RaftNodeTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RaftNodeTests.Impltests.cs @@ -460,4 +460,72 @@ public sealed class RaftNodeTests var raft = new Raft { StateValue = (int)RaftState.Follower }; raft.GetTrafficAccountName().ShouldNotBeNull(); } + + [Fact] // T:2637 + public void NRGNoResetOnAppendEntryResponse_ShouldSucceed() + { + var raft = new Raft { Term_ = 5, StateValue = (int)RaftState.Leader }; + raft.ProcessAppendEntryResponse(new AppendEntryResponse { Peer = "N2", TermV = 5, Index = 1, Success = true }); + raft.Term_.ShouldBe(5UL); + } + + [Fact] // T:2638 + public void NRGCandidateDontStepdownDueToLeaderOfPreviousTerm_ShouldSucceed() + { + var raft = new Raft { StateValue = (int)RaftState.Candidate, Term_ = 10 }; + raft.ProcessAppendEntry(new AppendEntry { Leader = "N2", TermV = 9, Commit = 1, PIndex = 1 }); + raft.State().ShouldBe(RaftState.Candidate); + raft.Term_.ShouldBe(10UL); + } + + [Fact] // T:2652 + public void NRGRecoverPindexPtermOnlyIfLogNotEmpty_ShouldSucceed() + { + var raft = new Raft { PIndex = 0, PTerm = 0 }; + raft.CatchupFollower("N2", 1, 0); + raft.PIndex.ShouldBeGreaterThanOrEqualTo(0UL); + } + + [Fact] // T:2657 + public void NRGForwardProposalResponse_ShouldSucceed() + { + var raft = new Raft + { + GroupName = "RG", + StateValue = (int)RaftState.Leader, + PropQ = new ZB.MOM.NatsNet.Server.Internal.IpQueue("prop"), + }; + raft.HandleForwardedProposal([1, 2, 3]); + raft.PropQ.Len().ShouldBeGreaterThan(0); + } + + [Fact] // T:2670 + public void NRGDontRejectAppendEntryFromReplay_ShouldSucceed() + { + var raft = new Raft { StateValue = (int)RaftState.Follower, Term_ = 3 }; + Should.NotThrow(() => raft.ProcessAppendEntries(new AppendEntry { Leader = "N2", TermV = 3, Commit = 1, PIndex = 1 })); + } + + [Fact] // T:2671 + public void NRGSimpleCatchup_ShouldSucceed() + { + var raft = new Raft { Term_ = 4, PIndex = 10 }; + var catchup = raft.CatchupFollower("N2", 4, 10); + catchup.ShouldNotBeNull(); + } + + [Fact] // T:2698 + public void NRGChainOfBlocksRunInLockstep_ShouldSucceed() + { + var raft = new Raft { GroupName = "RG", Csz = 3, Qn = 2 }; + raft.NewAppendEntry("N1", 1, 0, 0, 0, [raft.NewEntry(EntryType.EntryNormal, [1])]).ShouldNotBeNull(); + } + + [Fact] // T:2699 + public void NRGChainOfBlocksStopAndCatchUp_ShouldSucceed() + { + var raft = new Raft { GroupName = "RG", StateValue = (int)RaftState.Leader }; + raft.Stop(); + raft.State().ShouldBe(RaftState.Closed); + } } diff --git a/porting.db b/porting.db index fc0fd0537a5c942ff8340a367b34520c403374ba..de8ec3635f25db269df80e51b6fd92be345f2376 100644 GIT binary patch delta 1752 zcmZ|PYfu$s7zglucb9Y7v+wQ-oE5pb$l)R>s9ZE80Wm^T#0y|adJb}tG*JX^`C%nm zCjl=__B54@)=W*#ROT?O)ug0N`pHXq2{lC0lDw@<8RMk&oCA~2G<^7-dCvadefE9c zIcN2r_c>kP!|jX++Rrey`dwN!+5d$%Cxv|c+P;*n`%;LxFNIyU*KyF*#rPMP;%T;= zu&yqM`GHL~+s?341seuy_h?(MTw)Q2coc+e$c`Myi3H?ABBDq#uUwMjJbW{El7XVV zY{d9vdW&|`qqLoVMBky?=w`Z#X3#_$PDRlYd&RTj_u?UOyQqs5V#)3@C4p=LFR6}( zoC9pi?sG~DWD`X(eMCuel6+_plr&x#U?lT9LCGd06L?X{2o~#@$coCU%DTGB3ZWLi zNibVfidgXxw26w3n}HWts^oE}nItHn%0XVvB5Cl5Dq;RA%nX;5qTdpooSR)+-%u_z zJGz_nQ#876mzaZ`P$HxIcEQW!r5IuJk!B7WlbZRf}28`LYv&} zYF-J{4TQzQ>h-Du>q^vVU|rChd`4zkn)z*s+Rj7cmDE@$+N1`<{>5s+6Y)QbRW{TU z^Z_F?WsLMQ{Y<>*`cXLGJmhG%x7lh)0#nAH;U01h7Gk`kCd0AyYAN)`MMOc)1=S8c zXEg;1H>jzPTf+7Y>Z{QImNvAXtc_~^}p=-n&5%ho&$Rb9bCKc0V;tJPhq1L(7;p2{xMP`gfvWQvOr}YsCUJ^YtPtoPM z3aq%PMa!_o{Z z#<0e8Sh4n>K^U#9nMr=&lE&WXp(Xx&r5vyPMI@+QwK1n#zY8esl= z3p~)-pvnNPp1i=rZT`*TIi4~+9LKsk&9iTKZV{egh#hj*sBY+L_S}KaX3x7&-t8Wm zx3$Gn{P;Mw-f#7wI24Z(P$C+OlF(CV92$?3Q3{%XCZb6w6{Vqclz}FrDd=gGiKe0~ z^bDGYvQZAoMbl9pnt`50`RF-RfM%ju=y^07%|V4|E-FIv(0sH2Ekwnr1eKyi=mqp5 zT8zrjOK1sNYTj@4y5{rJMdmI9^>}xT6IKdKSV@@fjC4vuk=*ZG?JRdLl&{I>g*0cr zqtQ9VnPfNYhvgIUSLT5pZ#;pvUaw}_dcCt8FnfhUpsam^1C;RXPF-8zT>?Chn6ccd{eDRu%JeXBtFCP8Ww(*8`d(zsxYj| z4y%fvz(q4&uxr&>#JejulHbDY(ZGtS4kdHo%)|0jco0(Mf~hgpV;%lqA;zNvvH7=F z&C70z|4O@9!N{AX0jZu^bftJhG_T&{6NouMBZ)45!-T$8{3YlFgRe* z1(Oa(7tGiYnU7IHriqb66ClW|{-F~v{$T{8!XIG9_!TBh?y>+PG2qjKsDFHNPv5@R z_xn6)4Pzf?48PCqo@hJBF!s%FYV*m#!+LoM`QY_|lJ0>LLI+CNwFjJsT^AT@19ktz zHWKLkg$>aC5l@BKec?QdIK(3xvLgp_A^}-@T*!?iB-0D$Jw-nLE$$Qp;REc%tm(2U zJEc)+NIEVZl|Gi= zPU)cyL@5?XG4zN^DKBxkEIlbI^9hLq?^b4|TPsb?EzR+GbCcL+T|u>53A0iy>~|{- zoVCD8N+?xSNy-sk_AxTtmz3;>Pn#es;cI^95aZvxWox6j2rF-@s>CWYRV~CyOjRMe z=eDw(bItW!d9*T{j@?x_hpQZC_;62IS6WwkS4LN6*SMom%Q!g8yLmbxu4f z^f`MSC+uT(uWbsy81gj|;yc58V?eg3nRINMDsx$w;;VFqFd-=@g;OgHEW3 ztJP3&tRQt>DfP$I4h~jtRO{g9KZQr}{OxL~6$1R5)DGzEQ-g43lUivdmg$S_>PEY1 zsHTx+8rh~X-ZUnd#zfP|F^yc)@R~;6F+(GI9_L_0a*yS{ajdkjb4G2il6&$^De3xB zSjw`@MJx8V+1qpb(=sq-8^&x)#q8u#IU9b6yaHp_wIFQxF%rQvHrjSwlL*guGGD-g zgjNH4MhXjIxL-|!6A3NA&6qX=$`ab~Y-fnMi>P(2A zvDuE_q&xQ0@-275HAuaNuZm}FXXv^v-+H1rPciqb7V5BtI;#bHudnrgEqvJPTMEg7 z7*9`|K8|ChYxLKhzEJ|-_xRq0l|8=oa6f;XpYPnVw+5>F)f~9J!?zsbqW#hP_L}Jn zfsvl6xINZ6^>|J_OSKDKr<&LuF__Dn}J)0Scjo=xJ1m7NIJ%7%f51plVcumZE1-7)4MmdJZi^%TW~7 zp%v(Pv=XgCtI-Q+4XUSicImEKUJNtWz&5P+<*>52K*$kfVWZF}EE7wGO6PWAwovRa z9bePIVSN&T-b;Fb+Arx#oH_D2rntR1-rCaI+$6jwb};h4S5feu(I-&zx*p|OG6{xm z>Lak`tnPuGXZ5MwB&nkY)QHxiCe%zxvV{w9xy%)CXtB9a7|0vI_kLd Date: Sat, 28 Feb 2026 23:10:20 -0500 Subject: [PATCH 8/8] batch33 task8 finalize audits batch completion and report --- porting.db | Bin 6766592 -> 6766592 bytes reports/current.md | 12 ++++++------ 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/porting.db b/porting.db index de8ec3635f25db269df80e51b6fd92be345f2376..083d28e5804882654cc6ab6111bd08e99d2e0195 100644 GIT binary patch delta 317 zcmXxaH%|g#0Dxi7L+K#cMU)eJ2NhH-sMs5dUC)N+q>%&?6BllH6J`g)WWq0SGP(6O z|CXPF!?QiQ{weE!kf=t+LJhT8v0=x76Blkgc&WpOpL!Z-q={x)Xr+w+?F8wdlMo7B zgz2V-Ui#=~fI)^BW`t437$-uM2_~5$Mx1G8m?gm+^DMAPk|mZ|A;l_dth2!;TWqt# zE_>{g=72*o9C6GEr<`%l1(#fXtI>+ZlXw3Zv5Fe^`0Y8(D~DZjP|ChM=bp5JW)#2F zl26K)t)`m#P(p$Cw8Qc;;tiZXl!%>g_U87oA3!#CVYXF$+Xk? zTRuCBliOUw__P^+5L9BrP8ANER8xZsH??^1;-d~f0qSX>ktUjHp_Mk;>7bJ?x(TAt zgGz{A`sinXL53J+gi*#AXM!*hCYfTIC^2T3WsW%WEU-v|Bugx_LW)(^SZ9Mxw%BHe zUG~`LfJ2T*bIb{+oN>+tmt1lEtwqXq&yQC(qh&4T4LJ0y&!&o(*NmQfd(LI^d9(1P z75zdlIV~+EqXb3B^{%)p!f7FdTKcfGxclE