diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.Snapshots.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.Snapshots.cs new file mode 100644 index 0000000..0654335 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.Snapshots.cs @@ -0,0 +1,357 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Buffers.Binary; +using System.Security.Cryptography; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class Raft +{ + private const string SnapshotFilePrefix = "snap"; + private const string SnapshotFileSuffix = ".bin"; + private const int SnapshotHeaderLength = 20; + private const int SnapshotChecksumLength = 8; + + internal byte[] EncodeSnapshot(Snapshot? snap) + { + if (snap is null) + { + return []; + } + + var peerStateLength = snap.PeerState.Length; + var dataLength = snap.Data.Length; + var payloadLength = SnapshotHeaderLength + peerStateLength + dataLength; + var buffer = new byte[payloadLength + SnapshotChecksumLength]; + + BinaryPrimitives.WriteUInt64LittleEndian(buffer.AsSpan(0, 8), snap.LastTerm); + BinaryPrimitives.WriteUInt64LittleEndian(buffer.AsSpan(8, 8), snap.LastIndex); + BinaryPrimitives.WriteUInt32LittleEndian(buffer.AsSpan(16, 4), (uint)peerStateLength); + + var writeOffset = SnapshotHeaderLength; + snap.PeerState.CopyTo(buffer, writeOffset); + writeOffset += peerStateLength; + snap.Data.CopyTo(buffer, writeOffset); + + var checksum = SHA256.HashData(buffer.AsSpan(0, payloadLength)); + checksum.AsSpan(0, SnapshotChecksumLength).CopyTo(buffer.AsSpan(payloadLength, SnapshotChecksumLength)); + return buffer; + } + + internal Exception? InstallSnapshotInternal(Snapshot snap) + { + ArgumentNullException.ThrowIfNull(snap); + + try + { + var snapshotsDirectory = GetSnapshotsDirectory(); + Directory.CreateDirectory(snapshotsDirectory); + + var newSnapshotFile = Path.Combine(snapshotsDirectory, FormatSnapshotFileName(snap.LastTerm, snap.LastIndex)); + File.WriteAllBytes(newSnapshotFile, EncodeSnapshot(snap)); + + if (!string.IsNullOrWhiteSpace(SnapFile) && + !string.Equals(SnapFile, newSnapshotFile, StringComparison.Ordinal) && + File.Exists(SnapFile)) + { + File.Delete(SnapFile); + } + + SnapFile = newSnapshotFile; + PApplied = snap.LastIndex; + PIndex = snap.LastIndex; + PTerm = snap.LastTerm; + Commit = Math.Max(Commit, snap.LastIndex); + WalBytes = (ulong)(new FileInfo(newSnapshotFile).Length); + return null; + } + catch (Exception ex) + { + return ex; + } + finally + { + Snapshotting = false; + } + } + + internal Checkpoint? CreateSnapshotCheckpointLocked(bool force) + { + if (State() == RaftState.Closed) + { + return null; + } + + if (Snapshotting) + { + return null; + } + + if (!force && Progress_ is { Count: > 0 }) + { + return null; + } + + if (Applied_ == 0) + { + return null; + } + + Snapshotting = true; + var snapshotFile = Path.Combine(GetSnapshotsDirectory(), FormatSnapshotFileName(Term_, Applied_)); + return new Checkpoint + { + Node = this, + Term = Term_, + Applied = Applied_, + PApplied = PApplied, + SnapFile = snapshotFile, + PeerState = [.. Wps], + }; + } + + internal (ulong Term, ulong Index, Exception? Error) TermAndIndexFromSnapFile(string snapFileName) + { + if (string.IsNullOrWhiteSpace(snapFileName)) + { + return (0, 0, new InvalidOperationException("bad snapshot file name")); + } + + var fileName = Path.GetFileNameWithoutExtension(snapFileName); + var segments = fileName.Split('-', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries); + if (segments.Length != 3 || !string.Equals(segments[0], SnapshotFilePrefix, StringComparison.Ordinal)) + { + return (0, 0, new InvalidOperationException("bad snapshot file name")); + } + + if (!ulong.TryParse(segments[1], out var term) || !ulong.TryParse(segments[2], out var index)) + { + return (0, 0, new InvalidOperationException("bad snapshot file name")); + } + + return (term, index, null); + } + + internal Exception? SetupLastSnapshot() + { + var snapshotDirectory = GetSnapshotsDirectory(); + if (!Directory.Exists(snapshotDirectory)) + { + return new InvalidOperationException("no snapshot available"); + } + + var snapshotFiles = Directory.GetFiles(snapshotDirectory, $"*{SnapshotFileSuffix}"); + if (snapshotFiles.Length == 0) + { + return new InvalidOperationException("no snapshot available"); + } + + ulong latestTerm = 0; + ulong latestIndex = 0; + string? latestSnapshot = null; + foreach (var candidate in snapshotFiles) + { + var (term, index, parseError) = TermAndIndexFromSnapFile(candidate); + if (parseError is not null) + { + File.Delete(candidate); + continue; + } + + if (term > latestTerm || (term == latestTerm && index > latestIndex)) + { + latestTerm = term; + latestIndex = index; + latestSnapshot = candidate; + } + } + + if (latestSnapshot is null) + { + return null; + } + + SnapFile = latestSnapshot; + var (snapshot, loadError) = LoadLastSnapshot(); + if (loadError is not null || snapshot is null) + { + return loadError; + } + + PIndex = snapshot.LastIndex; + PTerm = snapshot.LastTerm; + Commit = snapshot.LastIndex; + PApplied = snapshot.LastIndex; + Wps = [.. snapshot.PeerState]; + + foreach (var oldFile in snapshotFiles) + { + if (!string.Equals(oldFile, latestSnapshot, StringComparison.Ordinal)) + { + File.Delete(oldFile); + } + } + + return null; + } + + internal (Snapshot? Snapshot, Exception? Error) LoadLastSnapshot() + { + if (string.IsNullOrWhiteSpace(SnapFile)) + { + return (null, new InvalidOperationException("no snapshot available")); + } + + try + { + var buffer = File.ReadAllBytes(SnapFile); + if (buffer.Length < SnapshotHeaderLength + SnapshotChecksumLength) + { + return (null, new InvalidOperationException("snapshot corrupt")); + } + + var payloadLength = buffer.Length - SnapshotChecksumLength; + var expectedChecksum = buffer.AsSpan(payloadLength, SnapshotChecksumLength); + var computedChecksum = SHA256.HashData(buffer.AsSpan(0, payloadLength)); + if (!expectedChecksum.SequenceEqual(computedChecksum.AsSpan(0, SnapshotChecksumLength))) + { + return (null, new InvalidOperationException("snapshot corrupt")); + } + + var term = BinaryPrimitives.ReadUInt64LittleEndian(buffer.AsSpan(0, 8)); + var index = BinaryPrimitives.ReadUInt64LittleEndian(buffer.AsSpan(8, 8)); + var peerStateLength = (int)BinaryPrimitives.ReadUInt32LittleEndian(buffer.AsSpan(16, 4)); + if (SnapshotHeaderLength + peerStateLength > payloadLength) + { + return (null, new InvalidOperationException("snapshot corrupt")); + } + + var peerState = buffer.AsSpan(SnapshotHeaderLength, peerStateLength).ToArray(); + var data = buffer.AsSpan(SnapshotHeaderLength + peerStateLength, payloadLength - SnapshotHeaderLength - peerStateLength).ToArray(); + if (index == 0) + { + File.Delete(SnapFile); + SnapFile = string.Empty; + return (null, null); + } + + return (new Snapshot + { + LastTerm = term, + LastIndex = index, + PeerState = peerState, + Data = data, + }, null); + } + catch (Exception ex) + { + return (null, ex); + } + } + + internal void StepdownLocked(string newLeader) + { + StateValue = (int)RaftState.Follower; + LeaderId = newLeader; + Lsut = DateTime.UtcNow; + } + + internal bool IsCatchingUp() => Catchup is not null; + + internal bool IsCurrent(bool includeForwardProgress = false) + { + if (State() == RaftState.Closed || Commit == 0 || Catchup is not null) + { + return false; + } + + if (Paused && HCommit > Commit) + { + return false; + } + + if (Commit == Applied_) + { + HcBehind = false; + return true; + } + + if (!includeForwardProgress) + { + return false; + } + + var startDelta = Commit > Applied_ ? Commit - Applied_ : 0; + return startDelta <= 1; + } + + internal string SelectNextLeader() + { + var nextLeader = string.Empty; + ulong highestIndex = 0; + foreach (var (peer, peerState) in Peers_) + { + if (string.Equals(peer, Id, StringComparison.Ordinal) || peerState.Li <= highestIndex) + { + continue; + } + + nextLeader = peer; + highestIndex = peerState.Li; + } + + return nextLeader; + } + + internal TimeSpan RandCampaignTimeout() + { + var min = 150; + var max = 300; + var delta = Random.Shared.Next(min, max); + return TimeSpan.FromMilliseconds(delta); + } + + internal Exception? CampaignInternal(TimeSpan electionTimeout) + { + _ = electionTimeout; + if (State() == RaftState.Leader) + { + return new InvalidOperationException("already leader"); + } + + Campaign(); + return null; + } + + internal Exception? XferCampaign() + { + if (State() == RaftState.Leader) + { + Lxfer = false; + return new InvalidOperationException("already leader"); + } + + Lxfer = true; + Campaign(); + return null; + } + + internal void UpdateKnownPeersLocked(IReadOnlyList knownPeers) + { + ProposeKnownPeers(knownPeers); + } + + private string GetSnapshotsDirectory() + { + if (string.IsNullOrWhiteSpace(StoreDir)) + { + StoreDir = Path.Combine(Path.GetTempPath(), "natsnet-raft"); + } + + return Path.Combine(StoreDir, "snapshots"); + } + + private static string FormatSnapshotFileName(ulong term, ulong index) => + $"{SnapshotFilePrefix}-{term:D20}-{index:D20}{SnapshotFileSuffix}"; +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/RaftNodeCoreTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/RaftNodeCoreTests.cs new file mode 100644 index 0000000..f5fb5f7 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/RaftNodeCoreTests.cs @@ -0,0 +1,111 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using Shouldly; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class RaftNodeCoreTests +{ + [Fact] + public void SnapshotHelpers_WhenEncodedAndLoaded_ShouldRoundTrip() + { + var storeDir = Path.Combine(Path.GetTempPath(), $"raft-node-core-{Guid.NewGuid():N}"); + var raft = new Raft + { + StoreDir = storeDir, + Term_ = 3, + Applied_ = 9, + PApplied = 7, + Wps = [1, 2, 3], + }; + + try + { + var checkpoint = raft.CreateSnapshotCheckpointLocked(force: true); + checkpoint.ShouldNotBeNull(); + + var snapshot = new Snapshot + { + LastTerm = 3, + LastIndex = 9, + PeerState = [7, 8], + Data = [9, 10, 11], + }; + + raft.InstallSnapshotInternal(snapshot).ShouldBeNull(); + var (loaded, error) = raft.LoadLastSnapshot(); + error.ShouldBeNull(); + loaded.ShouldNotBeNull(); + loaded!.LastTerm.ShouldBe(3UL); + loaded.LastIndex.ShouldBe(9UL); + loaded.PeerState.ShouldBe([7, 8]); + loaded.Data.ShouldBe([9, 10, 11]); + + raft.SetupLastSnapshot().ShouldBeNull(); + raft.PIndex.ShouldBe(9UL); + raft.PTerm.ShouldBe(3UL); + } + finally + { + if (Directory.Exists(storeDir)) + { + Directory.Delete(storeDir, recursive: true); + } + } + } + + [Fact] + public void LeadershipHelpers_WhenSteppingDownAndSelectingLeader_ShouldUpdateState() + { + var raft = new Raft + { + Id = "N1", + StateValue = (int)RaftState.Candidate, + Peers_ = new Dictionary + { + ["N2"] = new() { Li = 3 }, + ["N3"] = new() { Li = 7 }, + }, + }; + + raft.StepdownLocked("N3"); + raft.State().ShouldBe(RaftState.Follower); + raft.LeaderId.ShouldBe("N3"); + raft.SelectNextLeader().ShouldBe("N3"); + } + + [Fact] + public void CampaignHelpers_WhenLeaderOrFollower_ShouldReturnExpectedOutcome() + { + var raft = new Raft + { + StateValue = (int)RaftState.Leader, + }; + + raft.CampaignInternal(TimeSpan.FromMilliseconds(200)).ShouldNotBeNull(); + raft.XferCampaign().ShouldNotBeNull(); + + raft.StateValue = (int)RaftState.Follower; + raft.CampaignInternal(TimeSpan.FromMilliseconds(200)).ShouldBeNull(); + raft.State().ShouldBe(RaftState.Candidate); + } + + [Fact] + public void ProgressHelpers_WhenCatchupAndKnownPeersChange_ShouldTrackFlags() + { + var raft = new Raft + { + Commit = 10, + Applied_ = 8, + Catchup = new CatchupState(), + }; + + raft.IsCatchingUp().ShouldBeTrue(); + raft.IsCurrent(includeForwardProgress: true).ShouldBeFalse(); + raft.Catchup = null; + raft.UpdateKnownPeersLocked(["N2", "N3"]); + raft.Peers_.Count.ShouldBe(2); + raft.RandCampaignTimeout().ShouldBeGreaterThan(TimeSpan.Zero); + } +} diff --git a/porting.db b/porting.db index cfae808..69aae47 100644 Binary files a/porting.db and b/porting.db differ