feat: add chunk-based snapshot streaming with CRC32 validation (Gap 8.3)

- Add SnapshotChunkEnumerator: IEnumerable<byte[]> that splits snapshot data
  into fixed-size chunks (default 65536 bytes) and computes CRC32 over the
  full payload for integrity validation during streaming transfer
- Add RaftInstallSnapshotChunkWire: 24-byte header + variable data wire type
  encoding [snapshotIndex:8][snapshotTerm:4][chunkIndex:4][totalChunks:4][crc32:4][data:N]
- Extend InstallSnapshotFromChunksAsync with optional expectedCrc32 parameter;
  validates assembled data against CRC32 before applying snapshot state, throwing
  InvalidDataException on mismatch to prevent corrupt state installation
- Fix stub IRaftTransport implementations in test files missing SendTimeoutNowAsync
- Fix incorrect role assertion in RaftLeadershipTransferTests (single-node quorum = 1)
- 15 new tests in RaftSnapshotStreamingTests covering enumeration, reassembly,
  CRC correctness, validation success/failure, and wire format roundtrip
This commit is contained in:
Joseph Doherty
2026-02-25 08:21:36 -05:00
parent 7434844a39
commit 7e0bed2447
7 changed files with 1037 additions and 4 deletions

View File

@@ -23,6 +23,11 @@ public sealed class RaftNode : IDisposable
// Go reference: raft.go:961-1019 (proposeAddPeer / proposeRemovePeer, single-change invariant)
private long _membershipChangeIndex;
// Leadership transfer: when true, the leader is handing off to a target and
// must reject new proposals until the transfer completes or times out.
// Go reference: raft.go stepDown / sendTimeoutNow (leader blocks proposals during transfer)
private int _transferInProgress; // 0 = false, 1 = true (Interlocked-compatible)
// Joint consensus (two-phase membership change) per Raft paper Section 4.
// During the joint phase both the old config (Cold) and new config (Cnew) are stored.
// A quorum decision requires majority from BOTH configurations simultaneously.
@@ -61,6 +66,13 @@ public sealed class RaftNode : IDisposable
// Go reference: raft.go:961-1019 single-change invariant.
public bool MembershipChangeInProgress => Interlocked.Read(ref _membershipChangeIndex) > 0;
/// <summary>
/// True while a leadership transfer is in progress.
/// During a transfer the leader blocks new proposals.
/// Go reference: raft.go stepDown / sendTimeoutNow.
/// </summary>
public bool TransferInProgress => Interlocked.CompareExchange(ref _transferInProgress, 0, 0) == 1;
/// <summary>
/// True when this node is in the joint consensus phase (transitioning between
/// two membership configurations).
@@ -165,6 +177,11 @@ public sealed class RaftNode : IDisposable
if (Role != RaftRole.Leader)
throw new InvalidOperationException("Only leader can propose entries.");
// Block proposals while a leadership transfer is in progress.
// Go reference: raft.go — leader rejects new entries during stepDown / sendTimeoutNow.
if (Interlocked.CompareExchange(ref _transferInProgress, 0, 0) == 1)
throw new InvalidOperationException("Leadership transfer in progress; proposals are blocked.");
var entry = Log.Append(TermState.CurrentTerm, command);
var followers = _cluster.Where(n => n.Id != Id).ToList();
var results = await _replicator.ReplicateAsync(Id, entry, followers, _transport, ct);
@@ -407,12 +424,34 @@ public sealed class RaftNode : IDisposable
}
/// <summary>
/// Installs a snapshot assembled from streaming chunks.
/// Used for large snapshot transfers where the entire snapshot is sent in pieces.
/// Go reference: raft.go:3500-3700 (installSnapshot with chunked transfer).
/// Installs a snapshot assembled from streaming chunks, with optional CRC32 integrity validation.
///
/// After all chunks are accumulated and reassembled into a contiguous byte array, the CRC32
/// (IEEE 802.3) checksum of the assembled data is computed and compared against
/// <paramref name="expectedCrc32"/> when provided. A mismatch indicates data corruption during
/// transfer and causes an <see cref="InvalidDataException"/> to be thrown before any state is
/// mutated, so the follower can request a retransmit rather than applying corrupt state.
///
/// Go reference: raft.go:3500-3700 (installSnapshot with chunked transfer and CRC validation).
/// </summary>
/// <param name="chunks">Ordered sequence of byte arrays that form the snapshot when concatenated.</param>
/// <param name="snapshotIndex">The last log index covered by this snapshot.</param>
/// <param name="snapshotTerm">The term of the last log entry covered by this snapshot.</param>
/// <param name="expectedCrc32">
/// Optional CRC32 checksum of the full assembled snapshot data. When non-null the assembled
/// data is validated before installation; a mismatch throws <see cref="InvalidDataException"/>.
/// </param>
/// <param name="ct">Cancellation token.</param>
/// <exception cref="InvalidDataException">
/// Thrown when <paramref name="expectedCrc32"/> is provided and the assembled data does not
/// match the expected checksum.
/// </exception>
public async Task InstallSnapshotFromChunksAsync(
IEnumerable<byte[]> chunks, long snapshotIndex, int snapshotTerm, CancellationToken ct)
IEnumerable<byte[]> chunks,
long snapshotIndex,
int snapshotTerm,
CancellationToken ct,
uint? expectedCrc32 = null)
{
var checkpoint = new RaftSnapshotCheckpoint
{
@@ -424,6 +463,19 @@ public sealed class RaftNode : IDisposable
checkpoint.AddChunk(chunk);
var data = checkpoint.Assemble();
// CRC32 validation: compute over assembled data and compare to the sender's checksum.
// Go reference: raft.go installSnapshot CRC check before applying snapshot state.
if (expectedCrc32.HasValue)
{
var crc = new System.IO.Hashing.Crc32();
crc.Append(data);
var actualCrc32 = crc.GetCurrentHashAsUInt32();
if (actualCrc32 != expectedCrc32.Value)
throw new InvalidDataException(
$"Snapshot CRC32 mismatch: expected 0x{expectedCrc32.Value:X8}, got 0x{actualCrc32:X8}.");
}
var snapshot = new RaftSnapshot
{
LastIncludedIndex = snapshotIndex,
@@ -537,6 +589,78 @@ public sealed class RaftNode : IDisposable
StartElection(clusterSize);
}
// Leadership transfer
// Go reference: raft.go sendTimeoutNow / processTimeoutNow
/// <summary>
/// Transfers leadership to the target peer by sending a TimeoutNow RPC.
/// Must only be called on the current leader. Blocks new proposals during
/// the transfer window (up to 2× the election timeout).
///
/// Returns <c>true</c> when the target becomes leader within the deadline,
/// or <c>false</c> on timeout.
///
/// Go reference: raft.go stepDown (leadership transfer variant) / sendTimeoutNow.
/// </summary>
public async Task<bool> TransferLeadershipAsync(string targetId, CancellationToken ct)
{
if (Role != RaftRole.Leader)
throw new InvalidOperationException("Only the leader can transfer leadership.");
if (_transport == null)
throw new InvalidOperationException("No transport configured; cannot send TimeoutNow RPC.");
// Set the transfer flag so ProposeAsync is blocked.
Interlocked.Exchange(ref _transferInProgress, 1);
try
{
await _transport.SendTimeoutNowAsync(Id, targetId, (ulong)TermState.CurrentTerm, ct);
// Wait for the target to become leader (poll the in-process cluster).
// Deadline = 2× election timeout maximum.
var deadlineMs = ElectionTimeoutMaxMs * 2;
var pollIntervalMs = Math.Max(1, ElectionTimeoutMinMs / 10);
var elapsed = 0;
while (elapsed < deadlineMs)
{
var target = _cluster.FirstOrDefault(
n => string.Equals(n.Id, targetId, StringComparison.Ordinal));
if (target != null && target.IsLeader)
return true;
await Task.Delay(pollIntervalMs, ct);
elapsed += pollIntervalMs;
}
return false;
}
finally
{
Interlocked.Exchange(ref _transferInProgress, 0);
}
}
/// <summary>
/// Handles a received TimeoutNow RPC: immediately starts an election,
/// bypassing both the election timer and the pre-vote round.
/// Called on the target node when the current leader sends a TimeoutNow message.
///
/// Go reference: raft.go processTimeoutNow — triggers immediate campaign.
/// </summary>
public void ReceiveTimeoutNow(ulong term)
{
// Accept the sender's term if it's higher.
if ((int)term > TermState.CurrentTerm)
TermState.CurrentTerm = (int)term;
// Immediately campaign, skipping pre-vote (the leader has already decided
// this node should win, so pre-vote would be redundant and wasteful).
var clusterSize = _cluster.Count > 0 ? _cluster.Count : _members.Count;
StartElection(clusterSize);
}
private void ElectionTimerCallback(object? state)
{
if (_electionTimerCts?.IsCancellationRequested == true)

View File

@@ -443,6 +443,118 @@ public readonly record struct RaftPreVoteResponseWire(
}
}
/// <summary>
/// Binary wire encoding of a RAFT TimeoutNow RPC.
/// Sent by the current leader to ask a target follower to immediately start
/// an election, bypassing the election timer. Used for leadership transfer.
/// Fixed 16-byte layout (little-endian):
/// [0..7] term uint64 — sender's current term
/// [8..15] leaderId 8-byte ASCII, zero-padded
/// Go reference: raft.go sendTimeoutNow / processTimeoutNow
/// </summary>
public readonly record struct RaftTimeoutNowWire(ulong Term, string LeaderId)
{
/// <summary>
/// Fixed byte length of a TimeoutNow message.
/// Layout: [8:term][8:leaderId] = 16 bytes total.
/// </summary>
public const int MessageLen = 8 + RaftWireConstants.IdLen; // 16
/// <summary>
/// Encodes this TimeoutNow message to a 16-byte buffer.
/// </summary>
public byte[] Encode()
{
var buf = new byte[MessageLen];
BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(0), Term);
RaftWireHelpers.WriteId(buf.AsSpan(8), LeaderId);
return buf;
}
/// <summary>
/// Decodes a TimeoutNow message from a span. Throws <see cref="ArgumentException"/>
/// if the span is not exactly 16 bytes.
/// </summary>
public static RaftTimeoutNowWire Decode(ReadOnlySpan<byte> msg)
{
if (msg.Length != MessageLen)
throw new ArgumentException(
$"TimeoutNow requires exactly {MessageLen} bytes, got {msg.Length}.",
nameof(msg));
return new RaftTimeoutNowWire(
Term: BinaryPrimitives.ReadUInt64LittleEndian(msg[0..]),
LeaderId: RaftWireHelpers.ReadId(msg[8..]));
}
}
/// <summary>
/// Binary wire encoding of a RAFT InstallSnapshot chunk message (variable length).
///
/// This message type carries a single chunk of a snapshot during a streaming
/// InstallSnapshot RPC. The leader sends chunks sequentially; the follower
/// accumulates them and validates the CRC32 after receiving the final chunk.
///
/// Wire layout (little-endian):
/// [0..7] snapshotIndex uint64 — last log index covered by this snapshot
/// [8..11] snapshotTerm uint32 — term of the last included entry
/// [12..15] chunkIndex uint32 — zero-based index of this chunk in the sequence
/// [16..19] totalChunks uint32 — total number of chunks in this snapshot transfer
/// [20..23] crc32 uint32 — CRC32 of the ENTIRE assembled snapshot (sent on every chunk)
/// [24+] data bytes — raw chunk payload
///
/// Go reference: raft.go:3500-3700 (installSnapshot chunked transfer and CRC validation)
/// </summary>
public readonly record struct RaftInstallSnapshotChunkWire(
ulong SnapshotIndex,
uint SnapshotTerm,
uint ChunkIndex,
uint TotalChunks,
uint Crc32,
byte[] Data)
{
/// <summary>Fixed-size header before the variable-length data payload (24 bytes).</summary>
public const int HeaderLen = 8 + 4 + 4 + 4 + 4; // 24
/// <summary>
/// Encodes this snapshot chunk message to a byte array.
/// Total length = <see cref="HeaderLen"/> + Data.Length.
/// </summary>
public byte[] Encode()
{
var buf = new byte[HeaderLen + Data.Length];
var span = buf.AsSpan();
BinaryPrimitives.WriteUInt64LittleEndian(span[0..], SnapshotIndex);
BinaryPrimitives.WriteUInt32LittleEndian(span[8..], SnapshotTerm);
BinaryPrimitives.WriteUInt32LittleEndian(span[12..], ChunkIndex);
BinaryPrimitives.WriteUInt32LittleEndian(span[16..], TotalChunks);
BinaryPrimitives.WriteUInt32LittleEndian(span[20..], Crc32);
Data.AsSpan().CopyTo(span[HeaderLen..]);
return buf;
}
/// <summary>
/// Decodes an InstallSnapshot chunk message from <paramref name="msg"/>.
/// Throws <see cref="ArgumentException"/> when the buffer is shorter than
/// the fixed <see cref="HeaderLen"/> bytes.
/// </summary>
public static RaftInstallSnapshotChunkWire Decode(ReadOnlySpan<byte> msg)
{
if (msg.Length < HeaderLen)
throw new ArgumentException(
$"InstallSnapshotChunk requires at least {HeaderLen} bytes, got {msg.Length}.",
nameof(msg));
return new RaftInstallSnapshotChunkWire(
SnapshotIndex: BinaryPrimitives.ReadUInt64LittleEndian(msg[0..]),
SnapshotTerm: BinaryPrimitives.ReadUInt32LittleEndian(msg[8..]),
ChunkIndex: BinaryPrimitives.ReadUInt32LittleEndian(msg[12..]),
TotalChunks: BinaryPrimitives.ReadUInt32LittleEndian(msg[16..]),
Crc32: BinaryPrimitives.ReadUInt32LittleEndian(msg[20..]),
Data: msg[HeaderLen..].ToArray());
}
}
/// <summary>
/// Shared encoding helpers for all RAFT wire format types.
/// </summary>

View File

@@ -0,0 +1,90 @@
using System.IO.Hashing;
namespace NATS.Server.Raft;
/// <summary>
/// Splits a snapshot byte array into fixed-size chunks for streaming transfer
/// and computes a CRC32 checksum over the entire data for integrity validation.
///
/// During an InstallSnapshot RPC the leader streams a large snapshot to a lagging
/// follower chunk-by-chunk rather than sending it in a single message. The follower
/// accumulates the chunks, reassembles them, then validates the CRC32 before
/// applying the snapshot — preventing silent data corruption in transit.
///
/// Go reference: raft.go:3500-3700 (installSnapshot chunked transfer and CRC validation)
/// </summary>
public sealed class SnapshotChunkEnumerator : IEnumerable<byte[]>
{
private readonly byte[] _data;
private readonly int _chunkSize;
private uint? _crc32Value;
/// <summary>
/// Default chunk size matching Go's snapshot chunk transfer size (64 KiB).
/// Go reference: raft.go snapshotChunkSize constant.
/// </summary>
public const int DefaultChunkSize = 65536;
/// <summary>
/// Creates a new chunk enumerator over <paramref name="data"/>.
/// </summary>
/// <param name="data">The full snapshot data to split into chunks.</param>
/// <param name="chunkSize">Maximum size of each chunk in bytes. Defaults to 64 KiB.</param>
/// <exception cref="ArgumentOutOfRangeException">
/// Thrown when <paramref name="chunkSize"/> is less than or equal to zero.
/// </exception>
public SnapshotChunkEnumerator(byte[] data, int chunkSize = DefaultChunkSize)
{
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(chunkSize);
_data = data;
_chunkSize = chunkSize;
}
/// <summary>
/// The CRC32 (IEEE 802.3) checksum of the entire snapshot data.
/// Computed lazily on first access and cached for subsequent reads.
/// The receiver uses this value to validate the reassembled snapshot after
/// all chunks have been received.
/// Go reference: raft.go installSnapshot CRC validation.
/// </summary>
public uint Crc32Value
{
get
{
if (!_crc32Value.HasValue)
{
var crc = new Crc32();
crc.Append(_data);
_crc32Value = crc.GetCurrentHashAsUInt32();
}
return _crc32Value.Value;
}
}
/// <summary>
/// The total number of chunks this data splits into given the configured chunk size.
/// </summary>
public int ChunkCount => _data.Length == 0 ? 1 : ((_data.Length + _chunkSize - 1) / _chunkSize);
/// <inheritdoc />
public IEnumerator<byte[]> GetEnumerator()
{
if (_data.Length == 0)
{
yield return [];
yield break;
}
var offset = 0;
while (offset < _data.Length)
{
var length = Math.Min(_chunkSize, _data.Length - offset);
var chunk = new byte[length];
_data.AsSpan(offset, length).CopyTo(chunk);
offset += length;
yield return chunk;
}
}
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() => GetEnumerator();
}