feat(batch30): implement raft group-a server integration

This commit is contained in:
Joseph Doherty
2026-02-28 20:06:17 -05:00
parent a5b4dd39b1
commit 492030bd4b
6 changed files with 468 additions and 7 deletions

View File

@@ -0,0 +1,113 @@
// Copyright 2012-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0
using System.Threading;
namespace ZB.MOM.NatsNet.Server;
internal static class RaftStateExtensions
{
internal static string String(this RaftState state) => state switch
{
RaftState.Follower => "FOLLOWER",
RaftState.Candidate => "CANDIDATE",
RaftState.Leader => "LEADER",
RaftState.Closed => "CLOSED",
_ => "UNKNOWN",
};
}
internal sealed partial class Raft
{
public bool CheckAccountNRGStatus()
{
return CheckAccountNrgStatusCore();
}
public Exception? RecreateInternalSubsLocked()
{
return RecreateInternalSubsLockedCore();
}
public bool OutOfResources()
{
return OutOfResourcesCore();
}
public void PauseApplyLocked()
{
PauseApplyLockedCore();
}
private bool CheckAccountNrgStatusCore()
{
if (Server_ is not NatsServer server)
{
return false;
}
if (!server.AccountNrgAllowed)
{
return false;
}
var enabled = true;
foreach (var peerName in Peers_.Keys)
{
var nodeInfo = server.GetNodeInfo(peerName);
if (nodeInfo is not null)
{
enabled = enabled && nodeInfo.AccountNrg;
}
}
return enabled;
}
private Exception? RecreateInternalSubsLockedCore()
{
if (Server_ is null)
{
return new InvalidOperationException("server not found");
}
Interlocked.Exchange(ref _isSysAccV, 1);
Active = DateTime.UtcNow;
return null;
}
private bool OutOfResourcesCore()
{
if (!Track || JetStream_ is null)
{
return false;
}
if (JetStream_ is IJetStreamResourceLimits limits)
{
return limits.LimitsExceeded(WalType);
}
return false;
}
private void PauseApplyLockedCore()
{
if (State() == RaftState.Candidate)
{
StateValue = (int)RaftState.Follower;
Interlocked.Exchange(ref HasLeaderV, 0);
}
Paused = true;
if (HCommit < Commit)
{
HCommit = Commit;
}
}
}
internal interface IJetStreamResourceLimits
{
bool LimitsExceeded(StorageType storageType);
}

View File

@@ -98,6 +98,12 @@ public interface IRaftNode
void RecreateInternalSubs();
bool IsSystemAccount();
string GetTrafficAccountName();
// Batch 30 mapped methods (server/raft.go)
bool CheckAccountNRGStatus();
Exception? RecreateInternalSubsLocked();
bool OutOfResources();
void PauseApplyLocked();
}
// ============================================================================
@@ -210,7 +216,7 @@ public sealed class RaftConfig
/// Mirrors Go <c>raft</c> struct in server/raft.go lines 151-251.
/// All algorithm methods are stubbed — full implementation is session 20+.
/// </summary>
internal sealed class Raft : IRaftNode
internal sealed partial class Raft : IRaftNode
{
// Identity / location
internal DateTime Created_ { get; set; }
@@ -308,6 +314,8 @@ internal sealed class Raft : IRaftNode
internal bool Lxfer { get; set; }
internal bool HcBehind { get; set; }
internal bool MaybeLeader { get; set; }
internal bool Track { get; set; }
internal bool DebugEnabled { get; set; }
internal bool Paused { get; set; }
internal bool Observer_ { get; set; }
internal bool Initializing { get; set; }
@@ -725,7 +733,18 @@ internal sealed class Raft : IRaftNode
}
}
public IpQueue<CommittedEntry> ApplyQ() => ApplyQ_ ?? throw new InvalidOperationException("Apply queue not initialized");
public void PauseApply() => Paused = true;
public void PauseApply()
{
_lock.EnterWriteLock();
try
{
PauseApplyLocked();
}
finally
{
_lock.ExitWriteLock();
}
}
public void ResumeApply() => Paused = false;
public bool DrainAndReplaySnapshot()
@@ -733,11 +752,13 @@ internal sealed class Raft : IRaftNode
_lock.EnterWriteLock();
try
{
if (Snapshotting)
return false;
var canReplay = !Snapshotting;
if (canReplay)
{
HcBehind = false;
}
HcBehind = false;
return true;
return canReplay;
}
finally
{
@@ -782,7 +803,22 @@ internal sealed class Raft : IRaftNode
Stop();
}
public bool IsDeleted() => Deleted_;
public void RecreateInternalSubs() => Active = DateTime.UtcNow;
public void RecreateInternalSubs()
{
_lock.EnterWriteLock();
try
{
var error = RecreateInternalSubsLocked();
if (error is not null)
{
throw error;
}
}
finally
{
_lock.ExitWriteLock();
}
}
public bool IsSystemAccount() => Interlocked.Read(ref _isSysAccV) != 0;
public string GetTrafficAccountName()
=> IsSystemAccount() ? "$SYS" : (string.IsNullOrEmpty(AccName) ? "$G" : AccName);

View File

@@ -0,0 +1,245 @@
// Copyright 2012-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0
using System.Net;
using System.Text.Json;
using System.Threading.Channels;
using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
private const int RaftPeerIdLength = 8;
private const string PeerStateFileName = "peerstate.json";
internal bool AccountNrgAllowed { get; set; } = true;
internal Exception? BootstrapRaftNode(RaftConfig? cfg, IReadOnlyList<string>? knownPeers, bool allPeersKnown)
{
if (cfg is null)
{
return new InvalidOperationException("raft: nil config");
}
knownPeers ??= [];
foreach (var peer in knownPeers)
{
if (string.IsNullOrWhiteSpace(peer) || peer.Length != RaftPeerIdLength)
{
return new InvalidOperationException($"raft: illegal peer: {peer}");
}
}
var expected = knownPeers.Count;
if (!allPeersKnown)
{
if (expected < 2)
{
expected = 2;
}
var opts = GetOpts();
var routeCount = opts.Routes.Count;
var gatewayPeerCount = 0;
var clusterName = ClusterName();
foreach (var gateway in opts.Gateway.Gateways)
{
if (string.Equals(gateway.Name, clusterName, StringComparison.Ordinal))
{
continue;
}
foreach (var url in gateway.Urls)
{
var host = url.Host;
if (IPAddress.TryParse(host, out _))
{
gatewayPeerCount++;
}
else
{
try
{
var addrs = Dns.GetHostAddresses(host);
gatewayPeerCount += addrs.Length > 0 ? addrs.Length : 1;
}
catch
{
gatewayPeerCount++;
}
}
}
}
var inferred = routeCount + gatewayPeerCount;
if (expected < inferred)
{
expected = inferred;
}
}
if (string.IsNullOrWhiteSpace(cfg.Store))
{
return new InvalidOperationException("raft: storage directory is not set");
}
try
{
Directory.CreateDirectory(cfg.Store);
var tmpPath = Path.Combine(cfg.Store, $"_test_{Guid.NewGuid():N}");
using (File.Create(tmpPath)) { }
File.Delete(tmpPath);
var peerState = new RaftPeerState
{
KnownPeers = [.. knownPeers],
ClusterSize = expected,
DomainExt = 0,
};
var peerStatePath = Path.Combine(cfg.Store, PeerStateFileName);
var json = JsonSerializer.Serialize(peerState);
File.WriteAllText(peerStatePath, json);
}
catch (Exception ex)
{
return ex;
}
return null;
}
internal (Raft? Node, Exception? Error) InitRaftNode(string accName, RaftConfig? cfg, IReadOnlyDictionary<string, string>? labels = null)
{
if (cfg is null)
{
return (null, new InvalidOperationException("raft: nil config"));
}
_mu.EnterReadLock();
try
{
if (_sys == null)
{
return (null, ServerErrors.ErrNoSysAccount);
}
}
finally
{
_mu.ExitReadLock();
}
var node = new Raft
{
Created_ = DateTime.UtcNow,
GroupName = cfg.Name,
StoreDir = cfg.Store,
Wal = cfg.Log,
Track = cfg.Track,
Observer_ = cfg.Observer,
Initializing = !cfg.Recovering,
ScaleUp_ = cfg.ScaleUp,
AccName = accName,
Server_ = this,
Id = (ServerName() ?? string.Empty).PadRight(RaftPeerIdLength, '0')[..RaftPeerIdLength],
Qn = 1,
Csz = 1,
StateValue = (int)RaftState.Follower,
LeadC = Channel.CreateUnbounded<bool>(),
Quit = Channel.CreateUnbounded<bool>(),
ApplyQ_ = new IpQueue<CommittedEntry>($"{cfg.Name}-committed"),
PropQ = new IpQueue<ProposedEntry>($"{cfg.Name}-propose"),
EntryQ = new IpQueue<AppendEntry>($"{cfg.Name}-append"),
RespQ = new IpQueue<AppendEntryResponse>($"{cfg.Name}-append-response"),
Reqs = new IpQueue<VoteRequest>($"{cfg.Name}-vote-req"),
Votes_ = new IpQueue<VoteResponse>($"{cfg.Name}-vote-resp"),
};
RegisterRaftNode(node.GroupName, node);
_ = labels;
return (node, null);
}
internal (IRaftNode? Node, Exception? Error) StartRaftNode(string accName, RaftConfig? cfg, IReadOnlyDictionary<string, string>? labels = null)
{
var (node, error) = InitRaftNode(accName, cfg, labels);
if (error is not null)
{
return (null, error);
}
return (node, null);
}
internal string ServerNameForNode(string node)
{
if (_nodeToInfo.TryGetValue(node, out var value) && value is NodeInfo info)
{
return info.Name;
}
return string.Empty;
}
internal string ClusterNameForNode(string node)
{
if (_nodeToInfo.TryGetValue(node, out var value) && value is NodeInfo info)
{
return info.Cluster;
}
return string.Empty;
}
internal void RegisterRaftNode(string group, IRaftNode node)
{
_raftNodes[group] = node;
}
internal void UnregisterRaftNode(string group)
{
_raftNodes.TryRemove(group, out _);
}
internal int NumRaftNodes() => _raftNodes.Count;
internal IRaftNode? LookupRaftNode(string group)
{
if (_raftNodes.TryGetValue(group, out var value) && value is IRaftNode node)
{
return node;
}
return null;
}
internal void ReloadDebugRaftNodes(bool debug)
{
foreach (var value in _raftNodes.Values)
{
if (value is Raft raft)
{
raft.DebugEnabled = debug;
}
}
}
internal NodeInfo? GetNodeInfo(string nodeId)
{
if (_nodeToInfo.TryGetValue(nodeId, out var value) && value is NodeInfo info)
{
return info;
}
return null;
}
}
internal sealed class RaftPeerState
{
public List<string> KnownPeers { get; set; } = [];
public int ClusterSize { get; set; }
public ushort DomainExt { get; set; }
}

View File

@@ -17,6 +17,7 @@ public sealed class RaftTypesTests
Id = "N1",
GroupName = "RG",
AccName = "ACC",
Server_ = new object(),
StateValue = (int)RaftState.Leader,
LeaderId = "N1",
Csz = 3,

View File

@@ -0,0 +1,66 @@
// Copyright 2012-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0
using Shouldly;
namespace ZB.MOM.NatsNet.Server.Tests.Server;
public sealed class NatsServerRaftTests
{
[Fact]
public void RaftStateString_WhenKnownState_ReturnsExpectedText()
{
RaftState.Follower.String().ShouldBe("FOLLOWER");
RaftState.Candidate.String().ShouldBe("CANDIDATE");
RaftState.Leader.String().ShouldBe("LEADER");
RaftState.Closed.String().ShouldBe("CLOSED");
}
[Fact]
public void RegisterRaftNode_WhenRegisteredAndUnregistered_TracksLookupAndCount()
{
var (server, error) = NatsServer.NewServer(new ServerOptions());
error.ShouldBeNull();
server.ShouldNotBeNull();
var raftNode = new Raft { GroupName = "G1", Id = "N1" };
server!.RegisterRaftNode("G1", raftNode);
server.NumRaftNodes().ShouldBe(1);
server.LookupRaftNode("G1").ShouldBe(raftNode);
server.UnregisterRaftNode("G1");
server.NumRaftNodes().ShouldBe(0);
server.LookupRaftNode("G1").ShouldBeNull();
}
[Fact]
public void BootstrapRaftNode_WhenStoreMissing_CreatesStoreAndPeerState()
{
var (server, error) = NatsServer.NewServer(new ServerOptions());
error.ShouldBeNull();
server.ShouldNotBeNull();
var storeDir = Path.Combine(Path.GetTempPath(), $"raft-bootstrap-{Guid.NewGuid():N}");
var cfg = new RaftConfig
{
Name = "RG",
Store = storeDir,
};
try
{
var bootstrapError = server!.BootstrapRaftNode(cfg, ["ABCDEF12", "ABCDEF34"], allPeersKnown: true);
bootstrapError.ShouldBeNull();
Directory.Exists(storeDir).ShouldBeTrue();
File.Exists(Path.Combine(storeDir, "peerstate.json")).ShouldBeTrue();
}
finally
{
if (Directory.Exists(storeDir))
{
Directory.Delete(storeDir, recursive: true);
}
}
}
}

Binary file not shown.