feat(raft): add commit queue, election timers, and peer health tracking (B1+B2+B3)

- CommitQueue<T>: channel-based queue for committed entries awaiting state machine application
- RaftPeerState: tracks replication and health state (nextIndex, matchIndex, lastContact)
- RaftNode: CommitIndex/ProcessedIndex tracking, election timer with randomized 150-300ms interval,
  peer state integration with heartbeat and replication updates
- 52 new tests across RaftApplyQueueTests, RaftElectionTimerTests, RaftHealthTests
This commit is contained in:
Joseph Doherty
2026-02-24 17:01:00 -05:00
parent 579063dabd
commit 5b706c969d
6 changed files with 1125 additions and 2 deletions

View File

@@ -0,0 +1,43 @@
using System.Threading.Channels;
namespace NATS.Server.Raft;
/// <summary>
/// Channel-based queue for committed log entries awaiting state machine application.
/// Go reference: raft.go:150-160 (applied/processed fields), raft.go:2100-2150 (ApplyQ).
/// </summary>
public sealed class CommitQueue<T>
{
private readonly Channel<T> _channel = Channel.CreateUnbounded<T>(
new UnboundedChannelOptions { SingleReader = false, SingleWriter = false });
/// <summary>
/// Approximate number of items waiting to be dequeued.
/// </summary>
public int Count => _channel.Reader.Count;
/// <summary>
/// Enqueues an item for state machine application.
/// </summary>
public ValueTask EnqueueAsync(T item, CancellationToken ct = default)
=> _channel.Writer.WriteAsync(item, ct);
/// <summary>
/// Dequeues the next committed entry, waiting if none are available.
/// </summary>
public ValueTask<T> DequeueAsync(CancellationToken ct = default)
=> _channel.Reader.ReadAsync(ct);
/// <summary>
/// Attempts a non-blocking dequeue. Returns true if an item was available.
/// </summary>
public bool TryDequeue(out T? item)
=> _channel.Reader.TryRead(out item);
/// <summary>
/// Marks the channel as complete so no more items can be enqueued.
/// Readers will drain remaining items and then receive completion.
/// </summary>
public void Complete()
=> _channel.Writer.Complete();
}

View File

@@ -1,6 +1,6 @@
namespace NATS.Server.Raft;
public sealed class RaftNode
public sealed class RaftNode : IDisposable
{
private int _votesReceived;
private readonly List<RaftNode> _cluster = [];
@@ -10,6 +10,15 @@ public sealed class RaftNode
private readonly string? _persistDirectory;
private readonly HashSet<string> _members = new(StringComparer.Ordinal);
// B2: Election timer fields
// Go reference: raft.go:1400-1450 (resetElectionTimeout), raft.go:1500-1550 (campaign logic)
private Timer? _electionTimer;
private CancellationTokenSource? _electionTimerCts;
// B3: Peer state tracking
// Go reference: raft.go peer tracking (nextIndex, matchIndex, last contact)
private readonly Dictionary<string, RaftPeerState> _peerStates = new(StringComparer.Ordinal);
public string Id { get; }
public int Term => TermState.CurrentTerm;
public bool IsLeader => Role == RaftRole.Leader;
@@ -19,6 +28,16 @@ public sealed class RaftNode
public long AppliedIndex { get; set; }
public RaftLog Log { get; private set; } = new();
// B1: Commit tracking
// Go reference: raft.go:150-160 (applied/processed fields), raft.go:2100-2150 (ApplyQ)
public long CommitIndex { get; private set; }
public long ProcessedIndex { get; private set; }
public CommitQueue<RaftLogEntry> CommitQueue { get; } = new();
// B2: Election timeout configuration (milliseconds)
public int ElectionTimeoutMinMs { get; set; } = 150;
public int ElectionTimeoutMaxMs { get; set; } = 300;
public RaftNode(string id, IRaftTransport? transport = null, string? persistDirectory = null)
{
Id = id;
@@ -32,8 +51,16 @@ public sealed class RaftNode
_cluster.Clear();
_cluster.AddRange(peers);
_members.Clear();
_peerStates.Clear();
foreach (var peer in peers)
{
_members.Add(peer.Id);
// B3: Initialize peer state for all peers except self
if (!string.Equals(peer.Id, Id, StringComparison.Ordinal))
{
_peerStates[peer.Id] = new RaftPeerState { PeerId = peer.Id };
}
}
}
public void AddMember(string memberId) => _members.Add(memberId);
@@ -70,13 +97,22 @@ public sealed class RaftNode
return new VoteResponse { Granted = true };
}
public void ReceiveHeartbeat(int term)
public void ReceiveHeartbeat(int term, string? fromPeerId = null)
{
if (term < TermState.CurrentTerm)
return;
TermState.CurrentTerm = term;
Role = RaftRole.Follower;
// B2: Reset election timer on valid heartbeat
ResetElectionTimeout();
// B3: Update peer contact time
if (fromPeerId != null && _peerStates.TryGetValue(fromPeerId, out var peerState))
{
peerState.LastContact = DateTime.UtcNow;
}
}
public void ReceiveVote(VoteResponse response, int clusterSize = 3)
@@ -105,6 +141,21 @@ public sealed class RaftNode
foreach (var node in _cluster)
node.AppliedIndex = Math.Max(node.AppliedIndex, entry.Index);
// B1: Update commit index and enqueue for state machine application
CommitIndex = entry.Index;
await CommitQueue.EnqueueAsync(entry, ct);
// B3: Update peer match/next indices for successful replications
foreach (var result in results.Where(r => r.Success))
{
if (_peerStates.TryGetValue(result.FollowerId, out var peerState))
{
peerState.MatchIndex = Math.Max(peerState.MatchIndex, entry.Index);
peerState.NextIndex = entry.Index + 1;
peerState.LastContact = DateTime.UtcNow;
}
}
foreach (var node in _cluster.Where(n => n._persistDirectory != null))
await node.PersistAsync(ct);
}
@@ -115,6 +166,16 @@ public sealed class RaftNode
return entry.Index;
}
/// <summary>
/// Marks the given index as processed by the state machine.
/// Go reference: raft.go applied/processed tracking.
/// </summary>
public void MarkProcessed(long index)
{
if (index > ProcessedIndex)
ProcessedIndex = index;
}
public void ReceiveReplicatedEntry(RaftLogEntry entry)
{
Log.AppendReplicated(entry);
@@ -126,6 +187,9 @@ public sealed class RaftNode
if (entry.Term < TermState.CurrentTerm)
throw new InvalidOperationException("stale term append rejected");
// B2: Reset election timer when receiving append from leader
ResetElectionTimeout();
ReceiveReplicatedEntry(entry);
return Task.CompletedTask;
}
@@ -155,6 +219,110 @@ public sealed class RaftNode
TermState.VotedFor = null;
}
// B2: Election timer management
// Go reference: raft.go:1400-1450 (resetElectionTimeout)
/// <summary>
/// Resets the election timeout timer with a new randomized interval.
/// Called on heartbeat receipt and append entries from leader.
/// </summary>
public void ResetElectionTimeout()
{
var timeout = Random.Shared.Next(ElectionTimeoutMinMs, ElectionTimeoutMaxMs + 1);
_electionTimer?.Change(timeout, Timeout.Infinite);
}
/// <summary>
/// Starts the background election timer. When it fires and this node is a Follower,
/// an election campaign is triggered automatically.
/// Go reference: raft.go:1500-1550 (campaign logic).
/// </summary>
public void StartElectionTimer(CancellationToken ct = default)
{
_electionTimerCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
var timeout = Random.Shared.Next(ElectionTimeoutMinMs, ElectionTimeoutMaxMs + 1);
_electionTimer = new Timer(ElectionTimerCallback, null, timeout, Timeout.Infinite);
}
/// <summary>
/// Stops and disposes the election timer.
/// </summary>
public void StopElectionTimer()
{
_electionTimer?.Dispose();
_electionTimer = null;
_electionTimerCts?.Cancel();
_electionTimerCts?.Dispose();
_electionTimerCts = null;
}
/// <summary>
/// Bypasses the election timer and immediately starts an election campaign.
/// Useful for testing.
/// </summary>
public void CampaignImmediately()
{
var clusterSize = _cluster.Count > 0 ? _cluster.Count : _members.Count;
StartElection(clusterSize);
}
private void ElectionTimerCallback(object? state)
{
if (_electionTimerCts?.IsCancellationRequested == true)
return;
if (Role == RaftRole.Follower)
{
var clusterSize = _cluster.Count > 0 ? _cluster.Count : _members.Count;
StartElection(clusterSize);
}
else
{
// Re-arm the timer for non-follower states so it can fire again
// if the node transitions back to follower.
ResetElectionTimeout();
}
}
// B3: Peer state accessors
/// <summary>
/// Returns a read-only view of all tracked peer states.
/// </summary>
public IReadOnlyDictionary<string, RaftPeerState> GetPeerStates()
=> _peerStates;
/// <summary>
/// Checks if this node's log is current (within one election timeout of the leader).
/// Go reference: raft.go isCurrent check.
/// </summary>
public bool IsCurrent(TimeSpan electionTimeout)
{
// A leader is always current
if (Role == RaftRole.Leader)
return true;
// Check if any peer (which could be the leader) has contacted us recently
return _peerStates.Values.Any(p => p.IsCurrent(electionTimeout));
}
/// <summary>
/// Overall health check: node is active and peers are responsive.
/// </summary>
public bool IsHealthy(TimeSpan healthThreshold)
{
if (Role == RaftRole.Leader)
{
// Leader is healthy if a majority of peers are responsive
var healthyPeers = _peerStates.Values.Count(p => p.IsHealthy(healthThreshold));
var quorum = (_peerStates.Count + 1) / 2; // +1 for self
return healthyPeers >= quorum;
}
// Follower/candidate: healthy if at least one peer (the leader) is responsive
return _peerStates.Values.Any(p => p.IsHealthy(healthThreshold));
}
private void TryBecomeLeader(int clusterSize)
{
var quorum = (clusterSize / 2) + 1;
@@ -186,4 +354,9 @@ public sealed class RaftNode
else if (Log.Entries.Count > 0)
AppliedIndex = Log.Entries[^1].Index;
}
public void Dispose()
{
StopElectionTimer();
}
}

View File

@@ -0,0 +1,46 @@
namespace NATS.Server.Raft;
/// <summary>
/// Tracks replication and health state for a single RAFT peer.
/// Go reference: raft.go peer tracking fields (nextIndex, matchIndex, last contact).
/// </summary>
public sealed class RaftPeerState
{
/// <summary>
/// The peer's unique node identifier.
/// </summary>
public required string PeerId { get; init; }
/// <summary>
/// The next log index to send to this peer (leader use only).
/// </summary>
public long NextIndex { get; set; } = 1;
/// <summary>
/// The highest log index known to be replicated on this peer.
/// </summary>
public long MatchIndex { get; set; }
/// <summary>
/// Timestamp of the last successful communication with this peer.
/// </summary>
public DateTime LastContact { get; set; } = DateTime.UtcNow;
/// <summary>
/// Whether this peer is considered active in the cluster.
/// </summary>
public bool Active { get; set; } = true;
/// <summary>
/// Returns true if this peer has been contacted within the election timeout window.
/// Go reference: raft.go isCurrent check.
/// </summary>
public bool IsCurrent(TimeSpan electionTimeout)
=> DateTime.UtcNow - LastContact < electionTimeout;
/// <summary>
/// Returns true if this peer is both active and has been contacted within the health threshold.
/// </summary>
public bool IsHealthy(TimeSpan healthThreshold)
=> Active && DateTime.UtcNow - LastContact < healthThreshold;
}