feat(batch30): implement raft group-c run loop behaviors

This commit is contained in:
Joseph Doherty
2026-02-28 20:17:45 -05:00
parent ed1b62d6a3
commit 8bc4dfa58c
6 changed files with 731 additions and 6 deletions

View File

@@ -0,0 +1,222 @@
// Copyright 2012-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0
using System.Text.Json;
namespace ZB.MOM.NatsNet.Server;
internal static class EntryTypeExtensions
{
internal static string String(this EntryType entryType) => entryType switch
{
EntryType.EntryNormal => "EntryNormal",
EntryType.EntryOldSnapshot => "EntryOldSnapshot",
EntryType.EntryPeerState => "EntryPeerState",
EntryType.EntryAddPeer => "EntryAddPeer",
EntryType.EntryRemovePeer => "EntryRemovePeer",
EntryType.EntryLeaderTransfer => "EntryLeaderTransfer",
EntryType.EntrySnapshot => "EntrySnapshot",
EntryType.EntryCatchup => "EntryCatchup",
_ => "UNKNOWN",
};
}
internal sealed partial class Raft
{
public CommittedEntry NewCommittedEntry(ulong index, IReadOnlyList<Entry>? entries = null)
{
var committed = new CommittedEntry
{
Index = index,
};
if (entries is not null)
{
committed.Entries.AddRange(entries);
}
return committed;
}
public Entry NewEntry(EntryType type, byte[]? data = null) => new()
{
Type = type,
Data = data is null ? [] : [.. data],
};
public AppendEntry NewAppendEntry(string leader, ulong term, ulong commit, ulong prevTerm, ulong prevIndex, IReadOnlyList<Entry>? entries = null)
{
var appendEntry = new AppendEntry
{
Leader = leader,
TermV = term,
Commit = commit,
PTerm = prevTerm,
PIndex = prevIndex,
};
if (entries is not null)
{
appendEntry.Entries.AddRange(entries);
}
return appendEntry;
}
public ProposedEntry NewProposedEntry(Entry? entry = null, string? reply = null) => new()
{
Entry = entry,
Reply = reply ?? string.Empty,
};
public AppendEntry DecodeAppendEntry(byte[] buffer)
{
ArgumentNullException.ThrowIfNull(buffer);
return JsonSerializer.Deserialize<AppendEntry>(buffer) ?? new AppendEntry();
}
public AppendEntryResponse NewAppendEntryResponse(ulong term, ulong index, string peer, string reply, bool success) => new()
{
TermV = term,
Index = index,
Peer = peer,
Reply = reply,
Success = success,
};
public AppendEntryResponse DecodeAppendEntryResponse(byte[] buffer)
{
ArgumentNullException.ThrowIfNull(buffer);
return JsonSerializer.Deserialize<AppendEntryResponse>(buffer) ?? new AppendEntryResponse();
}
public void HandleForwardedRemovePeerProposal(string peer)
{
RemovePeer(peer);
}
public void HandleForwardedProposal(byte[] entry)
{
ForwardProposal(entry);
}
public void AddPeer(string peer)
{
ProposeAddPeer(peer);
}
public void RemovePeer(string peer)
{
ProposeRemovePeer(peer);
}
public void SendMembershipChange(EntryType changeType, string peer)
{
var entry = NewEntry(changeType, System.Text.Encoding.UTF8.GetBytes(peer));
var proposed = NewProposedEntry(entry);
PropQ ??= new ZB.MOM.NatsNet.Server.Internal.IpQueue<ProposedEntry>($"{GroupName}-propose");
PropQ.Push(proposed);
}
public int PeerStateBufSize(PeerState state)
{
ArgumentNullException.ThrowIfNull(state);
return EncodePeerState(state).Length;
}
public byte[] EncodePeerState(PeerState state)
{
ArgumentNullException.ThrowIfNull(state);
return JsonSerializer.SerializeToUtf8Bytes(state);
}
public PeerState DecodePeerState(byte[] buffer)
{
ArgumentNullException.ThrowIfNull(buffer);
return JsonSerializer.Deserialize<PeerState>(buffer) ?? new PeerState();
}
public VoteRequest DecodeVoteRequest(byte[] buffer)
{
ArgumentNullException.ThrowIfNull(buffer);
return JsonSerializer.Deserialize<VoteRequest>(buffer) ?? new VoteRequest();
}
public Exception? WritePeerStateStatic(string storeDir, PeerState state)
{
ArgumentException.ThrowIfNullOrWhiteSpace(storeDir);
ArgumentNullException.ThrowIfNull(state);
try
{
Directory.CreateDirectory(storeDir);
var path = Path.Combine(storeDir, "peerstate.json");
File.WriteAllBytes(path, EncodePeerState(state));
return null;
}
catch (Exception ex)
{
return ex;
}
}
public (PeerState? State, Exception? Error) ReadPeerState(string storeDir)
{
ArgumentException.ThrowIfNullOrWhiteSpace(storeDir);
try
{
var path = Path.Combine(storeDir, "peerstate.json");
if (!File.Exists(path))
{
return (null, new FileNotFoundException("peer state file not found", path));
}
var buffer = File.ReadAllBytes(path);
return (DecodePeerState(buffer), null);
}
catch (Exception ex)
{
return (null, ex);
}
}
public Exception? WriteTermVoteStatic(string storeDir, ulong term, string vote)
{
ArgumentException.ThrowIfNullOrWhiteSpace(storeDir);
try
{
Directory.CreateDirectory(storeDir);
var payload = JsonSerializer.SerializeToUtf8Bytes(new TermVoteFile { Term = term, Vote = vote ?? string.Empty });
var path = Path.Combine(storeDir, "tav.idx");
File.WriteAllBytes(path, payload);
Term_ = term;
Vote = vote ?? string.Empty;
return null;
}
catch (Exception ex)
{
return ex;
}
}
public VoteResponse DecodeVoteResponse(byte[] buffer)
{
ArgumentNullException.ThrowIfNull(buffer);
return JsonSerializer.Deserialize<VoteResponse>(buffer) ?? new VoteResponse();
}
}
internal sealed class TermVoteFile
{
public ulong Term { get; set; }
public string Vote { get; set; } = string.Empty;
}
internal static class AppendEntryExtensions
{
internal static string String(this AppendEntry appendEntry) =>
$"AppendEntry[leader={appendEntry.Leader}, term={appendEntry.TermV}, commit={appendEntry.Commit}, prev={appendEntry.PIndex}]";
internal static byte[] Encode(this AppendEntry appendEntry) =>
JsonSerializer.SerializeToUtf8Bytes(appendEntry);
internal static bool ShouldStore(this AppendEntry appendEntry) =>
appendEntry.Entries.Count > 0 || appendEntry.Commit > appendEntry.PIndex;
}

View File

@@ -0,0 +1,291 @@
// Copyright 2012-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0
using System.Threading.Channels;
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class Raft
{
public void Shutdown()
{
Stop();
}
public string NewCatchupInbox() => $"_INBOX.CATCHUP.{Id}.{Guid.NewGuid():N}";
public string NewInbox() => $"_INBOX.{Id}.{Guid.NewGuid():N}";
public object Subscribe(string subject, Action<byte[]?>? handler = null)
{
ArgumentException.ThrowIfNullOrWhiteSpace(subject);
var subscription = new RaftSubscription(subject, handler);
_lock.EnterWriteLock();
try
{
AeSub = subscription;
Active = DateTime.UtcNow;
}
finally
{
_lock.ExitWriteLock();
}
return subscription;
}
public void Unsubscribe(object subscription)
{
_lock.EnterWriteLock();
try
{
if (ReferenceEquals(AeSub, subscription))
{
AeSub = null;
}
}
finally
{
_lock.ExitWriteLock();
}
}
public Exception? CreateInternalSubs()
{
try
{
var aeSubj = string.IsNullOrWhiteSpace(ASubj) ? $"{GroupName}.append" : ASubj;
Subscribe(aeSubj);
return null;
}
catch (Exception ex)
{
return ex;
}
}
public TimeSpan RandElectionTimeout()
{
var min = 150;
var max = 300;
return TimeSpan.FromMilliseconds(Random.Shared.Next(min, max));
}
public void ResetElectionTimeout()
{
_lock.EnterWriteLock();
try
{
ResetElectionTimeoutWithLock();
}
finally
{
_lock.ExitWriteLock();
}
}
public void ResetElectionTimeoutWithLock()
{
ResetElectWithLock(RandElectionTimeout());
}
public void ResetElect(TimeSpan timeout)
{
_lock.EnterWriteLock();
try
{
ResetElectWithLock(timeout);
}
finally
{
_lock.ExitWriteLock();
}
}
public void ResetElectWithLock(TimeSpan timeout)
{
Elect?.Dispose();
var due = timeout < TimeSpan.Zero ? TimeSpan.Zero : timeout;
Elect = new Timer(_ => { }, null, due, Timeout.InfiniteTimeSpan);
Active = DateTime.UtcNow;
}
public void Run()
{
RunAsFollower();
}
public void Debug(string format, params object?[] args)
{
_ = string.Format(format, args);
}
public void Warn(string format, params object?[] args)
{
_ = string.Format(format, args);
}
public void Error(string format, params object?[] args)
{
_ = string.Format(format, args);
}
public DateTime ElectTimer() => Active;
public void SetObserverInternal(bool isObserver) => SetObserver(isObserver);
public void SetObserverLocked(bool isObserver)
{
Observer_ = isObserver;
}
public void ProcessAppendEntries(AppendEntry appendEntry)
{
ArgumentNullException.ThrowIfNull(appendEntry);
_lock.EnterWriteLock();
try
{
if (appendEntry.TermV >= Term_)
{
Term_ = appendEntry.TermV;
}
if (appendEntry.Commit > Commit)
{
Commit = appendEntry.Commit;
}
if (appendEntry.PIndex > PIndex)
{
PIndex = appendEntry.PIndex;
PTerm = appendEntry.PTerm;
}
LeaderId = appendEntry.Leader;
Interlocked.Exchange(ref HasLeaderV, string.IsNullOrWhiteSpace(LeaderId) ? 0 : 1);
Active = DateTime.UtcNow;
if (EntryQ is null)
{
EntryQ = new ZB.MOM.NatsNet.Server.Internal.IpQueue<AppendEntry>($"{GroupName}-entry");
}
EntryQ.Push(appendEntry);
}
finally
{
_lock.ExitWriteLock();
}
}
public void RunAsFollower()
{
_lock.EnterWriteLock();
try
{
StateValue = (int)RaftState.Follower;
ResetElectionTimeoutWithLock();
}
finally
{
_lock.ExitWriteLock();
}
}
public void RunAsLeader()
{
_lock.EnterWriteLock();
try
{
StateValue = (int)RaftState.Leader;
Lsut = DateTime.UtcNow;
Interlocked.Exchange(ref HasLeaderV, 1);
if (LeadC is null)
{
LeadC = Channel.CreateUnbounded<bool>();
}
LeadC.Writer.TryWrite(true);
}
finally
{
_lock.ExitWriteLock();
}
}
public bool LostQuorum()
{
_lock.EnterReadLock();
try
{
return LostQuorumLocked();
}
finally
{
_lock.ExitReadLock();
}
}
public bool LostQuorumLocked()
{
var expected = Qn > 0 ? Qn : Math.Max(1, (ClusterSize() / 2) + 1);
var activePeers = 1;
var now = DateTime.UtcNow;
foreach (var peer in Peers_.Values)
{
if (now - peer.Ts <= TimeSpan.FromSeconds(30))
{
activePeers++;
}
}
return activePeers < expected;
}
public bool NotActive()
{
return DateTime.UtcNow - Active > TimeSpan.FromSeconds(30);
}
public AppendEntry? LoadFirstEntry()
{
_lock.EnterReadLock();
try
{
if (EntryQ is null || EntryQ.Len() == 0)
{
return null;
}
var batch = EntryQ.Pop();
if (batch is not { Length: > 0 })
{
return null;
}
return batch[0];
}
finally
{
_lock.ExitReadLock();
}
}
public void RunCatchup()
{
_lock.EnterWriteLock();
try
{
Catchup ??= new CatchupState();
Catchup.Active = DateTime.UtcNow;
Catchup.Signal = true;
}
finally
{
_lock.ExitWriteLock();
}
}
}
internal sealed record RaftSubscription(string Subject, Action<byte[]?>? Handler);

View File

@@ -104,6 +104,58 @@ public interface IRaftNode
Exception? RecreateInternalSubsLocked();
bool OutOfResources();
void PauseApplyLocked();
// Group C
void Shutdown();
string NewCatchupInbox();
string NewInbox();
object Subscribe(string subject, Action<byte[]?>? handler = null);
void Unsubscribe(object subscription);
Exception? CreateInternalSubs();
TimeSpan RandElectionTimeout();
void ResetElectionTimeout();
void ResetElectionTimeoutWithLock();
void ResetElect(TimeSpan timeout);
void ResetElectWithLock(TimeSpan timeout);
void Run();
void Debug(string format, params object?[] args);
void Warn(string format, params object?[] args);
void Error(string format, params object?[] args);
DateTime ElectTimer();
void SetObserverInternal(bool isObserver);
void SetObserverLocked(bool isObserver);
void ProcessAppendEntries(AppendEntry appendEntry);
void RunAsFollower();
// Group D
CommittedEntry NewCommittedEntry(ulong index, IReadOnlyList<Entry>? entries = null);
Entry NewEntry(EntryType type, byte[]? data = null);
AppendEntry NewAppendEntry(string leader, ulong term, ulong commit, ulong prevTerm, ulong prevIndex, IReadOnlyList<Entry>? entries = null);
ProposedEntry NewProposedEntry(Entry? entry = null, string? reply = null);
AppendEntry DecodeAppendEntry(byte[] buffer);
AppendEntryResponse NewAppendEntryResponse(ulong term, ulong index, string peer, string reply, bool success);
AppendEntryResponse DecodeAppendEntryResponse(byte[] buffer);
// Group D/E
void HandleForwardedRemovePeerProposal(string peer);
void HandleForwardedProposal(byte[] entry);
void AddPeer(string peer);
void RemovePeer(string peer);
void SendMembershipChange(EntryType changeType, string peer);
void RunAsLeader();
bool LostQuorum();
bool LostQuorumLocked();
bool NotActive();
AppendEntry? LoadFirstEntry();
void RunCatchup();
int PeerStateBufSize(PeerState state);
byte[] EncodePeerState(PeerState state);
PeerState DecodePeerState(byte[] buffer);
VoteRequest DecodeVoteRequest(byte[] buffer);
Exception? WritePeerStateStatic(string storeDir, PeerState state);
(PeerState? State, Exception? Error) ReadPeerState(string storeDir);
Exception? WriteTermVoteStatic(string storeDir, ulong term, string vote);
VoteResponse DecodeVoteResponse(byte[] buffer);
}
// ============================================================================
@@ -832,10 +884,16 @@ internal sealed partial class Raft : IRaftNode
/// An entry that has been proposed to the leader, with an optional reply subject.
/// Mirrors Go <c>proposedEntry</c> struct in server/raft.go lines 253-256.
/// </summary>
internal sealed class ProposedEntry
public sealed class ProposedEntry
{
public Entry? Entry { get; set; }
public string Reply { get; set; } = string.Empty;
public void ReturnToPool()
{
Entry = null;
Reply = string.Empty;
}
}
// ============================================================================
@@ -984,6 +1042,12 @@ public sealed class CommittedEntry
{
public ulong Index { get; set; }
public List<Entry> Entries { get; set; } = new();
public void ReturnToPool()
{
Index = 0;
Entries.Clear();
}
}
// ============================================================================
@@ -994,7 +1058,7 @@ public sealed class CommittedEntry
/// The main struct used to sync Raft peers.
/// Mirrors Go <c>appendEntry</c> struct in server/raft.go lines 2557-2568.
/// </summary>
internal sealed class AppendEntry
public sealed class AppendEntry
{
public string Leader { get; set; } = string.Empty;
public ulong TermV { get; set; }
@@ -1008,6 +1072,20 @@ internal sealed class AppendEntry
/// <summary>Subscription the append entry arrived on (object to avoid session dep).</summary>
public object? Sub { get; set; }
public byte[]? Buf { get; set; }
public void ReturnToPool()
{
Leader = string.Empty;
TermV = 0;
Commit = 0;
PTerm = 0;
PIndex = 0;
Entries.Clear();
LTerm = 0;
Reply = string.Empty;
Sub = null;
Buf = null;
}
}
// ============================================================================
@@ -1060,13 +1138,16 @@ public sealed class Entry
/// Response sent by a follower after receiving an append-entry RPC.
/// Mirrors Go <c>appendEntryResponse</c> struct in server/raft.go lines 2760-2766.
/// </summary>
internal sealed class AppendEntryResponse
public sealed class AppendEntryResponse
{
public ulong TermV { get; set; }
public ulong Index { get; set; }
public string Peer { get; set; } = string.Empty;
public string Reply { get; set; } = string.Empty;
public bool Success { get; set; }
public byte[] Encode() =>
System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(this);
}
// ============================================================================
@@ -1077,7 +1158,7 @@ internal sealed class AppendEntryResponse
/// Encoded peer state attached to snapshots and peer-state entries.
/// Mirrors Go <c>peerState</c> struct in server/raft.go lines 4470-4474.
/// </summary>
internal sealed class PeerState
public sealed class PeerState
{
public List<string> KnownPeers { get; set; } = new();
public int ClusterSize { get; set; }
@@ -1093,7 +1174,7 @@ internal sealed class PeerState
/// A Raft vote request sent during leader election.
/// Mirrors Go <c>voteRequest</c> struct in server/raft.go lines 4549-4556.
/// </summary>
internal sealed class VoteRequest
public sealed class VoteRequest
{
public ulong TermV { get; set; }
public ulong LastTerm { get; set; }
@@ -1101,6 +1182,9 @@ internal sealed class VoteRequest
public string Candidate { get; set; } = string.Empty;
/// <summary>Internal use — reply subject.</summary>
public string Reply { get; set; } = string.Empty;
public byte[] Encode() =>
System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(this);
}
// ============================================================================
@@ -1111,11 +1195,14 @@ internal sealed class VoteRequest
/// A response to a <see cref="VoteRequest"/>.
/// Mirrors Go <c>voteResponse</c> struct in server/raft.go lines 4730-4735.
/// </summary>
internal sealed class VoteResponse
public sealed class VoteResponse
{
public ulong TermV { get; set; }
public string Peer { get; set; } = string.Empty;
public bool Granted { get; set; }
/// <summary>Whether this peer's log is empty.</summary>
public bool Empty { get; set; }
public byte[] Encode() =>
System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(this);
}