From 492030bd4b809c8b2bc394484d28282639b76a1c Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 20:06:17 -0500 Subject: [PATCH] feat(batch30): implement raft group-a server integration --- .../JetStream/RaftTypes.ServerIntegration.cs | 113 ++++++++ .../JetStream/RaftTypes.cs | 50 +++- .../ZB.MOM.NatsNet.Server/NatsServer.Raft.cs | 245 ++++++++++++++++++ .../JetStream/RaftTypesTests.cs | 1 + .../Server/NatsServerRaftTests.cs | 66 +++++ porting.db | Bin 6664192 -> 6668288 bytes 6 files changed, 468 insertions(+), 7 deletions(-) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.ServerIntegration.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Raft.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Server/NatsServerRaftTests.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.ServerIntegration.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.ServerIntegration.cs new file mode 100644 index 0000000..716a3de --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.ServerIntegration.cs @@ -0,0 +1,113 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Threading; + +namespace ZB.MOM.NatsNet.Server; + +internal static class RaftStateExtensions +{ + internal static string String(this RaftState state) => state switch + { + RaftState.Follower => "FOLLOWER", + RaftState.Candidate => "CANDIDATE", + RaftState.Leader => "LEADER", + RaftState.Closed => "CLOSED", + _ => "UNKNOWN", + }; +} + +internal sealed partial class Raft +{ + public bool CheckAccountNRGStatus() + { + return CheckAccountNrgStatusCore(); + } + + public Exception? RecreateInternalSubsLocked() + { + return RecreateInternalSubsLockedCore(); + } + + public bool OutOfResources() + { + return OutOfResourcesCore(); + } + + public void PauseApplyLocked() + { + PauseApplyLockedCore(); + } + + private bool CheckAccountNrgStatusCore() + { + if (Server_ is not NatsServer server) + { + return false; + } + + if (!server.AccountNrgAllowed) + { + return false; + } + + var enabled = true; + foreach (var peerName in Peers_.Keys) + { + var nodeInfo = server.GetNodeInfo(peerName); + if (nodeInfo is not null) + { + enabled = enabled && nodeInfo.AccountNrg; + } + } + + return enabled; + } + + private Exception? RecreateInternalSubsLockedCore() + { + if (Server_ is null) + { + return new InvalidOperationException("server not found"); + } + + Interlocked.Exchange(ref _isSysAccV, 1); + Active = DateTime.UtcNow; + return null; + } + + private bool OutOfResourcesCore() + { + if (!Track || JetStream_ is null) + { + return false; + } + + if (JetStream_ is IJetStreamResourceLimits limits) + { + return limits.LimitsExceeded(WalType); + } + + return false; + } + + private void PauseApplyLockedCore() + { + if (State() == RaftState.Candidate) + { + StateValue = (int)RaftState.Follower; + Interlocked.Exchange(ref HasLeaderV, 0); + } + + Paused = true; + if (HCommit < Commit) + { + HCommit = Commit; + } + } +} + +internal interface IJetStreamResourceLimits +{ + bool LimitsExceeded(StorageType storageType); +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.cs index 45f9dae..4490c2d 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.cs @@ -98,6 +98,12 @@ public interface IRaftNode void RecreateInternalSubs(); bool IsSystemAccount(); string GetTrafficAccountName(); + + // Batch 30 mapped methods (server/raft.go) + bool CheckAccountNRGStatus(); + Exception? RecreateInternalSubsLocked(); + bool OutOfResources(); + void PauseApplyLocked(); } // ============================================================================ @@ -210,7 +216,7 @@ public sealed class RaftConfig /// Mirrors Go raft struct in server/raft.go lines 151-251. /// All algorithm methods are stubbed — full implementation is session 20+. /// -internal sealed class Raft : IRaftNode +internal sealed partial class Raft : IRaftNode { // Identity / location internal DateTime Created_ { get; set; } @@ -308,6 +314,8 @@ internal sealed class Raft : IRaftNode internal bool Lxfer { get; set; } internal bool HcBehind { get; set; } internal bool MaybeLeader { get; set; } + internal bool Track { get; set; } + internal bool DebugEnabled { get; set; } internal bool Paused { get; set; } internal bool Observer_ { get; set; } internal bool Initializing { get; set; } @@ -725,7 +733,18 @@ internal sealed class Raft : IRaftNode } } public IpQueue ApplyQ() => ApplyQ_ ?? throw new InvalidOperationException("Apply queue not initialized"); - public void PauseApply() => Paused = true; + public void PauseApply() + { + _lock.EnterWriteLock(); + try + { + PauseApplyLocked(); + } + finally + { + _lock.ExitWriteLock(); + } + } public void ResumeApply() => Paused = false; public bool DrainAndReplaySnapshot() @@ -733,11 +752,13 @@ internal sealed class Raft : IRaftNode _lock.EnterWriteLock(); try { - if (Snapshotting) - return false; + var canReplay = !Snapshotting; + if (canReplay) + { + HcBehind = false; + } - HcBehind = false; - return true; + return canReplay; } finally { @@ -782,7 +803,22 @@ internal sealed class Raft : IRaftNode Stop(); } public bool IsDeleted() => Deleted_; - public void RecreateInternalSubs() => Active = DateTime.UtcNow; + public void RecreateInternalSubs() + { + _lock.EnterWriteLock(); + try + { + var error = RecreateInternalSubsLocked(); + if (error is not null) + { + throw error; + } + } + finally + { + _lock.ExitWriteLock(); + } + } public bool IsSystemAccount() => Interlocked.Read(ref _isSysAccV) != 0; public string GetTrafficAccountName() => IsSystemAccount() ? "$SYS" : (string.IsNullOrEmpty(AccName) ? "$G" : AccName); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Raft.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Raft.cs new file mode 100644 index 0000000..469506c --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Raft.cs @@ -0,0 +1,245 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Net; +using System.Text.Json; +using System.Threading.Channels; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + private const int RaftPeerIdLength = 8; + private const string PeerStateFileName = "peerstate.json"; + + internal bool AccountNrgAllowed { get; set; } = true; + + internal Exception? BootstrapRaftNode(RaftConfig? cfg, IReadOnlyList? knownPeers, bool allPeersKnown) + { + if (cfg is null) + { + return new InvalidOperationException("raft: nil config"); + } + + knownPeers ??= []; + foreach (var peer in knownPeers) + { + if (string.IsNullOrWhiteSpace(peer) || peer.Length != RaftPeerIdLength) + { + return new InvalidOperationException($"raft: illegal peer: {peer}"); + } + } + + var expected = knownPeers.Count; + if (!allPeersKnown) + { + if (expected < 2) + { + expected = 2; + } + + var opts = GetOpts(); + var routeCount = opts.Routes.Count; + var gatewayPeerCount = 0; + var clusterName = ClusterName(); + foreach (var gateway in opts.Gateway.Gateways) + { + if (string.Equals(gateway.Name, clusterName, StringComparison.Ordinal)) + { + continue; + } + + foreach (var url in gateway.Urls) + { + var host = url.Host; + if (IPAddress.TryParse(host, out _)) + { + gatewayPeerCount++; + } + else + { + try + { + var addrs = Dns.GetHostAddresses(host); + gatewayPeerCount += addrs.Length > 0 ? addrs.Length : 1; + } + catch + { + gatewayPeerCount++; + } + } + } + } + + var inferred = routeCount + gatewayPeerCount; + if (expected < inferred) + { + expected = inferred; + } + } + + if (string.IsNullOrWhiteSpace(cfg.Store)) + { + return new InvalidOperationException("raft: storage directory is not set"); + } + + try + { + Directory.CreateDirectory(cfg.Store); + + var tmpPath = Path.Combine(cfg.Store, $"_test_{Guid.NewGuid():N}"); + using (File.Create(tmpPath)) { } + File.Delete(tmpPath); + + var peerState = new RaftPeerState + { + KnownPeers = [.. knownPeers], + ClusterSize = expected, + DomainExt = 0, + }; + + var peerStatePath = Path.Combine(cfg.Store, PeerStateFileName); + var json = JsonSerializer.Serialize(peerState); + File.WriteAllText(peerStatePath, json); + } + catch (Exception ex) + { + return ex; + } + + return null; + } + + internal (Raft? Node, Exception? Error) InitRaftNode(string accName, RaftConfig? cfg, IReadOnlyDictionary? labels = null) + { + if (cfg is null) + { + return (null, new InvalidOperationException("raft: nil config")); + } + + _mu.EnterReadLock(); + try + { + if (_sys == null) + { + return (null, ServerErrors.ErrNoSysAccount); + } + } + finally + { + _mu.ExitReadLock(); + } + + var node = new Raft + { + Created_ = DateTime.UtcNow, + GroupName = cfg.Name, + StoreDir = cfg.Store, + Wal = cfg.Log, + Track = cfg.Track, + Observer_ = cfg.Observer, + Initializing = !cfg.Recovering, + ScaleUp_ = cfg.ScaleUp, + AccName = accName, + Server_ = this, + Id = (ServerName() ?? string.Empty).PadRight(RaftPeerIdLength, '0')[..RaftPeerIdLength], + Qn = 1, + Csz = 1, + StateValue = (int)RaftState.Follower, + LeadC = Channel.CreateUnbounded(), + Quit = Channel.CreateUnbounded(), + ApplyQ_ = new IpQueue($"{cfg.Name}-committed"), + PropQ = new IpQueue($"{cfg.Name}-propose"), + EntryQ = new IpQueue($"{cfg.Name}-append"), + RespQ = new IpQueue($"{cfg.Name}-append-response"), + Reqs = new IpQueue($"{cfg.Name}-vote-req"), + Votes_ = new IpQueue($"{cfg.Name}-vote-resp"), + }; + + RegisterRaftNode(node.GroupName, node); + _ = labels; + return (node, null); + } + + internal (IRaftNode? Node, Exception? Error) StartRaftNode(string accName, RaftConfig? cfg, IReadOnlyDictionary? labels = null) + { + var (node, error) = InitRaftNode(accName, cfg, labels); + if (error is not null) + { + return (null, error); + } + + return (node, null); + } + + internal string ServerNameForNode(string node) + { + if (_nodeToInfo.TryGetValue(node, out var value) && value is NodeInfo info) + { + return info.Name; + } + + return string.Empty; + } + + internal string ClusterNameForNode(string node) + { + if (_nodeToInfo.TryGetValue(node, out var value) && value is NodeInfo info) + { + return info.Cluster; + } + + return string.Empty; + } + + internal void RegisterRaftNode(string group, IRaftNode node) + { + _raftNodes[group] = node; + } + + internal void UnregisterRaftNode(string group) + { + _raftNodes.TryRemove(group, out _); + } + + internal int NumRaftNodes() => _raftNodes.Count; + + internal IRaftNode? LookupRaftNode(string group) + { + if (_raftNodes.TryGetValue(group, out var value) && value is IRaftNode node) + { + return node; + } + + return null; + } + + internal void ReloadDebugRaftNodes(bool debug) + { + foreach (var value in _raftNodes.Values) + { + if (value is Raft raft) + { + raft.DebugEnabled = debug; + } + } + } + + internal NodeInfo? GetNodeInfo(string nodeId) + { + if (_nodeToInfo.TryGetValue(nodeId, out var value) && value is NodeInfo info) + { + return info; + } + + return null; + } +} + +internal sealed class RaftPeerState +{ + public List KnownPeers { get; set; } = []; + public int ClusterSize { get; set; } + public ushort DomainExt { get; set; } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/RaftTypesTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/RaftTypesTests.cs index eeb180f..11f8947 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/RaftTypesTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/RaftTypesTests.cs @@ -17,6 +17,7 @@ public sealed class RaftTypesTests Id = "N1", GroupName = "RG", AccName = "ACC", + Server_ = new object(), StateValue = (int)RaftState.Leader, LeaderId = "N1", Csz = 3, diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Server/NatsServerRaftTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Server/NatsServerRaftTests.cs new file mode 100644 index 0000000..ba73d63 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Server/NatsServerRaftTests.cs @@ -0,0 +1,66 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using Shouldly; + +namespace ZB.MOM.NatsNet.Server.Tests.Server; + +public sealed class NatsServerRaftTests +{ + [Fact] + public void RaftStateString_WhenKnownState_ReturnsExpectedText() + { + RaftState.Follower.String().ShouldBe("FOLLOWER"); + RaftState.Candidate.String().ShouldBe("CANDIDATE"); + RaftState.Leader.String().ShouldBe("LEADER"); + RaftState.Closed.String().ShouldBe("CLOSED"); + } + + [Fact] + public void RegisterRaftNode_WhenRegisteredAndUnregistered_TracksLookupAndCount() + { + var (server, error) = NatsServer.NewServer(new ServerOptions()); + error.ShouldBeNull(); + server.ShouldNotBeNull(); + + var raftNode = new Raft { GroupName = "G1", Id = "N1" }; + + server!.RegisterRaftNode("G1", raftNode); + server.NumRaftNodes().ShouldBe(1); + server.LookupRaftNode("G1").ShouldBe(raftNode); + + server.UnregisterRaftNode("G1"); + server.NumRaftNodes().ShouldBe(0); + server.LookupRaftNode("G1").ShouldBeNull(); + } + + [Fact] + public void BootstrapRaftNode_WhenStoreMissing_CreatesStoreAndPeerState() + { + var (server, error) = NatsServer.NewServer(new ServerOptions()); + error.ShouldBeNull(); + server.ShouldNotBeNull(); + + var storeDir = Path.Combine(Path.GetTempPath(), $"raft-bootstrap-{Guid.NewGuid():N}"); + var cfg = new RaftConfig + { + Name = "RG", + Store = storeDir, + }; + + try + { + var bootstrapError = server!.BootstrapRaftNode(cfg, ["ABCDEF12", "ABCDEF34"], allPeersKnown: true); + bootstrapError.ShouldBeNull(); + Directory.Exists(storeDir).ShouldBeTrue(); + File.Exists(Path.Combine(storeDir, "peerstate.json")).ShouldBeTrue(); + } + finally + { + if (Directory.Exists(storeDir)) + { + Directory.Delete(storeDir, recursive: true); + } + } + } +} diff --git a/porting.db b/porting.db index e5c1144d214777d0064cf51b31af477155373718..cfae808fc758903e8d84bd0e882ad6d8cc980e3a 100644 GIT binary patch delta 5971 zcmc(jeM}U08pmg5XJ&V2-v)Wn3OkNgVHXO_F0v@Tw6((FTZ@Rch}9XE-O5P;+2vju zqb%2~IkgvSDbJqQcve%OtwrorC)cypzNBZ;o>i|+&WmT&RMT*tMK9Mh_0rt4yD+e@ z|ES~)`Gnsy!*9O7=jHdCXK26FrE6~o-R?|gH$kxLUKAfDcOQ`-DJ1>BdbLn}wUE5p z$FtKcIy?03FuNwD5(^q5%xlFzJ75OKc=N$Uegw@;pY)hjr z=;LzK<6n~}>1|vYK@h##h4$jaG|h6`@t9S#GqdHOqxoxn@ad z&C#s9LNs$nOti8zD;KSi6+6}J0rEl_dx&_B+C=~#;>)2?qK>BTqLTcmQ(RQ1zSOjw zN~`zzeXXs&daevDRD_1YZDPi-Rz`W@{uMu+a!EQ}5v(JYJ( zy+gAwI`kIJ!syUVnuXD!WzE9q&{dj+F4K!O%SogVv_~@?c+;U?m`7PQazDfEx>Pus#hinx zH99-&KF^wAaAzhBC+tj4?CF!@PcQCxu+xLHN;-5-crk>tQUs-D$Tu*`?$l(0N0ET@L$w6L5WmNUZg z7s6A{z6Tpz<~0W%$sSnFHo-Aukq-l%Y64R{ce-%Wf+wBe zI-ZpdpM?bjEbSGl<1<@6HJIVgy_l+KK@DdK{@W`&4_DkAR$Mo2gzndccufpd9Grh$ z@WI*1Og7{k6w=@qZwLt$2?yU0&Oyh4=+gRV4qkavKp#yZtJ ztgbz*^hkQ^0pc6VN~j-K@)S9P8{jT*r(CDcjf3(oRh^FoVn$<_qSNu19T~bU(1yp|grFP}wT}Gack!SMqE%qhHBfE|}Nw zP0;u+c{Av5$QG5mrsRmHN`bS#dSr7zYi#Ouw7TO;C2oFSID>_ zdsMoX->0Jqf?T9_ujiMNJXe;WP!b+g!h?0k2EAmzp6VsuMc078OgiACM491a3U5_U zUQ&8VxOG`MMl*xN4cL7}enLh0|4=k{np_0SugbI32d*hcY1;HW(E{fNl(#7}1>t`x zTWH%MVz0_xSANv>(Eas;1?j|j!j@-!#d6D1Zr*FUVOqp>83&9D4SV$0^u=rg^E>)4 zG^1NUwUIr<`RKvXGbOY%QfngYls!b+Lko)hJ|)=J>I-o2!Iqj#kT)okfX`D*T$SPv z7I*`tt~t=3r99|rXcB`;V3T{E*wWhkY;&-o=@~KDEDrYzVx7<5-0Bm(8-3nQEzJ#0 zK_HhZso6#Ol1r3udlr`#dr*6AGHn-Z-l{w_Mz`c%bY~^%E{@c2=yUqE1eYaPS!pZP zxujw^lBLYJOBcsUJ(WbNa9C>ggi1Y`L@F;PMY$wMd9N-V+U<(Q-a=&MH0N4?aNrwRwo96O3)_;e8J&eKXfECH@PLZCtn)9`P?B# zWMwif&&a0F(DbAU(W^|RS3K4?$ukkYJ&{an_BbuqL})#pOsi;&mTT@fzbft`XDgCv zjd@9Ed2`Jfr+GJZvOJmQsF%ZtUde>$ElZ{sKhH*_JjD}{x~0jaMjd16BgfcSxjWBZ zc4SHX^h(&=V^6R2;axX=li#_dDN+$XwGt$AAhf|-NDUnw(w&weUMa{lH zi}EI0RJ@J1~vv4(_Gx8Q1A;NTxZeOVB<^+|q>Vu1}^L?-oY+I-C{K*8j c@avKZ$6E+aORNhHXDKu9@~gozy8H9|4>??s?EnA( delta 1972 zcmY+^Yiv_x7zgn4o?FkY?b+5LZ1k)hgRL9Tt_V!7RbU`PV6K4U=4t5=5_C9$7%^Bb zIn%IkoqHliGRnrrC1ZYI2bJI;#Gn`)$ejfyfd%k`BLpJ?>eEgd_TksQ|K~k@&gFei zqfua@U3JX1Ol~Vh@e8)QCX%gR>j5vBx^a)!+~Xy8T2#VY87IA_qH6ilcqK~$#fl4IGL_t3MSK4OvTA`1yfNn6wzV$8};w#x@$gB=2^t(*s< zk7S#9$jU9GGwd1Ghf0htqHf3`!DA`ld2T2hVAe< z8SjK`S9t|`HfOT%B!$a`K#|o3zEtiE+{(1%LUtO~TF)>xn3u-YK==gbtaEVQc*$B9 z75Hl2#i97O1qnz*5|WXEtjLD!Cht<8crDUd>M3obAka$dP}QOPhKgO{dGVCkCN_(^ zY%#G&%ohiWHtrv;hr1=#i0j0aVpt4{#oPt%2UF+@(FCH~)G^SpRLw9mZiS9Av{(=K z%hXIL52_yM-yI5@E9R>8G#tIF4}{%8)d>gh>NZFVs>SeQP%VSM?}aX~_^9*2aVX+9 z+22FE8E6TrnWl4&I*Ay%YUnA3o@(f6hVC%*bVKiF=r0+1e?uQ&=oyBdY3KtD-PxvV zLLi4cBa5i-?d7%;*5gW}d`W6xKV(Ame({>{R2XC_<|lKr!R;bD&C>_89W1ohYqb!& zF6P48Jz7EH35)A$N2F3@?D}C4oxwZc+#anO&bK+;5LnL$FzYD8Lv>7ZL)T-`F4X!{ zC^E6!1KSTWX>dNKg}^^bal^Jx87qYLYB{|<=hPPBapUKCpzzqhboklO1o&pJ7VJH^ zwGA7OYS8j}mv}{kwh>OQ#ZA$%j};-kQOgD2g$z5CG-_dpT}Vqz$Z)YyD@^P=aO~4M zU`G-DqP>NcKyK28K~D>76H03H@w_7!z;B;(4)|rT#KAjFS{YQ`5p(M{FdTF@;hn@c zo0MMb!P_3O{(TRW&dE#zf3p^YQm3T={%yu%+MBI*@b1_4Ldse`7as4|@_Vng@PIa@ zcMI=4@c$iD&p)i8EaXCNl#K?V95fgWLAfXoc~CwoKts_mG#q)64;3Om8i8I$Bhe@{ z8Wo{e&=@opjYH$n1T+x^(5vV*Gzm>cQ&2IQil(90(R5UTW}r9FOf(D4My2RYREC0R z4w{SRp|?;unvWKsg=i64jNV4NIsb5|$>)_fQIGI#Q^qJm6-`kUTI!U}DqEDOQl&(c zxym$4j_i}OWrxg5e@XYD+yT^^x5Kmc=3gsJW$=nNMPo zM>I^kqkGH&`yxkJxN%8W;N-oK6Rh__Wib6