diff --git a/src/NATS.Server/JetStream/Cluster/ClusterAssignmentTypes.cs b/src/NATS.Server/JetStream/Cluster/ClusterAssignmentTypes.cs
index 9650d83..b227cdc 100644
--- a/src/NATS.Server/JetStream/Cluster/ClusterAssignmentTypes.cs
+++ b/src/NATS.Server/JetStream/Cluster/ClusterAssignmentTypes.cs
@@ -14,6 +14,18 @@ public sealed class RaftGroup
public string Cluster { get; set; } = string.Empty;
public string Preferred { get; set; } = string.Empty;
+ ///
+ /// Optional desired replica count for this group.
+ /// When set, ProcessAddPeer uses this to detect under-replication.
+ /// Go reference: jetstream_cluster.go:2284 sa.missingPeers() — len(Peers) < Config.Replicas.
+ ///
+ public int DesiredReplicas { get; set; }
+
+ ///
+ /// True when has been explicitly set (non-zero).
+ ///
+ public bool HasDesiredReplicas => DesiredReplicas > 0;
+
public int QuorumSize => (Peers.Count / 2) + 1;
public bool HasQuorum(int ackCount) => ackCount >= QuorumSize;
}
diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/PeerManagementTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/PeerManagementTests.cs
new file mode 100644
index 0000000..ce570f9
--- /dev/null
+++ b/tests/NATS.Server.Tests/JetStream/Cluster/PeerManagementTests.cs
@@ -0,0 +1,433 @@
+// Go parity: golang/nats-server/server/jetstream_cluster.go:2290-2439
+// Covers: ProcessAddPeer, ProcessRemovePeer, RemovePeerFromStream, RemapStreamAssignment —
+// peer-driven stream reassignment in the JetStreamMetaGroup.
+using NATS.Server.JetStream.Cluster;
+using NATS.Server.JetStream.Models;
+
+namespace NATS.Server.Tests.JetStream.Cluster;
+
+///
+/// Tests for JetStreamMetaGroup peer management and stream reassignment.
+/// Go reference: jetstream_cluster.go:2290-2439 (processAddPeer, processRemovePeer,
+/// removePeerFromStreamLocked, remapStreamAssignment).
+///
+public class PeerManagementTests
+{
+ // ---------------------------------------------------------------
+ // ProcessAddPeer — peer registration
+ // Go reference: jetstream_cluster.go:2290 processAddPeer
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public void ProcessAddPeer_registers_new_peer()
+ {
+ // Go reference: jetstream_cluster.go:2290 processAddPeer — peer is tracked
+ var meta = new JetStreamMetaGroup(3);
+
+ meta.ProcessAddPeer("peer-1");
+
+ meta.GetKnownPeers().ShouldContain("peer-1");
+ }
+
+ [Fact]
+ public void ProcessAddPeer_registers_multiple_peers_independently()
+ {
+ // Go reference: jetstream_cluster.go:2290 — each peer is independently tracked
+ var meta = new JetStreamMetaGroup(3);
+
+ meta.ProcessAddPeer("peer-1");
+ meta.ProcessAddPeer("peer-2");
+ meta.ProcessAddPeer("peer-3");
+
+ var known = meta.GetKnownPeers();
+ known.Count.ShouldBe(3);
+ known.ShouldContain("peer-1");
+ known.ShouldContain("peer-2");
+ known.ShouldContain("peer-3");
+ }
+
+ [Fact]
+ public void ProcessAddPeer_duplicate_add_is_idempotent()
+ {
+ // AddKnownPeer uses a HashSet so duplicates do not inflate the count.
+ var meta = new JetStreamMetaGroup(3);
+
+ meta.ProcessAddPeer("peer-1");
+ meta.ProcessAddPeer("peer-1");
+
+ meta.GetKnownPeers().Count.ShouldBe(1);
+ }
+
+ // ---------------------------------------------------------------
+ // ProcessAddPeer — under-replication detection
+ // Go reference: jetstream_cluster.go:2311-2339 missingPeers + peer append
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public void ProcessAddPeer_triggers_rereplication_of_underreplicated_stream()
+ {
+ // Go reference: jetstream_cluster.go:2315 sa.missingPeers() — adds new peer to group
+ var meta = new JetStreamMetaGroup(3); // leader by default (selfIndex == leaderIndex == 1)
+
+ // Stream assigned with 2 peers but DesiredReplicas == 3 → under-replicated
+ var group = new RaftGroup
+ {
+ Name = "orders-rg",
+ Peers = ["peer-1", "peer-2"],
+ DesiredReplicas = 3,
+ };
+ var sa = new StreamAssignment { StreamName = "ORDERS", Group = group };
+ meta.AddStreamAssignment(sa);
+
+ meta.ProcessAddPeer("peer-3");
+
+ var updated = meta.GetStreamAssignment("ORDERS")!;
+ updated.Group.Peers.ShouldContain("peer-3");
+ updated.Group.Peers.Count.ShouldBe(3);
+ }
+
+ [Fact]
+ public void ProcessAddPeer_does_not_add_peer_to_fully_replicated_stream()
+ {
+ // Go reference: jetstream_cluster.go:2315 missingPeers() returns false when at desired count
+ var meta = new JetStreamMetaGroup(3);
+
+ var group = new RaftGroup
+ {
+ Name = "events-rg",
+ Peers = ["peer-1", "peer-2", "peer-3"],
+ DesiredReplicas = 3,
+ };
+ var sa = new StreamAssignment { StreamName = "EVENTS", Group = group };
+ meta.AddStreamAssignment(sa);
+
+ meta.ProcessAddPeer("peer-4");
+
+ var updated = meta.GetStreamAssignment("EVENTS")!;
+ updated.Group.Peers.Count.ShouldBe(3);
+ updated.Group.Peers.ShouldNotContain("peer-4");
+ }
+
+ [Fact]
+ public void ProcessAddPeer_does_not_add_peer_already_in_group()
+ {
+ // Peer already a member — should not be added twice.
+ var meta = new JetStreamMetaGroup(3);
+
+ var group = new RaftGroup
+ {
+ Name = "logs-rg",
+ Peers = ["peer-1"],
+ DesiredReplicas = 2,
+ };
+ var sa = new StreamAssignment { StreamName = "LOGS", Group = group };
+ meta.AddStreamAssignment(sa);
+
+ meta.ProcessAddPeer("peer-1");
+
+ var updated = meta.GetStreamAssignment("LOGS")!;
+ updated.Group.Peers.Count.ShouldBe(1);
+ }
+
+ [Fact]
+ public void ProcessAddPeer_non_leader_does_not_modify_assignments()
+ {
+ // Go reference: jetstream_cluster.go:2301 — only leader triggers re-assignment
+ var meta = new JetStreamMetaGroup(3, selfIndex: 2); // not leader
+
+ var group = new RaftGroup
+ {
+ Name = "rg",
+ Peers = ["peer-1"],
+ DesiredReplicas = 3,
+ };
+ var sa = new StreamAssignment { StreamName = "S", Group = group };
+ meta.AddStreamAssignment(sa);
+
+ meta.ProcessAddPeer("peer-2");
+
+ // Peer is registered but stream is not modified since not leader.
+ meta.GetKnownPeers().ShouldContain("peer-2");
+ meta.GetStreamAssignment("S")!.Group.Peers.Count.ShouldBe(1);
+ }
+
+ // ---------------------------------------------------------------
+ // ProcessRemovePeer — stream reassignment
+ // Go reference: jetstream_cluster.go:2342 processRemovePeer
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public void ProcessRemovePeer_reassigns_streams_away_from_peer()
+ {
+ // Go reference: jetstream_cluster.go:2385-2392 — streams with removed peer get remapped
+ var meta = new JetStreamMetaGroup(3);
+
+ // Register three peers
+ meta.AddKnownPeer("peer-1");
+ meta.AddKnownPeer("peer-2");
+ meta.AddKnownPeer("peer-3");
+
+ var group = new RaftGroup
+ {
+ Name = "rg",
+ Peers = ["peer-1", "peer-2"],
+ };
+ var sa = new StreamAssignment { StreamName = "ORDERS", Group = group };
+ meta.AddStreamAssignment(sa);
+
+ meta.ProcessRemovePeer("peer-1");
+
+ var updated = meta.GetStreamAssignment("ORDERS")!;
+ updated.Group.Peers.ShouldNotContain("peer-1");
+ }
+
+ [Fact]
+ public void ProcessRemovePeer_removes_peer_from_known_peers()
+ {
+ // Go reference: jetstream_cluster.go:2342 — peer is de-registered
+ var meta = new JetStreamMetaGroup(3);
+ meta.AddKnownPeer("peer-1");
+
+ meta.ProcessRemovePeer("peer-1");
+
+ meta.GetKnownPeers().ShouldNotContain("peer-1");
+ }
+
+ [Fact]
+ public void ProcessRemovePeer_unknown_peer_is_noop()
+ {
+ // Go reference: jetstream_cluster.go:2342 — no crash when peer not known
+ var meta = new JetStreamMetaGroup(3);
+ var group = new RaftGroup { Name = "rg", Peers = ["peer-2", "peer-3"] };
+ var sa = new StreamAssignment { StreamName = "S", Group = group };
+ meta.AddStreamAssignment(sa);
+
+ // Should not throw
+ meta.ProcessRemovePeer("peer-99");
+
+ // Stream unaffected
+ meta.GetStreamAssignment("S")!.Group.Peers.ShouldContain("peer-2");
+ meta.GetStreamAssignment("S")!.Group.Peers.ShouldContain("peer-3");
+ }
+
+ [Fact]
+ public void ProcessRemovePeer_non_leader_only_deregisters_peer()
+ {
+ // Go reference: jetstream_cluster.go:2378 — non-leader skips re-assignment
+ var meta = new JetStreamMetaGroup(3, selfIndex: 2);
+ meta.AddKnownPeer("peer-1");
+ meta.AddKnownPeer("peer-2");
+
+ var group = new RaftGroup { Name = "rg", Peers = ["peer-1", "peer-2"] };
+ var sa = new StreamAssignment { StreamName = "S", Group = group };
+ meta.AddStreamAssignment(sa);
+
+ meta.ProcessRemovePeer("peer-1");
+
+ // Peer removed from known set
+ meta.GetKnownPeers().ShouldNotContain("peer-1");
+
+ // Stream assignments are NOT modified by a non-leader
+ meta.GetStreamAssignment("S")!.Group.Peers.ShouldContain("peer-1");
+ }
+
+ // ---------------------------------------------------------------
+ // RemovePeerFromStream
+ // Go reference: jetstream_cluster.go:2403 removePeerFromStreamLocked
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public void RemovePeerFromStream_removes_peer_from_group()
+ {
+ // Go reference: jetstream_cluster.go:2404 — peer is removed from stream group
+ var meta = new JetStreamMetaGroup(3);
+ meta.AddKnownPeer("peer-1");
+ meta.AddKnownPeer("peer-2");
+ meta.AddKnownPeer("peer-3");
+
+ var group = new RaftGroup { Name = "rg", Peers = ["peer-1", "peer-2", "peer-3"] };
+ var sa = new StreamAssignment { StreamName = "EVENTS", Group = group };
+ meta.AddStreamAssignment(sa);
+
+ meta.RemovePeerFromStream("EVENTS", "peer-2");
+
+ var updated = meta.GetStreamAssignment("EVENTS")!;
+ updated.Group.Peers.ShouldNotContain("peer-2");
+ }
+
+ [Fact]
+ public void RemovePeerFromStream_returns_false_for_nonexistent_stream()
+ {
+ // RemovePeerFromStream silently returns false when stream not found.
+ var meta = new JetStreamMetaGroup(3);
+
+ var result = meta.RemovePeerFromStream("GHOST", "peer-1");
+
+ result.ShouldBeFalse();
+ }
+
+ [Fact]
+ public void RemovePeerFromStream_returns_false_when_peer_not_in_group()
+ {
+ // Peer not a member of the stream's group.
+ var meta = new JetStreamMetaGroup(3);
+
+ var group = new RaftGroup { Name = "rg", Peers = ["peer-1", "peer-2"] };
+ var sa = new StreamAssignment { StreamName = "S", Group = group };
+ meta.AddStreamAssignment(sa);
+
+ var result = meta.RemovePeerFromStream("S", "peer-99");
+
+ result.ShouldBeFalse();
+ }
+
+ [Fact]
+ public void RemovePeerFromStream_replaces_peer_when_replacement_available()
+ {
+ // Go reference: jetstream_cluster.go:7088-7094 — replacement peer picked from available pool
+ var meta = new JetStreamMetaGroup(3);
+ meta.AddKnownPeer("peer-1");
+ meta.AddKnownPeer("peer-2");
+ meta.AddKnownPeer("peer-3"); // replacement candidate
+
+ var group = new RaftGroup { Name = "rg", Peers = ["peer-1", "peer-2"] };
+ var sa = new StreamAssignment { StreamName = "ORDERS", Group = group };
+ meta.AddStreamAssignment(sa);
+
+ var result = meta.RemovePeerFromStream("ORDERS", "peer-1");
+
+ result.ShouldBeTrue();
+ var updated = meta.GetStreamAssignment("ORDERS")!;
+ updated.Group.Peers.ShouldNotContain("peer-1");
+ updated.Group.Peers.Count.ShouldBe(2);
+ updated.Group.Peers.ShouldContain("peer-3");
+ }
+
+ [Fact]
+ public void RemovePeerFromStream_shrinks_group_when_no_replacement_available()
+ {
+ // Go reference: jetstream_cluster.go:7102-7110 — R>1 bare removal fallback
+ var meta = new JetStreamMetaGroup(3);
+ // Only peer-1 and peer-2 are known; peer-1 is in the group; no replacement
+ meta.AddKnownPeer("peer-1");
+ meta.AddKnownPeer("peer-2");
+
+ var group = new RaftGroup { Name = "rg", Peers = ["peer-1", "peer-2"] };
+ var sa = new StreamAssignment { StreamName = "LOGS", Group = group };
+ meta.AddStreamAssignment(sa);
+
+ var result = meta.RemovePeerFromStream("LOGS", "peer-1");
+
+ // No replacement found → group shrinks
+ result.ShouldBeFalse();
+ var updated = meta.GetStreamAssignment("LOGS")!;
+ updated.Group.Peers.ShouldNotContain("peer-1");
+ updated.Group.Peers.Count.ShouldBe(1);
+ }
+
+ // ---------------------------------------------------------------
+ // RemapStreamAssignment
+ // Go reference: jetstream_cluster.go:7077 remapStreamAssignment
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public void RemapStreamAssignment_selects_new_peers()
+ {
+ // Go reference: jetstream_cluster.go:7077 — retain existing minus removed, add candidate
+ var meta = new JetStreamMetaGroup(3);
+
+ var group = new RaftGroup { Name = "rg", Peers = ["peer-1", "peer-2", "peer-3"] };
+ var sa = new StreamAssignment { StreamName = "EVENTS", Group = group };
+ meta.AddStreamAssignment(sa);
+
+ var available = new List { "peer-1", "peer-2", "peer-3", "peer-4" };
+ var result = meta.RemapStreamAssignment(sa, available, removePeer: "peer-3");
+
+ result.ShouldBeTrue();
+ var updated = meta.GetStreamAssignment("EVENTS")!;
+ updated.Group.Peers.ShouldNotContain("peer-3");
+ updated.Group.Peers.Count.ShouldBe(3);
+ updated.Group.Peers.ShouldContain("peer-4");
+ }
+
+ [Fact]
+ public void RemapStreamAssignment_retains_existing_peers()
+ {
+ // Retained peers (not removed) remain in the new assignment.
+ var meta = new JetStreamMetaGroup(3);
+
+ var group = new RaftGroup { Name = "rg", Peers = ["peer-1", "peer-2", "peer-3"] };
+ var sa = new StreamAssignment { StreamName = "S", Group = group };
+ meta.AddStreamAssignment(sa);
+
+ var available = new List { "peer-1", "peer-2", "peer-3", "peer-4" };
+ meta.RemapStreamAssignment(sa, available, removePeer: "peer-1");
+
+ var updated = meta.GetStreamAssignment("S")!;
+ updated.Group.Peers.ShouldContain("peer-2");
+ updated.Group.Peers.ShouldContain("peer-3");
+ }
+
+ [Fact]
+ public void RemapStreamAssignment_returns_false_when_no_replacement()
+ {
+ // Go reference: jetstream_cluster.go:7098-7110 — no placement, R1 returns false
+ var meta = new JetStreamMetaGroup(3);
+
+ var group = new RaftGroup { Name = "rg", Peers = ["peer-1"] };
+ var sa = new StreamAssignment { StreamName = "R1", Group = group };
+ meta.AddStreamAssignment(sa);
+
+ var available = new List { "peer-1" };
+ var result = meta.RemapStreamAssignment(sa, available, removePeer: "peer-1");
+
+ // Only peer-1 available and it is the removed one → nothing to add
+ result.ShouldBeFalse();
+ }
+
+ [Fact]
+ public void RemapStreamAssignment_empty_available_shrinks_group()
+ {
+ // When the available-peer list is empty, the group simply loses the removed peer.
+ var meta = new JetStreamMetaGroup(3);
+
+ var group = new RaftGroup { Name = "rg", Peers = ["peer-1", "peer-2"] };
+ var sa = new StreamAssignment { StreamName = "S", Group = group };
+ meta.AddStreamAssignment(sa);
+
+ var result = meta.RemapStreamAssignment(sa, [], removePeer: "peer-1");
+
+ result.ShouldBeFalse();
+ meta.GetStreamAssignment("S")!.Group.Peers.ShouldNotContain("peer-1");
+ }
+
+ // ---------------------------------------------------------------
+ // AddKnownPeer / RemoveKnownPeer
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public void AddKnownPeer_and_RemoveKnownPeer_are_consistent()
+ {
+ var meta = new JetStreamMetaGroup(3);
+
+ meta.AddKnownPeer("p1");
+ meta.AddKnownPeer("p2");
+ meta.RemoveKnownPeer("p1");
+
+ var known = meta.GetKnownPeers();
+ known.ShouldNotContain("p1");
+ known.ShouldContain("p2");
+ }
+
+ [Fact]
+ public void RemoveKnownPeer_unknown_peer_is_noop()
+ {
+ var meta = new JetStreamMetaGroup(3);
+ meta.AddKnownPeer("p1");
+
+ // Should not throw
+ meta.RemoveKnownPeer("p99");
+
+ meta.GetKnownPeers().Count.ShouldBe(1);
+ }
+}