feat(batch30): implement raft group-b snapshot helpers

This commit is contained in:
Joseph Doherty
2026-02-28 20:09:24 -05:00
parent 492030bd4b
commit ed1b62d6a3
3 changed files with 468 additions and 0 deletions

View File

@@ -0,0 +1,357 @@
// Copyright 2012-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0
using System.Buffers.Binary;
using System.Security.Cryptography;
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class Raft
{
private const string SnapshotFilePrefix = "snap";
private const string SnapshotFileSuffix = ".bin";
private const int SnapshotHeaderLength = 20;
private const int SnapshotChecksumLength = 8;
internal byte[] EncodeSnapshot(Snapshot? snap)
{
if (snap is null)
{
return [];
}
var peerStateLength = snap.PeerState.Length;
var dataLength = snap.Data.Length;
var payloadLength = SnapshotHeaderLength + peerStateLength + dataLength;
var buffer = new byte[payloadLength + SnapshotChecksumLength];
BinaryPrimitives.WriteUInt64LittleEndian(buffer.AsSpan(0, 8), snap.LastTerm);
BinaryPrimitives.WriteUInt64LittleEndian(buffer.AsSpan(8, 8), snap.LastIndex);
BinaryPrimitives.WriteUInt32LittleEndian(buffer.AsSpan(16, 4), (uint)peerStateLength);
var writeOffset = SnapshotHeaderLength;
snap.PeerState.CopyTo(buffer, writeOffset);
writeOffset += peerStateLength;
snap.Data.CopyTo(buffer, writeOffset);
var checksum = SHA256.HashData(buffer.AsSpan(0, payloadLength));
checksum.AsSpan(0, SnapshotChecksumLength).CopyTo(buffer.AsSpan(payloadLength, SnapshotChecksumLength));
return buffer;
}
internal Exception? InstallSnapshotInternal(Snapshot snap)
{
ArgumentNullException.ThrowIfNull(snap);
try
{
var snapshotsDirectory = GetSnapshotsDirectory();
Directory.CreateDirectory(snapshotsDirectory);
var newSnapshotFile = Path.Combine(snapshotsDirectory, FormatSnapshotFileName(snap.LastTerm, snap.LastIndex));
File.WriteAllBytes(newSnapshotFile, EncodeSnapshot(snap));
if (!string.IsNullOrWhiteSpace(SnapFile) &&
!string.Equals(SnapFile, newSnapshotFile, StringComparison.Ordinal) &&
File.Exists(SnapFile))
{
File.Delete(SnapFile);
}
SnapFile = newSnapshotFile;
PApplied = snap.LastIndex;
PIndex = snap.LastIndex;
PTerm = snap.LastTerm;
Commit = Math.Max(Commit, snap.LastIndex);
WalBytes = (ulong)(new FileInfo(newSnapshotFile).Length);
return null;
}
catch (Exception ex)
{
return ex;
}
finally
{
Snapshotting = false;
}
}
internal Checkpoint? CreateSnapshotCheckpointLocked(bool force)
{
if (State() == RaftState.Closed)
{
return null;
}
if (Snapshotting)
{
return null;
}
if (!force && Progress_ is { Count: > 0 })
{
return null;
}
if (Applied_ == 0)
{
return null;
}
Snapshotting = true;
var snapshotFile = Path.Combine(GetSnapshotsDirectory(), FormatSnapshotFileName(Term_, Applied_));
return new Checkpoint
{
Node = this,
Term = Term_,
Applied = Applied_,
PApplied = PApplied,
SnapFile = snapshotFile,
PeerState = [.. Wps],
};
}
internal (ulong Term, ulong Index, Exception? Error) TermAndIndexFromSnapFile(string snapFileName)
{
if (string.IsNullOrWhiteSpace(snapFileName))
{
return (0, 0, new InvalidOperationException("bad snapshot file name"));
}
var fileName = Path.GetFileNameWithoutExtension(snapFileName);
var segments = fileName.Split('-', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries);
if (segments.Length != 3 || !string.Equals(segments[0], SnapshotFilePrefix, StringComparison.Ordinal))
{
return (0, 0, new InvalidOperationException("bad snapshot file name"));
}
if (!ulong.TryParse(segments[1], out var term) || !ulong.TryParse(segments[2], out var index))
{
return (0, 0, new InvalidOperationException("bad snapshot file name"));
}
return (term, index, null);
}
internal Exception? SetupLastSnapshot()
{
var snapshotDirectory = GetSnapshotsDirectory();
if (!Directory.Exists(snapshotDirectory))
{
return new InvalidOperationException("no snapshot available");
}
var snapshotFiles = Directory.GetFiles(snapshotDirectory, $"*{SnapshotFileSuffix}");
if (snapshotFiles.Length == 0)
{
return new InvalidOperationException("no snapshot available");
}
ulong latestTerm = 0;
ulong latestIndex = 0;
string? latestSnapshot = null;
foreach (var candidate in snapshotFiles)
{
var (term, index, parseError) = TermAndIndexFromSnapFile(candidate);
if (parseError is not null)
{
File.Delete(candidate);
continue;
}
if (term > latestTerm || (term == latestTerm && index > latestIndex))
{
latestTerm = term;
latestIndex = index;
latestSnapshot = candidate;
}
}
if (latestSnapshot is null)
{
return null;
}
SnapFile = latestSnapshot;
var (snapshot, loadError) = LoadLastSnapshot();
if (loadError is not null || snapshot is null)
{
return loadError;
}
PIndex = snapshot.LastIndex;
PTerm = snapshot.LastTerm;
Commit = snapshot.LastIndex;
PApplied = snapshot.LastIndex;
Wps = [.. snapshot.PeerState];
foreach (var oldFile in snapshotFiles)
{
if (!string.Equals(oldFile, latestSnapshot, StringComparison.Ordinal))
{
File.Delete(oldFile);
}
}
return null;
}
internal (Snapshot? Snapshot, Exception? Error) LoadLastSnapshot()
{
if (string.IsNullOrWhiteSpace(SnapFile))
{
return (null, new InvalidOperationException("no snapshot available"));
}
try
{
var buffer = File.ReadAllBytes(SnapFile);
if (buffer.Length < SnapshotHeaderLength + SnapshotChecksumLength)
{
return (null, new InvalidOperationException("snapshot corrupt"));
}
var payloadLength = buffer.Length - SnapshotChecksumLength;
var expectedChecksum = buffer.AsSpan(payloadLength, SnapshotChecksumLength);
var computedChecksum = SHA256.HashData(buffer.AsSpan(0, payloadLength));
if (!expectedChecksum.SequenceEqual(computedChecksum.AsSpan(0, SnapshotChecksumLength)))
{
return (null, new InvalidOperationException("snapshot corrupt"));
}
var term = BinaryPrimitives.ReadUInt64LittleEndian(buffer.AsSpan(0, 8));
var index = BinaryPrimitives.ReadUInt64LittleEndian(buffer.AsSpan(8, 8));
var peerStateLength = (int)BinaryPrimitives.ReadUInt32LittleEndian(buffer.AsSpan(16, 4));
if (SnapshotHeaderLength + peerStateLength > payloadLength)
{
return (null, new InvalidOperationException("snapshot corrupt"));
}
var peerState = buffer.AsSpan(SnapshotHeaderLength, peerStateLength).ToArray();
var data = buffer.AsSpan(SnapshotHeaderLength + peerStateLength, payloadLength - SnapshotHeaderLength - peerStateLength).ToArray();
if (index == 0)
{
File.Delete(SnapFile);
SnapFile = string.Empty;
return (null, null);
}
return (new Snapshot
{
LastTerm = term,
LastIndex = index,
PeerState = peerState,
Data = data,
}, null);
}
catch (Exception ex)
{
return (null, ex);
}
}
internal void StepdownLocked(string newLeader)
{
StateValue = (int)RaftState.Follower;
LeaderId = newLeader;
Lsut = DateTime.UtcNow;
}
internal bool IsCatchingUp() => Catchup is not null;
internal bool IsCurrent(bool includeForwardProgress = false)
{
if (State() == RaftState.Closed || Commit == 0 || Catchup is not null)
{
return false;
}
if (Paused && HCommit > Commit)
{
return false;
}
if (Commit == Applied_)
{
HcBehind = false;
return true;
}
if (!includeForwardProgress)
{
return false;
}
var startDelta = Commit > Applied_ ? Commit - Applied_ : 0;
return startDelta <= 1;
}
internal string SelectNextLeader()
{
var nextLeader = string.Empty;
ulong highestIndex = 0;
foreach (var (peer, peerState) in Peers_)
{
if (string.Equals(peer, Id, StringComparison.Ordinal) || peerState.Li <= highestIndex)
{
continue;
}
nextLeader = peer;
highestIndex = peerState.Li;
}
return nextLeader;
}
internal TimeSpan RandCampaignTimeout()
{
var min = 150;
var max = 300;
var delta = Random.Shared.Next(min, max);
return TimeSpan.FromMilliseconds(delta);
}
internal Exception? CampaignInternal(TimeSpan electionTimeout)
{
_ = electionTimeout;
if (State() == RaftState.Leader)
{
return new InvalidOperationException("already leader");
}
Campaign();
return null;
}
internal Exception? XferCampaign()
{
if (State() == RaftState.Leader)
{
Lxfer = false;
return new InvalidOperationException("already leader");
}
Lxfer = true;
Campaign();
return null;
}
internal void UpdateKnownPeersLocked(IReadOnlyList<string> knownPeers)
{
ProposeKnownPeers(knownPeers);
}
private string GetSnapshotsDirectory()
{
if (string.IsNullOrWhiteSpace(StoreDir))
{
StoreDir = Path.Combine(Path.GetTempPath(), "natsnet-raft");
}
return Path.Combine(StoreDir, "snapshots");
}
private static string FormatSnapshotFileName(ulong term, ulong index) =>
$"{SnapshotFilePrefix}-{term:D20}-{index:D20}{SnapshotFileSuffix}";
}

View File

@@ -0,0 +1,111 @@
// Copyright 2012-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0
using Shouldly;
namespace ZB.MOM.NatsNet.Server.Tests.JetStream;
public sealed class RaftNodeCoreTests
{
[Fact]
public void SnapshotHelpers_WhenEncodedAndLoaded_ShouldRoundTrip()
{
var storeDir = Path.Combine(Path.GetTempPath(), $"raft-node-core-{Guid.NewGuid():N}");
var raft = new Raft
{
StoreDir = storeDir,
Term_ = 3,
Applied_ = 9,
PApplied = 7,
Wps = [1, 2, 3],
};
try
{
var checkpoint = raft.CreateSnapshotCheckpointLocked(force: true);
checkpoint.ShouldNotBeNull();
var snapshot = new Snapshot
{
LastTerm = 3,
LastIndex = 9,
PeerState = [7, 8],
Data = [9, 10, 11],
};
raft.InstallSnapshotInternal(snapshot).ShouldBeNull();
var (loaded, error) = raft.LoadLastSnapshot();
error.ShouldBeNull();
loaded.ShouldNotBeNull();
loaded!.LastTerm.ShouldBe(3UL);
loaded.LastIndex.ShouldBe(9UL);
loaded.PeerState.ShouldBe([7, 8]);
loaded.Data.ShouldBe([9, 10, 11]);
raft.SetupLastSnapshot().ShouldBeNull();
raft.PIndex.ShouldBe(9UL);
raft.PTerm.ShouldBe(3UL);
}
finally
{
if (Directory.Exists(storeDir))
{
Directory.Delete(storeDir, recursive: true);
}
}
}
[Fact]
public void LeadershipHelpers_WhenSteppingDownAndSelectingLeader_ShouldUpdateState()
{
var raft = new Raft
{
Id = "N1",
StateValue = (int)RaftState.Candidate,
Peers_ = new Dictionary<string, Lps>
{
["N2"] = new() { Li = 3 },
["N3"] = new() { Li = 7 },
},
};
raft.StepdownLocked("N3");
raft.State().ShouldBe(RaftState.Follower);
raft.LeaderId.ShouldBe("N3");
raft.SelectNextLeader().ShouldBe("N3");
}
[Fact]
public void CampaignHelpers_WhenLeaderOrFollower_ShouldReturnExpectedOutcome()
{
var raft = new Raft
{
StateValue = (int)RaftState.Leader,
};
raft.CampaignInternal(TimeSpan.FromMilliseconds(200)).ShouldNotBeNull();
raft.XferCampaign().ShouldNotBeNull();
raft.StateValue = (int)RaftState.Follower;
raft.CampaignInternal(TimeSpan.FromMilliseconds(200)).ShouldBeNull();
raft.State().ShouldBe(RaftState.Candidate);
}
[Fact]
public void ProgressHelpers_WhenCatchupAndKnownPeersChange_ShouldTrackFlags()
{
var raft = new Raft
{
Commit = 10,
Applied_ = 8,
Catchup = new CatchupState(),
};
raft.IsCatchingUp().ShouldBeTrue();
raft.IsCurrent(includeForwardProgress: true).ShouldBeFalse();
raft.Catchup = null;
raft.UpdateKnownPeersLocked(["N2", "N3"]);
raft.Peers_.Count.ShouldBe(2);
raft.RandCampaignTimeout().ShouldBeGreaterThan(TimeSpan.Zero);
}
}

Binary file not shown.