diff --git a/src/NATS.Server/Raft/CommitQueue.cs b/src/NATS.Server/Raft/CommitQueue.cs new file mode 100644 index 0000000..5b5f440 --- /dev/null +++ b/src/NATS.Server/Raft/CommitQueue.cs @@ -0,0 +1,43 @@ +using System.Threading.Channels; + +namespace NATS.Server.Raft; + +/// +/// Channel-based queue for committed log entries awaiting state machine application. +/// Go reference: raft.go:150-160 (applied/processed fields), raft.go:2100-2150 (ApplyQ). +/// +public sealed class CommitQueue +{ + private readonly Channel _channel = Channel.CreateUnbounded( + new UnboundedChannelOptions { SingleReader = false, SingleWriter = false }); + + /// + /// Approximate number of items waiting to be dequeued. + /// + public int Count => _channel.Reader.Count; + + /// + /// Enqueues an item for state machine application. + /// + public ValueTask EnqueueAsync(T item, CancellationToken ct = default) + => _channel.Writer.WriteAsync(item, ct); + + /// + /// Dequeues the next committed entry, waiting if none are available. + /// + public ValueTask DequeueAsync(CancellationToken ct = default) + => _channel.Reader.ReadAsync(ct); + + /// + /// Attempts a non-blocking dequeue. Returns true if an item was available. + /// + public bool TryDequeue(out T? item) + => _channel.Reader.TryRead(out item); + + /// + /// Marks the channel as complete so no more items can be enqueued. + /// Readers will drain remaining items and then receive completion. + /// + public void Complete() + => _channel.Writer.Complete(); +} diff --git a/src/NATS.Server/Raft/RaftNode.cs b/src/NATS.Server/Raft/RaftNode.cs index 0bd9c0f..8412685 100644 --- a/src/NATS.Server/Raft/RaftNode.cs +++ b/src/NATS.Server/Raft/RaftNode.cs @@ -1,6 +1,6 @@ namespace NATS.Server.Raft; -public sealed class RaftNode +public sealed class RaftNode : IDisposable { private int _votesReceived; private readonly List _cluster = []; @@ -10,6 +10,15 @@ public sealed class RaftNode private readonly string? _persistDirectory; private readonly HashSet _members = new(StringComparer.Ordinal); + // B2: Election timer fields + // Go reference: raft.go:1400-1450 (resetElectionTimeout), raft.go:1500-1550 (campaign logic) + private Timer? _electionTimer; + private CancellationTokenSource? _electionTimerCts; + + // B3: Peer state tracking + // Go reference: raft.go peer tracking (nextIndex, matchIndex, last contact) + private readonly Dictionary _peerStates = new(StringComparer.Ordinal); + public string Id { get; } public int Term => TermState.CurrentTerm; public bool IsLeader => Role == RaftRole.Leader; @@ -19,6 +28,16 @@ public sealed class RaftNode public long AppliedIndex { get; set; } public RaftLog Log { get; private set; } = new(); + // B1: Commit tracking + // Go reference: raft.go:150-160 (applied/processed fields), raft.go:2100-2150 (ApplyQ) + public long CommitIndex { get; private set; } + public long ProcessedIndex { get; private set; } + public CommitQueue CommitQueue { get; } = new(); + + // B2: Election timeout configuration (milliseconds) + public int ElectionTimeoutMinMs { get; set; } = 150; + public int ElectionTimeoutMaxMs { get; set; } = 300; + public RaftNode(string id, IRaftTransport? transport = null, string? persistDirectory = null) { Id = id; @@ -32,8 +51,16 @@ public sealed class RaftNode _cluster.Clear(); _cluster.AddRange(peers); _members.Clear(); + _peerStates.Clear(); foreach (var peer in peers) + { _members.Add(peer.Id); + // B3: Initialize peer state for all peers except self + if (!string.Equals(peer.Id, Id, StringComparison.Ordinal)) + { + _peerStates[peer.Id] = new RaftPeerState { PeerId = peer.Id }; + } + } } public void AddMember(string memberId) => _members.Add(memberId); @@ -70,13 +97,22 @@ public sealed class RaftNode return new VoteResponse { Granted = true }; } - public void ReceiveHeartbeat(int term) + public void ReceiveHeartbeat(int term, string? fromPeerId = null) { if (term < TermState.CurrentTerm) return; TermState.CurrentTerm = term; Role = RaftRole.Follower; + + // B2: Reset election timer on valid heartbeat + ResetElectionTimeout(); + + // B3: Update peer contact time + if (fromPeerId != null && _peerStates.TryGetValue(fromPeerId, out var peerState)) + { + peerState.LastContact = DateTime.UtcNow; + } } public void ReceiveVote(VoteResponse response, int clusterSize = 3) @@ -105,6 +141,21 @@ public sealed class RaftNode foreach (var node in _cluster) node.AppliedIndex = Math.Max(node.AppliedIndex, entry.Index); + // B1: Update commit index and enqueue for state machine application + CommitIndex = entry.Index; + await CommitQueue.EnqueueAsync(entry, ct); + + // B3: Update peer match/next indices for successful replications + foreach (var result in results.Where(r => r.Success)) + { + if (_peerStates.TryGetValue(result.FollowerId, out var peerState)) + { + peerState.MatchIndex = Math.Max(peerState.MatchIndex, entry.Index); + peerState.NextIndex = entry.Index + 1; + peerState.LastContact = DateTime.UtcNow; + } + } + foreach (var node in _cluster.Where(n => n._persistDirectory != null)) await node.PersistAsync(ct); } @@ -115,6 +166,16 @@ public sealed class RaftNode return entry.Index; } + /// + /// Marks the given index as processed by the state machine. + /// Go reference: raft.go applied/processed tracking. + /// + public void MarkProcessed(long index) + { + if (index > ProcessedIndex) + ProcessedIndex = index; + } + public void ReceiveReplicatedEntry(RaftLogEntry entry) { Log.AppendReplicated(entry); @@ -126,6 +187,9 @@ public sealed class RaftNode if (entry.Term < TermState.CurrentTerm) throw new InvalidOperationException("stale term append rejected"); + // B2: Reset election timer when receiving append from leader + ResetElectionTimeout(); + ReceiveReplicatedEntry(entry); return Task.CompletedTask; } @@ -155,6 +219,110 @@ public sealed class RaftNode TermState.VotedFor = null; } + // B2: Election timer management + // Go reference: raft.go:1400-1450 (resetElectionTimeout) + + /// + /// Resets the election timeout timer with a new randomized interval. + /// Called on heartbeat receipt and append entries from leader. + /// + public void ResetElectionTimeout() + { + var timeout = Random.Shared.Next(ElectionTimeoutMinMs, ElectionTimeoutMaxMs + 1); + _electionTimer?.Change(timeout, Timeout.Infinite); + } + + /// + /// Starts the background election timer. When it fires and this node is a Follower, + /// an election campaign is triggered automatically. + /// Go reference: raft.go:1500-1550 (campaign logic). + /// + public void StartElectionTimer(CancellationToken ct = default) + { + _electionTimerCts = CancellationTokenSource.CreateLinkedTokenSource(ct); + var timeout = Random.Shared.Next(ElectionTimeoutMinMs, ElectionTimeoutMaxMs + 1); + _electionTimer = new Timer(ElectionTimerCallback, null, timeout, Timeout.Infinite); + } + + /// + /// Stops and disposes the election timer. + /// + public void StopElectionTimer() + { + _electionTimer?.Dispose(); + _electionTimer = null; + _electionTimerCts?.Cancel(); + _electionTimerCts?.Dispose(); + _electionTimerCts = null; + } + + /// + /// Bypasses the election timer and immediately starts an election campaign. + /// Useful for testing. + /// + public void CampaignImmediately() + { + var clusterSize = _cluster.Count > 0 ? _cluster.Count : _members.Count; + StartElection(clusterSize); + } + + private void ElectionTimerCallback(object? state) + { + if (_electionTimerCts?.IsCancellationRequested == true) + return; + + if (Role == RaftRole.Follower) + { + var clusterSize = _cluster.Count > 0 ? _cluster.Count : _members.Count; + StartElection(clusterSize); + } + else + { + // Re-arm the timer for non-follower states so it can fire again + // if the node transitions back to follower. + ResetElectionTimeout(); + } + } + + // B3: Peer state accessors + + /// + /// Returns a read-only view of all tracked peer states. + /// + public IReadOnlyDictionary GetPeerStates() + => _peerStates; + + /// + /// Checks if this node's log is current (within one election timeout of the leader). + /// Go reference: raft.go isCurrent check. + /// + public bool IsCurrent(TimeSpan electionTimeout) + { + // A leader is always current + if (Role == RaftRole.Leader) + return true; + + // Check if any peer (which could be the leader) has contacted us recently + return _peerStates.Values.Any(p => p.IsCurrent(electionTimeout)); + } + + /// + /// Overall health check: node is active and peers are responsive. + /// + public bool IsHealthy(TimeSpan healthThreshold) + { + if (Role == RaftRole.Leader) + { + // Leader is healthy if a majority of peers are responsive + var healthyPeers = _peerStates.Values.Count(p => p.IsHealthy(healthThreshold)); + var quorum = (_peerStates.Count + 1) / 2; // +1 for self + return healthyPeers >= quorum; + } + + // Follower/candidate: healthy if at least one peer (the leader) is responsive + return _peerStates.Values.Any(p => p.IsHealthy(healthThreshold)); + } + private void TryBecomeLeader(int clusterSize) { var quorum = (clusterSize / 2) + 1; @@ -186,4 +354,9 @@ public sealed class RaftNode else if (Log.Entries.Count > 0) AppliedIndex = Log.Entries[^1].Index; } + + public void Dispose() + { + StopElectionTimer(); + } } diff --git a/src/NATS.Server/Raft/RaftPeerState.cs b/src/NATS.Server/Raft/RaftPeerState.cs new file mode 100644 index 0000000..a0a32b8 --- /dev/null +++ b/src/NATS.Server/Raft/RaftPeerState.cs @@ -0,0 +1,46 @@ +namespace NATS.Server.Raft; + +/// +/// Tracks replication and health state for a single RAFT peer. +/// Go reference: raft.go peer tracking fields (nextIndex, matchIndex, last contact). +/// +public sealed class RaftPeerState +{ + /// + /// The peer's unique node identifier. + /// + public required string PeerId { get; init; } + + /// + /// The next log index to send to this peer (leader use only). + /// + public long NextIndex { get; set; } = 1; + + /// + /// The highest log index known to be replicated on this peer. + /// + public long MatchIndex { get; set; } + + /// + /// Timestamp of the last successful communication with this peer. + /// + public DateTime LastContact { get; set; } = DateTime.UtcNow; + + /// + /// Whether this peer is considered active in the cluster. + /// + public bool Active { get; set; } = true; + + /// + /// Returns true if this peer has been contacted within the election timeout window. + /// Go reference: raft.go isCurrent check. + /// + public bool IsCurrent(TimeSpan electionTimeout) + => DateTime.UtcNow - LastContact < electionTimeout; + + /// + /// Returns true if this peer is both active and has been contacted within the health threshold. + /// + public bool IsHealthy(TimeSpan healthThreshold) + => Active && DateTime.UtcNow - LastContact < healthThreshold; +} diff --git a/tests/NATS.Server.Tests/Raft/RaftApplyQueueTests.cs b/tests/NATS.Server.Tests/Raft/RaftApplyQueueTests.cs new file mode 100644 index 0000000..98d3bc0 --- /dev/null +++ b/tests/NATS.Server.Tests/Raft/RaftApplyQueueTests.cs @@ -0,0 +1,256 @@ +using NATS.Server.Raft; + +namespace NATS.Server.Tests.Raft; + +/// +/// Tests for CommitQueue and commit/processed index tracking in RaftNode. +/// Go reference: raft.go:150-160 (applied/processed fields), raft.go:2100-2150 (ApplyQ). +/// +public class RaftApplyQueueTests +{ + // -- Helpers -- + + private static (RaftNode[] nodes, InMemoryRaftTransport transport) CreateCluster(int size) + { + var transport = new InMemoryRaftTransport(); + var nodes = Enumerable.Range(1, size) + .Select(i => new RaftNode($"n{i}", transport)) + .ToArray(); + foreach (var node in nodes) + { + transport.Register(node); + node.ConfigureCluster(nodes); + } + return (nodes, transport); + } + + private static RaftNode ElectLeader(RaftNode[] nodes) + { + var candidate = nodes[0]; + candidate.StartElection(nodes.Length); + foreach (var voter in nodes.Skip(1)) + candidate.ReceiveVote(voter.GrantVote(candidate.Term, candidate.Id), nodes.Length); + return candidate; + } + + // -- CommitQueue unit tests -- + + [Fact] + public async Task Enqueue_and_dequeue_lifecycle() + { + var queue = new CommitQueue(); + + var entry = new RaftLogEntry(1, 1, "cmd-1"); + await queue.EnqueueAsync(entry); + queue.Count.ShouldBe(1); + + var dequeued = await queue.DequeueAsync(); + dequeued.ShouldBe(entry); + queue.Count.ShouldBe(0); + } + + [Fact] + public async Task Multiple_items_dequeue_in_fifo_order() + { + var queue = new CommitQueue(); + + var entry1 = new RaftLogEntry(1, 1, "cmd-1"); + var entry2 = new RaftLogEntry(2, 1, "cmd-2"); + var entry3 = new RaftLogEntry(3, 1, "cmd-3"); + + await queue.EnqueueAsync(entry1); + await queue.EnqueueAsync(entry2); + await queue.EnqueueAsync(entry3); + queue.Count.ShouldBe(3); + + (await queue.DequeueAsync()).ShouldBe(entry1); + (await queue.DequeueAsync()).ShouldBe(entry2); + (await queue.DequeueAsync()).ShouldBe(entry3); + queue.Count.ShouldBe(0); + } + + [Fact] + public void TryDequeue_returns_false_when_empty() + { + var queue = new CommitQueue(); + queue.TryDequeue(out var item).ShouldBeFalse(); + item.ShouldBeNull(); + } + + [Fact] + public async Task TryDequeue_returns_true_when_item_available() + { + var queue = new CommitQueue(); + var entry = new RaftLogEntry(1, 1, "cmd-1"); + await queue.EnqueueAsync(entry); + + queue.TryDequeue(out var item).ShouldBeTrue(); + item.ShouldBe(entry); + } + + [Fact] + public async Task Complete_prevents_further_enqueue() + { + var queue = new CommitQueue(); + await queue.EnqueueAsync(new RaftLogEntry(1, 1, "cmd-1")); + queue.Complete(); + + // After completion, writing should throw ChannelClosedException + await Should.ThrowAsync( + async () => await queue.EnqueueAsync(new RaftLogEntry(2, 1, "cmd-2"))); + } + + [Fact] + public async Task Complete_allows_draining_remaining_items() + { + var queue = new CommitQueue(); + var entry = new RaftLogEntry(1, 1, "cmd-1"); + await queue.EnqueueAsync(entry); + queue.Complete(); + + // Should still be able to read remaining items + var dequeued = await queue.DequeueAsync(); + dequeued.ShouldBe(entry); + } + + [Fact] + public void Count_reflects_current_queue_depth() + { + var queue = new CommitQueue(); + queue.Count.ShouldBe(0); + } + + // -- RaftNode CommitIndex tracking tests -- + + [Fact] + public async Task CommitIndex_advances_when_proposal_succeeds_quorum() + { + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + leader.CommitIndex.ShouldBe(0); + + var index1 = await leader.ProposeAsync("cmd-1", default); + leader.CommitIndex.ShouldBe(index1); + + var index2 = await leader.ProposeAsync("cmd-2", default); + leader.CommitIndex.ShouldBe(index2); + index2.ShouldBeGreaterThan(index1); + } + + [Fact] + public async Task CommitIndex_starts_at_zero() + { + var node = new RaftNode("n1"); + node.CommitIndex.ShouldBe(0); + await Task.CompletedTask; + } + + // -- RaftNode ProcessedIndex tracking tests -- + + [Fact] + public void ProcessedIndex_starts_at_zero() + { + var node = new RaftNode("n1"); + node.ProcessedIndex.ShouldBe(0); + } + + [Fact] + public void MarkProcessed_advances_ProcessedIndex() + { + var node = new RaftNode("n1"); + node.MarkProcessed(5); + node.ProcessedIndex.ShouldBe(5); + } + + [Fact] + public void MarkProcessed_does_not_go_backward() + { + var node = new RaftNode("n1"); + node.MarkProcessed(10); + node.MarkProcessed(5); + node.ProcessedIndex.ShouldBe(10); + } + + [Fact] + public async Task ProcessedIndex_tracks_separately_from_CommitIndex() + { + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + + var index1 = await leader.ProposeAsync("cmd-1", default); + var index2 = await leader.ProposeAsync("cmd-2", default); + + // CommitIndex should have advanced + leader.CommitIndex.ShouldBe(index2); + + // ProcessedIndex stays at 0 until explicitly marked + leader.ProcessedIndex.ShouldBe(0); + + // Simulate state machine processing one entry + leader.MarkProcessed(index1); + leader.ProcessedIndex.ShouldBe(index1); + + // CommitIndex is still ahead of ProcessedIndex + leader.CommitIndex.ShouldBeGreaterThan(leader.ProcessedIndex); + + // Process the second entry + leader.MarkProcessed(index2); + leader.ProcessedIndex.ShouldBe(index2); + leader.ProcessedIndex.ShouldBe(leader.CommitIndex); + } + + // -- CommitQueue integration with RaftNode -- + + [Fact] + public async Task CommitQueue_receives_entries_after_successful_quorum() + { + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + + var index1 = await leader.ProposeAsync("cmd-1", default); + var index2 = await leader.ProposeAsync("cmd-2", default); + + // CommitQueue should have 2 entries + leader.CommitQueue.Count.ShouldBe(2); + + // Dequeue and verify order + var entry1 = await leader.CommitQueue.DequeueAsync(); + entry1.Index.ShouldBe(index1); + entry1.Command.ShouldBe("cmd-1"); + + var entry2 = await leader.CommitQueue.DequeueAsync(); + entry2.Index.ShouldBe(index2); + entry2.Command.ShouldBe("cmd-2"); + } + + [Fact] + public async Task CommitQueue_entries_match_committed_log_entries() + { + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + + await leader.ProposeAsync("alpha", default); + await leader.ProposeAsync("beta", default); + await leader.ProposeAsync("gamma", default); + + // Drain the commit queue and verify entries match log + for (int i = 0; i < 3; i++) + { + var committed = await leader.CommitQueue.DequeueAsync(); + committed.ShouldBe(leader.Log.Entries[i]); + } + } + + [Fact] + public async Task Non_leader_proposal_throws_and_does_not_affect_commit_queue() + { + var node = new RaftNode("follower"); + node.CommitQueue.Count.ShouldBe(0); + + await Should.ThrowAsync( + async () => await node.ProposeAsync("cmd", default)); + + node.CommitQueue.Count.ShouldBe(0); + node.CommitIndex.ShouldBe(0); + } +} diff --git a/tests/NATS.Server.Tests/Raft/RaftElectionTimerTests.cs b/tests/NATS.Server.Tests/Raft/RaftElectionTimerTests.cs new file mode 100644 index 0000000..dc8a8d0 --- /dev/null +++ b/tests/NATS.Server.Tests/Raft/RaftElectionTimerTests.cs @@ -0,0 +1,263 @@ +using NATS.Server.Raft; + +namespace NATS.Server.Tests.Raft; + +/// +/// Tests for election timeout management and campaign triggering in RaftNode. +/// Go reference: raft.go:1400-1450 (resetElectionTimeout), raft.go:1500-1550 (campaign logic). +/// +public class RaftElectionTimerTests : IDisposable +{ + private readonly List _nodesToDispose = []; + + public void Dispose() + { + foreach (var node in _nodesToDispose) + node.Dispose(); + } + + private RaftNode CreateTrackedNode(string id) + { + var node = new RaftNode(id); + _nodesToDispose.Add(node); + return node; + } + + private RaftNode[] CreateTrackedCluster(int size) + { + var nodes = Enumerable.Range(1, size) + .Select(i => CreateTrackedNode($"n{i}")) + .ToArray(); + foreach (var node in nodes) + node.ConfigureCluster(nodes); + return nodes; + } + + [Fact] + public void ResetElectionTimeout_prevents_election_while_receiving_heartbeats() + { + // Node with very short timeout for testing + var nodes = CreateTrackedCluster(3); + var node = nodes[0]; + node.ElectionTimeoutMinMs = 50; + node.ElectionTimeoutMaxMs = 80; + + node.StartElectionTimer(); + + // Keep resetting to prevent election + for (int i = 0; i < 5; i++) + { + Thread.Sleep(30); + node.ResetElectionTimeout(); + } + + // Node should still be a follower since we kept resetting the timer + node.Role.ShouldBe(RaftRole.Follower); + node.StopElectionTimer(); + } + + [Fact] + public void CampaignImmediately_triggers_election_without_timer() + { + var nodes = CreateTrackedCluster(3); + var candidate = nodes[0]; + + candidate.Role.ShouldBe(RaftRole.Follower); + candidate.Term.ShouldBe(0); + + candidate.CampaignImmediately(); + + // Should have started an election + candidate.Role.ShouldBe(RaftRole.Candidate); + candidate.Term.ShouldBe(1); + candidate.TermState.VotedFor.ShouldBe(candidate.Id); + } + + [Fact] + public void CampaignImmediately_single_node_becomes_leader() + { + var node = CreateTrackedNode("solo"); + node.AddMember("solo"); + + node.CampaignImmediately(); + + node.IsLeader.ShouldBeTrue(); + node.Role.ShouldBe(RaftRole.Leader); + } + + [Fact] + public async Task Expired_timer_triggers_campaign_when_follower() + { + var nodes = CreateTrackedCluster(3); + var node = nodes[0]; + + // Use very short timeouts for testing + node.ElectionTimeoutMinMs = 30; + node.ElectionTimeoutMaxMs = 50; + node.Role.ShouldBe(RaftRole.Follower); + + node.StartElectionTimer(); + + // Wait long enough for the timer to fire + await Task.Delay(200); + + // The timer callback should have triggered an election + node.Role.ShouldBe(RaftRole.Candidate); + node.Term.ShouldBeGreaterThan(0); + node.TermState.VotedFor.ShouldBe(node.Id); + + node.StopElectionTimer(); + } + + [Fact] + public async Task Timer_does_not_trigger_campaign_when_leader() + { + var nodes = CreateTrackedCluster(3); + var node = nodes[0]; + + // Make this node the leader first + node.StartElection(nodes.Length); + foreach (var voter in nodes.Skip(1)) + node.ReceiveVote(voter.GrantVote(node.Term, node.Id), nodes.Length); + node.IsLeader.ShouldBeTrue(); + var termBefore = node.Term; + + // Use very short timeouts + node.ElectionTimeoutMinMs = 30; + node.ElectionTimeoutMaxMs = 50; + node.StartElectionTimer(); + + // Wait for timer to fire + await Task.Delay(200); + + // Should still be leader, no new election started + node.IsLeader.ShouldBeTrue(); + // Term may have incremented if re-election happened, but role stays leader + // The key assertion is the node didn't transition to Candidate + node.Role.ShouldBe(RaftRole.Leader); + + node.StopElectionTimer(); + } + + [Fact] + public async Task Timer_does_not_trigger_campaign_when_candidate() + { + var node = CreateTrackedNode("n1"); + node.AddMember("n1"); + node.AddMember("n2"); + node.AddMember("n3"); + + // Start an election manually (becomes Candidate but not Leader since no quorum) + node.StartElection(clusterSize: 3); + node.Role.ShouldBe(RaftRole.Candidate); + var termAfterElection = node.Term; + + // Use very short timeouts + node.ElectionTimeoutMinMs = 30; + node.ElectionTimeoutMaxMs = 50; + node.StartElectionTimer(); + + // Wait for timer to fire + await Task.Delay(200); + + // Timer should not trigger additional campaigns when already candidate + // (the callback only triggers for Follower state) + node.Role.ShouldNotBe(RaftRole.Follower); + + node.StopElectionTimer(); + } + + [Fact] + public void Election_timeout_range_is_configurable() + { + var node = CreateTrackedNode("n1"); + node.ElectionTimeoutMinMs.ShouldBe(150); + node.ElectionTimeoutMaxMs.ShouldBe(300); + + node.ElectionTimeoutMinMs = 500; + node.ElectionTimeoutMaxMs = 1000; + node.ElectionTimeoutMinMs.ShouldBe(500); + node.ElectionTimeoutMaxMs.ShouldBe(1000); + } + + [Fact] + public void StopElectionTimer_is_safe_when_no_timer_started() + { + var node = CreateTrackedNode("n1"); + // Should not throw + node.StopElectionTimer(); + } + + [Fact] + public void StopElectionTimer_can_be_called_multiple_times() + { + var node = CreateTrackedNode("n1"); + node.StartElectionTimer(); + node.StopElectionTimer(); + node.StopElectionTimer(); // Should not throw + } + + [Fact] + public void ReceiveHeartbeat_resets_election_timeout() + { + var nodes = CreateTrackedCluster(3); + var node = nodes[0]; + + node.ElectionTimeoutMinMs = 50; + node.ElectionTimeoutMaxMs = 80; + node.StartElectionTimer(); + + // Simulate heartbeats coming in regularly, preventing election + for (int i = 0; i < 8; i++) + { + Thread.Sleep(30); + node.ReceiveHeartbeat(term: 1); + } + + // Should still be follower since heartbeats kept resetting the timer + node.Role.ShouldBe(RaftRole.Follower); + node.StopElectionTimer(); + } + + [Fact] + public async Task Timer_fires_after_heartbeats_stop() + { + var nodes = CreateTrackedCluster(3); + var node = nodes[0]; + + node.ElectionTimeoutMinMs = 40; + node.ElectionTimeoutMaxMs = 60; + node.StartElectionTimer(); + + // Send a few heartbeats + for (int i = 0; i < 3; i++) + { + Thread.Sleep(20); + node.ReceiveHeartbeat(term: 1); + } + + node.Role.ShouldBe(RaftRole.Follower); + + // Stop sending heartbeats and wait for timer to fire + await Task.Delay(200); + + // Should have started an election + node.Role.ShouldBe(RaftRole.Candidate); + node.StopElectionTimer(); + } + + [Fact] + public void Dispose_stops_election_timer() + { + var node = new RaftNode("n1"); + node.ElectionTimeoutMinMs = 30; + node.ElectionTimeoutMaxMs = 50; + node.StartElectionTimer(); + + // Dispose should stop the timer cleanly + node.Dispose(); + + // Calling dispose again should be safe + node.Dispose(); + } +} diff --git a/tests/NATS.Server.Tests/Raft/RaftHealthTests.cs b/tests/NATS.Server.Tests/Raft/RaftHealthTests.cs new file mode 100644 index 0000000..d5db6ef --- /dev/null +++ b/tests/NATS.Server.Tests/Raft/RaftHealthTests.cs @@ -0,0 +1,342 @@ +using NATS.Server.Raft; + +namespace NATS.Server.Tests.Raft; + +/// +/// Tests for RaftPeerState health classification and peer tracking in RaftNode. +/// Go reference: raft.go peer tracking (nextIndex, matchIndex, last contact, isCurrent). +/// +public class RaftHealthTests +{ + // -- Helpers -- + + private static (RaftNode[] nodes, InMemoryRaftTransport transport) CreateCluster(int size) + { + var transport = new InMemoryRaftTransport(); + var nodes = Enumerable.Range(1, size) + .Select(i => new RaftNode($"n{i}", transport)) + .ToArray(); + foreach (var node in nodes) + { + transport.Register(node); + node.ConfigureCluster(nodes); + } + return (nodes, transport); + } + + private static RaftNode ElectLeader(RaftNode[] nodes) + { + var candidate = nodes[0]; + candidate.StartElection(nodes.Length); + foreach (var voter in nodes.Skip(1)) + candidate.ReceiveVote(voter.GrantVote(candidate.Term, candidate.Id), nodes.Length); + return candidate; + } + + // -- RaftPeerState unit tests -- + + [Fact] + public void PeerState_defaults_are_correct() + { + var peer = new RaftPeerState { PeerId = "n2" }; + peer.PeerId.ShouldBe("n2"); + peer.NextIndex.ShouldBe(1); + peer.MatchIndex.ShouldBe(0); + peer.Active.ShouldBeTrue(); + } + + [Fact] + public void IsCurrent_returns_true_when_within_timeout() + { + var peer = new RaftPeerState { PeerId = "n2" }; + peer.LastContact = DateTime.UtcNow; + + peer.IsCurrent(TimeSpan.FromSeconds(5)).ShouldBeTrue(); + } + + [Fact] + public void IsCurrent_returns_false_when_stale() + { + var peer = new RaftPeerState { PeerId = "n2" }; + peer.LastContact = DateTime.UtcNow.AddSeconds(-10); + + peer.IsCurrent(TimeSpan.FromSeconds(5)).ShouldBeFalse(); + } + + [Fact] + public void IsHealthy_returns_true_for_active_recent_peer() + { + var peer = new RaftPeerState { PeerId = "n2", Active = true }; + peer.LastContact = DateTime.UtcNow; + + peer.IsHealthy(TimeSpan.FromSeconds(5)).ShouldBeTrue(); + } + + [Fact] + public void IsHealthy_returns_false_for_inactive_peer() + { + var peer = new RaftPeerState { PeerId = "n2", Active = false }; + peer.LastContact = DateTime.UtcNow; + + peer.IsHealthy(TimeSpan.FromSeconds(5)).ShouldBeFalse(); + } + + [Fact] + public void IsHealthy_returns_false_for_stale_active_peer() + { + var peer = new RaftPeerState { PeerId = "n2", Active = true }; + peer.LastContact = DateTime.UtcNow.AddSeconds(-10); + + peer.IsHealthy(TimeSpan.FromSeconds(5)).ShouldBeFalse(); + } + + // -- Peer state initialization via ConfigureCluster -- + + [Fact] + public void ConfigureCluster_initializes_peer_states() + { + var (nodes, _) = CreateCluster(3); + var node = nodes[0]; + + var peerStates = node.GetPeerStates(); + peerStates.Count.ShouldBe(2); // 2 peers, not counting self + + peerStates.ContainsKey("n2").ShouldBeTrue(); + peerStates.ContainsKey("n3").ShouldBeTrue(); + peerStates.ContainsKey("n1").ShouldBeFalse(); // Self excluded + } + + [Fact] + public void ConfigureCluster_sets_initial_peer_state_values() + { + var (nodes, _) = CreateCluster(3); + var peerStates = nodes[0].GetPeerStates(); + + foreach (var (peerId, state) in peerStates) + { + state.NextIndex.ShouldBe(1); + state.MatchIndex.ShouldBe(0); + state.Active.ShouldBeTrue(); + } + } + + [Fact] + public void ConfigureCluster_five_node_has_four_peers() + { + var (nodes, _) = CreateCluster(5); + nodes[0].GetPeerStates().Count.ShouldBe(4); + } + + // -- LastContact updates on heartbeat -- + + [Fact] + public void LastContact_updates_on_heartbeat_from_known_peer() + { + var (nodes, _) = CreateCluster(3); + var node = nodes[0]; + + // Set contact time in the past + var peerStates = node.GetPeerStates(); + var oldTime = DateTime.UtcNow.AddMinutes(-5); + peerStates["n2"].LastContact = oldTime; + + // Receive heartbeat from n2 + node.ReceiveHeartbeat(term: 1, fromPeerId: "n2"); + + peerStates["n2"].LastContact.ShouldBeGreaterThan(oldTime); + } + + [Fact] + public void LastContact_not_updated_for_unknown_peer() + { + var (nodes, _) = CreateCluster(3); + var node = nodes[0]; + + // Heartbeat from unknown peer should not crash + node.ReceiveHeartbeat(term: 1, fromPeerId: "unknown-node"); + + // Existing peers should be unchanged + var peerStates = node.GetPeerStates(); + peerStates.ContainsKey("unknown-node").ShouldBeFalse(); + } + + [Fact] + public void LastContact_not_updated_when_fromPeerId_null() + { + var (nodes, _) = CreateCluster(3); + var node = nodes[0]; + + var oldContact = DateTime.UtcNow.AddMinutes(-5); + node.GetPeerStates()["n2"].LastContact = oldContact; + + // Heartbeat without peer ID + node.ReceiveHeartbeat(term: 1); + + // Should not update any peer contact times (no peer specified) + node.GetPeerStates()["n2"].LastContact.ShouldBe(oldContact); + } + + // -- IsCurrent on RaftNode -- + + [Fact] + public void Leader_is_always_current() + { + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + + leader.IsCurrent(TimeSpan.FromSeconds(1)).ShouldBeTrue(); + } + + [Fact] + public void Follower_is_current_when_peer_recently_contacted() + { + var (nodes, _) = CreateCluster(3); + var follower = nodes[1]; + + // Peer states are initialized with current time by ConfigureCluster + follower.IsCurrent(TimeSpan.FromSeconds(5)).ShouldBeTrue(); + } + + [Fact] + public void Follower_is_not_current_when_all_peers_stale() + { + var (nodes, _) = CreateCluster(3); + var follower = nodes[1]; + + // Make all peers stale + foreach (var (_, state) in follower.GetPeerStates()) + state.LastContact = DateTime.UtcNow.AddMinutes(-10); + + follower.IsCurrent(TimeSpan.FromSeconds(5)).ShouldBeFalse(); + } + + // -- IsHealthy on RaftNode -- + + [Fact] + public void Leader_is_healthy_when_majority_peers_responsive() + { + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + + // All peers recently contacted + leader.IsHealthy(TimeSpan.FromSeconds(5)).ShouldBeTrue(); + } + + [Fact] + public void Leader_is_unhealthy_when_majority_peers_unresponsive() + { + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + + // Make all peers stale + foreach (var (_, state) in leader.GetPeerStates()) + state.LastContact = DateTime.UtcNow.AddMinutes(-10); + + leader.IsHealthy(TimeSpan.FromSeconds(5)).ShouldBeFalse(); + } + + [Fact] + public void Follower_is_healthy_when_leader_peer_responsive() + { + var (nodes, _) = CreateCluster(3); + var follower = nodes[1]; + + // At least one peer (simulating leader) is recent + follower.IsHealthy(TimeSpan.FromSeconds(5)).ShouldBeTrue(); + } + + [Fact] + public void Follower_is_unhealthy_when_no_peers_responsive() + { + var (nodes, _) = CreateCluster(3); + var follower = nodes[1]; + + // Make all peers stale + foreach (var (_, state) in follower.GetPeerStates()) + state.LastContact = DateTime.UtcNow.AddMinutes(-10); + + follower.IsHealthy(TimeSpan.FromSeconds(5)).ShouldBeFalse(); + } + + // -- MatchIndex / NextIndex tracking during replication -- + + [Fact] + public async Task MatchIndex_and_NextIndex_update_during_replication() + { + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + + var index = await leader.ProposeAsync("cmd-1", default); + + var peerStates = leader.GetPeerStates(); + // Both followers should have updated match/next indices + foreach (var (_, state) in peerStates) + { + state.MatchIndex.ShouldBe(index); + state.NextIndex.ShouldBe(index + 1); + } + } + + [Fact] + public async Task MatchIndex_advances_monotonically_with_proposals() + { + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + + var index1 = await leader.ProposeAsync("cmd-1", default); + var index2 = await leader.ProposeAsync("cmd-2", default); + var index3 = await leader.ProposeAsync("cmd-3", default); + + var peerStates = leader.GetPeerStates(); + foreach (var (_, state) in peerStates) + { + state.MatchIndex.ShouldBe(index3); + state.NextIndex.ShouldBe(index3 + 1); + } + } + + [Fact] + public async Task LastContact_updates_on_successful_replication() + { + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + + // Set peer contacts in the past + foreach (var (_, state) in leader.GetPeerStates()) + state.LastContact = DateTime.UtcNow.AddMinutes(-5); + + await leader.ProposeAsync("cmd-1", default); + + // Successful replication should update LastContact + foreach (var (_, state) in leader.GetPeerStates()) + state.LastContact.ShouldBeGreaterThan(DateTime.UtcNow.AddSeconds(-2)); + } + + [Fact] + public void Peer_states_empty_before_cluster_configuration() + { + var node = new RaftNode("n1"); + node.GetPeerStates().Count.ShouldBe(0); + } + + [Fact] + public void ConfigureCluster_clears_previous_peer_states() + { + var (nodes, transport) = CreateCluster(3); + var node = nodes[0]; + node.GetPeerStates().Count.ShouldBe(2); + + // Reconfigure with 5 nodes + var moreNodes = Enumerable.Range(1, 5) + .Select(i => new RaftNode($"m{i}", transport)) + .ToArray(); + foreach (var n in moreNodes) + transport.Register(n); + node.ConfigureCluster(moreNodes); + + // Should now have 4 peers (5 nodes minus self) + // Note: the node's ID is "n1" but cluster members are "m1"-"m5" + // So all 5 are peers since none match "n1" + node.GetPeerStates().Count.ShouldBe(5); + } +}