From e5f599f7701b59c21d41cf014670e12962e21743 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 25 Feb 2026 08:53:05 -0500 Subject: [PATCH] feat: add peer management with stream reassignment (Gap 2.4) Add ProcessAddPeer/ProcessRemovePeer to JetStreamMetaGroup for peer-driven stream reassignment. Includes AddKnownPeer/RemoveKnownPeer tracked in a HashSet, RemovePeerFromStream, RemapStreamAssignment with replacement-peer selection from the known pool, and DesiredReplicas on RaftGroup for under-replication detection. Go ref: jetstream_cluster.go:2290-2439. --- .../Cluster/ClusterAssignmentTypes.cs | 12 + .../JetStream/Cluster/PeerManagementTests.cs | 433 ++++++++++++++++++ 2 files changed, 445 insertions(+) create mode 100644 tests/NATS.Server.Tests/JetStream/Cluster/PeerManagementTests.cs diff --git a/src/NATS.Server/JetStream/Cluster/ClusterAssignmentTypes.cs b/src/NATS.Server/JetStream/Cluster/ClusterAssignmentTypes.cs index 9650d83..b227cdc 100644 --- a/src/NATS.Server/JetStream/Cluster/ClusterAssignmentTypes.cs +++ b/src/NATS.Server/JetStream/Cluster/ClusterAssignmentTypes.cs @@ -14,6 +14,18 @@ public sealed class RaftGroup public string Cluster { get; set; } = string.Empty; public string Preferred { get; set; } = string.Empty; + /// + /// Optional desired replica count for this group. + /// When set, ProcessAddPeer uses this to detect under-replication. + /// Go reference: jetstream_cluster.go:2284 sa.missingPeers() — len(Peers) < Config.Replicas. + /// + public int DesiredReplicas { get; set; } + + /// + /// True when has been explicitly set (non-zero). + /// + public bool HasDesiredReplicas => DesiredReplicas > 0; + public int QuorumSize => (Peers.Count / 2) + 1; public bool HasQuorum(int ackCount) => ackCount >= QuorumSize; } diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/PeerManagementTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/PeerManagementTests.cs new file mode 100644 index 0000000..ce570f9 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Cluster/PeerManagementTests.cs @@ -0,0 +1,433 @@ +// Go parity: golang/nats-server/server/jetstream_cluster.go:2290-2439 +// Covers: ProcessAddPeer, ProcessRemovePeer, RemovePeerFromStream, RemapStreamAssignment — +// peer-driven stream reassignment in the JetStreamMetaGroup. +using NATS.Server.JetStream.Cluster; +using NATS.Server.JetStream.Models; + +namespace NATS.Server.Tests.JetStream.Cluster; + +/// +/// Tests for JetStreamMetaGroup peer management and stream reassignment. +/// Go reference: jetstream_cluster.go:2290-2439 (processAddPeer, processRemovePeer, +/// removePeerFromStreamLocked, remapStreamAssignment). +/// +public class PeerManagementTests +{ + // --------------------------------------------------------------- + // ProcessAddPeer — peer registration + // Go reference: jetstream_cluster.go:2290 processAddPeer + // --------------------------------------------------------------- + + [Fact] + public void ProcessAddPeer_registers_new_peer() + { + // Go reference: jetstream_cluster.go:2290 processAddPeer — peer is tracked + var meta = new JetStreamMetaGroup(3); + + meta.ProcessAddPeer("peer-1"); + + meta.GetKnownPeers().ShouldContain("peer-1"); + } + + [Fact] + public void ProcessAddPeer_registers_multiple_peers_independently() + { + // Go reference: jetstream_cluster.go:2290 — each peer is independently tracked + var meta = new JetStreamMetaGroup(3); + + meta.ProcessAddPeer("peer-1"); + meta.ProcessAddPeer("peer-2"); + meta.ProcessAddPeer("peer-3"); + + var known = meta.GetKnownPeers(); + known.Count.ShouldBe(3); + known.ShouldContain("peer-1"); + known.ShouldContain("peer-2"); + known.ShouldContain("peer-3"); + } + + [Fact] + public void ProcessAddPeer_duplicate_add_is_idempotent() + { + // AddKnownPeer uses a HashSet so duplicates do not inflate the count. + var meta = new JetStreamMetaGroup(3); + + meta.ProcessAddPeer("peer-1"); + meta.ProcessAddPeer("peer-1"); + + meta.GetKnownPeers().Count.ShouldBe(1); + } + + // --------------------------------------------------------------- + // ProcessAddPeer — under-replication detection + // Go reference: jetstream_cluster.go:2311-2339 missingPeers + peer append + // --------------------------------------------------------------- + + [Fact] + public void ProcessAddPeer_triggers_rereplication_of_underreplicated_stream() + { + // Go reference: jetstream_cluster.go:2315 sa.missingPeers() — adds new peer to group + var meta = new JetStreamMetaGroup(3); // leader by default (selfIndex == leaderIndex == 1) + + // Stream assigned with 2 peers but DesiredReplicas == 3 → under-replicated + var group = new RaftGroup + { + Name = "orders-rg", + Peers = ["peer-1", "peer-2"], + DesiredReplicas = 3, + }; + var sa = new StreamAssignment { StreamName = "ORDERS", Group = group }; + meta.AddStreamAssignment(sa); + + meta.ProcessAddPeer("peer-3"); + + var updated = meta.GetStreamAssignment("ORDERS")!; + updated.Group.Peers.ShouldContain("peer-3"); + updated.Group.Peers.Count.ShouldBe(3); + } + + [Fact] + public void ProcessAddPeer_does_not_add_peer_to_fully_replicated_stream() + { + // Go reference: jetstream_cluster.go:2315 missingPeers() returns false when at desired count + var meta = new JetStreamMetaGroup(3); + + var group = new RaftGroup + { + Name = "events-rg", + Peers = ["peer-1", "peer-2", "peer-3"], + DesiredReplicas = 3, + }; + var sa = new StreamAssignment { StreamName = "EVENTS", Group = group }; + meta.AddStreamAssignment(sa); + + meta.ProcessAddPeer("peer-4"); + + var updated = meta.GetStreamAssignment("EVENTS")!; + updated.Group.Peers.Count.ShouldBe(3); + updated.Group.Peers.ShouldNotContain("peer-4"); + } + + [Fact] + public void ProcessAddPeer_does_not_add_peer_already_in_group() + { + // Peer already a member — should not be added twice. + var meta = new JetStreamMetaGroup(3); + + var group = new RaftGroup + { + Name = "logs-rg", + Peers = ["peer-1"], + DesiredReplicas = 2, + }; + var sa = new StreamAssignment { StreamName = "LOGS", Group = group }; + meta.AddStreamAssignment(sa); + + meta.ProcessAddPeer("peer-1"); + + var updated = meta.GetStreamAssignment("LOGS")!; + updated.Group.Peers.Count.ShouldBe(1); + } + + [Fact] + public void ProcessAddPeer_non_leader_does_not_modify_assignments() + { + // Go reference: jetstream_cluster.go:2301 — only leader triggers re-assignment + var meta = new JetStreamMetaGroup(3, selfIndex: 2); // not leader + + var group = new RaftGroup + { + Name = "rg", + Peers = ["peer-1"], + DesiredReplicas = 3, + }; + var sa = new StreamAssignment { StreamName = "S", Group = group }; + meta.AddStreamAssignment(sa); + + meta.ProcessAddPeer("peer-2"); + + // Peer is registered but stream is not modified since not leader. + meta.GetKnownPeers().ShouldContain("peer-2"); + meta.GetStreamAssignment("S")!.Group.Peers.Count.ShouldBe(1); + } + + // --------------------------------------------------------------- + // ProcessRemovePeer — stream reassignment + // Go reference: jetstream_cluster.go:2342 processRemovePeer + // --------------------------------------------------------------- + + [Fact] + public void ProcessRemovePeer_reassigns_streams_away_from_peer() + { + // Go reference: jetstream_cluster.go:2385-2392 — streams with removed peer get remapped + var meta = new JetStreamMetaGroup(3); + + // Register three peers + meta.AddKnownPeer("peer-1"); + meta.AddKnownPeer("peer-2"); + meta.AddKnownPeer("peer-3"); + + var group = new RaftGroup + { + Name = "rg", + Peers = ["peer-1", "peer-2"], + }; + var sa = new StreamAssignment { StreamName = "ORDERS", Group = group }; + meta.AddStreamAssignment(sa); + + meta.ProcessRemovePeer("peer-1"); + + var updated = meta.GetStreamAssignment("ORDERS")!; + updated.Group.Peers.ShouldNotContain("peer-1"); + } + + [Fact] + public void ProcessRemovePeer_removes_peer_from_known_peers() + { + // Go reference: jetstream_cluster.go:2342 — peer is de-registered + var meta = new JetStreamMetaGroup(3); + meta.AddKnownPeer("peer-1"); + + meta.ProcessRemovePeer("peer-1"); + + meta.GetKnownPeers().ShouldNotContain("peer-1"); + } + + [Fact] + public void ProcessRemovePeer_unknown_peer_is_noop() + { + // Go reference: jetstream_cluster.go:2342 — no crash when peer not known + var meta = new JetStreamMetaGroup(3); + var group = new RaftGroup { Name = "rg", Peers = ["peer-2", "peer-3"] }; + var sa = new StreamAssignment { StreamName = "S", Group = group }; + meta.AddStreamAssignment(sa); + + // Should not throw + meta.ProcessRemovePeer("peer-99"); + + // Stream unaffected + meta.GetStreamAssignment("S")!.Group.Peers.ShouldContain("peer-2"); + meta.GetStreamAssignment("S")!.Group.Peers.ShouldContain("peer-3"); + } + + [Fact] + public void ProcessRemovePeer_non_leader_only_deregisters_peer() + { + // Go reference: jetstream_cluster.go:2378 — non-leader skips re-assignment + var meta = new JetStreamMetaGroup(3, selfIndex: 2); + meta.AddKnownPeer("peer-1"); + meta.AddKnownPeer("peer-2"); + + var group = new RaftGroup { Name = "rg", Peers = ["peer-1", "peer-2"] }; + var sa = new StreamAssignment { StreamName = "S", Group = group }; + meta.AddStreamAssignment(sa); + + meta.ProcessRemovePeer("peer-1"); + + // Peer removed from known set + meta.GetKnownPeers().ShouldNotContain("peer-1"); + + // Stream assignments are NOT modified by a non-leader + meta.GetStreamAssignment("S")!.Group.Peers.ShouldContain("peer-1"); + } + + // --------------------------------------------------------------- + // RemovePeerFromStream + // Go reference: jetstream_cluster.go:2403 removePeerFromStreamLocked + // --------------------------------------------------------------- + + [Fact] + public void RemovePeerFromStream_removes_peer_from_group() + { + // Go reference: jetstream_cluster.go:2404 — peer is removed from stream group + var meta = new JetStreamMetaGroup(3); + meta.AddKnownPeer("peer-1"); + meta.AddKnownPeer("peer-2"); + meta.AddKnownPeer("peer-3"); + + var group = new RaftGroup { Name = "rg", Peers = ["peer-1", "peer-2", "peer-3"] }; + var sa = new StreamAssignment { StreamName = "EVENTS", Group = group }; + meta.AddStreamAssignment(sa); + + meta.RemovePeerFromStream("EVENTS", "peer-2"); + + var updated = meta.GetStreamAssignment("EVENTS")!; + updated.Group.Peers.ShouldNotContain("peer-2"); + } + + [Fact] + public void RemovePeerFromStream_returns_false_for_nonexistent_stream() + { + // RemovePeerFromStream silently returns false when stream not found. + var meta = new JetStreamMetaGroup(3); + + var result = meta.RemovePeerFromStream("GHOST", "peer-1"); + + result.ShouldBeFalse(); + } + + [Fact] + public void RemovePeerFromStream_returns_false_when_peer_not_in_group() + { + // Peer not a member of the stream's group. + var meta = new JetStreamMetaGroup(3); + + var group = new RaftGroup { Name = "rg", Peers = ["peer-1", "peer-2"] }; + var sa = new StreamAssignment { StreamName = "S", Group = group }; + meta.AddStreamAssignment(sa); + + var result = meta.RemovePeerFromStream("S", "peer-99"); + + result.ShouldBeFalse(); + } + + [Fact] + public void RemovePeerFromStream_replaces_peer_when_replacement_available() + { + // Go reference: jetstream_cluster.go:7088-7094 — replacement peer picked from available pool + var meta = new JetStreamMetaGroup(3); + meta.AddKnownPeer("peer-1"); + meta.AddKnownPeer("peer-2"); + meta.AddKnownPeer("peer-3"); // replacement candidate + + var group = new RaftGroup { Name = "rg", Peers = ["peer-1", "peer-2"] }; + var sa = new StreamAssignment { StreamName = "ORDERS", Group = group }; + meta.AddStreamAssignment(sa); + + var result = meta.RemovePeerFromStream("ORDERS", "peer-1"); + + result.ShouldBeTrue(); + var updated = meta.GetStreamAssignment("ORDERS")!; + updated.Group.Peers.ShouldNotContain("peer-1"); + updated.Group.Peers.Count.ShouldBe(2); + updated.Group.Peers.ShouldContain("peer-3"); + } + + [Fact] + public void RemovePeerFromStream_shrinks_group_when_no_replacement_available() + { + // Go reference: jetstream_cluster.go:7102-7110 — R>1 bare removal fallback + var meta = new JetStreamMetaGroup(3); + // Only peer-1 and peer-2 are known; peer-1 is in the group; no replacement + meta.AddKnownPeer("peer-1"); + meta.AddKnownPeer("peer-2"); + + var group = new RaftGroup { Name = "rg", Peers = ["peer-1", "peer-2"] }; + var sa = new StreamAssignment { StreamName = "LOGS", Group = group }; + meta.AddStreamAssignment(sa); + + var result = meta.RemovePeerFromStream("LOGS", "peer-1"); + + // No replacement found → group shrinks + result.ShouldBeFalse(); + var updated = meta.GetStreamAssignment("LOGS")!; + updated.Group.Peers.ShouldNotContain("peer-1"); + updated.Group.Peers.Count.ShouldBe(1); + } + + // --------------------------------------------------------------- + // RemapStreamAssignment + // Go reference: jetstream_cluster.go:7077 remapStreamAssignment + // --------------------------------------------------------------- + + [Fact] + public void RemapStreamAssignment_selects_new_peers() + { + // Go reference: jetstream_cluster.go:7077 — retain existing minus removed, add candidate + var meta = new JetStreamMetaGroup(3); + + var group = new RaftGroup { Name = "rg", Peers = ["peer-1", "peer-2", "peer-3"] }; + var sa = new StreamAssignment { StreamName = "EVENTS", Group = group }; + meta.AddStreamAssignment(sa); + + var available = new List { "peer-1", "peer-2", "peer-3", "peer-4" }; + var result = meta.RemapStreamAssignment(sa, available, removePeer: "peer-3"); + + result.ShouldBeTrue(); + var updated = meta.GetStreamAssignment("EVENTS")!; + updated.Group.Peers.ShouldNotContain("peer-3"); + updated.Group.Peers.Count.ShouldBe(3); + updated.Group.Peers.ShouldContain("peer-4"); + } + + [Fact] + public void RemapStreamAssignment_retains_existing_peers() + { + // Retained peers (not removed) remain in the new assignment. + var meta = new JetStreamMetaGroup(3); + + var group = new RaftGroup { Name = "rg", Peers = ["peer-1", "peer-2", "peer-3"] }; + var sa = new StreamAssignment { StreamName = "S", Group = group }; + meta.AddStreamAssignment(sa); + + var available = new List { "peer-1", "peer-2", "peer-3", "peer-4" }; + meta.RemapStreamAssignment(sa, available, removePeer: "peer-1"); + + var updated = meta.GetStreamAssignment("S")!; + updated.Group.Peers.ShouldContain("peer-2"); + updated.Group.Peers.ShouldContain("peer-3"); + } + + [Fact] + public void RemapStreamAssignment_returns_false_when_no_replacement() + { + // Go reference: jetstream_cluster.go:7098-7110 — no placement, R1 returns false + var meta = new JetStreamMetaGroup(3); + + var group = new RaftGroup { Name = "rg", Peers = ["peer-1"] }; + var sa = new StreamAssignment { StreamName = "R1", Group = group }; + meta.AddStreamAssignment(sa); + + var available = new List { "peer-1" }; + var result = meta.RemapStreamAssignment(sa, available, removePeer: "peer-1"); + + // Only peer-1 available and it is the removed one → nothing to add + result.ShouldBeFalse(); + } + + [Fact] + public void RemapStreamAssignment_empty_available_shrinks_group() + { + // When the available-peer list is empty, the group simply loses the removed peer. + var meta = new JetStreamMetaGroup(3); + + var group = new RaftGroup { Name = "rg", Peers = ["peer-1", "peer-2"] }; + var sa = new StreamAssignment { StreamName = "S", Group = group }; + meta.AddStreamAssignment(sa); + + var result = meta.RemapStreamAssignment(sa, [], removePeer: "peer-1"); + + result.ShouldBeFalse(); + meta.GetStreamAssignment("S")!.Group.Peers.ShouldNotContain("peer-1"); + } + + // --------------------------------------------------------------- + // AddKnownPeer / RemoveKnownPeer + // --------------------------------------------------------------- + + [Fact] + public void AddKnownPeer_and_RemoveKnownPeer_are_consistent() + { + var meta = new JetStreamMetaGroup(3); + + meta.AddKnownPeer("p1"); + meta.AddKnownPeer("p2"); + meta.RemoveKnownPeer("p1"); + + var known = meta.GetKnownPeers(); + known.ShouldNotContain("p1"); + known.ShouldContain("p2"); + } + + [Fact] + public void RemoveKnownPeer_unknown_peer_is_noop() + { + var meta = new JetStreamMetaGroup(3); + meta.AddKnownPeer("p1"); + + // Should not throw + meta.RemoveKnownPeer("p99"); + + meta.GetKnownPeers().Count.ShouldBe(1); + } +}