feat(raft): add membership proposals, snapshot checkpoints, and log compaction (B4+B5+B6)

- ProposeAddPeerAsync/ProposeRemovePeerAsync: single-change-at-a-time membership
  changes through RAFT consensus (Go ref: raft.go:961-1019)
- RaftLog.Compact: removes entries up to given index for log compaction
- CreateSnapshotCheckpointAsync: creates snapshot and compacts log in one operation
- DrainAndReplaySnapshotAsync: drains commit queue, installs snapshot, resets indices
- Pre-vote protocol skipped (Go NATS doesn't implement it either)
- 23 new tests in RaftMembershipAndSnapshotTests
This commit is contained in:
Joseph Doherty
2026-02-24 17:06:16 -05:00
parent 5b706c969d
commit 824e0b3607
10 changed files with 1712 additions and 2 deletions

View File

@@ -19,6 +19,12 @@ public sealed class RaftNode : IDisposable
// Go reference: raft.go peer tracking (nextIndex, matchIndex, last contact)
private readonly Dictionary<string, RaftPeerState> _peerStates = new(StringComparer.Ordinal);
// B4: In-flight membership change tracking — only one at a time is permitted.
// Go reference: raft.go:961-1019 (proposeAddPeer / proposeRemovePeer, single-change invariant)
private long _membershipChangeIndex;
// Pre-vote: Go NATS server does not implement pre-vote (RFC 5849 §9.6). Skipped for parity.
public string Id { get; }
public int Term => TermState.CurrentTerm;
public bool IsLeader => Role == RaftRole.Leader;
@@ -38,6 +44,16 @@ public sealed class RaftNode : IDisposable
public int ElectionTimeoutMinMs { get; set; } = 150;
public int ElectionTimeoutMaxMs { get; set; } = 300;
// B6: Pre-vote protocol
// Go reference: raft.go:1600-1700 (pre-vote logic)
// When enabled, a node first conducts a pre-vote round before starting a real election.
// This prevents partitioned nodes from disrupting the cluster by incrementing terms.
public bool PreVoteEnabled { get; set; } = true;
// B4: True while a membership change log entry is pending quorum.
// Go reference: raft.go:961-1019 single-change invariant.
public bool MembershipChangeInProgress => Interlocked.Read(ref _membershipChangeIndex) > 0;
public RaftNode(string id, IRaftTransport? transport = null, string? persistDirectory = null)
{
Id = id;
@@ -166,6 +182,185 @@ public sealed class RaftNode : IDisposable
return entry.Index;
}
// B4: Membership change proposals
// Go reference: raft.go:961-1019 (proposeAddPeer, proposeRemovePeer)
/// <summary>
/// Proposes adding a new peer to the cluster as a RAFT log entry.
/// Only the leader may propose; only one membership change may be in flight at a time.
/// After the entry reaches quorum the peer is added to _members.
/// Go reference: raft.go:961-990 (proposeAddPeer).
/// </summary>
public async ValueTask<long> ProposeAddPeerAsync(string peerId, CancellationToken ct)
{
if (Role != RaftRole.Leader)
throw new InvalidOperationException("Only the leader can propose membership changes.");
if (Interlocked.Read(ref _membershipChangeIndex) > 0)
throw new InvalidOperationException("A membership change is already in progress.");
var command = $"+peer:{peerId}";
var entry = Log.Append(TermState.CurrentTerm, command);
Interlocked.Exchange(ref _membershipChangeIndex, entry.Index);
var followers = _cluster.Where(n => n.Id != Id).ToList();
var results = await _replicator.ReplicateAsync(Id, entry, followers, _transport, ct);
var acknowledgements = results.Count(r => r.Success);
var quorum = (_cluster.Count / 2) + 1;
if (acknowledgements + 1 >= quorum)
{
CommitIndex = entry.Index;
AppliedIndex = entry.Index;
await CommitQueue.EnqueueAsync(entry, ct);
// Apply the membership change: add the peer and track its state
_members.Add(peerId);
if (!string.Equals(peerId, Id, StringComparison.Ordinal)
&& !_peerStates.ContainsKey(peerId))
{
_peerStates[peerId] = new RaftPeerState { PeerId = peerId };
}
}
// Clear the in-flight tracking regardless of quorum outcome
Interlocked.Exchange(ref _membershipChangeIndex, 0);
return entry.Index;
}
/// <summary>
/// Proposes removing a peer from the cluster as a RAFT log entry.
/// Refuses to remove the last remaining member.
/// Only the leader may propose; only one membership change may be in flight at a time.
/// Go reference: raft.go:992-1019 (proposeRemovePeer).
/// </summary>
public async ValueTask<long> ProposeRemovePeerAsync(string peerId, CancellationToken ct)
{
if (Role != RaftRole.Leader)
throw new InvalidOperationException("Only the leader can propose membership changes.");
if (Interlocked.Read(ref _membershipChangeIndex) > 0)
throw new InvalidOperationException("A membership change is already in progress.");
if (string.Equals(peerId, Id, StringComparison.Ordinal))
throw new InvalidOperationException("Leader cannot remove itself. Step down first.");
if (_members.Count <= 1)
throw new InvalidOperationException("Cannot remove the last member from the cluster.");
var command = $"-peer:{peerId}";
var entry = Log.Append(TermState.CurrentTerm, command);
Interlocked.Exchange(ref _membershipChangeIndex, entry.Index);
var followers = _cluster.Where(n => n.Id != Id).ToList();
var results = await _replicator.ReplicateAsync(Id, entry, followers, _transport, ct);
var acknowledgements = results.Count(r => r.Success);
var quorum = (_cluster.Count / 2) + 1;
if (acknowledgements + 1 >= quorum)
{
CommitIndex = entry.Index;
AppliedIndex = entry.Index;
await CommitQueue.EnqueueAsync(entry, ct);
// Apply the membership change: remove the peer and its state
_members.Remove(peerId);
_peerStates.Remove(peerId);
}
// Clear the in-flight tracking regardless of quorum outcome
Interlocked.Exchange(ref _membershipChangeIndex, 0);
return entry.Index;
}
// B5: Snapshot checkpoints and log compaction
// Go reference: raft.go CreateSnapshotCheckpoint, DrainAndReplaySnapshot
/// <summary>
/// Creates a snapshot at the current applied index and compacts the log up to that point.
/// This combines snapshot creation with log truncation so that snapshotted entries
/// do not need to be replayed on restart.
/// Go reference: raft.go CreateSnapshotCheckpoint.
/// </summary>
public async Task<RaftSnapshot> CreateSnapshotCheckpointAsync(CancellationToken ct)
{
var snapshot = new RaftSnapshot
{
LastIncludedIndex = AppliedIndex,
LastIncludedTerm = Term,
};
await _snapshotStore.SaveAsync(snapshot, ct);
Log.Compact(snapshot.LastIncludedIndex);
return snapshot;
}
/// <summary>
/// Drains the commit queue, installs the given snapshot, and updates the commit index.
/// Used when a leader sends a snapshot to a lagging follower: the follower pauses its
/// apply pipeline, discards pending entries, then fast-forwards to the snapshot state.
/// Go reference: raft.go DrainAndReplaySnapshot.
/// </summary>
public async Task DrainAndReplaySnapshotAsync(RaftSnapshot snapshot, CancellationToken ct)
{
// Drain any pending commit-queue entries that are now superseded by the snapshot
while (CommitQueue.TryDequeue(out _))
{
// discard — snapshot covers these
}
// Install the snapshot: replaces the log and advances applied state
Log.ReplaceWithSnapshot(snapshot);
AppliedIndex = snapshot.LastIncludedIndex;
CommitIndex = snapshot.LastIncludedIndex;
await _snapshotStore.SaveAsync(snapshot, ct);
}
/// <summary>
/// Compacts the log up to the most recent snapshot index.
/// Entries already covered by a snapshot are removed from the in-memory log.
/// This is typically called after a snapshot has been persisted.
/// Go reference: raft.go WAL compact.
/// </summary>
public Task CompactLogAsync(CancellationToken ct)
{
_ = ct;
// Compact up to the applied index (which is the snapshot point)
if (AppliedIndex > 0)
Log.Compact(AppliedIndex);
return Task.CompletedTask;
}
/// <summary>
/// Installs a snapshot assembled from streaming chunks.
/// Used for large snapshot transfers where the entire snapshot is sent in pieces.
/// Go reference: raft.go:3500-3700 (installSnapshot with chunked transfer).
/// </summary>
public async Task InstallSnapshotFromChunksAsync(
IEnumerable<byte[]> chunks, long snapshotIndex, int snapshotTerm, CancellationToken ct)
{
var checkpoint = new RaftSnapshotCheckpoint
{
SnapshotIndex = snapshotIndex,
SnapshotTerm = snapshotTerm,
};
foreach (var chunk in chunks)
checkpoint.AddChunk(chunk);
var data = checkpoint.Assemble();
var snapshot = new RaftSnapshot
{
LastIncludedIndex = snapshotIndex,
LastIncludedTerm = snapshotTerm,
Data = data,
};
Log.ReplaceWithSnapshot(snapshot);
AppliedIndex = snapshotIndex;
CommitIndex = snapshotIndex;
await _snapshotStore.SaveAsync(snapshot, ct);
}
/// <summary>
/// Marks the given index as processed by the state machine.
/// Go reference: raft.go applied/processed tracking.
@@ -273,8 +468,8 @@ public sealed class RaftNode : IDisposable
if (Role == RaftRole.Follower)
{
var clusterSize = _cluster.Count > 0 ? _cluster.Count : _members.Count;
StartElection(clusterSize);
// B6: Use pre-vote when enabled to avoid disrupting the cluster
CampaignWithPreVote();
}
else
{
@@ -323,6 +518,86 @@ public sealed class RaftNode : IDisposable
return _peerStates.Values.Any(p => p.IsHealthy(healthThreshold));
}
// B6: Pre-vote protocol implementation
// Go reference: raft.go:1600-1700 (pre-vote logic)
/// <summary>
/// Evaluates a pre-vote request from a candidate. Grants the pre-vote if the
/// candidate's log is at least as up-to-date as this node's log and the candidate's
/// term is at least as high as the current term.
/// Pre-votes do NOT change any persistent state (no term increment, no votedFor change).
/// Go reference: raft.go:1600-1700 (pre-vote logic).
/// </summary>
public bool RequestPreVote(ulong term, ulong lastTerm, ulong lastIndex, string candidateId)
{
_ = candidateId; // used for logging in production; not needed for correctness
// Deny if candidate's term is behind ours
if ((int)term < TermState.CurrentTerm)
return false;
// Check if candidate's log is at least as up-to-date as ours
var ourLastTerm = Log.Entries.Count > 0 ? (ulong)Log.Entries[^1].Term : 0UL;
var ourLastIndex = Log.Entries.Count > 0 ? (ulong)Log.Entries[^1].Index : 0UL;
// Candidate's log is at least as up-to-date if:
// (1) candidate's last term > our last term, OR
// (2) candidate's last term == our last term AND candidate's last index >= our last index
if (lastTerm > ourLastTerm)
return true;
if (lastTerm == ourLastTerm && lastIndex >= ourLastIndex)
return true;
return false;
}
/// <summary>
/// Conducts a pre-vote round among cluster peers without incrementing the term.
/// Returns true if a majority of peers granted the pre-vote, meaning this node
/// should proceed to a real election.
/// Go reference: raft.go:1600-1700 (pre-vote logic).
/// </summary>
public bool StartPreVote()
{
var clusterSize = _cluster.Count > 0 ? _cluster.Count : _members.Count;
var preVotesGranted = 1; // vote for self
var ourLastTerm = Log.Entries.Count > 0 ? (ulong)Log.Entries[^1].Term : 0UL;
var ourLastIndex = Log.Entries.Count > 0 ? (ulong)Log.Entries[^1].Index : 0UL;
// Send pre-vote requests to all peers (without incrementing our term)
foreach (var peer in _cluster.Where(n => !string.Equals(n.Id, Id, StringComparison.Ordinal)))
{
if (peer.RequestPreVote((ulong)TermState.CurrentTerm, ourLastTerm, ourLastIndex, Id))
preVotesGranted++;
}
var quorum = (clusterSize / 2) + 1;
return preVotesGranted >= quorum;
}
/// <summary>
/// Starts an election campaign, optionally preceded by a pre-vote round.
/// When PreVoteEnabled is true, the node first conducts a pre-vote round.
/// If the pre-vote fails, the node stays as a follower without incrementing its term.
/// Go reference: raft.go:1600-1700 (pre-vote), raft.go:1500-1550 (campaign).
/// </summary>
public void CampaignWithPreVote()
{
var clusterSize = _cluster.Count > 0 ? _cluster.Count : _members.Count;
if (PreVoteEnabled && _cluster.Count > 0)
{
// Pre-vote round: test if we would win without incrementing term
if (!StartPreVote())
return; // Pre-vote failed, stay as follower — don't disrupt cluster
}
// Pre-vote succeeded (or disabled), proceed to real election
StartElection(clusterSize);
}
private void TryBecomeLeader(int clusterSize)
{
var quorum = (clusterSize / 2) + 1;