diff --git a/tests/NATS.Server.Tests/Raft/RaftReadIndexTests.cs b/tests/NATS.Server.Tests/Raft/RaftReadIndexTests.cs new file mode 100644 index 0000000..d5adb9a --- /dev/null +++ b/tests/NATS.Server.Tests/Raft/RaftReadIndexTests.cs @@ -0,0 +1,366 @@ +using NATS.Server.Raft; + +namespace NATS.Server.Tests.Raft; + +/// +/// Tests for ReadIndexAsync() — linearizable reads via quorum confirmation (Gap 8.7). +/// +/// ReadIndex avoids appending a log entry for reads (no log growth) while still +/// ensuring linearizability: the leader confirms it holds a quorum heartbeat before +/// returning the CommitIndex. A partitioned leader that can no longer reach a majority +/// will time out rather than serve a potentially stale read. +/// +/// Go reference: raft.go — read-index optimisation (send AppendEntries with no payload +/// to verify quorum before responding to a linearizable client read). +/// +public class RaftReadIndexTests +{ + // -- Helpers (self-contained, no shared TestHelpers class) -- + + private static (RaftNode[] nodes, InMemoryRaftTransport transport) CreateCluster(int size) + { + var transport = new InMemoryRaftTransport(); + var nodes = Enumerable.Range(1, size) + .Select(i => new RaftNode($"n{i}", transport)) + .ToArray(); + foreach (var node in nodes) + { + transport.Register(node); + node.ConfigureCluster(nodes); + // Short timeouts so tests finish quickly without real async delays. + node.ElectionTimeoutMinMs = 10; + node.ElectionTimeoutMaxMs = 20; + } + return (nodes, transport); + } + + private static RaftNode ElectLeader(RaftNode[] nodes) + { + var candidate = nodes[0]; + candidate.StartElection(nodes.Length); + foreach (var voter in nodes.Skip(1)) + candidate.ReceiveVote(voter.GrantVote(candidate.Term, candidate.Id), nodes.Length); + return candidate; + } + + // -- Core ReadIndex tests -- + + // Go reference: raft.go read-index quorum confirmation — leader returns CommitIndex after quorum. + [Fact] + public async Task ReadIndexAsync_returns_commit_index_after_quorum() + { + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + + // Commit a proposal so CommitIndex > 0. + await leader.ProposeAsync("cmd1", CancellationToken.None); + leader.CommitIndex.ShouldBeGreaterThan(0); + + // ReadIndex should confirm quorum (all peers are registered and reachable) + // and return the current CommitIndex. + var readIndex = await leader.ReadIndexAsync(CancellationToken.None); + + readIndex.ShouldBe(leader.CommitIndex); + } + + // Go reference: raft.go — only the leader can answer linearizable reads. + [Fact] + public async Task ReadIndexAsync_fails_for_non_leader() + { + var (nodes, _) = CreateCluster(3); + _ = ElectLeader(nodes); + + var follower = nodes[1]; + follower.IsLeader.ShouldBeFalse(); + + var ex = await Should.ThrowAsync( + () => follower.ReadIndexAsync(CancellationToken.None).AsTask()); + + ex.Message.ShouldContain("Only the leader"); + } + + // Go reference: raft.go checkQuorum — partitioned leader times out on ReadIndex. + [Fact] + public async Task ReadIndexAsync_fails_without_quorum() + { + // Use a partitioned transport: the leader's peers are unreachable so heartbeat + // acks never arrive and HasQuorum() stays false. + var partitionedTransport = new PartitionedRaftTransport(); + var leader = new RaftNode("leader", partitionedTransport); + var peer1 = new RaftNode("n2", partitionedTransport); + var peer2 = new RaftNode("n3", partitionedTransport); + + partitionedTransport.Register(leader); + partitionedTransport.Register(peer1); + partitionedTransport.Register(peer2); + + leader.ConfigureCluster([leader, peer1, peer2]); + leader.ElectionTimeoutMinMs = 10; + leader.ElectionTimeoutMaxMs = 20; + + // Forcibly make the leader by direct role assignment via election. + leader.StartElection(3); + leader.ReceiveVote(new VoteResponse { Granted = true }, 3); + leader.IsLeader.ShouldBeTrue(); + + // Mark all peers' LastContact as stale so HasQuorum() returns false + // after a heartbeat round that produces no acks. + foreach (var (_, state) in leader.GetPeerStates()) + state.LastContact = DateTime.UtcNow.AddMinutes(-5); + + // With the partitioned transport, no heartbeat acks arrive. The poll loop will + // exhaust ElectionTimeoutMaxMs (20 ms) and then throw TimeoutException. + var ex = await Should.ThrowAsync( + () => leader.ReadIndexAsync(CancellationToken.None).AsTask()); + + ex.Message.ShouldContain("quorum could not be confirmed"); + } + + // Go reference: raft.go — single-node cluster is always quorum; ReadIndex is synchronous. + [Fact] + public async Task ReadIndexAsync_single_node_returns_immediately() + { + // Single-node cluster has no peers; self is always majority. + var transport = new InMemoryRaftTransport(); + var node = new RaftNode("solo", transport); + node.ConfigureCluster([node]); + transport.Register(node); + node.ElectionTimeoutMinMs = 10; + node.ElectionTimeoutMaxMs = 20; + + node.StartElection(1); // single-node quorum — becomes leader. + node.IsLeader.ShouldBeTrue(); + + // ReadIndex on a single-node cluster must return immediately (no heartbeat round). + var readIndex = await node.ReadIndexAsync(CancellationToken.None); + + // CommitIndex is 0 before any proposals; ReadIndex returns 0. + readIndex.ShouldBe(node.CommitIndex); + } + + // Go reference: raft.go — ReadIndex reflects the latest committed entries. + [Fact] + public async Task ReadIndexAsync_reflects_latest_commit() + { + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + + // Submit several proposals to advance CommitIndex. + await leader.ProposeAsync("cmd1", CancellationToken.None); + await leader.ProposeAsync("cmd2", CancellationToken.None); + await leader.ProposeAsync("cmd3", CancellationToken.None); + + var expectedCommitIndex = leader.CommitIndex; + expectedCommitIndex.ShouldBe(3); + + // ReadIndex should return the current CommitIndex after quorum confirmation. + var readIndex = await leader.ReadIndexAsync(CancellationToken.None); + + readIndex.ShouldBe(expectedCommitIndex); + } + + // Go reference: raft.go read-index does not append a log entry — no log growth. + [Fact] + public async Task ReadIndexAsync_does_not_grow_log() + { + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + + // Commit one proposal to give the log a known size. + await leader.ProposeAsync("cmd1", CancellationToken.None); + var logSizeBefore = leader.Log.Entries.Count; + + // ReadIndex must NOT append any log entry. + await leader.ReadIndexAsync(CancellationToken.None); + + var logSizeAfter = leader.Log.Entries.Count; + logSizeAfter.ShouldBe(logSizeBefore); + } + + // Candidate node cannot serve reads — it has not confirmed quorum. + // Go reference: raft.go — only leader role can answer reads. + [Fact] + public async Task ReadIndexAsync_fails_for_candidate() + { + var (nodes, _) = CreateCluster(3); + + // Force n1 into Candidate role without winning the election. + var candidate = nodes[0]; + candidate.StartElection(3); // self-vote only; role = Candidate (needs 2 of 3). + candidate.Role.ShouldBe(RaftRole.Candidate); + + var ex = await Should.ThrowAsync( + () => candidate.ReadIndexAsync(CancellationToken.None).AsTask()); + + ex.Message.ShouldContain("Only the leader"); + } + + // Follower cannot serve reads — only the leader has an authoritative CommitIndex. + // Go reference: raft.go — only leader can answer linearizable reads. + [Fact] + public async Task ReadIndexAsync_fails_for_follower() + { + var (nodes, _) = CreateCluster(3); + _ = ElectLeader(nodes); + + var follower = nodes[2]; + follower.Role.ShouldBe(RaftRole.Follower); + + var ex = await Should.ThrowAsync( + () => follower.ReadIndexAsync(CancellationToken.None).AsTask()); + + ex.Message.ShouldContain("Only the leader"); + } + + // ReadIndex at CommitIndex=0 (before any proposals) is still valid — the leader + // confirms it is the current leader even before any entries are committed. + // Go reference: raft.go — read-index at term start (empty log). + [Fact] + public async Task ReadIndexAsync_at_zero_commit_index_returns_zero() + { + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + + // No proposals yet — CommitIndex = 0. + leader.CommitIndex.ShouldBe(0); + + var readIndex = await leader.ReadIndexAsync(CancellationToken.None); + + readIndex.ShouldBe(0); + } + + // Multiple concurrent ReadIndex calls all complete correctly. + // Go reference: raft.go — concurrent reads are safe because ReadIndex is idempotent. + [Fact] + public async Task ReadIndexAsync_multiple_calls_all_return_commit_index() + { + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + + await leader.ProposeAsync("cmd", CancellationToken.None); + var expected = leader.CommitIndex; + + // Issue three parallel ReadIndex calls. + var tasks = Enumerable.Range(0, 3) + .Select(_ => leader.ReadIndexAsync(CancellationToken.None).AsTask()) + .ToArray(); + + var results = await Task.WhenAll(tasks); + + foreach (var r in results) + r.ShouldBe(expected); + } + + // After ReadIndex returns, the caller must only serve reads once AppliedIndex + // reaches the returned value. Verify this contract is held by ProposeAsync + // (which advances AppliedIndex to CommitIndex after quorum). + // Go reference: raft.go — caller waits for appliedIndex >= readIndex before responding. + [Fact] + public async Task ReadIndexAsync_caller_can_serve_read_when_applied_reaches_index() + { + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + + await leader.ProposeAsync("data1", CancellationToken.None); + + var readIndex = await leader.ReadIndexAsync(CancellationToken.None); + + // After ProposeAsync, AppliedIndex == CommitIndex == readIndex. + // The caller is safe to serve the read. + leader.AppliedIndex.ShouldBeGreaterThanOrEqualTo(readIndex); + } + + // ReadIndex in a 5-node cluster with a majority of reachable peers. + // Go reference: raft.go quorum calculation — majority of N+1 nodes required. + [Fact] + public async Task ReadIndexAsync_five_node_cluster_with_majority_quorum() + { + var (nodes, _) = CreateCluster(5); + var leader = ElectLeader(nodes); + + await leader.ProposeAsync("five-node-cmd", CancellationToken.None); + + var readIndex = await leader.ReadIndexAsync(CancellationToken.None); + + readIndex.ShouldBe(leader.CommitIndex); + } + + // Heartbeat acks from quorum peers update LastContact, making HasQuorum() true. + // Go reference: raft.go — heartbeat ACKs refresh peer freshness for quorum check. + [Fact] + public async Task ReadIndexAsync_heartbeat_updates_peer_last_contact() + { + var (nodes, _) = CreateCluster(3); + var leader = ElectLeader(nodes); + + // Stale peers — quorum would fail without the heartbeat round. + foreach (var (_, state) in leader.GetPeerStates()) + state.LastContact = DateTime.UtcNow.AddMinutes(-5); + + // ReadIndexAsync sends heartbeats which update LastContact for reachable peers. + // InMemoryRaftTransport delivers acks synchronously so quorum is restored + // immediately after the heartbeat call. + var readIndex = await leader.ReadIndexAsync(CancellationToken.None); + + readIndex.ShouldBe(leader.CommitIndex); + + // Verify peer states were refreshed by the heartbeat round. + var peerStates = leader.GetPeerStates().Values.ToList(); + peerStates.ShouldAllBe(p => DateTime.UtcNow - p.LastContact < TimeSpan.FromSeconds(5)); + } +} + +/// +/// A transport that drops all heartbeat acks (simulating a partitioned leader). +/// AppendEntries and vote RPCs behave normally, but SendHeartbeatAsync delivers +/// no acks, so the leader's peer LastContact timestamps are never updated. +/// Go reference: raft.go network partition scenario. +/// +file sealed class PartitionedRaftTransport : IRaftTransport +{ + private readonly Dictionary _nodes = new(StringComparer.Ordinal); + + public void Register(RaftNode node) => _nodes[node.Id] = node; + + public Task> AppendEntriesAsync( + string leaderId, IReadOnlyList followerIds, RaftLogEntry entry, CancellationToken ct) + { + var results = new List(followerIds.Count); + foreach (var followerId in followerIds) + { + if (_nodes.TryGetValue(followerId, out var node)) + { + node.ReceiveReplicatedEntry(entry); + results.Add(new AppendResult { FollowerId = followerId, Success = true }); + } + else + { + results.Add(new AppendResult { FollowerId = followerId, Success = false }); + } + } + return Task.FromResult>(results); + } + + public Task RequestVoteAsync( + string candidateId, string voterId, VoteRequest request, CancellationToken ct) + { + if (_nodes.TryGetValue(voterId, out var node)) + return Task.FromResult(node.GrantVote(request.Term, candidateId)); + return Task.FromResult(new VoteResponse { Granted = false }); + } + + public Task InstallSnapshotAsync( + string leaderId, string followerId, RaftSnapshot snapshot, CancellationToken ct) + => Task.CompletedTask; + + public Task SendTimeoutNowAsync(string leaderId, string targetId, ulong term, CancellationToken ct) + => Task.CompletedTask; + + /// + /// Simulates a network partition: heartbeats are lost and no acks are delivered. + /// The callback is never invoked. + /// + public Task SendHeartbeatAsync( + string leaderId, IReadOnlyList followerIds, int term, Action onAck, CancellationToken ct) + => Task.CompletedTask; // acks dropped — leader stays isolated +}