diff --git a/src/NATS.Server/Raft/CompactionPolicy.cs b/src/NATS.Server/Raft/CompactionPolicy.cs new file mode 100644 index 0000000..e43539c --- /dev/null +++ b/src/NATS.Server/Raft/CompactionPolicy.cs @@ -0,0 +1,148 @@ +using System.Text; + +namespace NATS.Server.Raft; + +/// +/// Determines which log entries are eligible for compaction. +/// Go reference: raft.go compactLog / WAL compact. +/// +public enum CompactionPolicy +{ + /// No automatic compaction. Only manual compaction via CompactLogAsync is performed. + None, + + /// Compact when the total number of log entries exceeds MaxEntries. + ByCount, + + /// Compact when the estimated total size of log entries exceeds MaxSizeBytes. + BySize, + + /// Compact entries whose creation time is older than MaxAge (based on Timestamp). + ByAge, +} + +/// +/// Configuration for log compaction behavior on a RaftNode. +/// Go reference: raft.go compactLog / WAL compact thresholds. +/// +public sealed class CompactionOptions +{ + /// The compaction policy to apply. Defaults to . + public CompactionPolicy Policy { get; init; } = CompactionPolicy.None; + + /// + /// For : keep at most this many entries in the log. + /// Entries beyond this count (oldest first) are compacted. + /// Default: 10,000. + /// + public int MaxEntries { get; init; } = 10_000; + + /// + /// For : compact oldest entries once the total + /// estimated byte size of all entries exceeds this threshold. + /// Default: 100 MB. + /// + public long MaxSizeBytes { get; init; } = 100L * 1024 * 1024; + + /// + /// For : compact entries whose + /// is older than this duration relative to . + /// Default: 1 hour. + /// + public TimeSpan MaxAge { get; init; } = TimeSpan.FromHours(1); + + /// + /// Computes the index up to which the log should be compacted given the current options. + /// Returns -1 when no compaction should occur. + /// + /// Safety guarantee: the returned index never exceeds + /// so that committed entries are never discarded before they are applied. + /// + /// Go reference: raft.go compactLog (threshold checks before truncation). + /// + /// The current RAFT log. + /// The highest index that has been applied to the state machine. + /// Compaction never proceeds past this point. + /// + /// The log index up to which compaction should occur (inclusive), + /// or -1 if nothing should be compacted. + /// + public long ComputeCompactionIndex(RaftLog log, long appliedIndex) + { + if (Policy == CompactionPolicy.None) + return -1; + + var entries = log.Entries; + if (entries.Count == 0) + return -1; + + long candidateIndex = Policy switch + { + CompactionPolicy.ByCount => ComputeByCount(entries), + CompactionPolicy.BySize => ComputeBySize(entries), + CompactionPolicy.ByAge => ComputeByAge(entries), + _ => -1, + }; + + if (candidateIndex < 0) + return -1; + + // Safety guard: never compact past the applied index. + return Math.Min(candidateIndex, appliedIndex); + } + + // --------------------------------------------------------------------------- + // Private policy implementations + // --------------------------------------------------------------------------- + + private long ComputeByCount(IReadOnlyList entries) + { + // Keep at most MaxEntries. If entries.Count <= MaxEntries there is nothing to do. + var excess = entries.Count - MaxEntries; + if (excess <= 0) + return -1; + + // Compact the oldest `excess` entries — the entry at position (excess - 1). + return entries[excess - 1].Index; + } + + private long ComputeBySize(IReadOnlyList entries) + { + // Estimate total size: each entry contributes (index 8 bytes + term 4 bytes + command UTF-8 bytes). + long totalBytes = 0; + foreach (var e in entries) + totalBytes += 12L + Encoding.UTF8.GetByteCount(e.Command); + + if (totalBytes <= MaxSizeBytes) + return -1; + + // Walk from the oldest entry, accumulating bytes removed, until we are under threshold. + long bytesRemoved = 0; + long overBy = totalBytes - MaxSizeBytes; + for (var i = 0; i < entries.Count; i++) + { + bytesRemoved += 12L + Encoding.UTF8.GetByteCount(entries[i].Command); + if (bytesRemoved >= overBy) + return entries[i].Index; + } + + // Compact everything if somehow we still haven't reached the threshold. + return entries[^1].Index; + } + + private long ComputeByAge(IReadOnlyList entries) + { + var cutoff = DateTime.UtcNow - MaxAge; + long lastOldIndex = -1; + + foreach (var e in entries) + { + if (e.Timestamp < cutoff) + lastOldIndex = e.Index; + else + break; // Entries are in index order; once we hit a fresh entry we are done. + } + + return lastOldIndex; + } +} diff --git a/src/NATS.Server/Raft/RaftLog.cs b/src/NATS.Server/Raft/RaftLog.cs index fceb1d6..686999f 100644 --- a/src/NATS.Server/Raft/RaftLog.cs +++ b/src/NATS.Server/Raft/RaftLog.cs @@ -19,6 +19,20 @@ public sealed class RaftLog return entry; } + /// + /// Appends an entry with an explicit timestamp. Used in tests and by the + /// policy to set controlled creation times. + /// + public RaftLogEntry AppendWithTimestamp(int term, string command, DateTime timestamp) + { + var entry = new RaftLogEntry(_baseIndex + _entries.Count + 1, term, command) + { + Timestamp = timestamp, + }; + _entries.Add(entry); + return entry; + } + public void AppendReplicated(RaftLogEntry entry) { if (_entries.Any(e => e.Index == entry.Index)) @@ -79,4 +93,15 @@ public sealed class RaftLog } } -public sealed record RaftLogEntry(long Index, int Term, string Command); +/// +/// A single RAFT log entry. Timestamp records when the entry was created (UTC) and +/// is used by the compaction policy. +/// +public sealed record RaftLogEntry(long Index, int Term, string Command) +{ + /// + /// UTC creation time. Defaults to at append time. + /// Stored as a JSON-serializable property so it survives persistence round-trips. + /// + public DateTime Timestamp { get; init; } = DateTime.UtcNow; +} diff --git a/src/NATS.Server/Raft/RaftNode.cs b/src/NATS.Server/Raft/RaftNode.cs index 4d90a7c..f69dc33 100644 --- a/src/NATS.Server/Raft/RaftNode.cs +++ b/src/NATS.Server/Raft/RaftNode.cs @@ -86,12 +86,22 @@ public sealed class RaftNode : IDisposable /// public IReadOnlyCollection? JointNewMembers => _jointNewMembers; - public RaftNode(string id, IRaftTransport? transport = null, string? persistDirectory = null) + /// + /// Configurable log compaction policy. When set, uses + /// to determine the cutoff index + /// rather than always compacting to . + /// Go reference: raft.go compactLog thresholds. + /// + public CompactionOptions? CompactionOptions { get; set; } + + public RaftNode(string id, IRaftTransport? transport = null, string? persistDirectory = null, + CompactionOptions? compactionOptions = null) { Id = id; _transport = transport; _persistDirectory = persistDirectory; _members.Add(id); + CompactionOptions = compactionOptions; } public void ConfigureCluster(IEnumerable peers) @@ -172,6 +182,35 @@ public sealed class RaftNode : IDisposable TryBecomeLeader(clusterSize); } + /// + /// Returns true when this leader has heard from enough peers within the quorum + /// window (2 × ElectionTimeoutMaxMs) to be confident it still holds a majority. + /// + /// Quorum window = 2 × ElectionTimeoutMaxMs. Peers whose + /// is within that window are counted as current. Self counts as one voter. + /// Returns false for non-leaders immediately (a follower never has proposer quorum). + /// + /// Go reference: raft.go checkQuorum / stepDown (leader steps down when it cannot + /// confirm quorum with a majority of peers within an election-timeout period). + /// + public bool HasQuorum() + { + if (Role != RaftRole.Leader) + return false; + + var totalMembers = _members.Count; + + // Single-node cluster: self is always majority. + if (totalMembers <= 1) + return true; + + var window = TimeSpan.FromMilliseconds(ElectionTimeoutMaxMs * 2); + var currentPeers = _peerStates.Values.Count(p => DateTime.UtcNow - p.LastContact < window); + + var majority = (totalMembers / 2) + 1; + return currentPeers + 1 >= majority; // +1 for self + } + public async ValueTask ProposeAsync(string command, CancellationToken ct) { if (Role != RaftRole.Leader) @@ -182,6 +221,14 @@ public sealed class RaftNode : IDisposable if (Interlocked.CompareExchange(ref _transferInProgress, 0, 0) == 1) throw new InvalidOperationException("Leadership transfer in progress; proposals are blocked."); + // Quorum guard: refuse to append entries when the leader cannot confirm that a + // majority of peers are still reachable. This prevents a partitioned leader from + // diverging the log while isolated from the rest of the cluster. + // Go reference: raft.go checkQuorum — leader steps down (and therefore blocks + // proposals) when it has not heard from a quorum within the election timeout. + if (!HasQuorum()) + throw new InvalidOperationException("Cannot propose: leader has no quorum (majority of peers unreachable)."); + var entry = Log.Append(TermState.CurrentTerm, command); var followers = _cluster.Where(n => n.Id != Id).ToList(); var results = await _replicator.ReplicateAsync(Id, entry, followers, _transport, ct); @@ -409,17 +456,44 @@ public sealed class RaftNode : IDisposable } /// - /// Compacts the log up to the most recent snapshot index. - /// Entries already covered by a snapshot are removed from the in-memory log. - /// This is typically called after a snapshot has been persisted. - /// Go reference: raft.go WAL compact. + /// Compacts the log according to the provided (or configured) compaction options. + /// + /// When is supplied it takes precedence over . + /// When neither is provided the method falls back to compacting up to , + /// which preserves the original behavior. + /// + /// The compaction index is always capped at to guarantee that no + /// committed-but-not-yet-applied entries are discarded. + /// + /// Go reference: raft.go WAL compact / compactLog threshold checks. /// - public Task CompactLogAsync(CancellationToken ct) + /// Cancellation token (reserved for future async I/O). + /// + /// Optional policy override. When null, uses if set, + /// otherwise defaults to compacting everything up to . + /// + public Task CompactLogAsync(CancellationToken ct, CompactionOptions? options = null) { _ = ct; - // Compact up to the applied index (which is the snapshot point) - if (AppliedIndex > 0) - Log.Compact(AppliedIndex); + + var effectiveOptions = options ?? CompactionOptions; + + long compactUpTo; + if (effectiveOptions != null) + { + compactUpTo = effectiveOptions.ComputeCompactionIndex(Log, AppliedIndex); + if (compactUpTo < 0) + return Task.CompletedTask; // policy says: nothing to do + } + else + { + // Default behavior: compact everything up to the applied index. + compactUpTo = AppliedIndex; + } + + if (compactUpTo > 0) + Log.Compact(compactUpTo); + return Task.CompletedTask; } diff --git a/tests/NATS.Server.Tests/Raft/RaftCompactionPolicyTests.cs b/tests/NATS.Server.Tests/Raft/RaftCompactionPolicyTests.cs new file mode 100644 index 0000000..6a97f9e --- /dev/null +++ b/tests/NATS.Server.Tests/Raft/RaftCompactionPolicyTests.cs @@ -0,0 +1,344 @@ +using NATS.Server.Raft; + +namespace NATS.Server.Tests.Raft; + +/// +/// Tests for configurable log compaction policies (Gap 8.5). +/// +/// Covers enum values, defaults +/// and threshold logic, and the integration between +/// and the policy engine. +/// +/// Go reference: raft.go compactLog / WAL compact threshold checks. +/// +public class RaftCompactionPolicyTests +{ + // --------------------------------------------------------------------------- + // Helpers + // --------------------------------------------------------------------------- + + /// + /// Creates a leader node with proposed entries all applied. + /// Each entry command is "cmd-N" so UTF-8 command size is predictable (~6 bytes each). + /// + private static async Task CreateLeaderWithEntriesAsync(int entryCount, + CompactionOptions? compactionOptions = null) + { + var node = new RaftNode("leader", compactionOptions: compactionOptions); + node.ConfigureCluster([node]); + node.StartElection(1); + + for (var i = 1; i <= entryCount; i++) + await node.ProposeAsync($"cmd-{i}", default); + + return node; + } + + /// + /// Creates a log entry at an explicit index with an optional custom timestamp. + /// Used for ByAge tests where we need to control entry timestamps. + /// + private static RaftLogEntry MakeEntry(long index, int term, string command, DateTime? timestamp = null) + { + var entry = new RaftLogEntry(index, term, command); + if (timestamp.HasValue) + entry = entry with { Timestamp = timestamp.Value }; + return entry; + } + + // --------------------------------------------------------------------------- + // CompactionOptions default values + // --------------------------------------------------------------------------- + + // Go reference: raft.go compactLog threshold defaults. + [Fact] + public void CompactionOptions_default_values() + { + var opts = new CompactionOptions(); + + opts.Policy.ShouldBe(CompactionPolicy.None); + opts.MaxEntries.ShouldBe(10_000); + opts.MaxSizeBytes.ShouldBe(100L * 1024 * 1024); + opts.MaxAge.ShouldBe(TimeSpan.FromHours(1)); + } + + // --------------------------------------------------------------------------- + // None policy — no-op + // --------------------------------------------------------------------------- + + // Go reference: raft.go compactLog — no compaction when policy is None. + [Fact] + public async Task None_policy_does_not_compact() + { + var opts = new CompactionOptions { Policy = CompactionPolicy.None }; + var node = await CreateLeaderWithEntriesAsync(50, opts); + + node.Log.Entries.Count.ShouldBe(50); + await node.CompactLogAsync(default); + + // With None policy, nothing should be removed. + node.Log.Entries.Count.ShouldBe(50); + } + + // --------------------------------------------------------------------------- + // ByCount policy + // --------------------------------------------------------------------------- + + // Go reference: raft.go compactLog — keep at most MaxEntries. + [Fact] + public async Task ByCount_compacts_when_log_exceeds_max_entries() + { + const int total = 100; + const int keep = 50; + + var opts = new CompactionOptions + { + Policy = CompactionPolicy.ByCount, + MaxEntries = keep, + }; + var node = await CreateLeaderWithEntriesAsync(total, opts); + + // Before compaction: all 100 entries are present. + node.Log.Entries.Count.ShouldBe(total); + + await node.CompactLogAsync(default); + + // After compaction: only the newest `keep` entries remain. + node.Log.Entries.Count.ShouldBe(keep); + + // The oldest entry's index should be total - keep + 1 + node.Log.Entries[0].Index.ShouldBe(total - keep + 1); + } + + // Go reference: raft.go compactLog — do not compact below applied index (safety guard). + [Fact] + public async Task ByCount_does_not_compact_below_applied_index() + { + // Build a node but manually set a lower applied index so the safety guard kicks in. + const int total = 100; + const int keep = 50; + + var opts = new CompactionOptions + { + Policy = CompactionPolicy.ByCount, + MaxEntries = keep, + }; + + var node = await CreateLeaderWithEntriesAsync(total, opts); + + // Artificially lower the applied index so the safety guard prevents full compaction. + // With applied = 30 the policy wants to compact to index 50 but the guard limits it to 30. + node.AppliedIndex = 30; + + await node.CompactLogAsync(default); + + // Only entries up to index 30 may be removed. + // BaseIndex should be 30, entries with index > 30 remain. + node.Log.BaseIndex.ShouldBe(30); + node.Log.Entries[0].Index.ShouldBe(31); + } + + // Go reference: raft.go compactLog — no-op when count is within threshold. + [Fact] + public async Task ByCount_does_not_compact_when_within_threshold() + { + const int total = 20; + const int maxEntries = 50; // more than total — nothing to compact + + var opts = new CompactionOptions + { + Policy = CompactionPolicy.ByCount, + MaxEntries = maxEntries, + }; + var node = await CreateLeaderWithEntriesAsync(total, opts); + + await node.CompactLogAsync(default); + + node.Log.Entries.Count.ShouldBe(total); + } + + // --------------------------------------------------------------------------- + // BySize policy + // --------------------------------------------------------------------------- + + // Go reference: raft.go compactLog — compact oldest entries until under size threshold. + [Fact] + public void BySize_compacts_when_total_size_exceeds_threshold() + { + // Construct a log manually so we control sizes precisely. + // Each entry: index(8) + term(4) + command(N bytes). + // Use a 100-byte command to make size maths easy. + var log = new RaftLog(); + var largeCommand = new string('x', 100); // 100 UTF-8 bytes + + // Append 10 entries — each ~112 bytes → total ~1,120 bytes. + for (var i = 1; i <= 10; i++) + log.Append(term: 1, command: largeCommand); + + // Set threshold at ~600 bytes — roughly half the total — so the oldest ~5 entries + // should be compacted. + var opts = new CompactionOptions + { + Policy = CompactionPolicy.BySize, + MaxSizeBytes = 600, + }; + + // appliedIndex = last entry index (all applied) + var appliedIndex = log.Entries[^1].Index; + var cutoff = opts.ComputeCompactionIndex(log, appliedIndex); + + // Should return a positive index, meaning some entries must be compacted. + cutoff.ShouldBeGreaterThan(0); + cutoff.ShouldBeLessThanOrEqualTo(appliedIndex); + + log.Compact(cutoff); + + // After compaction fewer entries should remain. + log.Entries.Count.ShouldBeLessThan(10); + log.Entries.Count.ShouldBeGreaterThan(0); + } + + // Go reference: raft.go compactLog — no-op when total size is under threshold. + [Fact] + public void BySize_does_not_compact_when_under_threshold() + { + var log = new RaftLog(); + for (var i = 1; i <= 5; i++) + log.Append(term: 1, command: "tiny"); + + // Threshold large enough to fit everything. + var opts = new CompactionOptions + { + Policy = CompactionPolicy.BySize, + MaxSizeBytes = 10_000, + }; + + var cutoff = opts.ComputeCompactionIndex(log, log.Entries[^1].Index); + + cutoff.ShouldBe(-1); // no compaction + } + + // --------------------------------------------------------------------------- + // ByAge policy + // --------------------------------------------------------------------------- + + // Go reference: raft.go compactLog — compact entries older than MaxAge. + [Fact] + public void ByAge_compacts_old_entries() + { + // Build a log manually with some old and some fresh entries. + var log = new RaftLog(); + var old = DateTime.UtcNow - TimeSpan.FromHours(2); + var fresh = DateTime.UtcNow - TimeSpan.FromMinutes(1); + + // Entries 1–5: old (2 hours ago) + for (var i = 1; i <= 5; i++) + log.AppendWithTimestamp(term: 1, command: $"old-{i}", timestamp: old); + + // Entries 6–10: fresh (1 minute ago) + for (var i = 6; i <= 10; i++) + log.AppendWithTimestamp(term: 1, command: $"fresh-{i}", timestamp: fresh); + + var opts = new CompactionOptions + { + Policy = CompactionPolicy.ByAge, + MaxAge = TimeSpan.FromHours(1), // default; entries older than 1h are eligible + }; + + var appliedIndex = log.Entries[^1].Index; + var cutoff = opts.ComputeCompactionIndex(log, appliedIndex); + + // Should compact through index 5 (the last old entry). + cutoff.ShouldBe(5); + log.Compact(cutoff); + + log.Entries.Count.ShouldBe(5); // only the fresh entries remain + log.Entries[0].Index.ShouldBe(6); + } + + // Go reference: raft.go compactLog — no-op when all entries are still fresh. + [Fact] + public void ByAge_does_not_compact_fresh_entries() + { + var log = new RaftLog(); + var fresh = DateTime.UtcNow - TimeSpan.FromMinutes(5); + + for (var i = 1; i <= 10; i++) + log.AppendWithTimestamp(term: 1, command: $"cmd-{i}", timestamp: fresh); + + var opts = new CompactionOptions + { + Policy = CompactionPolicy.ByAge, + MaxAge = TimeSpan.FromHours(1), + }; + + var cutoff = opts.ComputeCompactionIndex(log, log.Entries[^1].Index); + + cutoff.ShouldBe(-1); // nothing old enough + } + + // --------------------------------------------------------------------------- + // CompactLogAsync integration tests + // --------------------------------------------------------------------------- + + // Go reference: raft.go CompactLog — safety guard: never exceed applied index. + [Fact] + public async Task CompactLogAsync_with_policy_respects_applied_index() + { + // Node with 50 entries. Policy wants to compact to index 40, but applied is only 25. + const int total = 50; + var opts = new CompactionOptions + { + Policy = CompactionPolicy.ByCount, + MaxEntries = 10, // wants to compact to index 40 + }; + + var node = await CreateLeaderWithEntriesAsync(total, opts); + + // Override applied index to a lower value than the policy cutoff. + node.AppliedIndex = 25; + + await node.CompactLogAsync(default); + + // Compaction must not go past applied index = 25. + node.Log.BaseIndex.ShouldBeLessThanOrEqualTo(25); + + // All entries above index 25 must still be present. + node.Log.Entries.ShouldAllBe(e => e.Index > 25); + } + + // Go reference: raft.go CompactLog — when no policy is set, compacts to applied index. + [Fact] + public async Task CompactLogAsync_without_policy_compacts_to_applied_index() + { + var node = await CreateLeaderWithEntriesAsync(30); + // No CompactionOptions set. + + await node.CompactLogAsync(default); + + // All entries compacted to the applied index. + node.Log.Entries.Count.ShouldBe(0); + node.Log.BaseIndex.ShouldBe(node.AppliedIndex); + } + + // Go reference: raft.go CompactLog — per-call options override node-level options. + [Fact] + public async Task CompactLogAsync_call_level_options_override_node_level_options() + { + // Node has None policy (no auto-compaction). + var nodeOpts = new CompactionOptions { Policy = CompactionPolicy.None }; + var node = await CreateLeaderWithEntriesAsync(50, nodeOpts); + + // But the call-level override uses ByCount with MaxEntries = 25. + var callOpts = new CompactionOptions + { + Policy = CompactionPolicy.ByCount, + MaxEntries = 25, + }; + + await node.CompactLogAsync(default, callOpts); + + // Call-level option must win: 25 entries kept. + node.Log.Entries.Count.ShouldBe(25); + } +} diff --git a/tests/NATS.Server.Tests/Raft/RaftHealthTests.cs b/tests/NATS.Server.Tests/Raft/RaftHealthTests.cs index d5db6ef..0d5f3c7 100644 --- a/tests/NATS.Server.Tests/Raft/RaftHealthTests.cs +++ b/tests/NATS.Server.Tests/Raft/RaftHealthTests.cs @@ -301,15 +301,15 @@ public class RaftHealthTests var (nodes, _) = CreateCluster(3); var leader = ElectLeader(nodes); - // Set peer contacts in the past - foreach (var (_, state) in leader.GetPeerStates()) - state.LastContact = DateTime.UtcNow.AddMinutes(-5); + // Record timestamps just before proposing (peers are fresh from ConfigureCluster). + var beforePropose = DateTime.UtcNow; await leader.ProposeAsync("cmd-1", default); - // Successful replication should update LastContact + // Successful replication should update LastContact to at least the time we + // recorded before the propose call. foreach (var (_, state) in leader.GetPeerStates()) - state.LastContact.ShouldBeGreaterThan(DateTime.UtcNow.AddSeconds(-2)); + state.LastContact.ShouldBeGreaterThanOrEqualTo(beforePropose); } [Fact] diff --git a/tests/NATS.Server.Tests/Raft/RaftQuorumCheckTests.cs b/tests/NATS.Server.Tests/Raft/RaftQuorumCheckTests.cs new file mode 100644 index 0000000..f2c3e7b --- /dev/null +++ b/tests/NATS.Server.Tests/Raft/RaftQuorumCheckTests.cs @@ -0,0 +1,282 @@ +using NATS.Server.Raft; + +namespace NATS.Server.Tests.Raft; + +/// +/// Tests for HasQuorum() and the quorum guard in ProposeAsync (Gap 8.6). +/// +/// A leader must confirm that a majority of peers have contacted it recently +/// (within 2 × ElectionTimeoutMaxMs) before it is allowed to append new log entries. +/// This prevents a partitioned leader from diverging the log while isolated from +/// the rest of the cluster. +/// +/// Go reference: raft.go checkQuorum / stepDown — a leader steps down (and therefore +/// blocks proposals) when it has not heard from a quorum of peers within the +/// election-timeout window. +/// +public class RaftQuorumCheckTests +{ + // -- Helpers (self-contained, no shared TestHelpers class) -- + + private static (RaftNode[] nodes, InMemoryRaftTransport transport) CreateCluster(int size) + { + var transport = new InMemoryRaftTransport(); + var nodes = Enumerable.Range(1, size) + .Select(i => new RaftNode($"n{i}", transport)) + .ToArray(); + foreach (var node in nodes) + { + transport.Register(node); + node.ConfigureCluster(nodes); + // Short timeouts so tests do not need real async delays. + node.ElectionTimeoutMinMs = 50; + node.ElectionTimeoutMaxMs = 100; + } + return (nodes, transport); + } + + private static RaftNode ElectLeader(RaftNode[] nodes) + { + var candidate = nodes[0]; + candidate.StartElection(nodes.Length); + foreach (var voter in nodes.Skip(1)) + candidate.ReceiveVote(voter.GrantVote(candidate.Term, candidate.Id), nodes.Length); + return candidate; + } + + // -- HasQuorum tests -- + + // Go reference: raft.go checkQuorum (leader confirms majority contact before acting) + [Fact] + public void HasQuorum_returns_true_with_majority_peers_current() + { + // 3-node cluster: leader + 2 peers. Both peers are freshly initialized by + // ConfigureCluster so their LastContact is very close to UtcNow. + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + + // Peers were initialized with DateTime.UtcNow — they are within the quorum window. + leader.HasQuorum().ShouldBeTrue(); + } + + // Go reference: raft.go checkQuorum (leader steps down when peers are stale) + [Fact] + public void HasQuorum_returns_false_with_stale_peers() + { + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + + // Set all peer contacts well beyond the quorum window (2 × 100 ms = 200 ms). + foreach (var (_, state) in leader.GetPeerStates()) + state.LastContact = DateTime.UtcNow.AddMinutes(-5); + + leader.HasQuorum().ShouldBeFalse(); + } + + // Go reference: raft.go — followers never have proposer quorum + [Fact] + public void HasQuorum_returns_false_for_non_leader() + { + var (nodes, _) = CreateCluster(3); + _ = ElectLeader(nodes); + + // nodes[1] is a follower. + var follower = nodes[1]; + follower.IsLeader.ShouldBeFalse(); + + follower.HasQuorum().ShouldBeFalse(); + } + + // Go reference: raft.go — candidate also does not have proposer quorum + [Fact] + public void HasQuorum_returns_false_for_candidate() + { + // A node becomes a Candidate when StartElection is called but it has not yet + // received enough votes to become Leader. In a 3-node cluster, after calling + // StartElection on n1 the node is a Candidate (it voted for itself but the + // other 2 nodes have not yet responded). + var (nodes, _) = CreateCluster(3); + var candidate = nodes[0]; + + // StartElection increments term, sets VotedFor=self, and calls TryBecomeLeader. + // With only 1 self-vote in a 3-node cluster quorum is 2, so role stays Candidate. + candidate.StartElection(clusterSize: 3); + + candidate.Role.ShouldBe(RaftRole.Candidate); + candidate.HasQuorum().ShouldBeFalse(); + } + + // Go reference: raft.go single-node cluster — self is always a majority of one + [Fact] + public void HasQuorum_single_node_always_true() + { + var node = new RaftNode("solo"); + node.StartElection(clusterSize: 1); + + node.IsLeader.ShouldBeTrue(); + node.HasQuorum().ShouldBeTrue(); + } + + // 5-node cluster: with 2 current peers + self = 3, majority of 5 is 3, so quorum. + [Fact] + public void HasQuorum_five_node_with_two_current_peers_is_true() + { + var (nodes, _) = CreateCluster(5); + var leader = ElectLeader(nodes); + + // Make 2 peers stale; keep 2 fresh (plus self = 3 voters, majority of 5 = 3). + var peerStates = leader.GetPeerStates().Values.ToList(); + peerStates[0].LastContact = DateTime.UtcNow.AddMinutes(-5); + peerStates[1].LastContact = DateTime.UtcNow.AddMinutes(-5); + // peerStates[2] and peerStates[3] remain fresh (within window). + + leader.HasQuorum().ShouldBeTrue(); + } + + // 5-node cluster: with only 1 current peer + self = 2, majority of 5 is 3, so no quorum. + [Fact] + public void HasQuorum_five_node_with_one_current_peer_is_false() + { + var (nodes, _) = CreateCluster(5); + var leader = ElectLeader(nodes); + + // Make 3 out of 4 peers stale; only 1 fresh peer + self = 2 voters (need 3). + var peerStates = leader.GetPeerStates().Values.ToList(); + peerStates[0].LastContact = DateTime.UtcNow.AddMinutes(-5); + peerStates[1].LastContact = DateTime.UtcNow.AddMinutes(-5); + peerStates[2].LastContact = DateTime.UtcNow.AddMinutes(-5); + // peerStates[3] is fresh. + + leader.HasQuorum().ShouldBeFalse(); + } + + // -- ProposeAsync quorum guard tests -- + + // Go reference: raft.go checkQuorum — leader rejects proposals when quorum lost + [Fact] + public async Task ProposeAsync_throws_when_no_quorum() + { + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + + // Make all peers stale to break quorum. + foreach (var (_, state) in leader.GetPeerStates()) + state.LastContact = DateTime.UtcNow.AddMinutes(-5); + + var ex = await Should.ThrowAsync( + () => leader.ProposeAsync("cmd", CancellationToken.None).AsTask()); + + ex.Message.ShouldContain("no quorum"); + } + + // Go reference: raft.go normal proposal path when quorum is confirmed + [Fact] + public async Task ProposeAsync_succeeds_with_quorum() + { + // Peers are initialized with fresh LastContact by ConfigureCluster, so quorum holds. + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + + var index = await leader.ProposeAsync("cmd-ok", CancellationToken.None); + + index.ShouldBeGreaterThan(0); + leader.AppliedIndex.ShouldBe(index); + } + + // After a heartbeat round, peers are fresh and quorum is restored. + [Fact] + public async Task ProposeAsync_succeeds_after_heartbeat_restores_quorum() + { + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + + // Make all peers stale. + foreach (var (_, state) in leader.GetPeerStates()) + state.LastContact = DateTime.UtcNow.AddMinutes(-5); + + // Proposal should fail with no quorum. + await Should.ThrowAsync( + () => leader.ProposeAsync("should-fail", CancellationToken.None).AsTask()); + + // Simulate heartbeat responses updating LastContact on the leader. + foreach (var peer in nodes.Skip(1)) + leader.GetPeerStates()[peer.Id].LastContact = DateTime.UtcNow; + + // Quorum is restored; proposal should now succeed. + var index = await leader.ProposeAsync("after-heartbeat", CancellationToken.None); + index.ShouldBeGreaterThan(0); + } + + // -- Heartbeat updates LastContact -- + + // Go reference: raft.go processHeartbeat — updates peer last-contact on a valid heartbeat + [Fact] + public void Heartbeat_updates_last_contact() + { + var (nodes, _) = CreateCluster(3); + var node = nodes[0]; + + var peerStates = node.GetPeerStates(); + var oldTime = DateTime.UtcNow.AddMinutes(-5); + peerStates["n2"].LastContact = oldTime; + + node.ReceiveHeartbeat(term: 1, fromPeerId: "n2"); + + peerStates["n2"].LastContact.ShouldBeGreaterThan(oldTime); + } + + // Heartbeats from the leader to the cluster restore the leader's quorum tracking. + [Fact] + public void Heartbeat_from_peer_restores_peer_freshness_for_quorum() + { + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + + // Simulate network partition: mark all peers stale. + foreach (var (_, state) in leader.GetPeerStates()) + state.LastContact = DateTime.UtcNow.AddMinutes(-5); + + leader.HasQuorum().ShouldBeFalse(); + + // Leader receives heartbeat ACK from n2 (simulating that n2 is still reachable). + // In a real RAFT loop the leader sends AppendEntries and processes the response; + // here we simulate the response side by directly updating LastContact via ReceiveHeartbeat. + // Note: ReceiveHeartbeat is called on a follower when it receives from the leader, not + // on the leader itself. We instead update LastContact directly to simulate the leader + // processing an AppendEntries response. + leader.GetPeerStates()["n2"].LastContact = DateTime.UtcNow; + + // 1 current peer + self = 2 voters; majority of 3 = 2, so quorum is restored. + leader.HasQuorum().ShouldBeTrue(); + } + + // -- Quorum window boundary tests -- + + [Fact] + public void HasQuorum_peer_just_within_window_counts_as_current() + { + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + + // ElectionTimeoutMaxMs = 100; window = 2 × 100 = 200 ms. + // Set LastContact to 150 ms ago — just inside the 200 ms window. + foreach (var (_, state) in leader.GetPeerStates()) + state.LastContact = DateTime.UtcNow.AddMilliseconds(-150); + + leader.HasQuorum().ShouldBeTrue(); + } + + [Fact] + public void HasQuorum_peer_just_outside_window_is_stale() + { + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + + // ElectionTimeoutMaxMs = 100; window = 2 × 100 = 200 ms. + // Set LastContact to 500 ms ago — comfortably outside the 200 ms window. + foreach (var (_, state) in leader.GetPeerStates()) + state.LastContact = DateTime.UtcNow.AddMilliseconds(-500); + + leader.HasQuorum().ShouldBeFalse(); + } +}