diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs
index d563da5..312dd86 100644
--- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs
@@ -452,6 +452,9 @@ internal sealed class RaftGroup
/// Internal Raft node — not serialized.
[JsonIgnore]
public IRaftNode? Node { get; set; }
+
+ internal bool IsMember(string id) =>
+ Peers.Contains(id, StringComparer.Ordinal);
}
// ============================================================================
@@ -482,6 +485,49 @@ internal sealed class StreamAssignment
[JsonIgnore] public bool Resetting { get; set; }
[JsonIgnore] public Exception? Error { get; set; }
[JsonIgnore] public UnsupportedStreamAssignment? Unsupported { get; set; }
+
+ internal StreamAssignment CopyGroup()
+ {
+ var clone = new StreamAssignment
+ {
+ Client = Client,
+ Created = Created,
+ ConfigJson = ConfigJson,
+ Config = Config,
+ Group = Group == null
+ ? null
+ : new RaftGroup
+ {
+ Name = Group.Name,
+ Peers = [.. Group.Peers],
+ Storage = Group.Storage,
+ Cluster = Group.Cluster,
+ Preferred = Group.Preferred,
+ ScaleUp = Group.ScaleUp,
+ Node = Group.Node,
+ },
+ Sync = Sync,
+ Subject = Subject,
+ Reply = Reply,
+ Restore = Restore,
+ Consumers = Consumers,
+ Responded = Responded,
+ Recovering = Recovering,
+ Reassigning = Reassigning,
+ Resetting = Resetting,
+ Error = Error,
+ Unsupported = Unsupported,
+ };
+ return clone;
+ }
+
+ internal bool MissingPeers() =>
+ Group != null &&
+ Config != null &&
+ Group.Peers.Length < Math.Max(1, Config.Replicas);
+
+ internal string RecoveryKey() =>
+ $"{Client?.ServiceAccount() ?? string.Empty}:{Config?.Name ?? Subject ?? string.Empty}";
}
// ============================================================================
@@ -551,6 +597,45 @@ internal sealed class ConsumerAssignment
[JsonIgnore] public bool Recovering { get; set; }
[JsonIgnore] public Exception? Error { get; set; }
[JsonIgnore] public UnsupportedConsumerAssignment? Unsupported { get; set; }
+
+ internal ConsumerAssignment CopyGroup()
+ {
+ var clone = new ConsumerAssignment
+ {
+ Client = Client,
+ Created = Created,
+ Name = Name,
+ Stream = Stream,
+ ConfigJson = ConfigJson,
+ Config = Config,
+ Group = Group == null
+ ? null
+ : new RaftGroup
+ {
+ Name = Group.Name,
+ Peers = [.. Group.Peers],
+ Storage = Group.Storage,
+ Cluster = Group.Cluster,
+ Preferred = Group.Preferred,
+ ScaleUp = Group.ScaleUp,
+ Node = Group.Node,
+ },
+ Subject = Subject,
+ Reply = Reply,
+ State = State,
+ Responded = Responded,
+ Recovering = Recovering,
+ Error = Error,
+ Unsupported = Unsupported,
+ };
+ return clone;
+ }
+
+ internal string StreamRecoveryKey() =>
+ $"{Client?.ServiceAccount() ?? string.Empty}:{Stream}";
+
+ internal string RecoveryKey() =>
+ $"{Client?.ServiceAccount() ?? string.Empty}:{Stream}:{Name}";
}
// ============================================================================
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs
index 6237f85..414a42d 100644
--- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs
@@ -727,6 +727,491 @@ internal sealed class JetStreamEngine(JetStream state)
!string.IsNullOrWhiteSpace(value) &&
value.Length >= 10 &&
value.StartsWith("A", StringComparison.Ordinal);
+
+ // -------------------------------------------------------------------------
+ // JetStream cluster stream methods (Batch 33 Group A)
+ // -------------------------------------------------------------------------
+
+ internal void MonitorCluster()
+ {
+ var meta = GetMetaGroup();
+ if (meta == null)
+ return;
+
+ SetMetaRecovering();
+ try
+ {
+ CheckClusterSize();
+ var (_, _, _, error) = MetaSnapshot();
+ if (error != null)
+ Server()?.Warnf("JetStream meta snapshot failed in MonitorCluster: {0}", error.Message);
+ }
+ finally
+ {
+ ClearMetaRecovering();
+ ClusterStoppedC()?.Writer.TryWrite(true);
+ }
+ }
+
+ internal void CheckClusterSize()
+ {
+ var server = Server();
+ var meta = GetMetaGroup();
+ if (server == null || meta == null)
+ return;
+
+ var activePeers = server.ActivePeers();
+ if (activePeers.Count == 0)
+ return;
+
+ var jsPeers = 0;
+ foreach (var peer in activePeers)
+ {
+ var info = server.GetNodeInfo(peer);
+ if (info?.Js == true)
+ jsPeers++;
+ }
+
+ if (jsPeers > 0 && jsPeers < meta.ClusterSize())
+ meta.AdjustClusterSize(jsPeers);
+ }
+
+ internal (StreamConfig Config, bool Ok) ClusterStreamConfig(string accountName, string streamName)
+ {
+ _state.Lock.EnterReadLock();
+ try
+ {
+ if (_state.Cluster is not JetStreamCluster cluster)
+ return (new StreamConfig(), false);
+ if (!cluster.Streams.TryGetValue(accountName, out var accountStreams))
+ return (new StreamConfig(), false);
+ if (!accountStreams.TryGetValue(streamName, out var assignment))
+ return (new StreamConfig(), false);
+ if (assignment.Config == null)
+ return (new StreamConfig(), false);
+
+ return (assignment.Config.Clone(), true);
+ }
+ finally
+ {
+ _state.Lock.ExitReadLock();
+ }
+ }
+
+ internal (byte[] Snapshot, int Streams, int Consumers, Exception? Error) MetaSnapshot()
+ {
+ _state.Lock.EnterReadLock();
+ try
+ {
+ if (_state.Cluster is not JetStreamCluster cluster)
+ return ([], 0, 0, null);
+
+ return EncodeMetaSnapshot(cluster.Streams);
+ }
+ finally
+ {
+ _state.Lock.ExitReadLock();
+ }
+ }
+
+ internal Exception? ApplyMetaSnapshot(byte[] buffer, RecoveryUpdates? updates, bool isRecovering)
+ {
+ var (decoded, error) = DecodeMetaSnapshot(buffer);
+ if (error != null || decoded == null)
+ return error;
+
+ _state.Lock.EnterWriteLock();
+ try
+ {
+ if (_state.Cluster is not JetStreamCluster cluster)
+ return null;
+
+ foreach (var accountStreams in decoded.Values)
+ {
+ foreach (var assignment in accountStreams.Values)
+ {
+ SetStreamAssignmentRecovering(assignment);
+ if (assignment.Consumers == null)
+ continue;
+
+ foreach (var consumer in assignment.Consumers.Values)
+ {
+ SetConsumerAssignmentRecovering(consumer);
+ }
+ }
+ }
+
+ cluster.Streams = decoded;
+ if (!isRecovering && updates != null)
+ {
+ updates.AddStreams.Clear();
+ updates.UpdateStreams.Clear();
+ updates.RemoveStreams.Clear();
+ updates.UpdateConsumers.Clear();
+ updates.RemoveConsumers.Clear();
+ }
+ return null;
+ }
+ finally
+ {
+ _state.Lock.ExitWriteLock();
+ }
+ }
+
+ internal (Dictionary>? Streams, Exception? Error) DecodeMetaSnapshot(byte[] buffer)
+ {
+ if (buffer.Length == 0)
+ {
+ return (
+ new Dictionary>(StringComparer.Ordinal),
+ null
+ );
+ }
+
+ try
+ {
+ var writeable = System.Text.Json.JsonSerializer.Deserialize>(buffer) ?? [];
+ var streams = new Dictionary>(StringComparer.Ordinal);
+ foreach (var item in writeable)
+ {
+ var accountName = item.Client?.ServiceAccount() ?? string.Empty;
+ if (!streams.TryGetValue(accountName, out var accountStreams))
+ {
+ accountStreams = new Dictionary(StringComparer.Ordinal);
+ streams[accountName] = accountStreams;
+ }
+
+ var assignment = new StreamAssignment
+ {
+ Client = item.Client,
+ Created = item.Created,
+ ConfigJson = item.ConfigJson,
+ Group = item.Group,
+ Sync = item.Sync,
+ Config = item.ConfigJson.ValueKind == System.Text.Json.JsonValueKind.Undefined
+ ? null
+ : System.Text.Json.JsonSerializer.Deserialize(item.ConfigJson.GetRawText()),
+ Consumers = new Dictionary(StringComparer.Ordinal),
+ };
+
+ if (item.Consumers.Count > 0)
+ {
+ foreach (var consumer in item.Consumers)
+ {
+ var streamName = !string.IsNullOrWhiteSpace(consumer.Stream)
+ ? consumer.Stream
+ : assignment.Config?.Name ?? string.Empty;
+ var assignmentConsumer = new ConsumerAssignment
+ {
+ Client = consumer.Client,
+ Created = consumer.Created,
+ Name = consumer.Name,
+ Stream = streamName,
+ ConfigJson = consumer.ConfigJson,
+ Group = consumer.Group,
+ State = consumer.State,
+ Config = consumer.ConfigJson.ValueKind == System.Text.Json.JsonValueKind.Undefined
+ ? null
+ : System.Text.Json.JsonSerializer.Deserialize(consumer.ConfigJson.GetRawText()),
+ };
+ assignment.Consumers[assignmentConsumer.Name] = assignmentConsumer;
+ }
+ }
+
+ var streamKey = assignment.Config?.Name ?? string.Empty;
+ accountStreams[streamKey] = assignment;
+ }
+
+ return (streams, null);
+ }
+ catch (Exception ex)
+ {
+ return (null, ex);
+ }
+ }
+
+ internal (byte[] Snapshot, int Streams, int Consumers, Exception? Error) EncodeMetaSnapshot(
+ Dictionary> streams)
+ {
+ try
+ {
+ var streamCount = 0;
+ var consumerCount = 0;
+ var writeable = new List();
+ foreach (var accountStreams in streams.Values)
+ {
+ foreach (var assignment in accountStreams.Values)
+ {
+ streamCount++;
+ var configJson = assignment.ConfigJson.ValueKind == System.Text.Json.JsonValueKind.Undefined &&
+ assignment.Config != null
+ ? System.Text.Json.JsonSerializer.SerializeToElement(assignment.Config)
+ : assignment.ConfigJson;
+ var ws = new WriteableStreamAssignment
+ {
+ Client = assignment.Client,
+ Created = assignment.Created,
+ ConfigJson = configJson,
+ Group = assignment.Group,
+ Sync = assignment.Sync,
+ };
+
+ if (assignment.Consumers != null)
+ {
+ foreach (var consumer in assignment.Consumers.Values)
+ {
+ consumerCount++;
+ var consumerConfigJson =
+ consumer.ConfigJson.ValueKind == System.Text.Json.JsonValueKind.Undefined &&
+ consumer.Config != null
+ ? System.Text.Json.JsonSerializer.SerializeToElement(consumer.Config)
+ : consumer.ConfigJson;
+ ws.Consumers.Add(new WriteableConsumerAssignment
+ {
+ Client = consumer.Client,
+ Created = consumer.Created,
+ Name = consumer.Name,
+ Stream = consumer.Stream,
+ ConfigJson = consumerConfigJson,
+ Group = consumer.Group,
+ State = consumer.State,
+ });
+ }
+ }
+
+ writeable.Add(ws);
+ }
+ }
+
+ if (writeable.Count == 0)
+ return ([], 0, 0, null);
+
+ var snapshot = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(writeable);
+ return (snapshot, streamCount, consumerCount, null);
+ }
+ catch (Exception ex)
+ {
+ return ([], 0, 0, ex);
+ }
+ }
+
+ internal Exception? CollectStreamAndConsumerChanges(
+ IRaftNodeCheckpoint checkpoint,
+ Dictionary> streams)
+ {
+ try
+ {
+ foreach (var (appendEntry, error) in checkpoint.AppendEntriesSeq())
+ {
+ if (error != null)
+ return error;
+
+ foreach (var entry in appendEntry.Entries)
+ {
+ if (entry.Type != EntryType.EntryNormal || entry.Data.Length == 0)
+ continue;
+
+ var op = (EntryOp)entry.Data[0];
+ var payload = entry.Data.Length > 1 ? entry.Data.AsSpan(1).ToArray() : [];
+ switch (op)
+ {
+ case EntryOp.AssignStreamOp:
+ case EntryOp.UpdateStreamOp:
+ {
+ var sa = System.Text.Json.JsonSerializer.Deserialize(payload);
+ if (sa?.Client == null || sa.Config == null)
+ break;
+ var account = sa.Client.ServiceAccount();
+ if (!streams.TryGetValue(account, out var accountStreams))
+ {
+ accountStreams = new Dictionary(StringComparer.Ordinal);
+ streams[account] = accountStreams;
+ }
+
+ accountStreams[sa.Config.Name] = sa;
+ break;
+ }
+ case EntryOp.RemoveStreamOp:
+ {
+ var sa = System.Text.Json.JsonSerializer.Deserialize(payload);
+ if (sa?.Client == null || sa.Config == null)
+ break;
+ if (streams.TryGetValue(sa.Client.ServiceAccount(), out var accountStreams))
+ {
+ accountStreams.Remove(sa.Config.Name);
+ }
+ break;
+ }
+ case EntryOp.AssignConsumerOp:
+ case EntryOp.AssignCompressedConsumerOp:
+ {
+ var ca = System.Text.Json.JsonSerializer.Deserialize(payload);
+ if (ca?.Client == null || string.IsNullOrWhiteSpace(ca.Stream))
+ break;
+ var account = ca.Client.ServiceAccount();
+ if (!streams.TryGetValue(account, out var accountStreams))
+ break;
+ if (!accountStreams.TryGetValue(ca.Stream, out var streamAssignment))
+ break;
+ streamAssignment.Consumers ??= new Dictionary(StringComparer.Ordinal);
+ streamAssignment.Consumers[ca.Name] = ca;
+ break;
+ }
+ case EntryOp.RemoveConsumerOp:
+ {
+ var ca = System.Text.Json.JsonSerializer.Deserialize(payload);
+ if (ca?.Client == null || string.IsNullOrWhiteSpace(ca.Stream))
+ break;
+ var account = ca.Client.ServiceAccount();
+ if (!streams.TryGetValue(account, out var accountStreams))
+ break;
+ if (!accountStreams.TryGetValue(ca.Stream, out var streamAssignment))
+ break;
+ streamAssignment.Consumers?.Remove(ca.Name);
+ break;
+ }
+ }
+ }
+ }
+
+ return null;
+ }
+ catch (Exception ex)
+ {
+ return ex;
+ }
+ }
+
+ internal void SetStreamAssignmentRecovering(StreamAssignment assignment)
+ {
+ assignment.Responded = true;
+ assignment.Recovering = true;
+ assignment.Restore = null;
+ if (assignment.Group != null)
+ {
+ assignment.Group.Preferred = string.Empty;
+ assignment.Group.ScaleUp = false;
+ }
+ }
+
+ internal void SetConsumerAssignmentRecovering(ConsumerAssignment assignment)
+ {
+ assignment.Responded = true;
+ assignment.Recovering = true;
+ if (assignment.Group != null)
+ {
+ assignment.Group.Preferred = string.Empty;
+ assignment.Group.ScaleUp = false;
+ }
+ }
+
+ internal void ProcessAddPeer(string peer)
+ {
+ _state.Lock.EnterWriteLock();
+ try
+ {
+ if (_state.Cluster is not JetStreamCluster cluster)
+ return;
+ if (!cluster.IsLeader())
+ return;
+
+ foreach (var accountStreams in cluster.Streams.Values)
+ {
+ foreach (var assignment in accountStreams.Values)
+ {
+ if (!assignment.MissingPeers())
+ continue;
+
+ var copy = assignment.CopyGroup();
+ copy.Group!.Peers = [.. copy.Group.Peers, peer];
+ assignment.Group = copy.Group;
+
+ if (assignment.Consumers == null)
+ continue;
+
+ foreach (var consumer in assignment.Consumers.Values)
+ {
+ if (consumer.Config?.Durable is { Length: > 0 } || (consumer.Group?.Peers.Length ?? 0) > 1)
+ consumer.Group!.Peers = assignment.Group.Peers;
+ }
+ }
+ }
+ }
+ finally
+ {
+ _state.Lock.ExitWriteLock();
+ }
+ }
+
+ internal void ProcessRemovePeer(string peer)
+ {
+ _state.Lock.EnterWriteLock();
+ try
+ {
+ if (_state.Cluster is not JetStreamCluster cluster)
+ return;
+
+ foreach (var accountStreams in cluster.Streams.Values)
+ {
+ foreach (var assignment in accountStreams.Values)
+ {
+ if (assignment.Group == null || !assignment.Group.IsMember(peer))
+ continue;
+
+ RemovePeerFromStreamLocked(assignment, peer);
+ }
+ }
+ }
+ finally
+ {
+ _state.Lock.ExitWriteLock();
+ }
+ }
+
+ internal bool RemovePeerFromStream(StreamAssignment assignment, string peer)
+ {
+ _state.Lock.EnterWriteLock();
+ try
+ {
+ return RemovePeerFromStreamLocked(assignment, peer);
+ }
+ finally
+ {
+ _state.Lock.ExitWriteLock();
+ }
+ }
+
+ internal bool RemovePeerFromStreamLocked(StreamAssignment assignment, string peer)
+ {
+ if (assignment.Group == null || !assignment.Group.IsMember(peer))
+ return false;
+
+ assignment.Group.Peers = [.. assignment.Group.Peers.Where(p => !string.Equals(p, peer, StringComparison.Ordinal))];
+ assignment.Group.Preferred = string.Empty;
+
+ if (assignment.Consumers == null)
+ return true;
+
+ foreach (var consumer in assignment.Consumers.Values)
+ {
+ if (consumer.Group == null)
+ continue;
+ consumer.Group.Peers = [.. consumer.Group.Peers.Where(p => !string.Equals(p, peer, StringComparison.Ordinal))];
+ consumer.Group.Preferred = string.Empty;
+ }
+
+ return true;
+ }
+
+ internal bool HasPeerEntries(IEnumerable entries)
+ {
+ foreach (var entry in entries)
+ {
+ if (entry.Type == EntryType.EntryAddPeer || entry.Type == EntryType.EntryRemovePeer)
+ return true;
+ }
+ return false;
+ }
}
internal sealed class StreamAssignmentView
diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupATests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupATests.Impltests.cs
new file mode 100644
index 0000000..2b7c040
--- /dev/null
+++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupATests.Impltests.cs
@@ -0,0 +1,175 @@
+using System.Reflection;
+using Shouldly;
+using ZB.MOM.NatsNet.Server;
+
+namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
+
+public sealed class JetStreamClusterStreamsGroupATests
+{
+ [Fact] // T:1578
+ public void MonitorCluster_Method_ShouldExist()
+ {
+ typeof(JetStreamEngine).GetMethod("MonitorCluster", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
+ }
+
+ [Fact] // T:1579
+ public void CheckClusterSize_Method_ShouldExist()
+ {
+ typeof(JetStreamEngine).GetMethod("CheckClusterSize", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
+ }
+
+ [Fact] // T:1580
+ public void ClusterStreamConfig_Method_ShouldExist()
+ {
+ typeof(JetStreamEngine).GetMethod("ClusterStreamConfig", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
+ }
+
+ [Fact] // T:1581
+ public void MetaSnapshot_Method_ShouldExist()
+ {
+ typeof(JetStreamEngine).GetMethod("MetaSnapshot", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
+ }
+
+ [Fact] // T:1582
+ public void ApplyMetaSnapshot_Method_ShouldExist()
+ {
+ typeof(JetStreamEngine).GetMethod("ApplyMetaSnapshot", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
+ }
+
+ [Fact] // T:1583
+ public void DecodeMetaSnapshot_Method_ShouldExist()
+ {
+ typeof(JetStreamEngine).GetMethod("DecodeMetaSnapshot", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
+ }
+
+ [Fact] // T:1584
+ public void EncodeMetaSnapshot_Method_ShouldExist()
+ {
+ typeof(JetStreamEngine).GetMethod("EncodeMetaSnapshot", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
+ }
+
+ [Fact] // T:1585
+ public void CollectStreamAndConsumerChanges_Method_ShouldExist()
+ {
+ typeof(JetStreamEngine).GetMethod("CollectStreamAndConsumerChanges", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
+ }
+
+ [Fact] // T:1586
+ public void SetStreamAssignmentRecovering_Method_ShouldExist()
+ {
+ typeof(JetStreamEngine).GetMethod("SetStreamAssignmentRecovering", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
+ }
+
+ [Fact] // T:1587
+ public void SetConsumerAssignmentRecovering_Method_ShouldExist()
+ {
+ typeof(JetStreamEngine).GetMethod("SetConsumerAssignmentRecovering", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
+ }
+
+ [Fact] // T:1588
+ public void CopyGroup_StreamAssignment_ShouldCloneGroupPeers()
+ {
+ var sa = new StreamAssignment
+ {
+ Group = new RaftGroup { Name = "RG", Peers = ["A", "B"] },
+ Config = new StreamConfig { Name = "S", Replicas = 3 },
+ };
+
+ var method = typeof(StreamAssignment).GetMethod("CopyGroup", BindingFlags.Instance | BindingFlags.NonPublic);
+ method.ShouldNotBeNull();
+ var copy = method!.Invoke(sa, []) as StreamAssignment;
+ copy.ShouldNotBeNull();
+ copy!.Group.ShouldNotBeNull();
+ ReferenceEquals(copy.Group, sa.Group).ShouldBeFalse();
+ copy.Group.Peers.SequenceEqual(sa.Group!.Peers).ShouldBeTrue();
+ }
+
+ [Fact] // T:1589
+ public void CopyGroup_ConsumerAssignment_ShouldCloneGroupPeers()
+ {
+ var ca = new ConsumerAssignment
+ {
+ Name = "C",
+ Stream = "S",
+ Group = new RaftGroup { Name = "RG", Peers = ["A", "B"] },
+ };
+
+ var method = typeof(ConsumerAssignment).GetMethod("CopyGroup", BindingFlags.Instance | BindingFlags.NonPublic);
+ method.ShouldNotBeNull();
+ var copy = method!.Invoke(ca, []) as ConsumerAssignment;
+ copy.ShouldNotBeNull();
+ copy!.Group.ShouldNotBeNull();
+ ReferenceEquals(copy.Group, ca.Group).ShouldBeFalse();
+ copy.Group.Peers.SequenceEqual(ca.Group!.Peers).ShouldBeTrue();
+ }
+
+ [Fact] // T:1590
+ public void MissingPeers_StreamAssignment_ShouldReflectReplicaGap()
+ {
+ var sa = new StreamAssignment
+ {
+ Config = new StreamConfig { Name = "S", Replicas = 3 },
+ Group = new RaftGroup { Peers = ["A", "B"] },
+ };
+ var method = typeof(StreamAssignment).GetMethod("MissingPeers", BindingFlags.Instance | BindingFlags.NonPublic);
+ method.ShouldNotBeNull();
+ ((bool)method!.Invoke(sa, [])!).ShouldBeTrue();
+ }
+
+ [Fact] // T:1591
+ public void ProcessAddPeer_Method_ShouldExist()
+ {
+ typeof(JetStreamEngine).GetMethod("ProcessAddPeer", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
+ }
+
+ [Fact] // T:1592
+ public void ProcessRemovePeer_Method_ShouldExist()
+ {
+ typeof(JetStreamEngine).GetMethod("ProcessRemovePeer", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
+ }
+
+ [Fact] // T:1593
+ public void RemovePeerFromStream_Method_ShouldExist()
+ {
+ typeof(JetStreamEngine).GetMethod("RemovePeerFromStream", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
+ }
+
+ [Fact] // T:1594
+ public void RemovePeerFromStreamLocked_Method_ShouldExist()
+ {
+ typeof(JetStreamEngine).GetMethod("RemovePeerFromStreamLocked", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
+ }
+
+ [Fact] // T:1595
+ public void HasPeerEntries_Method_ShouldExist()
+ {
+ typeof(JetStreamEngine).GetMethod("HasPeerEntries", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
+ }
+
+ [Fact] // T:1596
+ public void RecoveryKey_StreamAssignment_ShouldUseAccountAndName()
+ {
+ var sa = new StreamAssignment
+ {
+ Client = new ClientInfo { Account = "A", ServiceName = "SA" },
+ Config = new StreamConfig { Name = "ORDERS" },
+ };
+ var method = typeof(StreamAssignment).GetMethod("RecoveryKey", BindingFlags.Instance | BindingFlags.NonPublic);
+ method.ShouldNotBeNull();
+ method!.Invoke(sa, [])!.ToString().ShouldBe("SA:ORDERS");
+ }
+
+ [Fact] // T:1597
+ public void StreamRecoveryKey_ConsumerAssignment_ShouldUseAccountAndStream()
+ {
+ var ca = new ConsumerAssignment
+ {
+ Client = new ClientInfo { Account = "A", ServiceName = "SA" },
+ Stream = "ORDERS",
+ Name = "C1",
+ };
+ var method = typeof(ConsumerAssignment).GetMethod("StreamRecoveryKey", BindingFlags.Instance | BindingFlags.NonPublic);
+ method.ShouldNotBeNull();
+ method!.Invoke(ca, [])!.ToString().ShouldBe("SA:ORDERS");
+ }
+}
diff --git a/porting.db b/porting.db
index 20bfae6..ea4ceae 100644
Binary files a/porting.db and b/porting.db differ