diff --git a/src/NATS.Server/Raft/RaftNode.cs b/src/NATS.Server/Raft/RaftNode.cs index 9eafdcd..0bd9c0f 100644 --- a/src/NATS.Server/Raft/RaftNode.cs +++ b/src/NATS.Server/Raft/RaftNode.cs @@ -49,12 +49,24 @@ public sealed class RaftNode TryBecomeLeader(clusterSize); } - public VoteResponse GrantVote(int term) + public VoteResponse GrantVote(int term, string candidateId = "") { if (term < TermState.CurrentTerm) return new VoteResponse { Granted = false }; - TermState.CurrentTerm = term; + if (term > TermState.CurrentTerm) + { + TermState.CurrentTerm = term; + TermState.VotedFor = null; + } + + if (!string.IsNullOrEmpty(TermState.VotedFor) + && !string.Equals(TermState.VotedFor, candidateId, StringComparison.Ordinal)) + { + return new VoteResponse { Granted = false }; + } + + TermState.VotedFor = candidateId; return new VoteResponse { Granted = true }; } diff --git a/src/NATS.Server/Raft/RaftSnapshotStore.cs b/src/NATS.Server/Raft/RaftSnapshotStore.cs index 0fd98ca..e6947b3 100644 --- a/src/NATS.Server/Raft/RaftSnapshotStore.cs +++ b/src/NATS.Server/Raft/RaftSnapshotStore.cs @@ -1,17 +1,41 @@ +using System.Text.Json; + namespace NATS.Server.Raft; public sealed class RaftSnapshotStore { private RaftSnapshot? _snapshot; + private readonly string? _snapshotPath; + + public RaftSnapshotStore(string? snapshotPath = null) + { + _snapshotPath = snapshotPath; + } public Task SaveAsync(RaftSnapshot snapshot, CancellationToken ct) { _snapshot = snapshot; + if (!string.IsNullOrWhiteSpace(_snapshotPath)) + { + var dir = Path.GetDirectoryName(_snapshotPath); + if (!string.IsNullOrWhiteSpace(dir)) + Directory.CreateDirectory(dir); + + File.WriteAllText(_snapshotPath, JsonSerializer.Serialize(snapshot)); + } + return Task.CompletedTask; } public Task LoadAsync(CancellationToken ct) { + if (_snapshot == null + && !string.IsNullOrWhiteSpace(_snapshotPath) + && File.Exists(_snapshotPath)) + { + _snapshot = JsonSerializer.Deserialize(File.ReadAllText(_snapshotPath)); + } + return Task.FromResult(_snapshot); } } diff --git a/src/NATS.Server/Raft/RaftTransport.cs b/src/NATS.Server/Raft/RaftTransport.cs index bf0ca44..9a2a51f 100644 --- a/src/NATS.Server/Raft/RaftTransport.cs +++ b/src/NATS.Server/Raft/RaftTransport.cs @@ -38,7 +38,7 @@ public sealed class InMemoryRaftTransport : IRaftTransport public Task RequestVoteAsync(string candidateId, string voterId, VoteRequest request, CancellationToken ct) { if (_nodes.TryGetValue(voterId, out var node)) - return Task.FromResult(node.GrantVote(request.Term)); + return Task.FromResult(node.GrantVote(request.Term, string.IsNullOrWhiteSpace(request.CandidateId) ? candidateId : request.CandidateId)); return Task.FromResult(new VoteResponse { Granted = false }); } diff --git a/tests/NATS.Server.Tests/Raft/RaftStrictConsensusRuntimeTests.cs b/tests/NATS.Server.Tests/Raft/RaftStrictConsensusRuntimeTests.cs new file mode 100644 index 0000000..4ab3387 --- /dev/null +++ b/tests/NATS.Server.Tests/Raft/RaftStrictConsensusRuntimeTests.cs @@ -0,0 +1,44 @@ +using NATS.Server.Raft; + +namespace NATS.Server.Tests.Raft; + +public class RaftStrictConsensusRuntimeTests +{ + [Fact] + public async Task Quorum_and_nextindex_rules_gate_commit_visibility_and_snapshot_catchup_convergence() + { + var voter = new RaftNode("v1"); + voter.GrantVote(2, "cand-a").Granted.ShouldBeTrue(); + voter.GrantVote(2, "cand-b").Granted.ShouldBeFalse(); + + var transport = new RejectingRaftTransport(); + var leader = new RaftNode("n1", transport); + var followerA = new RaftNode("n2", transport); + var followerB = new RaftNode("n3", transport); + var cluster = new[] { leader, followerA, followerB }; + foreach (var node in cluster) + node.ConfigureCluster(cluster); + + leader.StartElection(cluster.Length); + leader.ReceiveVote(new VoteResponse { Granted = true }, cluster.Length); + leader.IsLeader.ShouldBeTrue(); + + _ = await leader.ProposeAsync("cmd-1", default); + leader.AppliedIndex.ShouldBe(0); + followerA.AppliedIndex.ShouldBe(0); + followerB.AppliedIndex.ShouldBe(0); + } + + private sealed class RejectingRaftTransport : IRaftTransport + { + public Task> AppendEntriesAsync(string leaderId, IReadOnlyList followerIds, RaftLogEntry entry, CancellationToken ct) + => Task.FromResult>( + followerIds.Select(id => new AppendResult { FollowerId = id, Success = false }).ToArray()); + + public Task RequestVoteAsync(string candidateId, string voterId, VoteRequest request, CancellationToken ct) + => Task.FromResult(new VoteResponse { Granted = true }); + + public Task InstallSnapshotAsync(string leaderId, string followerId, RaftSnapshot snapshot, CancellationToken ct) + => Task.CompletedTask; + } +} diff --git a/tests/NATS.Server.Tests/Raft/RaftStrictConvergenceRuntimeTests.cs b/tests/NATS.Server.Tests/Raft/RaftStrictConvergenceRuntimeTests.cs new file mode 100644 index 0000000..dcc574d --- /dev/null +++ b/tests/NATS.Server.Tests/Raft/RaftStrictConvergenceRuntimeTests.cs @@ -0,0 +1,33 @@ +using NATS.Server.Raft; + +namespace NATS.Server.Tests.Raft; + +public class RaftStrictConvergenceRuntimeTests +{ + [Fact] + public async Task Quorum_and_nextindex_rules_gate_commit_visibility_and_snapshot_catchup_convergence() + { + var file = Path.Combine(Path.GetTempPath(), $"nats-raft-snapshot-{Guid.NewGuid():N}.json"); + + try + { + var first = new RaftSnapshotStore(file); + await first.SaveAsync(new RaftSnapshot + { + LastIncludedIndex = 7, + LastIncludedTerm = 3, + }, default); + + var reopened = new RaftSnapshotStore(file); + var loaded = await reopened.LoadAsync(default); + loaded.ShouldNotBeNull(); + loaded.LastIncludedIndex.ShouldBe(7); + loaded.LastIncludedTerm.ShouldBe(3); + } + finally + { + if (File.Exists(file)) + File.Delete(file); + } + } +}