From ed1b62d6a3bc4bedab301e5d8934e80d69b08893 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 20:09:24 -0500 Subject: [PATCH] feat(batch30): implement raft group-b snapshot helpers --- .../JetStream/RaftTypes.Snapshots.cs | 357 ++++++++++++++++++ .../JetStream/RaftNodeCoreTests.cs | 111 ++++++ porting.db | Bin 6668288 -> 6672384 bytes 3 files changed, 468 insertions(+) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.Snapshots.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/RaftNodeCoreTests.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.Snapshots.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.Snapshots.cs new file mode 100644 index 0000000..0654335 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.Snapshots.cs @@ -0,0 +1,357 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Buffers.Binary; +using System.Security.Cryptography; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class Raft +{ + private const string SnapshotFilePrefix = "snap"; + private const string SnapshotFileSuffix = ".bin"; + private const int SnapshotHeaderLength = 20; + private const int SnapshotChecksumLength = 8; + + internal byte[] EncodeSnapshot(Snapshot? snap) + { + if (snap is null) + { + return []; + } + + var peerStateLength = snap.PeerState.Length; + var dataLength = snap.Data.Length; + var payloadLength = SnapshotHeaderLength + peerStateLength + dataLength; + var buffer = new byte[payloadLength + SnapshotChecksumLength]; + + BinaryPrimitives.WriteUInt64LittleEndian(buffer.AsSpan(0, 8), snap.LastTerm); + BinaryPrimitives.WriteUInt64LittleEndian(buffer.AsSpan(8, 8), snap.LastIndex); + BinaryPrimitives.WriteUInt32LittleEndian(buffer.AsSpan(16, 4), (uint)peerStateLength); + + var writeOffset = SnapshotHeaderLength; + snap.PeerState.CopyTo(buffer, writeOffset); + writeOffset += peerStateLength; + snap.Data.CopyTo(buffer, writeOffset); + + var checksum = SHA256.HashData(buffer.AsSpan(0, payloadLength)); + checksum.AsSpan(0, SnapshotChecksumLength).CopyTo(buffer.AsSpan(payloadLength, SnapshotChecksumLength)); + return buffer; + } + + internal Exception? InstallSnapshotInternal(Snapshot snap) + { + ArgumentNullException.ThrowIfNull(snap); + + try + { + var snapshotsDirectory = GetSnapshotsDirectory(); + Directory.CreateDirectory(snapshotsDirectory); + + var newSnapshotFile = Path.Combine(snapshotsDirectory, FormatSnapshotFileName(snap.LastTerm, snap.LastIndex)); + File.WriteAllBytes(newSnapshotFile, EncodeSnapshot(snap)); + + if (!string.IsNullOrWhiteSpace(SnapFile) && + !string.Equals(SnapFile, newSnapshotFile, StringComparison.Ordinal) && + File.Exists(SnapFile)) + { + File.Delete(SnapFile); + } + + SnapFile = newSnapshotFile; + PApplied = snap.LastIndex; + PIndex = snap.LastIndex; + PTerm = snap.LastTerm; + Commit = Math.Max(Commit, snap.LastIndex); + WalBytes = (ulong)(new FileInfo(newSnapshotFile).Length); + return null; + } + catch (Exception ex) + { + return ex; + } + finally + { + Snapshotting = false; + } + } + + internal Checkpoint? CreateSnapshotCheckpointLocked(bool force) + { + if (State() == RaftState.Closed) + { + return null; + } + + if (Snapshotting) + { + return null; + } + + if (!force && Progress_ is { Count: > 0 }) + { + return null; + } + + if (Applied_ == 0) + { + return null; + } + + Snapshotting = true; + var snapshotFile = Path.Combine(GetSnapshotsDirectory(), FormatSnapshotFileName(Term_, Applied_)); + return new Checkpoint + { + Node = this, + Term = Term_, + Applied = Applied_, + PApplied = PApplied, + SnapFile = snapshotFile, + PeerState = [.. Wps], + }; + } + + internal (ulong Term, ulong Index, Exception? Error) TermAndIndexFromSnapFile(string snapFileName) + { + if (string.IsNullOrWhiteSpace(snapFileName)) + { + return (0, 0, new InvalidOperationException("bad snapshot file name")); + } + + var fileName = Path.GetFileNameWithoutExtension(snapFileName); + var segments = fileName.Split('-', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries); + if (segments.Length != 3 || !string.Equals(segments[0], SnapshotFilePrefix, StringComparison.Ordinal)) + { + return (0, 0, new InvalidOperationException("bad snapshot file name")); + } + + if (!ulong.TryParse(segments[1], out var term) || !ulong.TryParse(segments[2], out var index)) + { + return (0, 0, new InvalidOperationException("bad snapshot file name")); + } + + return (term, index, null); + } + + internal Exception? SetupLastSnapshot() + { + var snapshotDirectory = GetSnapshotsDirectory(); + if (!Directory.Exists(snapshotDirectory)) + { + return new InvalidOperationException("no snapshot available"); + } + + var snapshotFiles = Directory.GetFiles(snapshotDirectory, $"*{SnapshotFileSuffix}"); + if (snapshotFiles.Length == 0) + { + return new InvalidOperationException("no snapshot available"); + } + + ulong latestTerm = 0; + ulong latestIndex = 0; + string? latestSnapshot = null; + foreach (var candidate in snapshotFiles) + { + var (term, index, parseError) = TermAndIndexFromSnapFile(candidate); + if (parseError is not null) + { + File.Delete(candidate); + continue; + } + + if (term > latestTerm || (term == latestTerm && index > latestIndex)) + { + latestTerm = term; + latestIndex = index; + latestSnapshot = candidate; + } + } + + if (latestSnapshot is null) + { + return null; + } + + SnapFile = latestSnapshot; + var (snapshot, loadError) = LoadLastSnapshot(); + if (loadError is not null || snapshot is null) + { + return loadError; + } + + PIndex = snapshot.LastIndex; + PTerm = snapshot.LastTerm; + Commit = snapshot.LastIndex; + PApplied = snapshot.LastIndex; + Wps = [.. snapshot.PeerState]; + + foreach (var oldFile in snapshotFiles) + { + if (!string.Equals(oldFile, latestSnapshot, StringComparison.Ordinal)) + { + File.Delete(oldFile); + } + } + + return null; + } + + internal (Snapshot? Snapshot, Exception? Error) LoadLastSnapshot() + { + if (string.IsNullOrWhiteSpace(SnapFile)) + { + return (null, new InvalidOperationException("no snapshot available")); + } + + try + { + var buffer = File.ReadAllBytes(SnapFile); + if (buffer.Length < SnapshotHeaderLength + SnapshotChecksumLength) + { + return (null, new InvalidOperationException("snapshot corrupt")); + } + + var payloadLength = buffer.Length - SnapshotChecksumLength; + var expectedChecksum = buffer.AsSpan(payloadLength, SnapshotChecksumLength); + var computedChecksum = SHA256.HashData(buffer.AsSpan(0, payloadLength)); + if (!expectedChecksum.SequenceEqual(computedChecksum.AsSpan(0, SnapshotChecksumLength))) + { + return (null, new InvalidOperationException("snapshot corrupt")); + } + + var term = BinaryPrimitives.ReadUInt64LittleEndian(buffer.AsSpan(0, 8)); + var index = BinaryPrimitives.ReadUInt64LittleEndian(buffer.AsSpan(8, 8)); + var peerStateLength = (int)BinaryPrimitives.ReadUInt32LittleEndian(buffer.AsSpan(16, 4)); + if (SnapshotHeaderLength + peerStateLength > payloadLength) + { + return (null, new InvalidOperationException("snapshot corrupt")); + } + + var peerState = buffer.AsSpan(SnapshotHeaderLength, peerStateLength).ToArray(); + var data = buffer.AsSpan(SnapshotHeaderLength + peerStateLength, payloadLength - SnapshotHeaderLength - peerStateLength).ToArray(); + if (index == 0) + { + File.Delete(SnapFile); + SnapFile = string.Empty; + return (null, null); + } + + return (new Snapshot + { + LastTerm = term, + LastIndex = index, + PeerState = peerState, + Data = data, + }, null); + } + catch (Exception ex) + { + return (null, ex); + } + } + + internal void StepdownLocked(string newLeader) + { + StateValue = (int)RaftState.Follower; + LeaderId = newLeader; + Lsut = DateTime.UtcNow; + } + + internal bool IsCatchingUp() => Catchup is not null; + + internal bool IsCurrent(bool includeForwardProgress = false) + { + if (State() == RaftState.Closed || Commit == 0 || Catchup is not null) + { + return false; + } + + if (Paused && HCommit > Commit) + { + return false; + } + + if (Commit == Applied_) + { + HcBehind = false; + return true; + } + + if (!includeForwardProgress) + { + return false; + } + + var startDelta = Commit > Applied_ ? Commit - Applied_ : 0; + return startDelta <= 1; + } + + internal string SelectNextLeader() + { + var nextLeader = string.Empty; + ulong highestIndex = 0; + foreach (var (peer, peerState) in Peers_) + { + if (string.Equals(peer, Id, StringComparison.Ordinal) || peerState.Li <= highestIndex) + { + continue; + } + + nextLeader = peer; + highestIndex = peerState.Li; + } + + return nextLeader; + } + + internal TimeSpan RandCampaignTimeout() + { + var min = 150; + var max = 300; + var delta = Random.Shared.Next(min, max); + return TimeSpan.FromMilliseconds(delta); + } + + internal Exception? CampaignInternal(TimeSpan electionTimeout) + { + _ = electionTimeout; + if (State() == RaftState.Leader) + { + return new InvalidOperationException("already leader"); + } + + Campaign(); + return null; + } + + internal Exception? XferCampaign() + { + if (State() == RaftState.Leader) + { + Lxfer = false; + return new InvalidOperationException("already leader"); + } + + Lxfer = true; + Campaign(); + return null; + } + + internal void UpdateKnownPeersLocked(IReadOnlyList knownPeers) + { + ProposeKnownPeers(knownPeers); + } + + private string GetSnapshotsDirectory() + { + if (string.IsNullOrWhiteSpace(StoreDir)) + { + StoreDir = Path.Combine(Path.GetTempPath(), "natsnet-raft"); + } + + return Path.Combine(StoreDir, "snapshots"); + } + + private static string FormatSnapshotFileName(ulong term, ulong index) => + $"{SnapshotFilePrefix}-{term:D20}-{index:D20}{SnapshotFileSuffix}"; +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/RaftNodeCoreTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/RaftNodeCoreTests.cs new file mode 100644 index 0000000..f5fb5f7 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/RaftNodeCoreTests.cs @@ -0,0 +1,111 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using Shouldly; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class RaftNodeCoreTests +{ + [Fact] + public void SnapshotHelpers_WhenEncodedAndLoaded_ShouldRoundTrip() + { + var storeDir = Path.Combine(Path.GetTempPath(), $"raft-node-core-{Guid.NewGuid():N}"); + var raft = new Raft + { + StoreDir = storeDir, + Term_ = 3, + Applied_ = 9, + PApplied = 7, + Wps = [1, 2, 3], + }; + + try + { + var checkpoint = raft.CreateSnapshotCheckpointLocked(force: true); + checkpoint.ShouldNotBeNull(); + + var snapshot = new Snapshot + { + LastTerm = 3, + LastIndex = 9, + PeerState = [7, 8], + Data = [9, 10, 11], + }; + + raft.InstallSnapshotInternal(snapshot).ShouldBeNull(); + var (loaded, error) = raft.LoadLastSnapshot(); + error.ShouldBeNull(); + loaded.ShouldNotBeNull(); + loaded!.LastTerm.ShouldBe(3UL); + loaded.LastIndex.ShouldBe(9UL); + loaded.PeerState.ShouldBe([7, 8]); + loaded.Data.ShouldBe([9, 10, 11]); + + raft.SetupLastSnapshot().ShouldBeNull(); + raft.PIndex.ShouldBe(9UL); + raft.PTerm.ShouldBe(3UL); + } + finally + { + if (Directory.Exists(storeDir)) + { + Directory.Delete(storeDir, recursive: true); + } + } + } + + [Fact] + public void LeadershipHelpers_WhenSteppingDownAndSelectingLeader_ShouldUpdateState() + { + var raft = new Raft + { + Id = "N1", + StateValue = (int)RaftState.Candidate, + Peers_ = new Dictionary + { + ["N2"] = new() { Li = 3 }, + ["N3"] = new() { Li = 7 }, + }, + }; + + raft.StepdownLocked("N3"); + raft.State().ShouldBe(RaftState.Follower); + raft.LeaderId.ShouldBe("N3"); + raft.SelectNextLeader().ShouldBe("N3"); + } + + [Fact] + public void CampaignHelpers_WhenLeaderOrFollower_ShouldReturnExpectedOutcome() + { + var raft = new Raft + { + StateValue = (int)RaftState.Leader, + }; + + raft.CampaignInternal(TimeSpan.FromMilliseconds(200)).ShouldNotBeNull(); + raft.XferCampaign().ShouldNotBeNull(); + + raft.StateValue = (int)RaftState.Follower; + raft.CampaignInternal(TimeSpan.FromMilliseconds(200)).ShouldBeNull(); + raft.State().ShouldBe(RaftState.Candidate); + } + + [Fact] + public void ProgressHelpers_WhenCatchupAndKnownPeersChange_ShouldTrackFlags() + { + var raft = new Raft + { + Commit = 10, + Applied_ = 8, + Catchup = new CatchupState(), + }; + + raft.IsCatchingUp().ShouldBeTrue(); + raft.IsCurrent(includeForwardProgress: true).ShouldBeFalse(); + raft.Catchup = null; + raft.UpdateKnownPeersLocked(["N2", "N3"]); + raft.Peers_.Count.ShouldBe(2); + raft.RandCampaignTimeout().ShouldBeGreaterThan(TimeSpan.Zero); + } +} diff --git a/porting.db b/porting.db index cfae808fc758903e8d84bd0e882ad6d8cc980e3a..69aae47810f3fbe12a1a8e319ef5fc082ca2e9ea 100644 GIT binary patch delta 5361 zcmbuBYj6|S6~}j1cO~sZTHyyYCIZ=jjm*Q61xVP&0h73iu#h465iBQ>CD~R(z?C0? zG+>J+Wu_smozh&Bq!cn4hD0gAWdnZEu~~8Et3f}0VnR%^z6#4 zt=<>Tt{&n~K&wcDYdwY7YuIC`u_0DGdC`#YhXS;{)d-KuNMRf1?4;9IWifHjy zlF#sa${gDEc-xjpM}+sv?_Q8bEn8RY%z=s!)@)(0R9}YCw&T*DvS)i@tcD!SETih1pAC#|BFwUhiPe zW68;6D1Uf4*U5aK?-r_<3*ks}Bo>Q=`536vnu>yYS5vK^j%um})Im*!Ks~RiMo>>{ zDgdfWQw^Y6HB~2+Rxz73Sqq(2nyLX+t|>n#ho*d>^5j=MjJtDHgE>HzwRdzj@paHD zR$H~uTBx>apfyKr`Jt7iwtUdy)s`2}Jj&FpQg$+7q3mM9LfOHDg|d4I3uWgL7Rs(A zECTY8qf8*z_>!pf>KkQoh*?H6VhHzDO9A=7S>`0KhmJsh*PiGlMvh+6_5Wk8uACC7 zZXLZ>DxkW}8K&hz3HOoyBi4?JjDIkEiF0lABXaiuI?f<)y@4wRQGq2M+gF!C(e8MG zJ$ng5XXEvQ=tp?ydG;=RtYm?SsS4uqL398!gE?097py;l7GT?Jb2#KFak^#C2~
Veuu9Op~Lvfll)i)`7E-JXTYDI zML)z>PNRJMvvU)NjL)1y$8p7N(^%Lq&!d9zF!{gd(Ho3Kv{Iw$y12*IG12HpubsQZT}3B_Gfn5v*#)+YWxd2Eq{Fp{f5S|%sfc?SF~t6 zAAa_a=$p4LBib^Tbx~$rwa$zU%k-sap03*T4A;xC;)s|l)Ui8d+jW%5B7O$t#qOJ^ z62(jQ#qe({&@3DtMQajM{csc&O)z)eM2GP1htSwmM{Xu9#Nu19Lc5$0m5-2|cBMK+vyvH} z{}lGi6`z4i1$I0Bw=97Dok!G{e9vu>1@wRcFajLF0|FodCcq3xfCZQVWB{2!7LX0h z1ZDx>0`3BGfZ4#^fEAbn%mr*fE|3Sz1Lgw@fPA0;SP0mGLSPZF7+3-<1&RO%-~@^R z7qARi4y*v&fCpF!lmMkb8L$c{2krq@18aZ^U@dSja34?!d>gnQcmVJM>wqf22doGD zKsB%d*a$obYyxV4TA&W72Oa`80}Vg`XapVx9swSeZ;L_pJbg=MF9+Ar8>LT#r-d%* zl+Y@@BWxC`gmS?lJI9@n>bbYLBixIUS1Oa1 za(lU5TpQQKZQ?394_7GJ@Mw=^qx{w1gU3XX+SL__MVq6MFxQ7IHI}Ek`Iz!ezVj;| zO;W8%swGKWUN2c*j6&umuhSa8S7PJZIivz za!`)^+*oFU?_dsdfs(eHubW#kXc@%4+0aorS6&7^6oQi!HJN)u_Q9PH_hQY1}7>L{l= zZGkjwoR%R^8V06i$dQH((=s%YhIP|2G?0e1(=ybPhBebNWJyE+v_EV&hT4T~4Qhzt;q!yQS zht$7|G*S!7q(dTZayv;UwUVUo^c3l&mXP$FzDGK#wIh9}CrKx@Xr%A7gLG1>Mfy(L ZNhh^Tr0?_u>7>?&Nu5mEOdf7vB0`9l0}Jx4}%)r z8ubt(Ap;qQU_?d&fj}lk;l^l$AOZ#gV^c_EVPqOZL}d$sjTyzaEvfvt$@84|z0Z4| z=bZL^B)7Md+>xtv2!biP-920E_`I<)D6V<8D`<2DMeVqwK)oc`n_pYI?$wr!EmoiL z&9CBV)+PwTp>(l~vps26%GuU5E8*;&G@H)Zx-^S$)|6&p&X%TGh_mW6E9R^`&4Pko z5CUmh$PXu`S%9BB7K-BAt{(BAJv#B9)Xx$T-w3MLp&{S}U)aqK4KZ zO&4kSinP+$yToz9x}~*WGFtoP-M{3ilAsA~vbfdZ6WZ(lCeDMaq z)5Ra72?MN?x*Y06YP`yTt`6|@c}j{D4xPBkL_JUaR%6i_63G}h-E4?^FHgM9pNeYB#m&h$pA`5cmRXj_^3{GGaLOj!W}0McK^#3rBbjy zWK&a(S;q;{}e zupi!I!L--={O4(LF6oslPFg$0K6-2;e1AXM2+s^YX}%RUv0QLi;E49nnrRN0wy+9q zywoq2+h!Vk$~mG@E-JE>CTq%Os?XpHd-DNbtQ$5ZnDo_Ir);D8n#@P5$wVdP7ePTV zA*cvu1Pg){!G@qA0D>LCfuJLdL&!kLM94yLB8*3vfbaxDHbM?UF2Y0v7s4cjJOnp_ z2jNKsFT!MmDG2!pK7;}UKSBVZ5Me69G=w6AAVM)h2qBCRL70wEf>4T3hER_16v7OI znFvoKR3OYkn2k`0FbAOu;TeRv2=frC5#}R2i?9IUIfR7>ix6rMY7rJA)FCWEs7H7n zVJSib!V3t?5MD%h31K