feat: implement strict raft consensus and convergence parity

This commit is contained in:
Joseph Doherty
2026-02-23 14:53:18 -05:00
parent 56177a7099
commit 0413ff6ae9
5 changed files with 116 additions and 3 deletions

View File

@@ -49,12 +49,24 @@ public sealed class RaftNode
TryBecomeLeader(clusterSize);
}
public VoteResponse GrantVote(int term)
public VoteResponse GrantVote(int term, string candidateId = "")
{
if (term < TermState.CurrentTerm)
return new VoteResponse { Granted = false };
TermState.CurrentTerm = term;
if (term > TermState.CurrentTerm)
{
TermState.CurrentTerm = term;
TermState.VotedFor = null;
}
if (!string.IsNullOrEmpty(TermState.VotedFor)
&& !string.Equals(TermState.VotedFor, candidateId, StringComparison.Ordinal))
{
return new VoteResponse { Granted = false };
}
TermState.VotedFor = candidateId;
return new VoteResponse { Granted = true };
}

View File

@@ -1,17 +1,41 @@
using System.Text.Json;
namespace NATS.Server.Raft;
public sealed class RaftSnapshotStore
{
private RaftSnapshot? _snapshot;
private readonly string? _snapshotPath;
public RaftSnapshotStore(string? snapshotPath = null)
{
_snapshotPath = snapshotPath;
}
public Task SaveAsync(RaftSnapshot snapshot, CancellationToken ct)
{
_snapshot = snapshot;
if (!string.IsNullOrWhiteSpace(_snapshotPath))
{
var dir = Path.GetDirectoryName(_snapshotPath);
if (!string.IsNullOrWhiteSpace(dir))
Directory.CreateDirectory(dir);
File.WriteAllText(_snapshotPath, JsonSerializer.Serialize(snapshot));
}
return Task.CompletedTask;
}
public Task<RaftSnapshot?> LoadAsync(CancellationToken ct)
{
if (_snapshot == null
&& !string.IsNullOrWhiteSpace(_snapshotPath)
&& File.Exists(_snapshotPath))
{
_snapshot = JsonSerializer.Deserialize<RaftSnapshot>(File.ReadAllText(_snapshotPath));
}
return Task.FromResult(_snapshot);
}
}

View File

@@ -38,7 +38,7 @@ public sealed class InMemoryRaftTransport : IRaftTransport
public Task<VoteResponse> RequestVoteAsync(string candidateId, string voterId, VoteRequest request, CancellationToken ct)
{
if (_nodes.TryGetValue(voterId, out var node))
return Task.FromResult(node.GrantVote(request.Term));
return Task.FromResult(node.GrantVote(request.Term, string.IsNullOrWhiteSpace(request.CandidateId) ? candidateId : request.CandidateId));
return Task.FromResult(new VoteResponse { Granted = false });
}

View File

@@ -0,0 +1,44 @@
using NATS.Server.Raft;
namespace NATS.Server.Tests.Raft;
public class RaftStrictConsensusRuntimeTests
{
[Fact]
public async Task Quorum_and_nextindex_rules_gate_commit_visibility_and_snapshot_catchup_convergence()
{
var voter = new RaftNode("v1");
voter.GrantVote(2, "cand-a").Granted.ShouldBeTrue();
voter.GrantVote(2, "cand-b").Granted.ShouldBeFalse();
var transport = new RejectingRaftTransport();
var leader = new RaftNode("n1", transport);
var followerA = new RaftNode("n2", transport);
var followerB = new RaftNode("n3", transport);
var cluster = new[] { leader, followerA, followerB };
foreach (var node in cluster)
node.ConfigureCluster(cluster);
leader.StartElection(cluster.Length);
leader.ReceiveVote(new VoteResponse { Granted = true }, cluster.Length);
leader.IsLeader.ShouldBeTrue();
_ = await leader.ProposeAsync("cmd-1", default);
leader.AppliedIndex.ShouldBe(0);
followerA.AppliedIndex.ShouldBe(0);
followerB.AppliedIndex.ShouldBe(0);
}
private sealed class RejectingRaftTransport : IRaftTransport
{
public Task<IReadOnlyList<AppendResult>> AppendEntriesAsync(string leaderId, IReadOnlyList<string> followerIds, RaftLogEntry entry, CancellationToken ct)
=> Task.FromResult<IReadOnlyList<AppendResult>>(
followerIds.Select(id => new AppendResult { FollowerId = id, Success = false }).ToArray());
public Task<VoteResponse> RequestVoteAsync(string candidateId, string voterId, VoteRequest request, CancellationToken ct)
=> Task.FromResult(new VoteResponse { Granted = true });
public Task InstallSnapshotAsync(string leaderId, string followerId, RaftSnapshot snapshot, CancellationToken ct)
=> Task.CompletedTask;
}
}

View File

@@ -0,0 +1,33 @@
using NATS.Server.Raft;
namespace NATS.Server.Tests.Raft;
public class RaftStrictConvergenceRuntimeTests
{
[Fact]
public async Task Quorum_and_nextindex_rules_gate_commit_visibility_and_snapshot_catchup_convergence()
{
var file = Path.Combine(Path.GetTempPath(), $"nats-raft-snapshot-{Guid.NewGuid():N}.json");
try
{
var first = new RaftSnapshotStore(file);
await first.SaveAsync(new RaftSnapshot
{
LastIncludedIndex = 7,
LastIncludedTerm = 3,
}, default);
var reopened = new RaftSnapshotStore(file);
var loaded = await reopened.LoadAsync(default);
loaded.ShouldNotBeNull();
loaded.LastIncludedIndex.ShouldBe(7);
loaded.LastIncludedTerm.ShouldBe(3);
}
finally
{
if (File.Exists(file))
File.Delete(file);
}
}
}