feat(batch31): merge raft-part-2

This commit is contained in:
Joseph Doherty
2026-02-28 20:56:08 -05:00
8 changed files with 1235 additions and 7 deletions

View File

@@ -0,0 +1,239 @@
// Copyright 2012-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0
using System.Text;
using System.Text.Json;
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class Raft
{
public void ProcessAppendEntry(AppendEntry appendEntry)
{
ArgumentNullException.ThrowIfNull(appendEntry);
ProcessAppendEntries(appendEntry);
if (appendEntry.ShouldStore())
{
StoreToWAL(appendEntry);
}
}
public void ResetInitializing()
{
Initializing = false;
}
public void ProcessPeerState(Entry entry)
{
ArgumentNullException.ThrowIfNull(entry);
if (entry.Type != EntryType.EntryPeerState || entry.Data.Length == 0)
{
return;
}
var peerState = DecodePeerState(entry.Data);
_lock.EnterWriteLock();
try
{
ProposeKnownPeers(peerState.KnownPeers);
Csz = Math.Max(peerState.ClusterSize, 1);
Qn = (Csz / 2) + 1;
}
finally
{
_lock.ExitWriteLock();
}
}
public void ProcessAppendEntryResponse(AppendEntryResponse response)
{
ArgumentNullException.ThrowIfNull(response);
TrackResponse(response);
}
public void HandleAppendEntryResponse(AppendEntryResponse response)
{
ProcessAppendEntryResponse(response);
}
public AppendEntry BuildAppendEntry(string peer)
{
_lock.EnterReadLock();
try
{
var appendEntry = NewAppendEntry(Id, Term_, Commit, PTerm, PIndex);
appendEntry.Reply = AReply;
if (Pae.TryGetValue(PIndex, out var pending))
{
appendEntry.Entries.AddRange(pending.Entries.Select(entry => NewEntry(entry.Type, entry.Data)));
}
if (!string.IsNullOrWhiteSpace(peer))
{
appendEntry.Reply = $"{AReply}.{peer}";
}
return appendEntry;
}
finally
{
_lock.ExitReadLock();
}
}
public bool StoreToWAL(AppendEntry appendEntry)
{
ArgumentNullException.ThrowIfNull(appendEntry);
if (!appendEntry.ShouldStore())
{
return false;
}
_lock.EnterWriteLock();
try
{
var index = appendEntry.PIndex;
foreach (var entry in appendEntry.Entries)
{
index++;
var stored = NewAppendEntry(appendEntry.Leader, appendEntry.TermV, appendEntry.Commit, appendEntry.PTerm, index, [entry]);
CachePendingEntry(stored, index);
}
PIndex = Math.Max(PIndex, index);
PTerm = appendEntry.TermV;
Commit = Math.Max(Commit, appendEntry.Commit);
WalBytes += (ulong)appendEntry.Encode().Length;
return true;
}
finally
{
_lock.ExitWriteLock();
}
}
public void SendAppendEntry(AppendEntry appendEntry, string? peer = null)
{
ArgumentNullException.ThrowIfNull(appendEntry);
_lock.EnterWriteLock();
try
{
if (!string.IsNullOrWhiteSpace(peer))
{
SendAppendEntryLocked(peer, appendEntry);
return;
}
foreach (var peerName in PeerNames())
{
SendAppendEntryLocked(peerName, appendEntry);
}
}
finally
{
_lock.ExitWriteLock();
}
}
public void SendAppendEntryLocked(string peer, AppendEntry appendEntry)
{
ArgumentException.ThrowIfNullOrWhiteSpace(peer);
ArgumentNullException.ThrowIfNull(appendEntry);
CachePendingEntry(appendEntry);
TrackPeer(peer, appendEntry.PIndex);
SendRPC($"{GroupName}.append.{peer}", appendEntry.Encode());
}
public void CachePendingEntry(AppendEntry appendEntry, ulong? index = null)
{
ArgumentNullException.ThrowIfNull(appendEntry);
var cacheIndex = index ?? (appendEntry.PIndex > 0 ? appendEntry.PIndex : PIndex + 1);
Pae[cacheIndex] = appendEntry;
}
public IReadOnlyList<string> PeerNames()
{
_lock.EnterReadLock();
try
{
return [.. Peers_.Keys.OrderBy(static key => key, StringComparer.Ordinal)];
}
finally
{
_lock.ExitReadLock();
}
}
public PeerState CurrentPeerState()
{
_lock.EnterReadLock();
try
{
return CurrentPeerStateLocked();
}
finally
{
_lock.ExitReadLock();
}
}
public PeerState CurrentPeerStateLocked()
{
return new PeerState
{
KnownPeers = [.. PeerNames()],
ClusterSize = ClusterSize(),
};
}
public void SendPeerState()
{
var state = CurrentPeerState();
Wps = EncodePeerState(state);
var entry = NewEntry(EntryType.EntryPeerState, Wps);
var appendEntry = NewAppendEntry(Id, Term_, Commit, PTerm, PIndex, [entry]);
SendAppendEntry(appendEntry);
}
public void SendHeartbeat()
{
var heartbeat = NewAppendEntry(Id, Term_, Commit, PTerm, PIndex);
SendAppendEntry(heartbeat);
}
public Exception? WritePeerState()
{
return WritePeerStateStatic(StoreDir, CurrentPeerState());
}
public (ulong Term, string Vote, Exception? Error) ReadTermVote()
{
try
{
var path = Path.Combine(StoreDir, "tav.idx");
if (!File.Exists(path))
{
return (0, string.Empty, new FileNotFoundException("term vote file not found", path));
}
var payload = File.ReadAllBytes(path);
var state = JsonSerializer.Deserialize<TermVoteFile>(payload);
if (state is null)
{
return (0, string.Empty, new InvalidDataException("term vote file is invalid"));
}
Term_ = state.Term;
Vote = state.Vote;
Wtv = payload;
return (state.Term, state.Vote, null);
}
catch (Exception ex)
{
return (0, string.Empty, ex);
}
}
}

View File

@@ -0,0 +1,329 @@
// Copyright 2012-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0
using System.Threading;
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class Raft
{
public void SendSnapshotToFollower(string peer, byte[] snapshot, ulong term = 0, ulong index = 0)
{
ArgumentException.ThrowIfNullOrWhiteSpace(peer);
ArgumentNullException.ThrowIfNull(snapshot);
_lock.EnterWriteLock();
try
{
var catchup = CatchupFollower(peer, term == 0 ? Term_ : term, index == 0 ? Commit : index);
catchup.Signal = false;
InstallSnapshot(snapshot, force: true);
Commit = Math.Max(Commit, catchup.CIndex);
Applied_ = Math.Max(Applied_, catchup.CIndex);
TrackPeer(peer, catchup.CIndex);
}
finally
{
_lock.ExitWriteLock();
}
}
public CatchupState CatchupFollower(string peer, ulong term, ulong index)
{
ArgumentException.ThrowIfNullOrWhiteSpace(peer);
_lock.EnterWriteLock();
try
{
var catchup = CreateCatchup();
catchup.CTerm = term;
catchup.CIndex = index;
catchup.PTerm = PTerm;
catchup.PIndex = PIndex;
catchup.Sub = peer;
catchup.Active = DateTime.UtcNow;
catchup.Signal = true;
return catchup;
}
finally
{
_lock.ExitWriteLock();
}
}
public Entry? LoadEntry(ulong index)
{
_lock.EnterReadLock();
try
{
if (index == 0 || !Pae.TryGetValue(index, out var appendEntry) || appendEntry.Entries.Count == 0)
{
return null;
}
return appendEntry.Entries[0];
}
finally
{
_lock.ExitReadLock();
}
}
public CommittedEntry? ApplyCommit(ulong commitIndex)
{
_lock.EnterWriteLock();
try
{
if (commitIndex <= Applied_)
{
return null;
}
var committed = NewCommittedEntry(commitIndex);
for (var index = Applied_ + 1; index <= commitIndex; index++)
{
if (Pae.TryGetValue(index, out var appendEntry))
{
committed.Entries.AddRange(appendEntry.Entries);
}
}
Applied_ = commitIndex;
Commit = Math.Max(Commit, commitIndex);
ApplyQ_ ??= new ZB.MOM.NatsNet.Server.Internal.IpQueue<CommittedEntry>($"{GroupName}-apply");
ApplyQ_.Push(committed);
return committed;
}
finally
{
_lock.ExitWriteLock();
}
}
public bool TryCommit(ulong commitIndex) => ApplyCommit(commitIndex) is not null;
public void TrackResponse(AppendEntryResponse response)
{
ArgumentNullException.ThrowIfNull(response);
_lock.EnterWriteLock();
try
{
if (State() != RaftState.Leader)
{
return;
}
TrackPeer(response.Peer, response.Index);
if (response.Success)
{
if (!Acks.TryGetValue(response.Index, out var acks))
{
acks = new Dictionary<string, bool>(StringComparer.Ordinal);
Acks[response.Index] = acks;
}
acks[response.Peer] = true;
TryCommit(response.Index);
}
}
finally
{
_lock.ExitWriteLock();
}
}
public void AdjustClusterSizeAndQuorum()
{
_lock.EnterWriteLock();
try
{
Csz = Math.Max(Peers_.Count + 1, 1);
Qn = (Csz / 2) + 1;
}
finally
{
_lock.ExitWriteLock();
}
}
public void TrackPeer(string peer, ulong index)
{
ArgumentException.ThrowIfNullOrWhiteSpace(peer);
_lock.EnterWriteLock();
try
{
if (!Peers_.TryGetValue(peer, out var state))
{
state = new Lps();
Peers_[peer] = state;
}
state.Li = Math.Max(state.Li, index);
state.Ts = DateTime.UtcNow;
}
finally
{
_lock.ExitWriteLock();
}
}
public void RunAsCandidate()
{
SwitchToCandidate();
RequestVote();
}
public void HandleAppendEntry(AppendEntry appendEntry)
{
ProcessAppendEntries(appendEntry);
}
public void CancelCatchup()
{
_lock.EnterWriteLock();
try
{
Catchup = null;
}
finally
{
_lock.ExitWriteLock();
}
}
public bool CatchupStalled(TimeSpan? threshold = null)
{
_lock.EnterReadLock();
try
{
if (Catchup is null)
{
return false;
}
var timeout = threshold ?? TimeSpan.FromSeconds(10);
return DateTime.UtcNow - Catchup.Active > timeout;
}
finally
{
_lock.ExitReadLock();
}
}
public CatchupState CreateCatchup()
{
_lock.EnterWriteLock();
try
{
Catchup ??= new CatchupState();
Catchup.Active = DateTime.UtcNow;
return Catchup;
}
finally
{
_lock.ExitWriteLock();
}
}
public bool SendCatchupSignal()
{
_lock.EnterWriteLock();
try
{
var catchup = CreateCatchup();
catchup.Signal = true;
catchup.Active = DateTime.UtcNow;
return true;
}
finally
{
_lock.ExitWriteLock();
}
}
public void CancelCatchupSignal()
{
_lock.EnterWriteLock();
try
{
if (Catchup is not null)
{
Catchup.Signal = false;
}
}
finally
{
_lock.ExitWriteLock();
}
}
public void TruncateWAL(ulong index)
{
_lock.EnterWriteLock();
try
{
foreach (var key in Pae.Keys.Where(key => key > index).ToArray())
{
Pae.Remove(key);
}
PIndex = Math.Min(PIndex, index);
Commit = Math.Min(Commit, index);
Applied_ = Math.Min(Applied_, index);
Processed_ = Math.Min(Processed_, index);
}
finally
{
_lock.ExitWriteLock();
}
}
public void ResetWAL()
{
_lock.EnterWriteLock();
try
{
Pae.Clear();
Acks.Clear();
PIndex = 0;
PTerm = 0;
Commit = 0;
Applied_ = 0;
Processed_ = 0;
WalBytes = 0;
}
finally
{
_lock.ExitWriteLock();
}
}
public void UpdateLeader(string leaderId)
{
_lock.EnterWriteLock();
try
{
LeaderId = leaderId ?? string.Empty;
if (!string.IsNullOrWhiteSpace(LeaderId) &&
!string.Equals(LeaderId, Id, StringComparison.Ordinal) &&
State() == RaftState.Leader)
{
StateValue = (int)RaftState.Follower;
}
Interlocked.Exchange(ref HasLeaderV, string.IsNullOrWhiteSpace(LeaderId) ? 0 : 1);
if (!string.IsNullOrWhiteSpace(LeaderId))
{
Interlocked.Exchange(ref PLeaderV, 1);
}
Active = DateTime.UtcNow;
}
finally
{
_lock.ExitWriteLock();
}
}
}

View File

@@ -0,0 +1,305 @@
// Copyright 2012-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0
using System.Threading;
using System.Threading.Channels;
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class Raft
{
public void SetWriteErrLocked(Exception? error)
{
if (error is not null && WriteErr is null)
{
WriteErr = error;
}
}
public bool IsClosed() => State() == RaftState.Closed;
public void SetWriteErr(Exception? error)
{
_lock.EnterWriteLock();
try
{
SetWriteErrLocked(error);
}
finally
{
_lock.ExitWriteLock();
}
}
public Exception? WriteTermVote() => WriteTermVoteStatic(StoreDir, Term_, Vote);
public void HandleVoteResponse(VoteResponse response)
{
ArgumentNullException.ThrowIfNull(response);
_lock.EnterWriteLock();
try
{
if (response.TermV > Term_)
{
Term_ = response.TermV;
SwitchToFollowerLocked(string.Empty);
return;
}
if (State() != RaftState.Candidate)
{
return;
}
Votes_ ??= new ZB.MOM.NatsNet.Server.Internal.IpQueue<VoteResponse>($"{GroupName}-votes");
Votes_.Push(response);
if (WonElection())
{
SwitchToLeader();
}
}
finally
{
_lock.ExitWriteLock();
}
}
public bool ProcessVoteRequest(VoteRequest request)
{
ArgumentNullException.ThrowIfNull(request);
_lock.EnterWriteLock();
try
{
if (request.TermV < Term_)
{
return false;
}
if (request.TermV > Term_)
{
Term_ = request.TermV;
Vote = string.Empty;
SwitchToFollowerLocked(string.Empty);
}
var upToDate = request.LastTerm > PTerm || (request.LastTerm == PTerm && request.LastIndex >= PIndex);
if (!upToDate)
{
return false;
}
if (string.IsNullOrWhiteSpace(Vote) || string.Equals(Vote, request.Candidate, StringComparison.Ordinal))
{
Vote = request.Candidate;
_ = WriteTermVote();
Active = DateTime.UtcNow;
return true;
}
return false;
}
finally
{
_lock.ExitWriteLock();
}
}
public void HandleVoteRequest(VoteRequest request)
{
ArgumentNullException.ThrowIfNull(request);
var granted = ProcessVoteRequest(request);
if (!string.IsNullOrWhiteSpace(request.Reply))
{
var response = new VoteResponse
{
TermV = Term_,
Peer = Id,
Granted = granted,
Empty = PIndex == 0,
};
SendReply(request.Reply, response.Encode());
}
}
public void RequestVote()
{
_lock.EnterWriteLock();
try
{
var request = new VoteRequest
{
TermV = Term_,
LastTerm = PTerm,
LastIndex = PIndex,
Candidate = Id,
Reply = VReply,
};
Reqs ??= new ZB.MOM.NatsNet.Server.Internal.IpQueue<VoteRequest>($"{GroupName}-requests");
Reqs.Push(request);
SendRPC(VSubj, request.Encode());
}
finally
{
_lock.ExitWriteLock();
}
}
public void SendRPC(string subject, byte[] payload)
{
ArgumentException.ThrowIfNullOrWhiteSpace(subject);
ArgumentNullException.ThrowIfNull(payload);
_lock.EnterWriteLock();
try
{
var pending = SendQ as List<(string Subject, byte[] Payload)>;
if (pending is null)
{
pending = [];
SendQ = pending;
}
pending.Add((subject, [.. payload]));
Active = DateTime.UtcNow;
}
finally
{
_lock.ExitWriteLock();
}
}
public void SendReply(string subject, byte[] payload)
{
SendRPC(subject, payload);
}
public bool WonElection()
{
if (State() != RaftState.Candidate)
{
return false;
}
var votes = 1; // self-vote
if (Votes_ is not null)
{
votes += Votes_.Len();
}
return votes >= QuorumNeeded();
}
public int QuorumNeeded()
{
return Qn > 0 ? Qn : (Math.Max(ClusterSize(), 1) / 2) + 1;
}
public void UpdateLeadChange(bool isLeader)
{
_lock.EnterWriteLock();
try
{
LeadC ??= Channel.CreateUnbounded<bool>();
LeadC.Writer.TryWrite(isLeader);
}
finally
{
_lock.ExitWriteLock();
}
}
public void SwitchState(RaftState newState)
{
switch (newState)
{
case RaftState.Follower:
SwitchToFollower();
break;
case RaftState.Candidate:
SwitchToCandidate();
break;
case RaftState.Leader:
SwitchToLeader();
break;
default:
_lock.EnterWriteLock();
try
{
StateValue = (int)newState;
}
finally
{
_lock.ExitWriteLock();
}
break;
}
}
public void SwitchToFollower(string newLeader = "")
{
_lock.EnterWriteLock();
try
{
SwitchToFollowerLocked(newLeader);
}
finally
{
_lock.ExitWriteLock();
}
}
public void SwitchToFollowerLocked(string newLeader = "")
{
StateValue = (int)RaftState.Follower;
if (!string.IsNullOrWhiteSpace(newLeader))
{
LeaderId = newLeader;
}
Interlocked.Exchange(ref HasLeaderV, string.IsNullOrWhiteSpace(LeaderId) ? 0 : 1);
UpdateLeadChange(false);
ResetElectionTimeoutWithLock();
}
public void SwitchToCandidate()
{
_lock.EnterWriteLock();
try
{
StateValue = (int)RaftState.Candidate;
Term_++;
Vote = Id;
Votes_ ??= new ZB.MOM.NatsNet.Server.Internal.IpQueue<VoteResponse>($"{GroupName}-votes");
Votes_.Push(new VoteResponse { TermV = Term_, Peer = Id, Granted = true });
UpdateLeadChange(false);
ResetElectionTimeoutWithLock();
}
finally
{
_lock.ExitWriteLock();
}
}
public void SwitchToLeader()
{
_lock.EnterWriteLock();
try
{
StateValue = (int)RaftState.Leader;
LeaderId = Id;
Lsut = DateTime.UtcNow;
Interlocked.Exchange(ref HasLeaderV, 1);
Interlocked.Exchange(ref PLeaderV, 1);
UpdateLeadChange(true);
SendHeartbeat();
}
finally
{
_lock.ExitWriteLock();
}
}
}

View File

@@ -126,6 +126,24 @@ public interface IRaftNode
void SetObserverLocked(bool isObserver);
void ProcessAppendEntries(AppendEntry appendEntry);
void RunAsFollower();
void SendSnapshotToFollower(string peer, byte[] snapshot, ulong term = 0, ulong index = 0);
CatchupState CatchupFollower(string peer, ulong term, ulong index);
Entry? LoadEntry(ulong index);
CommittedEntry? ApplyCommit(ulong commitIndex);
bool TryCommit(ulong commitIndex);
void TrackResponse(AppendEntryResponse response);
void AdjustClusterSizeAndQuorum();
void TrackPeer(string peer, ulong index);
void RunAsCandidate();
void HandleAppendEntry(AppendEntry appendEntry);
void CancelCatchup();
bool CatchupStalled(TimeSpan? threshold = null);
CatchupState CreateCatchup();
bool SendCatchupSignal();
void CancelCatchupSignal();
void TruncateWAL(ulong index);
void ResetWAL();
void UpdateLeader(string leaderId);
// Group D
CommittedEntry NewCommittedEntry(ulong index, IReadOnlyList<Entry>? entries = null);
@@ -156,6 +174,41 @@ public interface IRaftNode
(PeerState? State, Exception? Error) ReadPeerState(string storeDir);
Exception? WriteTermVoteStatic(string storeDir, ulong term, string vote);
VoteResponse DecodeVoteResponse(byte[] buffer);
void ProcessAppendEntry(AppendEntry appendEntry);
void ResetInitializing();
void ProcessPeerState(Entry entry);
void ProcessAppendEntryResponse(AppendEntryResponse response);
void HandleAppendEntryResponse(AppendEntryResponse response);
AppendEntry BuildAppendEntry(string peer);
bool StoreToWAL(AppendEntry appendEntry);
void SendAppendEntry(AppendEntry appendEntry, string? peer = null);
void SendAppendEntryLocked(string peer, AppendEntry appendEntry);
void CachePendingEntry(AppendEntry appendEntry, ulong? index = null);
IReadOnlyList<string> PeerNames();
PeerState CurrentPeerState();
PeerState CurrentPeerStateLocked();
void SendPeerState();
void SendHeartbeat();
Exception? WritePeerState();
(ulong Term, string Vote, Exception? Error) ReadTermVote();
void SetWriteErrLocked(Exception? error);
bool IsClosed();
void SetWriteErr(Exception? error);
Exception? WriteTermVote();
void HandleVoteResponse(VoteResponse response);
bool ProcessVoteRequest(VoteRequest request);
void HandleVoteRequest(VoteRequest request);
void RequestVote();
void SendRPC(string subject, byte[] payload);
void SendReply(string subject, byte[] payload);
bool WonElection();
int QuorumNeeded();
void UpdateLeadChange(bool isLeader);
void SwitchState(RaftState newState);
void SwitchToFollower(string newLeader = "");
void SwitchToFollowerLocked(string newLeader = "");
void SwitchToCandidate();
void SwitchToLeader();
}
// ============================================================================
@@ -904,7 +957,7 @@ public sealed class ProposedEntry
/// Tracks the state of a follower catch-up operation.
/// Mirrors Go <c>catchupState</c> struct in server/raft.go lines 259-268.
/// </summary>
internal sealed class CatchupState
public sealed class CatchupState
{
/// <summary>Subscription that catchup messages arrive on (object to avoid session dep).</summary>
public object? Sub { get; set; }