diff --git a/src/NATS.Server/Raft/RaftNode.cs b/src/NATS.Server/Raft/RaftNode.cs index 946c2b1..e650b9b 100644 --- a/src/NATS.Server/Raft/RaftNode.cs +++ b/src/NATS.Server/Raft/RaftNode.cs @@ -23,6 +23,13 @@ public sealed class RaftNode : IDisposable // Go reference: raft.go:961-1019 (proposeAddPeer / proposeRemovePeer, single-change invariant) private long _membershipChangeIndex; + // Joint consensus (two-phase membership change) per Raft paper Section 4. + // During the joint phase both the old config (Cold) and new config (Cnew) are stored. + // A quorum decision requires majority from BOTH configurations simultaneously. + // Go reference: raft.go joint consensus / two-phase membership transitions. + private HashSet? _jointOldMembers; + private HashSet? _jointNewMembers; + // Pre-vote: Go NATS server does not implement pre-vote (RFC 5849 §9.6). Skipped for parity. public string Id { get; } @@ -54,6 +61,19 @@ public sealed class RaftNode : IDisposable // Go reference: raft.go:961-1019 single-change invariant. public bool MembershipChangeInProgress => Interlocked.Read(ref _membershipChangeIndex) > 0; + /// + /// True when this node is in the joint consensus phase (transitioning between + /// two membership configurations). + /// Go reference: raft.go joint consensus / two-phase membership transitions. + /// + public bool InJointConsensus => _jointNewMembers != null; + + /// + /// The new (Cnew) member set stored during a joint configuration transition, + /// or null when not in joint consensus. Exposed for testing. + /// + public IReadOnlyCollection? JointNewMembers => _jointNewMembers; + public RaftNode(string id, IRaftTransport? transport = null, string? persistDirectory = null) { Id = id; @@ -273,6 +293,62 @@ public sealed class RaftNode : IDisposable return entry.Index; } + // Joint consensus (Raft paper Section 4) — two-phase membership transitions. + // Go reference: raft.go joint consensus / two-phase membership transitions. + + /// + /// Enters the joint consensus phase with the given old and new configurations. + /// During this phase quorum decisions require majority from BOTH configurations. + /// The active member set is set to the union of Cold and Cnew so that entries + /// are replicated to all nodes that participate in either configuration. + /// Go reference: raft.go Section 4 (joint consensus). + /// + public void BeginJointConsensus(IReadOnlyCollection cold, IReadOnlyCollection cnew) + { + _jointOldMembers = new HashSet(cold, StringComparer.Ordinal); + _jointNewMembers = new HashSet(cnew, StringComparer.Ordinal); + // The active member set is the union of both configs + foreach (var member in cnew) + _members.Add(member); + } + + /// + /// Commits the joint configuration by finalizing Cnew as the active member set. + /// Clears both Cold and Cnew, leaving only the new configuration. + /// Call this once the Cnew log entry has reached quorum in both configs. + /// Go reference: raft.go joint consensus commit. + /// + public void CommitJointConsensus() + { + if (_jointNewMembers == null) + return; + + _members.Clear(); + foreach (var m in _jointNewMembers) + _members.Add(m); + + _jointOldMembers = null; + _jointNewMembers = null; + } + + /// + /// During joint consensus, checks whether a set of acknowledging voters satisfies + /// a majority in BOTH the old configuration (Cold) and the new configuration (Cnew). + /// Returns false when not in joint consensus. + /// Go reference: raft.go Section 4 — joint config quorum calculation. + /// + public bool CalculateJointQuorum( + IReadOnlyCollection coldVoters, + IReadOnlyCollection cnewVoters) + { + if (_jointOldMembers == null || _jointNewMembers == null) + return false; + + var oldQuorum = (_jointOldMembers.Count / 2) + 1; + var newQuorum = (_jointNewMembers.Count / 2) + 1; + return coldVoters.Count >= oldQuorum && cnewVoters.Count >= newQuorum; + } + // B5: Snapshot checkpoints and log compaction // Go reference: raft.go CreateSnapshotCheckpoint, DrainAndReplaySnapshot diff --git a/tests/NATS.Server.Tests/Raft/RaftJointConsensusTests.cs b/tests/NATS.Server.Tests/Raft/RaftJointConsensusTests.cs new file mode 100644 index 0000000..2d39a77 --- /dev/null +++ b/tests/NATS.Server.Tests/Raft/RaftJointConsensusTests.cs @@ -0,0 +1,285 @@ +using NATS.Server.Raft; + +namespace NATS.Server.Tests.Raft; + +/// +/// Tests for joint consensus membership changes per Raft paper Section 4. +/// During a joint configuration transition a quorum requires majority from BOTH +/// the old configuration (Cold) and the new configuration (Cnew). +/// Go reference: raft.go joint consensus / two-phase membership transitions. +/// +public class RaftJointConsensusTests +{ + // -- Helpers (self-contained, no shared TestHelpers class) -- + + private static (RaftNode[] nodes, InMemoryRaftTransport transport) CreateCluster(int size) + { + var transport = new InMemoryRaftTransport(); + var nodes = Enumerable.Range(1, size) + .Select(i => new RaftNode($"n{i}", transport)) + .ToArray(); + foreach (var node in nodes) + { + transport.Register(node); + node.ConfigureCluster(nodes); + } + return (nodes, transport); + } + + private static RaftNode ElectLeader(RaftNode[] nodes) + { + var candidate = nodes[0]; + candidate.StartElection(nodes.Length); + foreach (var voter in nodes.Skip(1)) + candidate.ReceiveVote(voter.GrantVote(candidate.Term, candidate.Id), nodes.Length); + return candidate; + } + + // -- BeginJointConsensus / InJointConsensus / JointNewMembers -- + + [Fact] + public void BeginJointConsensus_sets_InJointConsensus_flag() + { + // Go reference: raft.go Section 4 — begin joint config + var node = new RaftNode("n1"); + node.AddMember("n2"); + node.AddMember("n3"); + + node.InJointConsensus.ShouldBeFalse(); + + node.BeginJointConsensus(["n1", "n2", "n3"], ["n1", "n2", "n3", "n4"]); + + node.InJointConsensus.ShouldBeTrue(); + } + + [Fact] + public void BeginJointConsensus_exposes_JointNewMembers() + { + // Go reference: raft.go Section 4 — Cnew accessible during joint phase + var node = new RaftNode("n1"); + node.AddMember("n2"); + node.AddMember("n3"); + + node.BeginJointConsensus(["n1", "n2", "n3"], ["n1", "n2", "n3", "n4"]); + + node.JointNewMembers.ShouldNotBeNull(); + node.JointNewMembers!.ShouldContain("n4"); + node.JointNewMembers.Count.ShouldBe(4); + } + + [Fact] + public void BeginJointConsensus_adds_new_members_to_active_set() + { + // During joint consensus the active member set is the union of Cold and Cnew + // so that entries are replicated to all participating nodes. + // Go reference: raft.go Section 4 — joint config is union of Cold and Cnew. + var node = new RaftNode("n1"); + node.AddMember("n2"); + node.AddMember("n3"); + + node.BeginJointConsensus(["n1", "n2", "n3"], ["n1", "n2", "n3", "n4"]); + + node.Members.ShouldContain("n4"); + } + + // -- CommitJointConsensus -- + + [Fact] + public void CommitJointConsensus_clears_InJointConsensus_flag() + { + // Go reference: raft.go joint consensus commit finalizes Cnew + var node = new RaftNode("n1"); + node.AddMember("n2"); + node.AddMember("n3"); + + node.BeginJointConsensus(["n1", "n2", "n3"], ["n1", "n2", "n3", "n4"]); + node.CommitJointConsensus(); + + node.InJointConsensus.ShouldBeFalse(); + } + + [Fact] + public void CommitJointConsensus_finalizes_new_configuration_when_adding_peer() + { + // After commit, Members should exactly equal Cnew. + // Go reference: raft.go joint consensus commit. + var node = new RaftNode("n1"); + node.AddMember("n2"); + node.AddMember("n3"); + + node.BeginJointConsensus(["n1", "n2", "n3"], ["n1", "n2", "n3", "n4"]); + node.CommitJointConsensus(); + + node.Members.Count.ShouldBe(4); + node.Members.ShouldContain("n1"); + node.Members.ShouldContain("n2"); + node.Members.ShouldContain("n3"); + node.Members.ShouldContain("n4"); + } + + [Fact] + public void CommitJointConsensus_removes_old_only_members_when_removing_peer() + { + // Removing a peer: Cold={n1,n2,n3}, Cnew={n1,n2}. + // After commit, n3 must be gone. + // Go reference: raft.go joint consensus commit removes Cold-only members. + var node = new RaftNode("n1"); + node.AddMember("n2"); + node.AddMember("n3"); + + node.BeginJointConsensus(["n1", "n2", "n3"], ["n1", "n2"]); + node.CommitJointConsensus(); + + node.Members.Count.ShouldBe(2); + node.Members.ShouldContain("n1"); + node.Members.ShouldContain("n2"); + node.Members.ShouldNotContain("n3"); + } + + [Fact] + public void CommitJointConsensus_is_idempotent_when_not_in_joint() + { + // Calling commit when not in joint consensus must be a no-op. + var node = new RaftNode("n1"); + node.AddMember("n2"); + + node.CommitJointConsensus(); // should not throw + + node.Members.Count.ShouldBe(2); + node.InJointConsensus.ShouldBeFalse(); + } + + // -- CalculateJointQuorum -- + + [Fact] + public void CalculateJointQuorum_returns_false_when_not_in_joint_consensus() + { + // Outside joint consensus the method has no defined result and returns false. + // Go reference: raft.go Section 4. + var node = new RaftNode("n1"); + + node.CalculateJointQuorum(["n1"], ["n1"]).ShouldBeFalse(); + } + + [Fact] + public void Joint_quorum_requires_majority_from_both_old_and_new_configurations() + { + // Cold={n1,n2,n3} (size 3, quorum=2), Cnew={n1,n2,n3,n4} (size 4, quorum=3). + // 2/3 old AND 3/4 new — both majorities satisfied. + // Go reference: raft.go Section 4 — joint config quorum calculation. + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + + leader.BeginJointConsensus(["n1", "n2", "n3"], ["n1", "n2", "n3", "n4"]); + + leader.CalculateJointQuorum(["n1", "n2"], ["n1", "n2", "n3"]).ShouldBeTrue(); + } + + [Fact] + public void Joint_quorum_fails_when_old_majority_not_met() + { + // Cold={n1,n2,n3} (quorum=2): only 1 old voter — fails old quorum. + // Cnew={n1,n2,n3,n4} (quorum=3): 2 new voters — also fails new quorum. + // Go reference: raft.go Section 4 — must satisfy BOTH majorities. + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + + leader.BeginJointConsensus(["n1", "n2", "n3"], ["n1", "n2", "n3", "n4"]); + + leader.CalculateJointQuorum(["n1"], ["n1", "n2"]).ShouldBeFalse(); + } + + [Fact] + public void Joint_quorum_fails_when_new_majority_not_met() + { + // Cold={n1,n2,n3} (quorum=2): 2 old voters — passes old quorum. + // Cnew={n1,n2,n3,n4} (quorum=3): only 2 new voters — fails new quorum. + // Go reference: raft.go Section 4 — both are required. + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + + leader.BeginJointConsensus(["n1", "n2", "n3"], ["n1", "n2", "n3", "n4"]); + + leader.CalculateJointQuorum(["n1", "n2"], ["n1", "n2"]).ShouldBeFalse(); + } + + [Fact] + public void Joint_quorum_exact_majority_boundary_old_config() + { + // Cold={n1,n2,n3,n4,n5} (size 5, quorum=3). + // Exactly 3 old voters meets old quorum boundary. + var node = new RaftNode("n1"); + foreach (var m in new[] { "n2", "n3", "n4", "n5" }) + node.AddMember(m); + + node.BeginJointConsensus( + ["n1", "n2", "n3", "n4", "n5"], + ["n1", "n2", "n3", "n4", "n5", "n6"]); + + // 3/5 old (exact quorum=3) and 4/6 new (quorum=4) — both satisfied + node.CalculateJointQuorum(["n1", "n2", "n3"], ["n1", "n2", "n3", "n4"]).ShouldBeTrue(); + } + + [Fact] + public void Joint_quorum_just_below_boundary_old_config_fails() + { + // Cold={n1,n2,n3,n4,n5} (size 5, quorum=3): 2 voters fails. + var node = new RaftNode("n1"); + foreach (var m in new[] { "n2", "n3", "n4", "n5" }) + node.AddMember(m); + + node.BeginJointConsensus( + ["n1", "n2", "n3", "n4", "n5"], + ["n1", "n2", "n3", "n4", "n5", "n6"]); + + // 2/5 old < quorum=3 — must fail + node.CalculateJointQuorum(["n1", "n2"], ["n1", "n2", "n3", "n4"]).ShouldBeFalse(); + } + + // -- Integration: existing ProposeAddPeerAsync/ProposeRemovePeerAsync unchanged -- + + [Fact] + public async Task ProposeAddPeerAsync_still_works_after_joint_consensus_fields_added() + { + // Verify that adding joint consensus fields does not break the existing + // single-phase ProposeAddPeerAsync behaviour. + // Go reference: raft.go:961-990 (proposeAddPeer). + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + + await leader.ProposeAddPeerAsync("n4", default); + + leader.Members.ShouldContain("n4"); + leader.Members.Count.ShouldBe(4); + leader.MembershipChangeInProgress.ShouldBeFalse(); + } + + [Fact] + public async Task ProposeRemovePeerAsync_still_works_after_joint_consensus_fields_added() + { + // Verify that adding joint consensus fields does not break the existing + // single-phase ProposeRemovePeerAsync behaviour. + // Go reference: raft.go:992-1019 (proposeRemovePeer). + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + + await leader.ProposeRemovePeerAsync("n3", default); + + leader.Members.ShouldNotContain("n3"); + leader.Members.Count.ShouldBe(2); + leader.MembershipChangeInProgress.ShouldBeFalse(); + } + + [Fact] + public async Task MembershipChangeInProgress_is_false_after_completed_add() + { + // The single-change invariant must still hold: flag is cleared after completion. + // Go reference: raft.go:961-1019 single-change invariant. + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + + await leader.ProposeAddPeerAsync("n4", default); + + leader.MembershipChangeInProgress.ShouldBeFalse(); + } +}