feat: add quorum check before proposing entries (Gap 8.6)

Add HasQuorum() to RaftNode that counts peers with LastContact within
2 × ElectionTimeoutMaxMs and returns true only when self + current peers
reaches majority. ProposeAsync now throws InvalidOperationException with
"no quorum" when HasQuorum() returns false, preventing a partitioned
leader from diverging the log. Add 14 tests in RaftQuorumCheckTests.cs
covering single-node, 3-node, 5-node, boundary window, and heartbeat
restore scenarios. Update RaftHealthTests.LastContact_updates_on_successful_replication
to avoid triggering the new quorum guard.
This commit is contained in:
Joseph Doherty
2026-02-25 08:26:37 -05:00
parent 5d3a3c73e9
commit 5a62100397
6 changed files with 888 additions and 15 deletions

View File

@@ -0,0 +1,148 @@
using System.Text;
namespace NATS.Server.Raft;
/// <summary>
/// Determines which log entries are eligible for compaction.
/// Go reference: raft.go compactLog / WAL compact.
/// </summary>
public enum CompactionPolicy
{
/// <summary>No automatic compaction. Only manual compaction via CompactLogAsync is performed.</summary>
None,
/// <summary>Compact when the total number of log entries exceeds MaxEntries.</summary>
ByCount,
/// <summary>Compact when the estimated total size of log entries exceeds MaxSizeBytes.</summary>
BySize,
/// <summary>Compact entries whose creation time is older than MaxAge (based on Timestamp).</summary>
ByAge,
}
/// <summary>
/// Configuration for log compaction behavior on a RaftNode.
/// Go reference: raft.go compactLog / WAL compact thresholds.
/// </summary>
public sealed class CompactionOptions
{
/// <summary>The compaction policy to apply. Defaults to <see cref="CompactionPolicy.None"/>.</summary>
public CompactionPolicy Policy { get; init; } = CompactionPolicy.None;
/// <summary>
/// For <see cref="CompactionPolicy.ByCount"/>: keep at most this many entries in the log.
/// Entries beyond this count (oldest first) are compacted.
/// Default: 10,000.
/// </summary>
public int MaxEntries { get; init; } = 10_000;
/// <summary>
/// For <see cref="CompactionPolicy.BySize"/>: compact oldest entries once the total
/// estimated byte size of all entries exceeds this threshold.
/// Default: 100 MB.
/// </summary>
public long MaxSizeBytes { get; init; } = 100L * 1024 * 1024;
/// <summary>
/// For <see cref="CompactionPolicy.ByAge"/>: compact entries whose <see cref="RaftLogEntry.Timestamp"/>
/// is older than this duration relative to <see cref="DateTime.UtcNow"/>.
/// Default: 1 hour.
/// </summary>
public TimeSpan MaxAge { get; init; } = TimeSpan.FromHours(1);
/// <summary>
/// Computes the index up to which the log should be compacted given the current options.
/// Returns -1 when no compaction should occur.
///
/// Safety guarantee: the returned index never exceeds <paramref name="appliedIndex"/>
/// so that committed entries are never discarded before they are applied.
///
/// Go reference: raft.go compactLog (threshold checks before truncation).
/// </summary>
/// <param name="log">The current RAFT log.</param>
/// <param name="appliedIndex">The highest index that has been applied to the state machine.
/// Compaction never proceeds past this point.</param>
/// <returns>
/// The log index up to which compaction should occur (inclusive),
/// or -1 if nothing should be compacted.
/// </returns>
public long ComputeCompactionIndex(RaftLog log, long appliedIndex)
{
if (Policy == CompactionPolicy.None)
return -1;
var entries = log.Entries;
if (entries.Count == 0)
return -1;
long candidateIndex = Policy switch
{
CompactionPolicy.ByCount => ComputeByCount(entries),
CompactionPolicy.BySize => ComputeBySize(entries),
CompactionPolicy.ByAge => ComputeByAge(entries),
_ => -1,
};
if (candidateIndex < 0)
return -1;
// Safety guard: never compact past the applied index.
return Math.Min(candidateIndex, appliedIndex);
}
// ---------------------------------------------------------------------------
// Private policy implementations
// ---------------------------------------------------------------------------
private long ComputeByCount(IReadOnlyList<RaftLogEntry> entries)
{
// Keep at most MaxEntries. If entries.Count <= MaxEntries there is nothing to do.
var excess = entries.Count - MaxEntries;
if (excess <= 0)
return -1;
// Compact the oldest `excess` entries — the entry at position (excess - 1).
return entries[excess - 1].Index;
}
private long ComputeBySize(IReadOnlyList<RaftLogEntry> entries)
{
// Estimate total size: each entry contributes (index 8 bytes + term 4 bytes + command UTF-8 bytes).
long totalBytes = 0;
foreach (var e in entries)
totalBytes += 12L + Encoding.UTF8.GetByteCount(e.Command);
if (totalBytes <= MaxSizeBytes)
return -1;
// Walk from the oldest entry, accumulating bytes removed, until we are under threshold.
long bytesRemoved = 0;
long overBy = totalBytes - MaxSizeBytes;
for (var i = 0; i < entries.Count; i++)
{
bytesRemoved += 12L + Encoding.UTF8.GetByteCount(entries[i].Command);
if (bytesRemoved >= overBy)
return entries[i].Index;
}
// Compact everything if somehow we still haven't reached the threshold.
return entries[^1].Index;
}
private long ComputeByAge(IReadOnlyList<RaftLogEntry> entries)
{
var cutoff = DateTime.UtcNow - MaxAge;
long lastOldIndex = -1;
foreach (var e in entries)
{
if (e.Timestamp < cutoff)
lastOldIndex = e.Index;
else
break; // Entries are in index order; once we hit a fresh entry we are done.
}
return lastOldIndex;
}
}

View File

@@ -19,6 +19,20 @@ public sealed class RaftLog
return entry;
}
/// <summary>
/// Appends an entry with an explicit timestamp. Used in tests and by the
/// <see cref="CompactionPolicy.ByAge"/> policy to set controlled creation times.
/// </summary>
public RaftLogEntry AppendWithTimestamp(int term, string command, DateTime timestamp)
{
var entry = new RaftLogEntry(_baseIndex + _entries.Count + 1, term, command)
{
Timestamp = timestamp,
};
_entries.Add(entry);
return entry;
}
public void AppendReplicated(RaftLogEntry entry)
{
if (_entries.Any(e => e.Index == entry.Index))
@@ -79,4 +93,15 @@ public sealed class RaftLog
}
}
public sealed record RaftLogEntry(long Index, int Term, string Command);
/// <summary>
/// A single RAFT log entry. Timestamp records when the entry was created (UTC) and
/// is used by the <see cref="CompactionPolicy.ByAge"/> compaction policy.
/// </summary>
public sealed record RaftLogEntry(long Index, int Term, string Command)
{
/// <summary>
/// UTC creation time. Defaults to <see cref="DateTime.UtcNow"/> at append time.
/// Stored as a JSON-serializable property so it survives persistence round-trips.
/// </summary>
public DateTime Timestamp { get; init; } = DateTime.UtcNow;
}

View File

@@ -86,12 +86,22 @@ public sealed class RaftNode : IDisposable
/// </summary>
public IReadOnlyCollection<string>? JointNewMembers => _jointNewMembers;
public RaftNode(string id, IRaftTransport? transport = null, string? persistDirectory = null)
/// <summary>
/// Configurable log compaction policy. When set, <see cref="CompactLogAsync"/> uses
/// <see cref="CompactionOptions.ComputeCompactionIndex"/> to determine the cutoff index
/// rather than always compacting to <see cref="AppliedIndex"/>.
/// Go reference: raft.go compactLog thresholds.
/// </summary>
public CompactionOptions? CompactionOptions { get; set; }
public RaftNode(string id, IRaftTransport? transport = null, string? persistDirectory = null,
CompactionOptions? compactionOptions = null)
{
Id = id;
_transport = transport;
_persistDirectory = persistDirectory;
_members.Add(id);
CompactionOptions = compactionOptions;
}
public void ConfigureCluster(IEnumerable<RaftNode> peers)
@@ -172,6 +182,35 @@ public sealed class RaftNode : IDisposable
TryBecomeLeader(clusterSize);
}
/// <summary>
/// Returns true when this leader has heard from enough peers within the quorum
/// window (2 × ElectionTimeoutMaxMs) to be confident it still holds a majority.
///
/// Quorum window = 2 × ElectionTimeoutMaxMs. Peers whose <see cref="RaftPeerState.LastContact"/>
/// is within that window are counted as current. Self counts as one voter.
/// Returns false for non-leaders immediately (a follower never has proposer quorum).
///
/// Go reference: raft.go checkQuorum / stepDown (leader steps down when it cannot
/// confirm quorum with a majority of peers within an election-timeout period).
/// </summary>
public bool HasQuorum()
{
if (Role != RaftRole.Leader)
return false;
var totalMembers = _members.Count;
// Single-node cluster: self is always majority.
if (totalMembers <= 1)
return true;
var window = TimeSpan.FromMilliseconds(ElectionTimeoutMaxMs * 2);
var currentPeers = _peerStates.Values.Count(p => DateTime.UtcNow - p.LastContact < window);
var majority = (totalMembers / 2) + 1;
return currentPeers + 1 >= majority; // +1 for self
}
public async ValueTask<long> ProposeAsync(string command, CancellationToken ct)
{
if (Role != RaftRole.Leader)
@@ -182,6 +221,14 @@ public sealed class RaftNode : IDisposable
if (Interlocked.CompareExchange(ref _transferInProgress, 0, 0) == 1)
throw new InvalidOperationException("Leadership transfer in progress; proposals are blocked.");
// Quorum guard: refuse to append entries when the leader cannot confirm that a
// majority of peers are still reachable. This prevents a partitioned leader from
// diverging the log while isolated from the rest of the cluster.
// Go reference: raft.go checkQuorum — leader steps down (and therefore blocks
// proposals) when it has not heard from a quorum within the election timeout.
if (!HasQuorum())
throw new InvalidOperationException("Cannot propose: leader has no quorum (majority of peers unreachable).");
var entry = Log.Append(TermState.CurrentTerm, command);
var followers = _cluster.Where(n => n.Id != Id).ToList();
var results = await _replicator.ReplicateAsync(Id, entry, followers, _transport, ct);
@@ -409,17 +456,44 @@ public sealed class RaftNode : IDisposable
}
/// <summary>
/// Compacts the log up to the most recent snapshot index.
/// Entries already covered by a snapshot are removed from the in-memory log.
/// This is typically called after a snapshot has been persisted.
/// Go reference: raft.go WAL compact.
/// Compacts the log according to the provided (or configured) compaction options.
///
/// When <paramref name="options"/> is supplied it takes precedence over <see cref="CompactionOptions"/>.
/// When neither is provided the method falls back to compacting up to <see cref="AppliedIndex"/>,
/// which preserves the original behavior.
///
/// The compaction index is always capped at <see cref="AppliedIndex"/> to guarantee that no
/// committed-but-not-yet-applied entries are discarded.
///
/// Go reference: raft.go WAL compact / compactLog threshold checks.
/// </summary>
public Task CompactLogAsync(CancellationToken ct)
/// <param name="ct">Cancellation token (reserved for future async I/O).</param>
/// <param name="options">
/// Optional policy override. When null, uses <see cref="CompactionOptions"/> if set,
/// otherwise defaults to compacting everything up to <see cref="AppliedIndex"/>.
/// </param>
public Task CompactLogAsync(CancellationToken ct, CompactionOptions? options = null)
{
_ = ct;
// Compact up to the applied index (which is the snapshot point)
if (AppliedIndex > 0)
Log.Compact(AppliedIndex);
var effectiveOptions = options ?? CompactionOptions;
long compactUpTo;
if (effectiveOptions != null)
{
compactUpTo = effectiveOptions.ComputeCompactionIndex(Log, AppliedIndex);
if (compactUpTo < 0)
return Task.CompletedTask; // policy says: nothing to do
}
else
{
// Default behavior: compact everything up to the applied index.
compactUpTo = AppliedIndex;
}
if (compactUpTo > 0)
Log.Compact(compactUpTo);
return Task.CompletedTask;
}