diff --git a/src/NATS.Server/Raft/RaftNode.cs b/src/NATS.Server/Raft/RaftNode.cs
index e650b9b..4d90a7c 100644
--- a/src/NATS.Server/Raft/RaftNode.cs
+++ b/src/NATS.Server/Raft/RaftNode.cs
@@ -23,6 +23,11 @@ public sealed class RaftNode : IDisposable
// Go reference: raft.go:961-1019 (proposeAddPeer / proposeRemovePeer, single-change invariant)
private long _membershipChangeIndex;
+ // Leadership transfer: when true, the leader is handing off to a target and
+ // must reject new proposals until the transfer completes or times out.
+ // Go reference: raft.go stepDown / sendTimeoutNow (leader blocks proposals during transfer)
+ private int _transferInProgress; // 0 = false, 1 = true (Interlocked-compatible)
+
// Joint consensus (two-phase membership change) per Raft paper Section 4.
// During the joint phase both the old config (Cold) and new config (Cnew) are stored.
// A quorum decision requires majority from BOTH configurations simultaneously.
@@ -61,6 +66,13 @@ public sealed class RaftNode : IDisposable
// Go reference: raft.go:961-1019 single-change invariant.
public bool MembershipChangeInProgress => Interlocked.Read(ref _membershipChangeIndex) > 0;
+ ///
+ /// True while a leadership transfer is in progress.
+ /// During a transfer the leader blocks new proposals.
+ /// Go reference: raft.go stepDown / sendTimeoutNow.
+ ///
+ public bool TransferInProgress => Interlocked.CompareExchange(ref _transferInProgress, 0, 0) == 1;
+
///
/// True when this node is in the joint consensus phase (transitioning between
/// two membership configurations).
@@ -165,6 +177,11 @@ public sealed class RaftNode : IDisposable
if (Role != RaftRole.Leader)
throw new InvalidOperationException("Only leader can propose entries.");
+ // Block proposals while a leadership transfer is in progress.
+ // Go reference: raft.go — leader rejects new entries during stepDown / sendTimeoutNow.
+ if (Interlocked.CompareExchange(ref _transferInProgress, 0, 0) == 1)
+ throw new InvalidOperationException("Leadership transfer in progress; proposals are blocked.");
+
var entry = Log.Append(TermState.CurrentTerm, command);
var followers = _cluster.Where(n => n.Id != Id).ToList();
var results = await _replicator.ReplicateAsync(Id, entry, followers, _transport, ct);
@@ -407,12 +424,34 @@ public sealed class RaftNode : IDisposable
}
///
- /// Installs a snapshot assembled from streaming chunks.
- /// Used for large snapshot transfers where the entire snapshot is sent in pieces.
- /// Go reference: raft.go:3500-3700 (installSnapshot with chunked transfer).
+ /// Installs a snapshot assembled from streaming chunks, with optional CRC32 integrity validation.
+ ///
+ /// After all chunks are accumulated and reassembled into a contiguous byte array, the CRC32
+ /// (IEEE 802.3) checksum of the assembled data is computed and compared against
+ /// when provided. A mismatch indicates data corruption during
+ /// transfer and causes an to be thrown before any state is
+ /// mutated, so the follower can request a retransmit rather than applying corrupt state.
+ ///
+ /// Go reference: raft.go:3500-3700 (installSnapshot with chunked transfer and CRC validation).
///
+ /// Ordered sequence of byte arrays that form the snapshot when concatenated.
+ /// The last log index covered by this snapshot.
+ /// The term of the last log entry covered by this snapshot.
+ ///
+ /// Optional CRC32 checksum of the full assembled snapshot data. When non-null the assembled
+ /// data is validated before installation; a mismatch throws .
+ ///
+ /// Cancellation token.
+ ///
+ /// Thrown when is provided and the assembled data does not
+ /// match the expected checksum.
+ ///
public async Task InstallSnapshotFromChunksAsync(
- IEnumerable chunks, long snapshotIndex, int snapshotTerm, CancellationToken ct)
+ IEnumerable chunks,
+ long snapshotIndex,
+ int snapshotTerm,
+ CancellationToken ct,
+ uint? expectedCrc32 = null)
{
var checkpoint = new RaftSnapshotCheckpoint
{
@@ -424,6 +463,19 @@ public sealed class RaftNode : IDisposable
checkpoint.AddChunk(chunk);
var data = checkpoint.Assemble();
+
+ // CRC32 validation: compute over assembled data and compare to the sender's checksum.
+ // Go reference: raft.go installSnapshot CRC check before applying snapshot state.
+ if (expectedCrc32.HasValue)
+ {
+ var crc = new System.IO.Hashing.Crc32();
+ crc.Append(data);
+ var actualCrc32 = crc.GetCurrentHashAsUInt32();
+ if (actualCrc32 != expectedCrc32.Value)
+ throw new InvalidDataException(
+ $"Snapshot CRC32 mismatch: expected 0x{expectedCrc32.Value:X8}, got 0x{actualCrc32:X8}.");
+ }
+
var snapshot = new RaftSnapshot
{
LastIncludedIndex = snapshotIndex,
@@ -537,6 +589,78 @@ public sealed class RaftNode : IDisposable
StartElection(clusterSize);
}
+ // Leadership transfer
+ // Go reference: raft.go sendTimeoutNow / processTimeoutNow
+
+ ///
+ /// Transfers leadership to the target peer by sending a TimeoutNow RPC.
+ /// Must only be called on the current leader. Blocks new proposals during
+ /// the transfer window (up to 2× the election timeout).
+ ///
+ /// Returns true when the target becomes leader within the deadline,
+ /// or false on timeout.
+ ///
+ /// Go reference: raft.go stepDown (leadership transfer variant) / sendTimeoutNow.
+ ///
+ public async Task TransferLeadershipAsync(string targetId, CancellationToken ct)
+ {
+ if (Role != RaftRole.Leader)
+ throw new InvalidOperationException("Only the leader can transfer leadership.");
+
+ if (_transport == null)
+ throw new InvalidOperationException("No transport configured; cannot send TimeoutNow RPC.");
+
+ // Set the transfer flag so ProposeAsync is blocked.
+ Interlocked.Exchange(ref _transferInProgress, 1);
+ try
+ {
+ await _transport.SendTimeoutNowAsync(Id, targetId, (ulong)TermState.CurrentTerm, ct);
+
+ // Wait for the target to become leader (poll the in-process cluster).
+ // Deadline = 2× election timeout maximum.
+ var deadlineMs = ElectionTimeoutMaxMs * 2;
+ var pollIntervalMs = Math.Max(1, ElectionTimeoutMinMs / 10);
+ var elapsed = 0;
+
+ while (elapsed < deadlineMs)
+ {
+ var target = _cluster.FirstOrDefault(
+ n => string.Equals(n.Id, targetId, StringComparison.Ordinal));
+
+ if (target != null && target.IsLeader)
+ return true;
+
+ await Task.Delay(pollIntervalMs, ct);
+ elapsed += pollIntervalMs;
+ }
+
+ return false;
+ }
+ finally
+ {
+ Interlocked.Exchange(ref _transferInProgress, 0);
+ }
+ }
+
+ ///
+ /// Handles a received TimeoutNow RPC: immediately starts an election,
+ /// bypassing both the election timer and the pre-vote round.
+ /// Called on the target node when the current leader sends a TimeoutNow message.
+ ///
+ /// Go reference: raft.go processTimeoutNow — triggers immediate campaign.
+ ///
+ public void ReceiveTimeoutNow(ulong term)
+ {
+ // Accept the sender's term if it's higher.
+ if ((int)term > TermState.CurrentTerm)
+ TermState.CurrentTerm = (int)term;
+
+ // Immediately campaign, skipping pre-vote (the leader has already decided
+ // this node should win, so pre-vote would be redundant and wasteful).
+ var clusterSize = _cluster.Count > 0 ? _cluster.Count : _members.Count;
+ StartElection(clusterSize);
+ }
+
private void ElectionTimerCallback(object? state)
{
if (_electionTimerCts?.IsCancellationRequested == true)
diff --git a/src/NATS.Server/Raft/RaftWireFormat.cs b/src/NATS.Server/Raft/RaftWireFormat.cs
index e313a03..87b6c6c 100644
--- a/src/NATS.Server/Raft/RaftWireFormat.cs
+++ b/src/NATS.Server/Raft/RaftWireFormat.cs
@@ -443,6 +443,118 @@ public readonly record struct RaftPreVoteResponseWire(
}
}
+///
+/// Binary wire encoding of a RAFT TimeoutNow RPC.
+/// Sent by the current leader to ask a target follower to immediately start
+/// an election, bypassing the election timer. Used for leadership transfer.
+/// Fixed 16-byte layout (little-endian):
+/// [0..7] term uint64 — sender's current term
+/// [8..15] leaderId 8-byte ASCII, zero-padded
+/// Go reference: raft.go sendTimeoutNow / processTimeoutNow
+///
+public readonly record struct RaftTimeoutNowWire(ulong Term, string LeaderId)
+{
+ ///
+ /// Fixed byte length of a TimeoutNow message.
+ /// Layout: [8:term][8:leaderId] = 16 bytes total.
+ ///
+ public const int MessageLen = 8 + RaftWireConstants.IdLen; // 16
+
+ ///
+ /// Encodes this TimeoutNow message to a 16-byte buffer.
+ ///
+ public byte[] Encode()
+ {
+ var buf = new byte[MessageLen];
+ BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(0), Term);
+ RaftWireHelpers.WriteId(buf.AsSpan(8), LeaderId);
+ return buf;
+ }
+
+ ///
+ /// Decodes a TimeoutNow message from a span. Throws
+ /// if the span is not exactly 16 bytes.
+ ///
+ public static RaftTimeoutNowWire Decode(ReadOnlySpan msg)
+ {
+ if (msg.Length != MessageLen)
+ throw new ArgumentException(
+ $"TimeoutNow requires exactly {MessageLen} bytes, got {msg.Length}.",
+ nameof(msg));
+
+ return new RaftTimeoutNowWire(
+ Term: BinaryPrimitives.ReadUInt64LittleEndian(msg[0..]),
+ LeaderId: RaftWireHelpers.ReadId(msg[8..]));
+ }
+}
+
+///
+/// Binary wire encoding of a RAFT InstallSnapshot chunk message (variable length).
+///
+/// This message type carries a single chunk of a snapshot during a streaming
+/// InstallSnapshot RPC. The leader sends chunks sequentially; the follower
+/// accumulates them and validates the CRC32 after receiving the final chunk.
+///
+/// Wire layout (little-endian):
+/// [0..7] snapshotIndex uint64 — last log index covered by this snapshot
+/// [8..11] snapshotTerm uint32 — term of the last included entry
+/// [12..15] chunkIndex uint32 — zero-based index of this chunk in the sequence
+/// [16..19] totalChunks uint32 — total number of chunks in this snapshot transfer
+/// [20..23] crc32 uint32 — CRC32 of the ENTIRE assembled snapshot (sent on every chunk)
+/// [24+] data bytes — raw chunk payload
+///
+/// Go reference: raft.go:3500-3700 (installSnapshot chunked transfer and CRC validation)
+///
+public readonly record struct RaftInstallSnapshotChunkWire(
+ ulong SnapshotIndex,
+ uint SnapshotTerm,
+ uint ChunkIndex,
+ uint TotalChunks,
+ uint Crc32,
+ byte[] Data)
+{
+ /// Fixed-size header before the variable-length data payload (24 bytes).
+ public const int HeaderLen = 8 + 4 + 4 + 4 + 4; // 24
+
+ ///
+ /// Encodes this snapshot chunk message to a byte array.
+ /// Total length = + Data.Length.
+ ///
+ public byte[] Encode()
+ {
+ var buf = new byte[HeaderLen + Data.Length];
+ var span = buf.AsSpan();
+ BinaryPrimitives.WriteUInt64LittleEndian(span[0..], SnapshotIndex);
+ BinaryPrimitives.WriteUInt32LittleEndian(span[8..], SnapshotTerm);
+ BinaryPrimitives.WriteUInt32LittleEndian(span[12..], ChunkIndex);
+ BinaryPrimitives.WriteUInt32LittleEndian(span[16..], TotalChunks);
+ BinaryPrimitives.WriteUInt32LittleEndian(span[20..], Crc32);
+ Data.AsSpan().CopyTo(span[HeaderLen..]);
+ return buf;
+ }
+
+ ///
+ /// Decodes an InstallSnapshot chunk message from .
+ /// Throws when the buffer is shorter than
+ /// the fixed bytes.
+ ///
+ public static RaftInstallSnapshotChunkWire Decode(ReadOnlySpan msg)
+ {
+ if (msg.Length < HeaderLen)
+ throw new ArgumentException(
+ $"InstallSnapshotChunk requires at least {HeaderLen} bytes, got {msg.Length}.",
+ nameof(msg));
+
+ return new RaftInstallSnapshotChunkWire(
+ SnapshotIndex: BinaryPrimitives.ReadUInt64LittleEndian(msg[0..]),
+ SnapshotTerm: BinaryPrimitives.ReadUInt32LittleEndian(msg[8..]),
+ ChunkIndex: BinaryPrimitives.ReadUInt32LittleEndian(msg[12..]),
+ TotalChunks: BinaryPrimitives.ReadUInt32LittleEndian(msg[16..]),
+ Crc32: BinaryPrimitives.ReadUInt32LittleEndian(msg[20..]),
+ Data: msg[HeaderLen..].ToArray());
+ }
+}
+
///
/// Shared encoding helpers for all RAFT wire format types.
///
diff --git a/src/NATS.Server/Raft/SnapshotChunkEnumerator.cs b/src/NATS.Server/Raft/SnapshotChunkEnumerator.cs
new file mode 100644
index 0000000..991a749
--- /dev/null
+++ b/src/NATS.Server/Raft/SnapshotChunkEnumerator.cs
@@ -0,0 +1,90 @@
+using System.IO.Hashing;
+
+namespace NATS.Server.Raft;
+
+///
+/// Splits a snapshot byte array into fixed-size chunks for streaming transfer
+/// and computes a CRC32 checksum over the entire data for integrity validation.
+///
+/// During an InstallSnapshot RPC the leader streams a large snapshot to a lagging
+/// follower chunk-by-chunk rather than sending it in a single message. The follower
+/// accumulates the chunks, reassembles them, then validates the CRC32 before
+/// applying the snapshot — preventing silent data corruption in transit.
+///
+/// Go reference: raft.go:3500-3700 (installSnapshot chunked transfer and CRC validation)
+///
+public sealed class SnapshotChunkEnumerator : IEnumerable
+{
+ private readonly byte[] _data;
+ private readonly int _chunkSize;
+ private uint? _crc32Value;
+
+ ///
+ /// Default chunk size matching Go's snapshot chunk transfer size (64 KiB).
+ /// Go reference: raft.go snapshotChunkSize constant.
+ ///
+ public const int DefaultChunkSize = 65536;
+
+ ///
+ /// Creates a new chunk enumerator over .
+ ///
+ /// The full snapshot data to split into chunks.
+ /// Maximum size of each chunk in bytes. Defaults to 64 KiB.
+ ///
+ /// Thrown when is less than or equal to zero.
+ ///
+ public SnapshotChunkEnumerator(byte[] data, int chunkSize = DefaultChunkSize)
+ {
+ ArgumentOutOfRangeException.ThrowIfNegativeOrZero(chunkSize);
+ _data = data;
+ _chunkSize = chunkSize;
+ }
+
+ ///
+ /// The CRC32 (IEEE 802.3) checksum of the entire snapshot data.
+ /// Computed lazily on first access and cached for subsequent reads.
+ /// The receiver uses this value to validate the reassembled snapshot after
+ /// all chunks have been received.
+ /// Go reference: raft.go installSnapshot CRC validation.
+ ///
+ public uint Crc32Value
+ {
+ get
+ {
+ if (!_crc32Value.HasValue)
+ {
+ var crc = new Crc32();
+ crc.Append(_data);
+ _crc32Value = crc.GetCurrentHashAsUInt32();
+ }
+ return _crc32Value.Value;
+ }
+ }
+
+ ///
+ /// The total number of chunks this data splits into given the configured chunk size.
+ ///
+ public int ChunkCount => _data.Length == 0 ? 1 : ((_data.Length + _chunkSize - 1) / _chunkSize);
+
+ ///
+ public IEnumerator GetEnumerator()
+ {
+ if (_data.Length == 0)
+ {
+ yield return [];
+ yield break;
+ }
+
+ var offset = 0;
+ while (offset < _data.Length)
+ {
+ var length = Math.Min(_chunkSize, _data.Length - offset);
+ var chunk = new byte[length];
+ _data.AsSpan(offset, length).CopyTo(chunk);
+ offset += length;
+ yield return chunk;
+ }
+ }
+
+ System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() => GetEnumerator();
+}
diff --git a/tests/NATS.Server.Tests/Raft/RaftLeadershipTransferTests.cs b/tests/NATS.Server.Tests/Raft/RaftLeadershipTransferTests.cs
new file mode 100644
index 0000000..e0bcfa0
--- /dev/null
+++ b/tests/NATS.Server.Tests/Raft/RaftLeadershipTransferTests.cs
@@ -0,0 +1,410 @@
+using NATS.Server.Raft;
+
+namespace NATS.Server.Tests.Raft;
+
+///
+/// Tests for RAFT leadership transfer via TimeoutNow RPC (Gap 8.4).
+/// The leader sends a TimeoutNow message to a target follower, which immediately
+/// starts an election. The leader blocks proposals while the transfer is in flight.
+/// Go reference: raft.go sendTimeoutNow / processTimeoutNow
+///
+public class RaftLeadershipTransferTests
+{
+ // -- Helpers --
+
+ private static (RaftNode[] nodes, InMemoryRaftTransport transport) CreateCluster(int size)
+ {
+ var transport = new InMemoryRaftTransport();
+ var nodes = Enumerable.Range(1, size)
+ .Select(i => new RaftNode($"n{i}", transport))
+ .ToArray();
+ foreach (var node in nodes)
+ {
+ transport.Register(node);
+ node.ConfigureCluster(nodes);
+ // Use short election timeouts so polling in TransferLeadershipAsync
+ // converges quickly in tests without requiring real async delays.
+ node.ElectionTimeoutMinMs = 5;
+ node.ElectionTimeoutMaxMs = 10;
+ }
+ return (nodes, transport);
+ }
+
+ private static RaftNode ElectLeader(RaftNode[] nodes)
+ {
+ var candidate = nodes[0];
+ candidate.StartElection(nodes.Length);
+ foreach (var voter in nodes.Skip(1))
+ candidate.ReceiveVote(voter.GrantVote(candidate.Term, candidate.Id), nodes.Length);
+ return candidate;
+ }
+
+ // -- Wire format tests --
+
+ // Go reference: raft.go TimeoutNow wire encoding
+ [Fact]
+ public void TimeoutNowRpc_wire_format_roundtrip()
+ {
+ var wire = new RaftTimeoutNowWire(Term: 7UL, LeaderId: "n1");
+
+ var encoded = wire.Encode();
+ encoded.Length.ShouldBe(RaftTimeoutNowWire.MessageLen); // 16 bytes
+
+ var decoded = RaftTimeoutNowWire.Decode(encoded);
+ decoded.Term.ShouldBe(7UL);
+ decoded.LeaderId.ShouldBe("n1");
+ }
+
+ [Fact]
+ public void TimeoutNowRpc_wire_format_preserves_term_and_leader_id()
+ {
+ var wire = new RaftTimeoutNowWire(Term: 42UL, LeaderId: "node5");
+
+ var decoded = RaftTimeoutNowWire.Decode(wire.Encode());
+
+ decoded.Term.ShouldBe(42UL);
+ decoded.LeaderId.ShouldBe("node5");
+ }
+
+ [Fact]
+ public void TimeoutNowRpc_decode_throws_on_wrong_length()
+ {
+ Should.Throw(() =>
+ RaftTimeoutNowWire.Decode(new byte[10]));
+ }
+
+ [Fact]
+ public void TimeoutNowRpc_message_len_is_16_bytes()
+ {
+ RaftTimeoutNowWire.MessageLen.ShouldBe(16);
+ }
+
+ // -- ReceiveTimeoutNow logic tests --
+
+ // Go reference: raft.go processTimeoutNow -- follower starts election immediately
+ [Fact]
+ public void ReceiveTimeoutNow_triggers_immediate_election_on_follower()
+ {
+ var (nodes, _) = CreateCluster(3);
+ var follower = nodes[1]; // starts as follower
+ follower.Role.ShouldBe(RaftRole.Follower);
+
+ follower.ReceiveTimeoutNow(term: 0);
+
+ // Node should now be a candidate (or leader if it self-voted quorum)
+ follower.Role.ShouldBeOneOf(RaftRole.Candidate, RaftRole.Leader);
+ }
+
+ [Fact]
+ public void ReceiveTimeoutNow_updates_term_when_sender_term_is_higher()
+ {
+ var node = new RaftNode("follower");
+ node.TermState.CurrentTerm = 3;
+
+ node.ReceiveTimeoutNow(term: 10);
+
+ // ReceiveTimeoutNow sets term to 10, then StartElection increments to 11
+ node.TermState.CurrentTerm.ShouldBe(11);
+ }
+
+ [Fact]
+ public void ReceiveTimeoutNow_increments_term_and_starts_campaign()
+ {
+ var node = new RaftNode("n1");
+ node.TermState.CurrentTerm = 2;
+ var termBefore = node.Term;
+
+ node.ReceiveTimeoutNow(term: 0);
+
+ // StartElection increments term
+ node.Term.ShouldBe(termBefore + 1);
+ node.Role.ShouldBe(RaftRole.Candidate); // single node with no cluster -- needs votes
+ }
+
+ [Fact]
+ public void ReceiveTimeoutNow_on_single_node_makes_it_leader()
+ {
+ // Single-node cluster: quorum = 1, so self-vote is sufficient.
+ var node = new RaftNode("solo");
+ node.ConfigureCluster([node]);
+
+ node.ReceiveTimeoutNow(term: 0);
+
+ node.IsLeader.ShouldBeTrue();
+ }
+
+ // -- Proposal blocking during transfer --
+
+ // Go reference: raft.go -- leader rejects new entries while transfer is in progress.
+ // BlockingTimeoutNowTransport signals via SemaphoreSlim when SendTimeoutNowAsync is
+ // entered, letting the test observe the _transferInProgress flag without timing deps.
+ [Fact]
+ public async Task TransferLeadership_leader_blocks_proposals_during_transfer()
+ {
+ var blockingTransport = new BlockingTimeoutNowTransport();
+ var node = new RaftNode("leader", blockingTransport);
+ node.ConfigureCluster([node]);
+ node.StartElection(1); // become leader
+ node.IsLeader.ShouldBeTrue();
+
+ using var cts = new CancellationTokenSource();
+ var transferTask = node.TransferLeadershipAsync("n2", cts.Token);
+
+ // Wait until SendTimeoutNowAsync is entered -- transfer flag is guaranteed set.
+ await blockingTransport.WaitUntilBlockingAsync();
+
+ // ProposeAsync must throw because the transfer flag is set.
+ var ex = await Should.ThrowAsync(
+ () => node.ProposeAsync("cmd", CancellationToken.None).AsTask());
+ ex.Message.ShouldContain("Leadership transfer in progress");
+
+ // Cancel and await proper completion to avoid test resource leaks.
+ await cts.CancelAsync();
+ await Should.ThrowAsync(() => transferTask);
+ }
+
+ // Go reference: raft.go -- only leader can initiate leadership transfer
+ [Fact]
+ public async Task TransferLeadership_only_leader_can_transfer()
+ {
+ var transport = new InMemoryRaftTransport();
+ var follower = new RaftNode("follower", transport);
+ follower.Role.ShouldBe(RaftRole.Follower);
+
+ var ex = await Should.ThrowAsync(
+ () => follower.TransferLeadershipAsync("n2", CancellationToken.None));
+ ex.Message.ShouldContain("Only the leader");
+ }
+
+ // Go reference: raft.go -- TransferLeadershipAsync requires a configured transport
+ [Fact]
+ public async Task TransferLeadership_throws_when_no_transport_configured()
+ {
+ // No transport injected.
+ var node = new RaftNode("leader");
+ node.StartElection(1); // become leader (single node, quorum = 1)
+ node.IsLeader.ShouldBeTrue();
+
+ var ex = await Should.ThrowAsync(
+ () => node.TransferLeadershipAsync("n2", CancellationToken.None));
+ ex.Message.ShouldContain("No transport configured");
+ }
+
+ // Go reference: raft.go sendTimeoutNow -- target becomes leader after receiving TimeoutNow.
+ // VoteGrantingTransport delivers TimeoutNow and immediately grants votes so the target
+ // is already leader before the polling loop runs -- no Task.Delay required.
+ [Fact]
+ public async Task TransferLeadership_target_becomes_leader()
+ {
+ var transport = new VoteGrantingTransport();
+ var nodes = Enumerable.Range(1, 3)
+ .Select(i => new RaftNode($"n{i}", transport))
+ .ToArray();
+ foreach (var node in nodes)
+ {
+ transport.Register(node);
+ node.ConfigureCluster(nodes);
+ node.ElectionTimeoutMinMs = 5;
+ node.ElectionTimeoutMaxMs = 10;
+ }
+
+ var leader = nodes[0];
+ leader.StartElection(nodes.Length);
+ foreach (var voter in nodes.Skip(1))
+ leader.ReceiveVote(voter.GrantVote(leader.Term, leader.Id), nodes.Length);
+ leader.IsLeader.ShouldBeTrue();
+
+ var target = nodes[1];
+ // VoteGrantingTransport makes the target a leader synchronously during TimeoutNow
+ // delivery, so the first poll iteration in TransferLeadershipAsync succeeds.
+ var result = await leader.TransferLeadershipAsync(target.Id, CancellationToken.None);
+
+ result.ShouldBeTrue();
+ target.IsLeader.ShouldBeTrue();
+ }
+
+ // Go reference: raft.go sendTimeoutNow -- returns false when target doesn't respond.
+ // "ghost" is not registered in the transport so TimeoutNow is a no-op and the
+ // polling loop times out after 2x election timeout.
+ [Fact]
+ public async Task TransferLeadership_timeout_on_unreachable_target()
+ {
+ var transport = new InMemoryRaftTransport();
+ var leader = new RaftNode("leader", transport);
+ leader.ConfigureCluster([leader]);
+ transport.Register(leader);
+ leader.StartElection(1);
+
+ // Very short timeouts so the poll deadline is reached quickly.
+ leader.ElectionTimeoutMinMs = 5;
+ leader.ElectionTimeoutMaxMs = 10;
+
+ // "ghost" is not registered -- TimeoutNow is a no-op; target never becomes leader.
+ var result = await leader.TransferLeadershipAsync("ghost", CancellationToken.None);
+
+ result.ShouldBeFalse();
+ }
+
+ // -- Integration: flag lifecycle --
+
+ [Fact]
+ public async Task TransferLeadership_clears_transfer_flag_after_success()
+ {
+ var transport = new VoteGrantingTransport();
+ var nodes = Enumerable.Range(1, 3)
+ .Select(i => new RaftNode($"n{i}", transport))
+ .ToArray();
+ foreach (var node in nodes)
+ {
+ transport.Register(node);
+ node.ConfigureCluster(nodes);
+ node.ElectionTimeoutMinMs = 5;
+ node.ElectionTimeoutMaxMs = 10;
+ }
+
+ var leader = nodes[0];
+ leader.StartElection(nodes.Length);
+ foreach (var voter in nodes.Skip(1))
+ leader.ReceiveVote(voter.GrantVote(leader.Term, leader.Id), nodes.Length);
+
+ var target = nodes[1];
+ var success = await leader.TransferLeadershipAsync(target.Id, CancellationToken.None);
+
+ success.ShouldBeTrue();
+ // After transfer completes the flag must be cleared.
+ leader.TransferInProgress.ShouldBeFalse();
+ }
+
+ [Fact]
+ public async Task TransferLeadership_clears_transfer_flag_after_timeout()
+ {
+ var transport = new InMemoryRaftTransport();
+ var leader = new RaftNode("leader", transport);
+ leader.ConfigureCluster([leader]);
+ transport.Register(leader);
+ leader.StartElection(1);
+ leader.ElectionTimeoutMinMs = 5;
+ leader.ElectionTimeoutMaxMs = 10;
+
+ // "ghost" is not registered -- transfer times out.
+ await leader.TransferLeadershipAsync("ghost", CancellationToken.None);
+
+ // Flag must be cleared regardless of outcome.
+ leader.TransferInProgress.ShouldBeFalse();
+ }
+}
+
+///
+/// A transport that blocks inside until the
+/// provided is cancelled. Exposes a semaphore
+/// so the test can synchronize on when the leader transfer flag is set.
+///
+file sealed class BlockingTimeoutNowTransport : IRaftTransport
+{
+ private readonly SemaphoreSlim _entered = new(0, 1);
+
+ ///
+ /// Returns a task that completes once has been
+ /// entered and the leader's transfer flag is guaranteed to be set.
+ ///
+ public Task WaitUntilBlockingAsync() => _entered.WaitAsync();
+
+ public Task> AppendEntriesAsync(
+ string leaderId, IReadOnlyList followerIds, RaftLogEntry entry, CancellationToken ct)
+ => Task.FromResult>([]);
+
+ public Task RequestVoteAsync(
+ string candidateId, string voterId, VoteRequest request, CancellationToken ct)
+ => Task.FromResult(new VoteResponse { Granted = false });
+
+ public Task InstallSnapshotAsync(
+ string leaderId, string followerId, RaftSnapshot snapshot, CancellationToken ct)
+ => Task.CompletedTask;
+
+ public async Task SendTimeoutNowAsync(string leaderId, string targetId, ulong term, CancellationToken ct)
+ {
+ // Signal that the transfer flag is set -- the test can now probe ProposeAsync.
+ _entered.Release();
+
+ // Block until the test cancels the token.
+ var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ await using var reg = ct.Register(() => tcs.TrySetCanceled(ct));
+ await tcs.Task;
+ }
+}
+
+///
+/// A transport that, when delivering a TimeoutNow RPC, also immediately grants
+/// votes to the target candidate so it reaches quorum synchronously. This makes
+/// the target become leader before TransferLeadershipAsync starts polling, removing
+/// any need for Task.Delay waits in the test.
+///
+file sealed class VoteGrantingTransport : IRaftTransport
+{
+ private readonly Dictionary _nodes = new(StringComparer.Ordinal);
+
+ public void Register(RaftNode node) => _nodes[node.Id] = node;
+
+ public Task> AppendEntriesAsync(
+ string leaderId, IReadOnlyList followerIds, RaftLogEntry entry, CancellationToken ct)
+ {
+ var results = new List(followerIds.Count);
+ foreach (var followerId in followerIds)
+ {
+ if (_nodes.TryGetValue(followerId, out var node))
+ {
+ node.ReceiveReplicatedEntry(entry);
+ results.Add(new AppendResult { FollowerId = followerId, Success = true });
+ }
+ else
+ {
+ results.Add(new AppendResult { FollowerId = followerId, Success = false });
+ }
+ }
+ return Task.FromResult>(results);
+ }
+
+ public Task RequestVoteAsync(
+ string candidateId, string voterId, VoteRequest request, CancellationToken ct)
+ {
+ if (_nodes.TryGetValue(voterId, out var node))
+ return Task.FromResult(node.GrantVote(request.Term, candidateId));
+ return Task.FromResult(new VoteResponse { Granted = false });
+ }
+
+ public Task InstallSnapshotAsync(
+ string leaderId, string followerId, RaftSnapshot snapshot, CancellationToken ct)
+ => Task.CompletedTask;
+
+ ///
+ /// Delivers TimeoutNow to the target (triggering an immediate election), then
+ /// grants votes from every other peer so the target reaches quorum synchronously.
+ /// This ensures the target is already leader before TransferLeadershipAsync polls,
+ /// removing any timing dependency between delivery and vote propagation.
+ ///
+ public Task SendTimeoutNowAsync(string leaderId, string targetId, ulong term, CancellationToken ct)
+ {
+ if (!_nodes.TryGetValue(targetId, out var target))
+ return Task.CompletedTask;
+
+ // Trigger immediate election on the target node.
+ target.ReceiveTimeoutNow(term);
+
+ // Grant peer votes so the target reaches quorum immediately.
+ if (target.Role == RaftRole.Candidate)
+ {
+ var clusterSize = _nodes.Count;
+ foreach (var (peerId, peer) in _nodes)
+ {
+ if (string.Equals(peerId, targetId, StringComparison.Ordinal))
+ continue;
+ var vote = peer.GrantVote(target.Term, targetId);
+ target.ReceiveVote(vote, clusterSize);
+ if (target.IsLeader)
+ break;
+ }
+ }
+
+ return Task.CompletedTask;
+ }
+}
diff --git a/tests/NATS.Server.Tests/Raft/RaftLogReplicationTests.cs b/tests/NATS.Server.Tests/Raft/RaftLogReplicationTests.cs
index a481c4c..7f16f8e 100644
--- a/tests/NATS.Server.Tests/Raft/RaftLogReplicationTests.cs
+++ b/tests/NATS.Server.Tests/Raft/RaftLogReplicationTests.cs
@@ -552,6 +552,9 @@ public class RaftLogReplicationTests
public Task InstallSnapshotAsync(
string leaderId, string followerId, RaftSnapshot snapshot, CancellationToken ct)
=> Task.CompletedTask;
+
+ public Task SendTimeoutNowAsync(string leaderId, string targetId, ulong term, CancellationToken ct)
+ => Task.CompletedTask;
}
// -- Helper transport that succeeds for first follower, fails for rest --
@@ -590,5 +593,8 @@ public class RaftLogReplicationTests
public Task InstallSnapshotAsync(
string leaderId, string followerId, RaftSnapshot snapshot, CancellationToken ct)
=> Task.CompletedTask;
+
+ public Task SendTimeoutNowAsync(string leaderId, string targetId, ulong term, CancellationToken ct)
+ => Task.CompletedTask;
}
}
diff --git a/tests/NATS.Server.Tests/Raft/RaftSnapshotStreamingTests.cs b/tests/NATS.Server.Tests/Raft/RaftSnapshotStreamingTests.cs
new file mode 100644
index 0000000..c7ad7df
--- /dev/null
+++ b/tests/NATS.Server.Tests/Raft/RaftSnapshotStreamingTests.cs
@@ -0,0 +1,288 @@
+using System.IO.Hashing;
+using NATS.Server.Raft;
+
+namespace NATS.Server.Tests.Raft;
+
+///
+/// Tests for Gap 8.3: chunk-based snapshot streaming with CRC32 validation.
+///
+/// Covers enumeration/CRC behaviour and
+/// the CRC validation path.
+///
+/// Go reference: raft.go:3500-3700 (installSnapshot chunked transfer + CRC validation)
+///
+public class RaftSnapshotStreamingTests
+{
+ // -----------------------------------------------------------------------
+ // SnapshotChunkEnumerator tests
+ // -----------------------------------------------------------------------
+
+ [Fact]
+ public void SnapshotChunkEnumerator_yields_correct_chunk_count()
+ {
+ // 200 KB of data at 64 KB per chunk → ceil(200/64) = 4 chunks
+ // Go reference: raft.go snapshotChunkSize chunking logic
+ const int dataSize = 200 * 1024;
+ var data = new byte[dataSize];
+ Random.Shared.NextBytes(data);
+
+ var enumerator = new SnapshotChunkEnumerator(data, chunkSize: 65536);
+ var chunks = enumerator.ToList();
+
+ chunks.Count.ShouldBe(4);
+ enumerator.ChunkCount.ShouldBe(4);
+ }
+
+ [Fact]
+ public void SnapshotChunkEnumerator_chunks_reassemble_to_original()
+ {
+ // Concatenating all chunks must reproduce the original byte array exactly
+ // Go reference: raft.go installSnapshot chunk reassembly
+ const int dataSize = 150 * 1024; // 150 KB → 3 chunks at 64 KB
+ var original = new byte[dataSize];
+ Random.Shared.NextBytes(original);
+
+ var enumerator = new SnapshotChunkEnumerator(original, chunkSize: 65536);
+ var assembled = enumerator.SelectMany(c => c).ToArray();
+
+ assembled.Length.ShouldBe(original.Length);
+ assembled.ShouldBe(original);
+ }
+
+ [Fact]
+ public void SnapshotChunkEnumerator_crc32_matches()
+ {
+ // The CRC32 reported by the enumerator must equal the CRC32 computed
+ // directly over the original data — proving it covers the full payload
+ // Go reference: raft.go installSnapshot CRC32 computation
+ var data = new byte[100 * 1024];
+ Random.Shared.NextBytes(data);
+
+ var enumerator = new SnapshotChunkEnumerator(data, chunkSize: 65536);
+
+ var expectedCrc = new Crc32();
+ expectedCrc.Append(data);
+ var expected = expectedCrc.GetCurrentHashAsUInt32();
+
+ enumerator.Crc32Value.ShouldBe(expected);
+ }
+
+ [Fact]
+ public void SnapshotChunkEnumerator_single_chunk_for_small_data()
+ {
+ // Data that fits in a single chunk — only one chunk should be yielded,
+ // and it should be identical to the input
+ // Go reference: raft.go installSnapshot — single-chunk case
+ var data = new byte[] { 1, 2, 3, 4, 5, 10, 20, 30 };
+
+ var enumerator = new SnapshotChunkEnumerator(data, chunkSize: 65536);
+ var chunks = enumerator.ToList();
+
+ chunks.Count.ShouldBe(1);
+ enumerator.ChunkCount.ShouldBe(1);
+ chunks[0].ShouldBe(data);
+ }
+
+ [Fact]
+ public void SnapshotChunkEnumerator_last_chunk_is_remainder()
+ {
+ // 10 bytes with chunk size 3 → chunks of [3, 3, 3, 1]
+ var data = Enumerable.Range(0, 10).Select(i => (byte)i).ToArray();
+
+ var enumerator = new SnapshotChunkEnumerator(data, chunkSize: 3);
+ var chunks = enumerator.ToList();
+
+ chunks.Count.ShouldBe(4);
+ chunks[0].Length.ShouldBe(3);
+ chunks[1].Length.ShouldBe(3);
+ chunks[2].Length.ShouldBe(3);
+ chunks[3].Length.ShouldBe(1); // remainder
+ chunks[3][0].ShouldBe((byte)9);
+ }
+
+ [Fact]
+ public void SnapshotChunkEnumerator_crc32_is_stable_across_multiple_reads()
+ {
+ // CRC32Value must return the same value on every call (cached)
+ var data = new byte[1024];
+ Random.Shared.NextBytes(data);
+
+ var enumerator = new SnapshotChunkEnumerator(data, chunkSize: 256);
+
+ var first = enumerator.Crc32Value;
+ var second = enumerator.Crc32Value;
+ var third = enumerator.Crc32Value;
+
+ second.ShouldBe(first);
+ third.ShouldBe(first);
+ }
+
+ // -----------------------------------------------------------------------
+ // InstallSnapshotFromChunksAsync CRC32 validation tests
+ // -----------------------------------------------------------------------
+
+ [Fact]
+ public async Task InstallSnapshot_assembles_chunks_into_snapshot()
+ {
+ // Snapshot assembled from multiple chunks should produce the correct
+ // LastIncludedIndex, LastIncludedTerm, and Data on the node.
+ // Go reference: raft.go:3500-3700 installSnapshot
+ var node = new RaftNode("n1");
+
+ var data = new byte[] { 10, 20, 30, 40, 50 };
+ var enumerator = new SnapshotChunkEnumerator(data, chunkSize: 2);
+ var chunks = enumerator.ToList();
+
+ await node.InstallSnapshotFromChunksAsync(
+ chunks,
+ snapshotIndex: 42,
+ snapshotTerm: 7,
+ ct: default);
+
+ node.AppliedIndex.ShouldBe(42);
+ node.CommitIndex.ShouldBe(42);
+ }
+
+ [Fact]
+ public async Task InstallSnapshot_validates_crc32_success()
+ {
+ // When the correct CRC32 is supplied the install should complete without error.
+ // Go reference: raft.go installSnapshot CRC validation
+ var node = new RaftNode("n1");
+
+ var data = new byte[256];
+ Random.Shared.NextBytes(data);
+ var enumerator = new SnapshotChunkEnumerator(data, chunkSize: 64);
+ var crc = enumerator.Crc32Value;
+ var chunks = enumerator.ToList();
+
+ // Should not throw
+ await node.InstallSnapshotFromChunksAsync(
+ chunks,
+ snapshotIndex: 10,
+ snapshotTerm: 2,
+ ct: default,
+ expectedCrc32: crc);
+
+ node.AppliedIndex.ShouldBe(10);
+ }
+
+ [Fact]
+ public async Task InstallSnapshot_validates_crc32_throws_on_mismatch()
+ {
+ // A wrong CRC32 must cause InvalidDataException before any state is mutated.
+ // Go reference: raft.go installSnapshot CRC mismatch → abort
+ var node = new RaftNode("n1");
+ node.Log.Append(1, "cmd-1"); // pre-existing state
+
+ var data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8 };
+ var chunks = new[] { data }; // single chunk
+
+ const uint wrongCrc = 0xDEADBEEF;
+
+ var ex = await Should.ThrowAsync(async () =>
+ await node.InstallSnapshotFromChunksAsync(
+ chunks,
+ snapshotIndex: 99,
+ snapshotTerm: 5,
+ ct: default,
+ expectedCrc32: wrongCrc));
+
+ ex.Message.ShouldContain("CRC32");
+ ex.Message.ShouldContain("DEADBEEF");
+
+ // State must NOT have been mutated since CRC failed before any writes
+ node.AppliedIndex.ShouldBe(0);
+ node.CommitIndex.ShouldBe(0);
+ }
+
+ [Fact]
+ public async Task InstallSnapshot_no_crc_parameter_installs_without_validation()
+ {
+ // When expectedCrc32 is omitted (null), no validation occurs and any data installs.
+ // Go reference: raft.go optional CRC field (backward compat)
+ var node = new RaftNode("n1");
+ var chunks = new[] { new byte[] { 7, 8, 9 } };
+
+ // Should not throw even with no CRC supplied
+ await node.InstallSnapshotFromChunksAsync(
+ chunks,
+ snapshotIndex: 5,
+ snapshotTerm: 1,
+ ct: default);
+
+ node.AppliedIndex.ShouldBe(5);
+ node.CommitIndex.ShouldBe(5);
+ }
+
+ // -----------------------------------------------------------------------
+ // RaftInstallSnapshotChunkWire encode/decode roundtrip tests
+ // -----------------------------------------------------------------------
+
+ [Fact]
+ public void SnapshotChunkWire_roundtrip_with_data()
+ {
+ // Encode and decode a chunk message and verify all fields survive the roundtrip.
+ // Go reference: raft.go wire format for InstallSnapshot RPC chunks
+ var payload = new byte[] { 0xAA, 0xBB, 0xCC, 0xDD };
+ var wire = new RaftInstallSnapshotChunkWire(
+ SnapshotIndex: 42UL,
+ SnapshotTerm: 3U,
+ ChunkIndex: 1U,
+ TotalChunks: 4U,
+ Crc32: 0x12345678U,
+ Data: payload);
+
+ var encoded = wire.Encode();
+ var decoded = RaftInstallSnapshotChunkWire.Decode(encoded);
+
+ decoded.SnapshotIndex.ShouldBe(42UL);
+ decoded.SnapshotTerm.ShouldBe(3U);
+ decoded.ChunkIndex.ShouldBe(1U);
+ decoded.TotalChunks.ShouldBe(4U);
+ decoded.Crc32.ShouldBe(0x12345678U);
+ decoded.Data.ShouldBe(payload);
+ }
+
+ [Fact]
+ public void SnapshotChunkWire_header_length_is_24_bytes()
+ {
+ // Header must be exactly 24 bytes as documented in the wire format.
+ RaftInstallSnapshotChunkWire.HeaderLen.ShouldBe(24);
+ }
+
+ [Fact]
+ public void SnapshotChunkWire_encode_total_length_is_header_plus_data()
+ {
+ var data = new byte[100];
+ var wire = new RaftInstallSnapshotChunkWire(1UL, 1U, 0U, 1U, 0U, data);
+ wire.Encode().Length.ShouldBe(RaftInstallSnapshotChunkWire.HeaderLen + 100);
+ }
+
+ [Fact]
+ public void SnapshotChunkWire_decode_throws_on_short_buffer()
+ {
+ // Buffers shorter than the header should throw ArgumentException
+ var tooShort = new byte[10]; // < 24 bytes
+ Should.Throw(() => RaftInstallSnapshotChunkWire.Decode(tooShort));
+ }
+
+ [Fact]
+ public void SnapshotChunkWire_roundtrip_empty_payload()
+ {
+ // A header-only message (no chunk data) should encode and decode cleanly.
+ var wire = new RaftInstallSnapshotChunkWire(
+ SnapshotIndex: 0UL,
+ SnapshotTerm: 0U,
+ ChunkIndex: 0U,
+ TotalChunks: 0U,
+ Crc32: 0U,
+ Data: []);
+
+ var encoded = wire.Encode();
+ encoded.Length.ShouldBe(RaftInstallSnapshotChunkWire.HeaderLen);
+
+ var decoded = RaftInstallSnapshotChunkWire.Decode(encoded);
+ decoded.Data.Length.ShouldBe(0);
+ }
+}
diff --git a/tests/NATS.Server.Tests/Raft/RaftStrictConsensusRuntimeTests.cs b/tests/NATS.Server.Tests/Raft/RaftStrictConsensusRuntimeTests.cs
index 4ab3387..90316ff 100644
--- a/tests/NATS.Server.Tests/Raft/RaftStrictConsensusRuntimeTests.cs
+++ b/tests/NATS.Server.Tests/Raft/RaftStrictConsensusRuntimeTests.cs
@@ -40,5 +40,8 @@ public class RaftStrictConsensusRuntimeTests
public Task InstallSnapshotAsync(string leaderId, string followerId, RaftSnapshot snapshot, CancellationToken ct)
=> Task.CompletedTask;
+
+ public Task SendTimeoutNowAsync(string leaderId, string targetId, ulong term, CancellationToken ct)
+ => Task.CompletedTask;
}
}