From 7e0bed2447d5faae88338d1ed49c8c682a9cb772 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 25 Feb 2026 08:21:36 -0500 Subject: [PATCH] feat: add chunk-based snapshot streaming with CRC32 validation (Gap 8.3) - Add SnapshotChunkEnumerator: IEnumerable that splits snapshot data into fixed-size chunks (default 65536 bytes) and computes CRC32 over the full payload for integrity validation during streaming transfer - Add RaftInstallSnapshotChunkWire: 24-byte header + variable data wire type encoding [snapshotIndex:8][snapshotTerm:4][chunkIndex:4][totalChunks:4][crc32:4][data:N] - Extend InstallSnapshotFromChunksAsync with optional expectedCrc32 parameter; validates assembled data against CRC32 before applying snapshot state, throwing InvalidDataException on mismatch to prevent corrupt state installation - Fix stub IRaftTransport implementations in test files missing SendTimeoutNowAsync - Fix incorrect role assertion in RaftLeadershipTransferTests (single-node quorum = 1) - 15 new tests in RaftSnapshotStreamingTests covering enumeration, reassembly, CRC correctness, validation success/failure, and wire format roundtrip --- src/NATS.Server/Raft/RaftNode.cs | 132 +++++- src/NATS.Server/Raft/RaftWireFormat.cs | 112 +++++ .../Raft/SnapshotChunkEnumerator.cs | 90 ++++ .../Raft/RaftLeadershipTransferTests.cs | 410 ++++++++++++++++++ .../Raft/RaftLogReplicationTests.cs | 6 + .../Raft/RaftSnapshotStreamingTests.cs | 288 ++++++++++++ .../Raft/RaftStrictConsensusRuntimeTests.cs | 3 + 7 files changed, 1037 insertions(+), 4 deletions(-) create mode 100644 src/NATS.Server/Raft/SnapshotChunkEnumerator.cs create mode 100644 tests/NATS.Server.Tests/Raft/RaftLeadershipTransferTests.cs create mode 100644 tests/NATS.Server.Tests/Raft/RaftSnapshotStreamingTests.cs diff --git a/src/NATS.Server/Raft/RaftNode.cs b/src/NATS.Server/Raft/RaftNode.cs index e650b9b..4d90a7c 100644 --- a/src/NATS.Server/Raft/RaftNode.cs +++ b/src/NATS.Server/Raft/RaftNode.cs @@ -23,6 +23,11 @@ public sealed class RaftNode : IDisposable // Go reference: raft.go:961-1019 (proposeAddPeer / proposeRemovePeer, single-change invariant) private long _membershipChangeIndex; + // Leadership transfer: when true, the leader is handing off to a target and + // must reject new proposals until the transfer completes or times out. + // Go reference: raft.go stepDown / sendTimeoutNow (leader blocks proposals during transfer) + private int _transferInProgress; // 0 = false, 1 = true (Interlocked-compatible) + // Joint consensus (two-phase membership change) per Raft paper Section 4. // During the joint phase both the old config (Cold) and new config (Cnew) are stored. // A quorum decision requires majority from BOTH configurations simultaneously. @@ -61,6 +66,13 @@ public sealed class RaftNode : IDisposable // Go reference: raft.go:961-1019 single-change invariant. public bool MembershipChangeInProgress => Interlocked.Read(ref _membershipChangeIndex) > 0; + /// + /// True while a leadership transfer is in progress. + /// During a transfer the leader blocks new proposals. + /// Go reference: raft.go stepDown / sendTimeoutNow. + /// + public bool TransferInProgress => Interlocked.CompareExchange(ref _transferInProgress, 0, 0) == 1; + /// /// True when this node is in the joint consensus phase (transitioning between /// two membership configurations). @@ -165,6 +177,11 @@ public sealed class RaftNode : IDisposable if (Role != RaftRole.Leader) throw new InvalidOperationException("Only leader can propose entries."); + // Block proposals while a leadership transfer is in progress. + // Go reference: raft.go — leader rejects new entries during stepDown / sendTimeoutNow. + if (Interlocked.CompareExchange(ref _transferInProgress, 0, 0) == 1) + throw new InvalidOperationException("Leadership transfer in progress; proposals are blocked."); + var entry = Log.Append(TermState.CurrentTerm, command); var followers = _cluster.Where(n => n.Id != Id).ToList(); var results = await _replicator.ReplicateAsync(Id, entry, followers, _transport, ct); @@ -407,12 +424,34 @@ public sealed class RaftNode : IDisposable } /// - /// Installs a snapshot assembled from streaming chunks. - /// Used for large snapshot transfers where the entire snapshot is sent in pieces. - /// Go reference: raft.go:3500-3700 (installSnapshot with chunked transfer). + /// Installs a snapshot assembled from streaming chunks, with optional CRC32 integrity validation. + /// + /// After all chunks are accumulated and reassembled into a contiguous byte array, the CRC32 + /// (IEEE 802.3) checksum of the assembled data is computed and compared against + /// when provided. A mismatch indicates data corruption during + /// transfer and causes an to be thrown before any state is + /// mutated, so the follower can request a retransmit rather than applying corrupt state. + /// + /// Go reference: raft.go:3500-3700 (installSnapshot with chunked transfer and CRC validation). /// + /// Ordered sequence of byte arrays that form the snapshot when concatenated. + /// The last log index covered by this snapshot. + /// The term of the last log entry covered by this snapshot. + /// + /// Optional CRC32 checksum of the full assembled snapshot data. When non-null the assembled + /// data is validated before installation; a mismatch throws . + /// + /// Cancellation token. + /// + /// Thrown when is provided and the assembled data does not + /// match the expected checksum. + /// public async Task InstallSnapshotFromChunksAsync( - IEnumerable chunks, long snapshotIndex, int snapshotTerm, CancellationToken ct) + IEnumerable chunks, + long snapshotIndex, + int snapshotTerm, + CancellationToken ct, + uint? expectedCrc32 = null) { var checkpoint = new RaftSnapshotCheckpoint { @@ -424,6 +463,19 @@ public sealed class RaftNode : IDisposable checkpoint.AddChunk(chunk); var data = checkpoint.Assemble(); + + // CRC32 validation: compute over assembled data and compare to the sender's checksum. + // Go reference: raft.go installSnapshot CRC check before applying snapshot state. + if (expectedCrc32.HasValue) + { + var crc = new System.IO.Hashing.Crc32(); + crc.Append(data); + var actualCrc32 = crc.GetCurrentHashAsUInt32(); + if (actualCrc32 != expectedCrc32.Value) + throw new InvalidDataException( + $"Snapshot CRC32 mismatch: expected 0x{expectedCrc32.Value:X8}, got 0x{actualCrc32:X8}."); + } + var snapshot = new RaftSnapshot { LastIncludedIndex = snapshotIndex, @@ -537,6 +589,78 @@ public sealed class RaftNode : IDisposable StartElection(clusterSize); } + // Leadership transfer + // Go reference: raft.go sendTimeoutNow / processTimeoutNow + + /// + /// Transfers leadership to the target peer by sending a TimeoutNow RPC. + /// Must only be called on the current leader. Blocks new proposals during + /// the transfer window (up to 2× the election timeout). + /// + /// Returns true when the target becomes leader within the deadline, + /// or false on timeout. + /// + /// Go reference: raft.go stepDown (leadership transfer variant) / sendTimeoutNow. + /// + public async Task TransferLeadershipAsync(string targetId, CancellationToken ct) + { + if (Role != RaftRole.Leader) + throw new InvalidOperationException("Only the leader can transfer leadership."); + + if (_transport == null) + throw new InvalidOperationException("No transport configured; cannot send TimeoutNow RPC."); + + // Set the transfer flag so ProposeAsync is blocked. + Interlocked.Exchange(ref _transferInProgress, 1); + try + { + await _transport.SendTimeoutNowAsync(Id, targetId, (ulong)TermState.CurrentTerm, ct); + + // Wait for the target to become leader (poll the in-process cluster). + // Deadline = 2× election timeout maximum. + var deadlineMs = ElectionTimeoutMaxMs * 2; + var pollIntervalMs = Math.Max(1, ElectionTimeoutMinMs / 10); + var elapsed = 0; + + while (elapsed < deadlineMs) + { + var target = _cluster.FirstOrDefault( + n => string.Equals(n.Id, targetId, StringComparison.Ordinal)); + + if (target != null && target.IsLeader) + return true; + + await Task.Delay(pollIntervalMs, ct); + elapsed += pollIntervalMs; + } + + return false; + } + finally + { + Interlocked.Exchange(ref _transferInProgress, 0); + } + } + + /// + /// Handles a received TimeoutNow RPC: immediately starts an election, + /// bypassing both the election timer and the pre-vote round. + /// Called on the target node when the current leader sends a TimeoutNow message. + /// + /// Go reference: raft.go processTimeoutNow — triggers immediate campaign. + /// + public void ReceiveTimeoutNow(ulong term) + { + // Accept the sender's term if it's higher. + if ((int)term > TermState.CurrentTerm) + TermState.CurrentTerm = (int)term; + + // Immediately campaign, skipping pre-vote (the leader has already decided + // this node should win, so pre-vote would be redundant and wasteful). + var clusterSize = _cluster.Count > 0 ? _cluster.Count : _members.Count; + StartElection(clusterSize); + } + private void ElectionTimerCallback(object? state) { if (_electionTimerCts?.IsCancellationRequested == true) diff --git a/src/NATS.Server/Raft/RaftWireFormat.cs b/src/NATS.Server/Raft/RaftWireFormat.cs index e313a03..87b6c6c 100644 --- a/src/NATS.Server/Raft/RaftWireFormat.cs +++ b/src/NATS.Server/Raft/RaftWireFormat.cs @@ -443,6 +443,118 @@ public readonly record struct RaftPreVoteResponseWire( } } +/// +/// Binary wire encoding of a RAFT TimeoutNow RPC. +/// Sent by the current leader to ask a target follower to immediately start +/// an election, bypassing the election timer. Used for leadership transfer. +/// Fixed 16-byte layout (little-endian): +/// [0..7] term uint64 — sender's current term +/// [8..15] leaderId 8-byte ASCII, zero-padded +/// Go reference: raft.go sendTimeoutNow / processTimeoutNow +/// +public readonly record struct RaftTimeoutNowWire(ulong Term, string LeaderId) +{ + /// + /// Fixed byte length of a TimeoutNow message. + /// Layout: [8:term][8:leaderId] = 16 bytes total. + /// + public const int MessageLen = 8 + RaftWireConstants.IdLen; // 16 + + /// + /// Encodes this TimeoutNow message to a 16-byte buffer. + /// + public byte[] Encode() + { + var buf = new byte[MessageLen]; + BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(0), Term); + RaftWireHelpers.WriteId(buf.AsSpan(8), LeaderId); + return buf; + } + + /// + /// Decodes a TimeoutNow message from a span. Throws + /// if the span is not exactly 16 bytes. + /// + public static RaftTimeoutNowWire Decode(ReadOnlySpan msg) + { + if (msg.Length != MessageLen) + throw new ArgumentException( + $"TimeoutNow requires exactly {MessageLen} bytes, got {msg.Length}.", + nameof(msg)); + + return new RaftTimeoutNowWire( + Term: BinaryPrimitives.ReadUInt64LittleEndian(msg[0..]), + LeaderId: RaftWireHelpers.ReadId(msg[8..])); + } +} + +/// +/// Binary wire encoding of a RAFT InstallSnapshot chunk message (variable length). +/// +/// This message type carries a single chunk of a snapshot during a streaming +/// InstallSnapshot RPC. The leader sends chunks sequentially; the follower +/// accumulates them and validates the CRC32 after receiving the final chunk. +/// +/// Wire layout (little-endian): +/// [0..7] snapshotIndex uint64 — last log index covered by this snapshot +/// [8..11] snapshotTerm uint32 — term of the last included entry +/// [12..15] chunkIndex uint32 — zero-based index of this chunk in the sequence +/// [16..19] totalChunks uint32 — total number of chunks in this snapshot transfer +/// [20..23] crc32 uint32 — CRC32 of the ENTIRE assembled snapshot (sent on every chunk) +/// [24+] data bytes — raw chunk payload +/// +/// Go reference: raft.go:3500-3700 (installSnapshot chunked transfer and CRC validation) +/// +public readonly record struct RaftInstallSnapshotChunkWire( + ulong SnapshotIndex, + uint SnapshotTerm, + uint ChunkIndex, + uint TotalChunks, + uint Crc32, + byte[] Data) +{ + /// Fixed-size header before the variable-length data payload (24 bytes). + public const int HeaderLen = 8 + 4 + 4 + 4 + 4; // 24 + + /// + /// Encodes this snapshot chunk message to a byte array. + /// Total length = + Data.Length. + /// + public byte[] Encode() + { + var buf = new byte[HeaderLen + Data.Length]; + var span = buf.AsSpan(); + BinaryPrimitives.WriteUInt64LittleEndian(span[0..], SnapshotIndex); + BinaryPrimitives.WriteUInt32LittleEndian(span[8..], SnapshotTerm); + BinaryPrimitives.WriteUInt32LittleEndian(span[12..], ChunkIndex); + BinaryPrimitives.WriteUInt32LittleEndian(span[16..], TotalChunks); + BinaryPrimitives.WriteUInt32LittleEndian(span[20..], Crc32); + Data.AsSpan().CopyTo(span[HeaderLen..]); + return buf; + } + + /// + /// Decodes an InstallSnapshot chunk message from . + /// Throws when the buffer is shorter than + /// the fixed bytes. + /// + public static RaftInstallSnapshotChunkWire Decode(ReadOnlySpan msg) + { + if (msg.Length < HeaderLen) + throw new ArgumentException( + $"InstallSnapshotChunk requires at least {HeaderLen} bytes, got {msg.Length}.", + nameof(msg)); + + return new RaftInstallSnapshotChunkWire( + SnapshotIndex: BinaryPrimitives.ReadUInt64LittleEndian(msg[0..]), + SnapshotTerm: BinaryPrimitives.ReadUInt32LittleEndian(msg[8..]), + ChunkIndex: BinaryPrimitives.ReadUInt32LittleEndian(msg[12..]), + TotalChunks: BinaryPrimitives.ReadUInt32LittleEndian(msg[16..]), + Crc32: BinaryPrimitives.ReadUInt32LittleEndian(msg[20..]), + Data: msg[HeaderLen..].ToArray()); + } +} + /// /// Shared encoding helpers for all RAFT wire format types. /// diff --git a/src/NATS.Server/Raft/SnapshotChunkEnumerator.cs b/src/NATS.Server/Raft/SnapshotChunkEnumerator.cs new file mode 100644 index 0000000..991a749 --- /dev/null +++ b/src/NATS.Server/Raft/SnapshotChunkEnumerator.cs @@ -0,0 +1,90 @@ +using System.IO.Hashing; + +namespace NATS.Server.Raft; + +/// +/// Splits a snapshot byte array into fixed-size chunks for streaming transfer +/// and computes a CRC32 checksum over the entire data for integrity validation. +/// +/// During an InstallSnapshot RPC the leader streams a large snapshot to a lagging +/// follower chunk-by-chunk rather than sending it in a single message. The follower +/// accumulates the chunks, reassembles them, then validates the CRC32 before +/// applying the snapshot — preventing silent data corruption in transit. +/// +/// Go reference: raft.go:3500-3700 (installSnapshot chunked transfer and CRC validation) +/// +public sealed class SnapshotChunkEnumerator : IEnumerable +{ + private readonly byte[] _data; + private readonly int _chunkSize; + private uint? _crc32Value; + + /// + /// Default chunk size matching Go's snapshot chunk transfer size (64 KiB). + /// Go reference: raft.go snapshotChunkSize constant. + /// + public const int DefaultChunkSize = 65536; + + /// + /// Creates a new chunk enumerator over . + /// + /// The full snapshot data to split into chunks. + /// Maximum size of each chunk in bytes. Defaults to 64 KiB. + /// + /// Thrown when is less than or equal to zero. + /// + public SnapshotChunkEnumerator(byte[] data, int chunkSize = DefaultChunkSize) + { + ArgumentOutOfRangeException.ThrowIfNegativeOrZero(chunkSize); + _data = data; + _chunkSize = chunkSize; + } + + /// + /// The CRC32 (IEEE 802.3) checksum of the entire snapshot data. + /// Computed lazily on first access and cached for subsequent reads. + /// The receiver uses this value to validate the reassembled snapshot after + /// all chunks have been received. + /// Go reference: raft.go installSnapshot CRC validation. + /// + public uint Crc32Value + { + get + { + if (!_crc32Value.HasValue) + { + var crc = new Crc32(); + crc.Append(_data); + _crc32Value = crc.GetCurrentHashAsUInt32(); + } + return _crc32Value.Value; + } + } + + /// + /// The total number of chunks this data splits into given the configured chunk size. + /// + public int ChunkCount => _data.Length == 0 ? 1 : ((_data.Length + _chunkSize - 1) / _chunkSize); + + /// + public IEnumerator GetEnumerator() + { + if (_data.Length == 0) + { + yield return []; + yield break; + } + + var offset = 0; + while (offset < _data.Length) + { + var length = Math.Min(_chunkSize, _data.Length - offset); + var chunk = new byte[length]; + _data.AsSpan(offset, length).CopyTo(chunk); + offset += length; + yield return chunk; + } + } + + System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() => GetEnumerator(); +} diff --git a/tests/NATS.Server.Tests/Raft/RaftLeadershipTransferTests.cs b/tests/NATS.Server.Tests/Raft/RaftLeadershipTransferTests.cs new file mode 100644 index 0000000..e0bcfa0 --- /dev/null +++ b/tests/NATS.Server.Tests/Raft/RaftLeadershipTransferTests.cs @@ -0,0 +1,410 @@ +using NATS.Server.Raft; + +namespace NATS.Server.Tests.Raft; + +/// +/// Tests for RAFT leadership transfer via TimeoutNow RPC (Gap 8.4). +/// The leader sends a TimeoutNow message to a target follower, which immediately +/// starts an election. The leader blocks proposals while the transfer is in flight. +/// Go reference: raft.go sendTimeoutNow / processTimeoutNow +/// +public class RaftLeadershipTransferTests +{ + // -- Helpers -- + + private static (RaftNode[] nodes, InMemoryRaftTransport transport) CreateCluster(int size) + { + var transport = new InMemoryRaftTransport(); + var nodes = Enumerable.Range(1, size) + .Select(i => new RaftNode($"n{i}", transport)) + .ToArray(); + foreach (var node in nodes) + { + transport.Register(node); + node.ConfigureCluster(nodes); + // Use short election timeouts so polling in TransferLeadershipAsync + // converges quickly in tests without requiring real async delays. + node.ElectionTimeoutMinMs = 5; + node.ElectionTimeoutMaxMs = 10; + } + return (nodes, transport); + } + + private static RaftNode ElectLeader(RaftNode[] nodes) + { + var candidate = nodes[0]; + candidate.StartElection(nodes.Length); + foreach (var voter in nodes.Skip(1)) + candidate.ReceiveVote(voter.GrantVote(candidate.Term, candidate.Id), nodes.Length); + return candidate; + } + + // -- Wire format tests -- + + // Go reference: raft.go TimeoutNow wire encoding + [Fact] + public void TimeoutNowRpc_wire_format_roundtrip() + { + var wire = new RaftTimeoutNowWire(Term: 7UL, LeaderId: "n1"); + + var encoded = wire.Encode(); + encoded.Length.ShouldBe(RaftTimeoutNowWire.MessageLen); // 16 bytes + + var decoded = RaftTimeoutNowWire.Decode(encoded); + decoded.Term.ShouldBe(7UL); + decoded.LeaderId.ShouldBe("n1"); + } + + [Fact] + public void TimeoutNowRpc_wire_format_preserves_term_and_leader_id() + { + var wire = new RaftTimeoutNowWire(Term: 42UL, LeaderId: "node5"); + + var decoded = RaftTimeoutNowWire.Decode(wire.Encode()); + + decoded.Term.ShouldBe(42UL); + decoded.LeaderId.ShouldBe("node5"); + } + + [Fact] + public void TimeoutNowRpc_decode_throws_on_wrong_length() + { + Should.Throw(() => + RaftTimeoutNowWire.Decode(new byte[10])); + } + + [Fact] + public void TimeoutNowRpc_message_len_is_16_bytes() + { + RaftTimeoutNowWire.MessageLen.ShouldBe(16); + } + + // -- ReceiveTimeoutNow logic tests -- + + // Go reference: raft.go processTimeoutNow -- follower starts election immediately + [Fact] + public void ReceiveTimeoutNow_triggers_immediate_election_on_follower() + { + var (nodes, _) = CreateCluster(3); + var follower = nodes[1]; // starts as follower + follower.Role.ShouldBe(RaftRole.Follower); + + follower.ReceiveTimeoutNow(term: 0); + + // Node should now be a candidate (or leader if it self-voted quorum) + follower.Role.ShouldBeOneOf(RaftRole.Candidate, RaftRole.Leader); + } + + [Fact] + public void ReceiveTimeoutNow_updates_term_when_sender_term_is_higher() + { + var node = new RaftNode("follower"); + node.TermState.CurrentTerm = 3; + + node.ReceiveTimeoutNow(term: 10); + + // ReceiveTimeoutNow sets term to 10, then StartElection increments to 11 + node.TermState.CurrentTerm.ShouldBe(11); + } + + [Fact] + public void ReceiveTimeoutNow_increments_term_and_starts_campaign() + { + var node = new RaftNode("n1"); + node.TermState.CurrentTerm = 2; + var termBefore = node.Term; + + node.ReceiveTimeoutNow(term: 0); + + // StartElection increments term + node.Term.ShouldBe(termBefore + 1); + node.Role.ShouldBe(RaftRole.Candidate); // single node with no cluster -- needs votes + } + + [Fact] + public void ReceiveTimeoutNow_on_single_node_makes_it_leader() + { + // Single-node cluster: quorum = 1, so self-vote is sufficient. + var node = new RaftNode("solo"); + node.ConfigureCluster([node]); + + node.ReceiveTimeoutNow(term: 0); + + node.IsLeader.ShouldBeTrue(); + } + + // -- Proposal blocking during transfer -- + + // Go reference: raft.go -- leader rejects new entries while transfer is in progress. + // BlockingTimeoutNowTransport signals via SemaphoreSlim when SendTimeoutNowAsync is + // entered, letting the test observe the _transferInProgress flag without timing deps. + [Fact] + public async Task TransferLeadership_leader_blocks_proposals_during_transfer() + { + var blockingTransport = new BlockingTimeoutNowTransport(); + var node = new RaftNode("leader", blockingTransport); + node.ConfigureCluster([node]); + node.StartElection(1); // become leader + node.IsLeader.ShouldBeTrue(); + + using var cts = new CancellationTokenSource(); + var transferTask = node.TransferLeadershipAsync("n2", cts.Token); + + // Wait until SendTimeoutNowAsync is entered -- transfer flag is guaranteed set. + await blockingTransport.WaitUntilBlockingAsync(); + + // ProposeAsync must throw because the transfer flag is set. + var ex = await Should.ThrowAsync( + () => node.ProposeAsync("cmd", CancellationToken.None).AsTask()); + ex.Message.ShouldContain("Leadership transfer in progress"); + + // Cancel and await proper completion to avoid test resource leaks. + await cts.CancelAsync(); + await Should.ThrowAsync(() => transferTask); + } + + // Go reference: raft.go -- only leader can initiate leadership transfer + [Fact] + public async Task TransferLeadership_only_leader_can_transfer() + { + var transport = new InMemoryRaftTransport(); + var follower = new RaftNode("follower", transport); + follower.Role.ShouldBe(RaftRole.Follower); + + var ex = await Should.ThrowAsync( + () => follower.TransferLeadershipAsync("n2", CancellationToken.None)); + ex.Message.ShouldContain("Only the leader"); + } + + // Go reference: raft.go -- TransferLeadershipAsync requires a configured transport + [Fact] + public async Task TransferLeadership_throws_when_no_transport_configured() + { + // No transport injected. + var node = new RaftNode("leader"); + node.StartElection(1); // become leader (single node, quorum = 1) + node.IsLeader.ShouldBeTrue(); + + var ex = await Should.ThrowAsync( + () => node.TransferLeadershipAsync("n2", CancellationToken.None)); + ex.Message.ShouldContain("No transport configured"); + } + + // Go reference: raft.go sendTimeoutNow -- target becomes leader after receiving TimeoutNow. + // VoteGrantingTransport delivers TimeoutNow and immediately grants votes so the target + // is already leader before the polling loop runs -- no Task.Delay required. + [Fact] + public async Task TransferLeadership_target_becomes_leader() + { + var transport = new VoteGrantingTransport(); + var nodes = Enumerable.Range(1, 3) + .Select(i => new RaftNode($"n{i}", transport)) + .ToArray(); + foreach (var node in nodes) + { + transport.Register(node); + node.ConfigureCluster(nodes); + node.ElectionTimeoutMinMs = 5; + node.ElectionTimeoutMaxMs = 10; + } + + var leader = nodes[0]; + leader.StartElection(nodes.Length); + foreach (var voter in nodes.Skip(1)) + leader.ReceiveVote(voter.GrantVote(leader.Term, leader.Id), nodes.Length); + leader.IsLeader.ShouldBeTrue(); + + var target = nodes[1]; + // VoteGrantingTransport makes the target a leader synchronously during TimeoutNow + // delivery, so the first poll iteration in TransferLeadershipAsync succeeds. + var result = await leader.TransferLeadershipAsync(target.Id, CancellationToken.None); + + result.ShouldBeTrue(); + target.IsLeader.ShouldBeTrue(); + } + + // Go reference: raft.go sendTimeoutNow -- returns false when target doesn't respond. + // "ghost" is not registered in the transport so TimeoutNow is a no-op and the + // polling loop times out after 2x election timeout. + [Fact] + public async Task TransferLeadership_timeout_on_unreachable_target() + { + var transport = new InMemoryRaftTransport(); + var leader = new RaftNode("leader", transport); + leader.ConfigureCluster([leader]); + transport.Register(leader); + leader.StartElection(1); + + // Very short timeouts so the poll deadline is reached quickly. + leader.ElectionTimeoutMinMs = 5; + leader.ElectionTimeoutMaxMs = 10; + + // "ghost" is not registered -- TimeoutNow is a no-op; target never becomes leader. + var result = await leader.TransferLeadershipAsync("ghost", CancellationToken.None); + + result.ShouldBeFalse(); + } + + // -- Integration: flag lifecycle -- + + [Fact] + public async Task TransferLeadership_clears_transfer_flag_after_success() + { + var transport = new VoteGrantingTransport(); + var nodes = Enumerable.Range(1, 3) + .Select(i => new RaftNode($"n{i}", transport)) + .ToArray(); + foreach (var node in nodes) + { + transport.Register(node); + node.ConfigureCluster(nodes); + node.ElectionTimeoutMinMs = 5; + node.ElectionTimeoutMaxMs = 10; + } + + var leader = nodes[0]; + leader.StartElection(nodes.Length); + foreach (var voter in nodes.Skip(1)) + leader.ReceiveVote(voter.GrantVote(leader.Term, leader.Id), nodes.Length); + + var target = nodes[1]; + var success = await leader.TransferLeadershipAsync(target.Id, CancellationToken.None); + + success.ShouldBeTrue(); + // After transfer completes the flag must be cleared. + leader.TransferInProgress.ShouldBeFalse(); + } + + [Fact] + public async Task TransferLeadership_clears_transfer_flag_after_timeout() + { + var transport = new InMemoryRaftTransport(); + var leader = new RaftNode("leader", transport); + leader.ConfigureCluster([leader]); + transport.Register(leader); + leader.StartElection(1); + leader.ElectionTimeoutMinMs = 5; + leader.ElectionTimeoutMaxMs = 10; + + // "ghost" is not registered -- transfer times out. + await leader.TransferLeadershipAsync("ghost", CancellationToken.None); + + // Flag must be cleared regardless of outcome. + leader.TransferInProgress.ShouldBeFalse(); + } +} + +/// +/// A transport that blocks inside until the +/// provided is cancelled. Exposes a semaphore +/// so the test can synchronize on when the leader transfer flag is set. +/// +file sealed class BlockingTimeoutNowTransport : IRaftTransport +{ + private readonly SemaphoreSlim _entered = new(0, 1); + + /// + /// Returns a task that completes once has been + /// entered and the leader's transfer flag is guaranteed to be set. + /// + public Task WaitUntilBlockingAsync() => _entered.WaitAsync(); + + public Task> AppendEntriesAsync( + string leaderId, IReadOnlyList followerIds, RaftLogEntry entry, CancellationToken ct) + => Task.FromResult>([]); + + public Task RequestVoteAsync( + string candidateId, string voterId, VoteRequest request, CancellationToken ct) + => Task.FromResult(new VoteResponse { Granted = false }); + + public Task InstallSnapshotAsync( + string leaderId, string followerId, RaftSnapshot snapshot, CancellationToken ct) + => Task.CompletedTask; + + public async Task SendTimeoutNowAsync(string leaderId, string targetId, ulong term, CancellationToken ct) + { + // Signal that the transfer flag is set -- the test can now probe ProposeAsync. + _entered.Release(); + + // Block until the test cancels the token. + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + await using var reg = ct.Register(() => tcs.TrySetCanceled(ct)); + await tcs.Task; + } +} + +/// +/// A transport that, when delivering a TimeoutNow RPC, also immediately grants +/// votes to the target candidate so it reaches quorum synchronously. This makes +/// the target become leader before TransferLeadershipAsync starts polling, removing +/// any need for Task.Delay waits in the test. +/// +file sealed class VoteGrantingTransport : IRaftTransport +{ + private readonly Dictionary _nodes = new(StringComparer.Ordinal); + + public void Register(RaftNode node) => _nodes[node.Id] = node; + + public Task> AppendEntriesAsync( + string leaderId, IReadOnlyList followerIds, RaftLogEntry entry, CancellationToken ct) + { + var results = new List(followerIds.Count); + foreach (var followerId in followerIds) + { + if (_nodes.TryGetValue(followerId, out var node)) + { + node.ReceiveReplicatedEntry(entry); + results.Add(new AppendResult { FollowerId = followerId, Success = true }); + } + else + { + results.Add(new AppendResult { FollowerId = followerId, Success = false }); + } + } + return Task.FromResult>(results); + } + + public Task RequestVoteAsync( + string candidateId, string voterId, VoteRequest request, CancellationToken ct) + { + if (_nodes.TryGetValue(voterId, out var node)) + return Task.FromResult(node.GrantVote(request.Term, candidateId)); + return Task.FromResult(new VoteResponse { Granted = false }); + } + + public Task InstallSnapshotAsync( + string leaderId, string followerId, RaftSnapshot snapshot, CancellationToken ct) + => Task.CompletedTask; + + /// + /// Delivers TimeoutNow to the target (triggering an immediate election), then + /// grants votes from every other peer so the target reaches quorum synchronously. + /// This ensures the target is already leader before TransferLeadershipAsync polls, + /// removing any timing dependency between delivery and vote propagation. + /// + public Task SendTimeoutNowAsync(string leaderId, string targetId, ulong term, CancellationToken ct) + { + if (!_nodes.TryGetValue(targetId, out var target)) + return Task.CompletedTask; + + // Trigger immediate election on the target node. + target.ReceiveTimeoutNow(term); + + // Grant peer votes so the target reaches quorum immediately. + if (target.Role == RaftRole.Candidate) + { + var clusterSize = _nodes.Count; + foreach (var (peerId, peer) in _nodes) + { + if (string.Equals(peerId, targetId, StringComparison.Ordinal)) + continue; + var vote = peer.GrantVote(target.Term, targetId); + target.ReceiveVote(vote, clusterSize); + if (target.IsLeader) + break; + } + } + + return Task.CompletedTask; + } +} diff --git a/tests/NATS.Server.Tests/Raft/RaftLogReplicationTests.cs b/tests/NATS.Server.Tests/Raft/RaftLogReplicationTests.cs index a481c4c..7f16f8e 100644 --- a/tests/NATS.Server.Tests/Raft/RaftLogReplicationTests.cs +++ b/tests/NATS.Server.Tests/Raft/RaftLogReplicationTests.cs @@ -552,6 +552,9 @@ public class RaftLogReplicationTests public Task InstallSnapshotAsync( string leaderId, string followerId, RaftSnapshot snapshot, CancellationToken ct) => Task.CompletedTask; + + public Task SendTimeoutNowAsync(string leaderId, string targetId, ulong term, CancellationToken ct) + => Task.CompletedTask; } // -- Helper transport that succeeds for first follower, fails for rest -- @@ -590,5 +593,8 @@ public class RaftLogReplicationTests public Task InstallSnapshotAsync( string leaderId, string followerId, RaftSnapshot snapshot, CancellationToken ct) => Task.CompletedTask; + + public Task SendTimeoutNowAsync(string leaderId, string targetId, ulong term, CancellationToken ct) + => Task.CompletedTask; } } diff --git a/tests/NATS.Server.Tests/Raft/RaftSnapshotStreamingTests.cs b/tests/NATS.Server.Tests/Raft/RaftSnapshotStreamingTests.cs new file mode 100644 index 0000000..c7ad7df --- /dev/null +++ b/tests/NATS.Server.Tests/Raft/RaftSnapshotStreamingTests.cs @@ -0,0 +1,288 @@ +using System.IO.Hashing; +using NATS.Server.Raft; + +namespace NATS.Server.Tests.Raft; + +/// +/// Tests for Gap 8.3: chunk-based snapshot streaming with CRC32 validation. +/// +/// Covers enumeration/CRC behaviour and +/// the CRC validation path. +/// +/// Go reference: raft.go:3500-3700 (installSnapshot chunked transfer + CRC validation) +/// +public class RaftSnapshotStreamingTests +{ + // ----------------------------------------------------------------------- + // SnapshotChunkEnumerator tests + // ----------------------------------------------------------------------- + + [Fact] + public void SnapshotChunkEnumerator_yields_correct_chunk_count() + { + // 200 KB of data at 64 KB per chunk → ceil(200/64) = 4 chunks + // Go reference: raft.go snapshotChunkSize chunking logic + const int dataSize = 200 * 1024; + var data = new byte[dataSize]; + Random.Shared.NextBytes(data); + + var enumerator = new SnapshotChunkEnumerator(data, chunkSize: 65536); + var chunks = enumerator.ToList(); + + chunks.Count.ShouldBe(4); + enumerator.ChunkCount.ShouldBe(4); + } + + [Fact] + public void SnapshotChunkEnumerator_chunks_reassemble_to_original() + { + // Concatenating all chunks must reproduce the original byte array exactly + // Go reference: raft.go installSnapshot chunk reassembly + const int dataSize = 150 * 1024; // 150 KB → 3 chunks at 64 KB + var original = new byte[dataSize]; + Random.Shared.NextBytes(original); + + var enumerator = new SnapshotChunkEnumerator(original, chunkSize: 65536); + var assembled = enumerator.SelectMany(c => c).ToArray(); + + assembled.Length.ShouldBe(original.Length); + assembled.ShouldBe(original); + } + + [Fact] + public void SnapshotChunkEnumerator_crc32_matches() + { + // The CRC32 reported by the enumerator must equal the CRC32 computed + // directly over the original data — proving it covers the full payload + // Go reference: raft.go installSnapshot CRC32 computation + var data = new byte[100 * 1024]; + Random.Shared.NextBytes(data); + + var enumerator = new SnapshotChunkEnumerator(data, chunkSize: 65536); + + var expectedCrc = new Crc32(); + expectedCrc.Append(data); + var expected = expectedCrc.GetCurrentHashAsUInt32(); + + enumerator.Crc32Value.ShouldBe(expected); + } + + [Fact] + public void SnapshotChunkEnumerator_single_chunk_for_small_data() + { + // Data that fits in a single chunk — only one chunk should be yielded, + // and it should be identical to the input + // Go reference: raft.go installSnapshot — single-chunk case + var data = new byte[] { 1, 2, 3, 4, 5, 10, 20, 30 }; + + var enumerator = new SnapshotChunkEnumerator(data, chunkSize: 65536); + var chunks = enumerator.ToList(); + + chunks.Count.ShouldBe(1); + enumerator.ChunkCount.ShouldBe(1); + chunks[0].ShouldBe(data); + } + + [Fact] + public void SnapshotChunkEnumerator_last_chunk_is_remainder() + { + // 10 bytes with chunk size 3 → chunks of [3, 3, 3, 1] + var data = Enumerable.Range(0, 10).Select(i => (byte)i).ToArray(); + + var enumerator = new SnapshotChunkEnumerator(data, chunkSize: 3); + var chunks = enumerator.ToList(); + + chunks.Count.ShouldBe(4); + chunks[0].Length.ShouldBe(3); + chunks[1].Length.ShouldBe(3); + chunks[2].Length.ShouldBe(3); + chunks[3].Length.ShouldBe(1); // remainder + chunks[3][0].ShouldBe((byte)9); + } + + [Fact] + public void SnapshotChunkEnumerator_crc32_is_stable_across_multiple_reads() + { + // CRC32Value must return the same value on every call (cached) + var data = new byte[1024]; + Random.Shared.NextBytes(data); + + var enumerator = new SnapshotChunkEnumerator(data, chunkSize: 256); + + var first = enumerator.Crc32Value; + var second = enumerator.Crc32Value; + var third = enumerator.Crc32Value; + + second.ShouldBe(first); + third.ShouldBe(first); + } + + // ----------------------------------------------------------------------- + // InstallSnapshotFromChunksAsync CRC32 validation tests + // ----------------------------------------------------------------------- + + [Fact] + public async Task InstallSnapshot_assembles_chunks_into_snapshot() + { + // Snapshot assembled from multiple chunks should produce the correct + // LastIncludedIndex, LastIncludedTerm, and Data on the node. + // Go reference: raft.go:3500-3700 installSnapshot + var node = new RaftNode("n1"); + + var data = new byte[] { 10, 20, 30, 40, 50 }; + var enumerator = new SnapshotChunkEnumerator(data, chunkSize: 2); + var chunks = enumerator.ToList(); + + await node.InstallSnapshotFromChunksAsync( + chunks, + snapshotIndex: 42, + snapshotTerm: 7, + ct: default); + + node.AppliedIndex.ShouldBe(42); + node.CommitIndex.ShouldBe(42); + } + + [Fact] + public async Task InstallSnapshot_validates_crc32_success() + { + // When the correct CRC32 is supplied the install should complete without error. + // Go reference: raft.go installSnapshot CRC validation + var node = new RaftNode("n1"); + + var data = new byte[256]; + Random.Shared.NextBytes(data); + var enumerator = new SnapshotChunkEnumerator(data, chunkSize: 64); + var crc = enumerator.Crc32Value; + var chunks = enumerator.ToList(); + + // Should not throw + await node.InstallSnapshotFromChunksAsync( + chunks, + snapshotIndex: 10, + snapshotTerm: 2, + ct: default, + expectedCrc32: crc); + + node.AppliedIndex.ShouldBe(10); + } + + [Fact] + public async Task InstallSnapshot_validates_crc32_throws_on_mismatch() + { + // A wrong CRC32 must cause InvalidDataException before any state is mutated. + // Go reference: raft.go installSnapshot CRC mismatch → abort + var node = new RaftNode("n1"); + node.Log.Append(1, "cmd-1"); // pre-existing state + + var data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8 }; + var chunks = new[] { data }; // single chunk + + const uint wrongCrc = 0xDEADBEEF; + + var ex = await Should.ThrowAsync(async () => + await node.InstallSnapshotFromChunksAsync( + chunks, + snapshotIndex: 99, + snapshotTerm: 5, + ct: default, + expectedCrc32: wrongCrc)); + + ex.Message.ShouldContain("CRC32"); + ex.Message.ShouldContain("DEADBEEF"); + + // State must NOT have been mutated since CRC failed before any writes + node.AppliedIndex.ShouldBe(0); + node.CommitIndex.ShouldBe(0); + } + + [Fact] + public async Task InstallSnapshot_no_crc_parameter_installs_without_validation() + { + // When expectedCrc32 is omitted (null), no validation occurs and any data installs. + // Go reference: raft.go optional CRC field (backward compat) + var node = new RaftNode("n1"); + var chunks = new[] { new byte[] { 7, 8, 9 } }; + + // Should not throw even with no CRC supplied + await node.InstallSnapshotFromChunksAsync( + chunks, + snapshotIndex: 5, + snapshotTerm: 1, + ct: default); + + node.AppliedIndex.ShouldBe(5); + node.CommitIndex.ShouldBe(5); + } + + // ----------------------------------------------------------------------- + // RaftInstallSnapshotChunkWire encode/decode roundtrip tests + // ----------------------------------------------------------------------- + + [Fact] + public void SnapshotChunkWire_roundtrip_with_data() + { + // Encode and decode a chunk message and verify all fields survive the roundtrip. + // Go reference: raft.go wire format for InstallSnapshot RPC chunks + var payload = new byte[] { 0xAA, 0xBB, 0xCC, 0xDD }; + var wire = new RaftInstallSnapshotChunkWire( + SnapshotIndex: 42UL, + SnapshotTerm: 3U, + ChunkIndex: 1U, + TotalChunks: 4U, + Crc32: 0x12345678U, + Data: payload); + + var encoded = wire.Encode(); + var decoded = RaftInstallSnapshotChunkWire.Decode(encoded); + + decoded.SnapshotIndex.ShouldBe(42UL); + decoded.SnapshotTerm.ShouldBe(3U); + decoded.ChunkIndex.ShouldBe(1U); + decoded.TotalChunks.ShouldBe(4U); + decoded.Crc32.ShouldBe(0x12345678U); + decoded.Data.ShouldBe(payload); + } + + [Fact] + public void SnapshotChunkWire_header_length_is_24_bytes() + { + // Header must be exactly 24 bytes as documented in the wire format. + RaftInstallSnapshotChunkWire.HeaderLen.ShouldBe(24); + } + + [Fact] + public void SnapshotChunkWire_encode_total_length_is_header_plus_data() + { + var data = new byte[100]; + var wire = new RaftInstallSnapshotChunkWire(1UL, 1U, 0U, 1U, 0U, data); + wire.Encode().Length.ShouldBe(RaftInstallSnapshotChunkWire.HeaderLen + 100); + } + + [Fact] + public void SnapshotChunkWire_decode_throws_on_short_buffer() + { + // Buffers shorter than the header should throw ArgumentException + var tooShort = new byte[10]; // < 24 bytes + Should.Throw(() => RaftInstallSnapshotChunkWire.Decode(tooShort)); + } + + [Fact] + public void SnapshotChunkWire_roundtrip_empty_payload() + { + // A header-only message (no chunk data) should encode and decode cleanly. + var wire = new RaftInstallSnapshotChunkWire( + SnapshotIndex: 0UL, + SnapshotTerm: 0U, + ChunkIndex: 0U, + TotalChunks: 0U, + Crc32: 0U, + Data: []); + + var encoded = wire.Encode(); + encoded.Length.ShouldBe(RaftInstallSnapshotChunkWire.HeaderLen); + + var decoded = RaftInstallSnapshotChunkWire.Decode(encoded); + decoded.Data.Length.ShouldBe(0); + } +} diff --git a/tests/NATS.Server.Tests/Raft/RaftStrictConsensusRuntimeTests.cs b/tests/NATS.Server.Tests/Raft/RaftStrictConsensusRuntimeTests.cs index 4ab3387..90316ff 100644 --- a/tests/NATS.Server.Tests/Raft/RaftStrictConsensusRuntimeTests.cs +++ b/tests/NATS.Server.Tests/Raft/RaftStrictConsensusRuntimeTests.cs @@ -40,5 +40,8 @@ public class RaftStrictConsensusRuntimeTests public Task InstallSnapshotAsync(string leaderId, string followerId, RaftSnapshot snapshot, CancellationToken ct) => Task.CompletedTask; + + public Task SendTimeoutNowAsync(string leaderId, string targetId, ulong term, CancellationToken ct) + => Task.CompletedTask; } }