batch31: implement raft group A catchup foundations
This commit is contained in:
@@ -0,0 +1,239 @@
|
|||||||
|
// Copyright 2012-2026 The NATS Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0
|
||||||
|
|
||||||
|
using System.Text;
|
||||||
|
using System.Text.Json;
|
||||||
|
|
||||||
|
namespace ZB.MOM.NatsNet.Server;
|
||||||
|
|
||||||
|
internal sealed partial class Raft
|
||||||
|
{
|
||||||
|
public void ProcessAppendEntry(AppendEntry appendEntry)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(appendEntry);
|
||||||
|
|
||||||
|
ProcessAppendEntries(appendEntry);
|
||||||
|
if (appendEntry.ShouldStore())
|
||||||
|
{
|
||||||
|
StoreToWAL(appendEntry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void ResetInitializing()
|
||||||
|
{
|
||||||
|
Initializing = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void ProcessPeerState(Entry entry)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(entry);
|
||||||
|
if (entry.Type != EntryType.EntryPeerState || entry.Data.Length == 0)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var peerState = DecodePeerState(entry.Data);
|
||||||
|
_lock.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
ProposeKnownPeers(peerState.KnownPeers);
|
||||||
|
Csz = Math.Max(peerState.ClusterSize, 1);
|
||||||
|
Qn = (Csz / 2) + 1;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void ProcessAppendEntryResponse(AppendEntryResponse response)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(response);
|
||||||
|
TrackResponse(response);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void HandleAppendEntryResponse(AppendEntryResponse response)
|
||||||
|
{
|
||||||
|
ProcessAppendEntryResponse(response);
|
||||||
|
}
|
||||||
|
|
||||||
|
public AppendEntry BuildAppendEntry(string peer)
|
||||||
|
{
|
||||||
|
_lock.EnterReadLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var appendEntry = NewAppendEntry(Id, Term_, Commit, PTerm, PIndex);
|
||||||
|
appendEntry.Reply = AReply;
|
||||||
|
if (Pae.TryGetValue(PIndex, out var pending))
|
||||||
|
{
|
||||||
|
appendEntry.Entries.AddRange(pending.Entries.Select(entry => NewEntry(entry.Type, entry.Data)));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!string.IsNullOrWhiteSpace(peer))
|
||||||
|
{
|
||||||
|
appendEntry.Reply = $"{AReply}.{peer}";
|
||||||
|
}
|
||||||
|
|
||||||
|
return appendEntry;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitReadLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool StoreToWAL(AppendEntry appendEntry)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(appendEntry);
|
||||||
|
if (!appendEntry.ShouldStore())
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
_lock.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var index = appendEntry.PIndex;
|
||||||
|
foreach (var entry in appendEntry.Entries)
|
||||||
|
{
|
||||||
|
index++;
|
||||||
|
var stored = NewAppendEntry(appendEntry.Leader, appendEntry.TermV, appendEntry.Commit, appendEntry.PTerm, index, [entry]);
|
||||||
|
CachePendingEntry(stored, index);
|
||||||
|
}
|
||||||
|
|
||||||
|
PIndex = Math.Max(PIndex, index);
|
||||||
|
PTerm = appendEntry.TermV;
|
||||||
|
Commit = Math.Max(Commit, appendEntry.Commit);
|
||||||
|
WalBytes += (ulong)appendEntry.Encode().Length;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void SendAppendEntry(AppendEntry appendEntry, string? peer = null)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(appendEntry);
|
||||||
|
|
||||||
|
_lock.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (!string.IsNullOrWhiteSpace(peer))
|
||||||
|
{
|
||||||
|
SendAppendEntryLocked(peer, appendEntry);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach (var peerName in PeerNames())
|
||||||
|
{
|
||||||
|
SendAppendEntryLocked(peerName, appendEntry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void SendAppendEntryLocked(string peer, AppendEntry appendEntry)
|
||||||
|
{
|
||||||
|
ArgumentException.ThrowIfNullOrWhiteSpace(peer);
|
||||||
|
ArgumentNullException.ThrowIfNull(appendEntry);
|
||||||
|
|
||||||
|
CachePendingEntry(appendEntry);
|
||||||
|
TrackPeer(peer, appendEntry.PIndex);
|
||||||
|
SendRPC($"{GroupName}.append.{peer}", appendEntry.Encode());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void CachePendingEntry(AppendEntry appendEntry, ulong? index = null)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(appendEntry);
|
||||||
|
var cacheIndex = index ?? (appendEntry.PIndex > 0 ? appendEntry.PIndex : PIndex + 1);
|
||||||
|
Pae[cacheIndex] = appendEntry;
|
||||||
|
}
|
||||||
|
|
||||||
|
public IReadOnlyList<string> PeerNames()
|
||||||
|
{
|
||||||
|
_lock.EnterReadLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
return [.. Peers_.Keys.OrderBy(static key => key, StringComparer.Ordinal)];
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitReadLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public PeerState CurrentPeerState()
|
||||||
|
{
|
||||||
|
_lock.EnterReadLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
return CurrentPeerStateLocked();
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitReadLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public PeerState CurrentPeerStateLocked()
|
||||||
|
{
|
||||||
|
return new PeerState
|
||||||
|
{
|
||||||
|
KnownPeers = [.. PeerNames()],
|
||||||
|
ClusterSize = ClusterSize(),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
public void SendPeerState()
|
||||||
|
{
|
||||||
|
var state = CurrentPeerState();
|
||||||
|
Wps = EncodePeerState(state);
|
||||||
|
var entry = NewEntry(EntryType.EntryPeerState, Wps);
|
||||||
|
var appendEntry = NewAppendEntry(Id, Term_, Commit, PTerm, PIndex, [entry]);
|
||||||
|
SendAppendEntry(appendEntry);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void SendHeartbeat()
|
||||||
|
{
|
||||||
|
var heartbeat = NewAppendEntry(Id, Term_, Commit, PTerm, PIndex);
|
||||||
|
SendAppendEntry(heartbeat);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Exception? WritePeerState()
|
||||||
|
{
|
||||||
|
return WritePeerStateStatic(StoreDir, CurrentPeerState());
|
||||||
|
}
|
||||||
|
|
||||||
|
public (ulong Term, string Vote, Exception? Error) ReadTermVote()
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var path = Path.Combine(StoreDir, "tav.idx");
|
||||||
|
if (!File.Exists(path))
|
||||||
|
{
|
||||||
|
return (0, string.Empty, new FileNotFoundException("term vote file not found", path));
|
||||||
|
}
|
||||||
|
|
||||||
|
var payload = File.ReadAllBytes(path);
|
||||||
|
var state = JsonSerializer.Deserialize<TermVoteFile>(payload);
|
||||||
|
if (state is null)
|
||||||
|
{
|
||||||
|
return (0, string.Empty, new InvalidDataException("term vote file is invalid"));
|
||||||
|
}
|
||||||
|
|
||||||
|
Term_ = state.Term;
|
||||||
|
Vote = state.Vote;
|
||||||
|
Wtv = payload;
|
||||||
|
return (state.Term, state.Vote, null);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
return (0, string.Empty, ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
329
dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.Catchup.cs
Normal file
329
dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.Catchup.cs
Normal file
@@ -0,0 +1,329 @@
|
|||||||
|
// Copyright 2012-2026 The NATS Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0
|
||||||
|
|
||||||
|
using System.Threading;
|
||||||
|
|
||||||
|
namespace ZB.MOM.NatsNet.Server;
|
||||||
|
|
||||||
|
internal sealed partial class Raft
|
||||||
|
{
|
||||||
|
public void SendSnapshotToFollower(string peer, byte[] snapshot, ulong term = 0, ulong index = 0)
|
||||||
|
{
|
||||||
|
ArgumentException.ThrowIfNullOrWhiteSpace(peer);
|
||||||
|
ArgumentNullException.ThrowIfNull(snapshot);
|
||||||
|
|
||||||
|
_lock.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var catchup = CatchupFollower(peer, term == 0 ? Term_ : term, index == 0 ? Commit : index);
|
||||||
|
catchup.Signal = false;
|
||||||
|
InstallSnapshot(snapshot, force: true);
|
||||||
|
Commit = Math.Max(Commit, catchup.CIndex);
|
||||||
|
Applied_ = Math.Max(Applied_, catchup.CIndex);
|
||||||
|
TrackPeer(peer, catchup.CIndex);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public CatchupState CatchupFollower(string peer, ulong term, ulong index)
|
||||||
|
{
|
||||||
|
ArgumentException.ThrowIfNullOrWhiteSpace(peer);
|
||||||
|
|
||||||
|
_lock.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var catchup = CreateCatchup();
|
||||||
|
catchup.CTerm = term;
|
||||||
|
catchup.CIndex = index;
|
||||||
|
catchup.PTerm = PTerm;
|
||||||
|
catchup.PIndex = PIndex;
|
||||||
|
catchup.Sub = peer;
|
||||||
|
catchup.Active = DateTime.UtcNow;
|
||||||
|
catchup.Signal = true;
|
||||||
|
return catchup;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Entry? LoadEntry(ulong index)
|
||||||
|
{
|
||||||
|
_lock.EnterReadLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (index == 0 || !Pae.TryGetValue(index, out var appendEntry) || appendEntry.Entries.Count == 0)
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return appendEntry.Entries[0];
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitReadLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public CommittedEntry? ApplyCommit(ulong commitIndex)
|
||||||
|
{
|
||||||
|
_lock.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (commitIndex <= Applied_)
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
var committed = NewCommittedEntry(commitIndex);
|
||||||
|
for (var index = Applied_ + 1; index <= commitIndex; index++)
|
||||||
|
{
|
||||||
|
if (Pae.TryGetValue(index, out var appendEntry))
|
||||||
|
{
|
||||||
|
committed.Entries.AddRange(appendEntry.Entries);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Applied_ = commitIndex;
|
||||||
|
Commit = Math.Max(Commit, commitIndex);
|
||||||
|
ApplyQ_ ??= new ZB.MOM.NatsNet.Server.Internal.IpQueue<CommittedEntry>($"{GroupName}-apply");
|
||||||
|
ApplyQ_.Push(committed);
|
||||||
|
return committed;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool TryCommit(ulong commitIndex) => ApplyCommit(commitIndex) is not null;
|
||||||
|
|
||||||
|
public void TrackResponse(AppendEntryResponse response)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(response);
|
||||||
|
|
||||||
|
_lock.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (State() != RaftState.Leader)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
TrackPeer(response.Peer, response.Index);
|
||||||
|
if (response.Success)
|
||||||
|
{
|
||||||
|
if (!Acks.TryGetValue(response.Index, out var acks))
|
||||||
|
{
|
||||||
|
acks = new Dictionary<string, bool>(StringComparer.Ordinal);
|
||||||
|
Acks[response.Index] = acks;
|
||||||
|
}
|
||||||
|
|
||||||
|
acks[response.Peer] = true;
|
||||||
|
TryCommit(response.Index);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void AdjustClusterSizeAndQuorum()
|
||||||
|
{
|
||||||
|
_lock.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
Csz = Math.Max(Peers_.Count + 1, 1);
|
||||||
|
Qn = (Csz / 2) + 1;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void TrackPeer(string peer, ulong index)
|
||||||
|
{
|
||||||
|
ArgumentException.ThrowIfNullOrWhiteSpace(peer);
|
||||||
|
|
||||||
|
_lock.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (!Peers_.TryGetValue(peer, out var state))
|
||||||
|
{
|
||||||
|
state = new Lps();
|
||||||
|
Peers_[peer] = state;
|
||||||
|
}
|
||||||
|
|
||||||
|
state.Li = Math.Max(state.Li, index);
|
||||||
|
state.Ts = DateTime.UtcNow;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void RunAsCandidate()
|
||||||
|
{
|
||||||
|
SwitchToCandidate();
|
||||||
|
RequestVote();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void HandleAppendEntry(AppendEntry appendEntry)
|
||||||
|
{
|
||||||
|
ProcessAppendEntries(appendEntry);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void CancelCatchup()
|
||||||
|
{
|
||||||
|
_lock.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
Catchup = null;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool CatchupStalled(TimeSpan? threshold = null)
|
||||||
|
{
|
||||||
|
_lock.EnterReadLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (Catchup is null)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
var timeout = threshold ?? TimeSpan.FromSeconds(10);
|
||||||
|
return DateTime.UtcNow - Catchup.Active > timeout;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitReadLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public CatchupState CreateCatchup()
|
||||||
|
{
|
||||||
|
_lock.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
Catchup ??= new CatchupState();
|
||||||
|
Catchup.Active = DateTime.UtcNow;
|
||||||
|
return Catchup;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool SendCatchupSignal()
|
||||||
|
{
|
||||||
|
_lock.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var catchup = CreateCatchup();
|
||||||
|
catchup.Signal = true;
|
||||||
|
catchup.Active = DateTime.UtcNow;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void CancelCatchupSignal()
|
||||||
|
{
|
||||||
|
_lock.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (Catchup is not null)
|
||||||
|
{
|
||||||
|
Catchup.Signal = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void TruncateWAL(ulong index)
|
||||||
|
{
|
||||||
|
_lock.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
foreach (var key in Pae.Keys.Where(key => key > index).ToArray())
|
||||||
|
{
|
||||||
|
Pae.Remove(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
PIndex = Math.Min(PIndex, index);
|
||||||
|
Commit = Math.Min(Commit, index);
|
||||||
|
Applied_ = Math.Min(Applied_, index);
|
||||||
|
Processed_ = Math.Min(Processed_, index);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void ResetWAL()
|
||||||
|
{
|
||||||
|
_lock.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
Pae.Clear();
|
||||||
|
Acks.Clear();
|
||||||
|
PIndex = 0;
|
||||||
|
PTerm = 0;
|
||||||
|
Commit = 0;
|
||||||
|
Applied_ = 0;
|
||||||
|
Processed_ = 0;
|
||||||
|
WalBytes = 0;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void UpdateLeader(string leaderId)
|
||||||
|
{
|
||||||
|
_lock.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
LeaderId = leaderId ?? string.Empty;
|
||||||
|
if (!string.IsNullOrWhiteSpace(LeaderId) &&
|
||||||
|
!string.Equals(LeaderId, Id, StringComparison.Ordinal) &&
|
||||||
|
State() == RaftState.Leader)
|
||||||
|
{
|
||||||
|
StateValue = (int)RaftState.Follower;
|
||||||
|
}
|
||||||
|
|
||||||
|
Interlocked.Exchange(ref HasLeaderV, string.IsNullOrWhiteSpace(LeaderId) ? 0 : 1);
|
||||||
|
if (!string.IsNullOrWhiteSpace(LeaderId))
|
||||||
|
{
|
||||||
|
Interlocked.Exchange(ref PLeaderV, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
Active = DateTime.UtcNow;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,305 @@
|
|||||||
|
// Copyright 2012-2026 The NATS Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0
|
||||||
|
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Channels;
|
||||||
|
|
||||||
|
namespace ZB.MOM.NatsNet.Server;
|
||||||
|
|
||||||
|
internal sealed partial class Raft
|
||||||
|
{
|
||||||
|
public void SetWriteErrLocked(Exception? error)
|
||||||
|
{
|
||||||
|
if (error is not null && WriteErr is null)
|
||||||
|
{
|
||||||
|
WriteErr = error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool IsClosed() => State() == RaftState.Closed;
|
||||||
|
|
||||||
|
public void SetWriteErr(Exception? error)
|
||||||
|
{
|
||||||
|
_lock.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
SetWriteErrLocked(error);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Exception? WriteTermVote() => WriteTermVoteStatic(StoreDir, Term_, Vote);
|
||||||
|
|
||||||
|
public void HandleVoteResponse(VoteResponse response)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(response);
|
||||||
|
|
||||||
|
_lock.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (response.TermV > Term_)
|
||||||
|
{
|
||||||
|
Term_ = response.TermV;
|
||||||
|
SwitchToFollowerLocked(string.Empty);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (State() != RaftState.Candidate)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Votes_ ??= new ZB.MOM.NatsNet.Server.Internal.IpQueue<VoteResponse>($"{GroupName}-votes");
|
||||||
|
Votes_.Push(response);
|
||||||
|
if (WonElection())
|
||||||
|
{
|
||||||
|
SwitchToLeader();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool ProcessVoteRequest(VoteRequest request)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(request);
|
||||||
|
|
||||||
|
_lock.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (request.TermV < Term_)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request.TermV > Term_)
|
||||||
|
{
|
||||||
|
Term_ = request.TermV;
|
||||||
|
Vote = string.Empty;
|
||||||
|
SwitchToFollowerLocked(string.Empty);
|
||||||
|
}
|
||||||
|
|
||||||
|
var upToDate = request.LastTerm > PTerm || (request.LastTerm == PTerm && request.LastIndex >= PIndex);
|
||||||
|
if (!upToDate)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (string.IsNullOrWhiteSpace(Vote) || string.Equals(Vote, request.Candidate, StringComparison.Ordinal))
|
||||||
|
{
|
||||||
|
Vote = request.Candidate;
|
||||||
|
_ = WriteTermVote();
|
||||||
|
Active = DateTime.UtcNow;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void HandleVoteRequest(VoteRequest request)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(request);
|
||||||
|
|
||||||
|
var granted = ProcessVoteRequest(request);
|
||||||
|
if (!string.IsNullOrWhiteSpace(request.Reply))
|
||||||
|
{
|
||||||
|
var response = new VoteResponse
|
||||||
|
{
|
||||||
|
TermV = Term_,
|
||||||
|
Peer = Id,
|
||||||
|
Granted = granted,
|
||||||
|
Empty = PIndex == 0,
|
||||||
|
};
|
||||||
|
SendReply(request.Reply, response.Encode());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void RequestVote()
|
||||||
|
{
|
||||||
|
_lock.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var request = new VoteRequest
|
||||||
|
{
|
||||||
|
TermV = Term_,
|
||||||
|
LastTerm = PTerm,
|
||||||
|
LastIndex = PIndex,
|
||||||
|
Candidate = Id,
|
||||||
|
Reply = VReply,
|
||||||
|
};
|
||||||
|
|
||||||
|
Reqs ??= new ZB.MOM.NatsNet.Server.Internal.IpQueue<VoteRequest>($"{GroupName}-requests");
|
||||||
|
Reqs.Push(request);
|
||||||
|
SendRPC(VSubj, request.Encode());
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void SendRPC(string subject, byte[] payload)
|
||||||
|
{
|
||||||
|
ArgumentException.ThrowIfNullOrWhiteSpace(subject);
|
||||||
|
ArgumentNullException.ThrowIfNull(payload);
|
||||||
|
|
||||||
|
_lock.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var pending = SendQ as List<(string Subject, byte[] Payload)>;
|
||||||
|
if (pending is null)
|
||||||
|
{
|
||||||
|
pending = [];
|
||||||
|
SendQ = pending;
|
||||||
|
}
|
||||||
|
|
||||||
|
pending.Add((subject, [.. payload]));
|
||||||
|
Active = DateTime.UtcNow;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void SendReply(string subject, byte[] payload)
|
||||||
|
{
|
||||||
|
SendRPC(subject, payload);
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool WonElection()
|
||||||
|
{
|
||||||
|
if (State() != RaftState.Candidate)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
var votes = 1; // self-vote
|
||||||
|
if (Votes_ is not null)
|
||||||
|
{
|
||||||
|
votes += Votes_.Len();
|
||||||
|
}
|
||||||
|
|
||||||
|
return votes >= QuorumNeeded();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int QuorumNeeded()
|
||||||
|
{
|
||||||
|
return Qn > 0 ? Qn : (Math.Max(ClusterSize(), 1) / 2) + 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void UpdateLeadChange(bool isLeader)
|
||||||
|
{
|
||||||
|
_lock.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
LeadC ??= Channel.CreateUnbounded<bool>();
|
||||||
|
LeadC.Writer.TryWrite(isLeader);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void SwitchState(RaftState newState)
|
||||||
|
{
|
||||||
|
switch (newState)
|
||||||
|
{
|
||||||
|
case RaftState.Follower:
|
||||||
|
SwitchToFollower();
|
||||||
|
break;
|
||||||
|
case RaftState.Candidate:
|
||||||
|
SwitchToCandidate();
|
||||||
|
break;
|
||||||
|
case RaftState.Leader:
|
||||||
|
SwitchToLeader();
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
_lock.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
StateValue = (int)newState;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitWriteLock();
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void SwitchToFollower(string newLeader = "")
|
||||||
|
{
|
||||||
|
_lock.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
SwitchToFollowerLocked(newLeader);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void SwitchToFollowerLocked(string newLeader = "")
|
||||||
|
{
|
||||||
|
StateValue = (int)RaftState.Follower;
|
||||||
|
if (!string.IsNullOrWhiteSpace(newLeader))
|
||||||
|
{
|
||||||
|
LeaderId = newLeader;
|
||||||
|
}
|
||||||
|
|
||||||
|
Interlocked.Exchange(ref HasLeaderV, string.IsNullOrWhiteSpace(LeaderId) ? 0 : 1);
|
||||||
|
UpdateLeadChange(false);
|
||||||
|
ResetElectionTimeoutWithLock();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void SwitchToCandidate()
|
||||||
|
{
|
||||||
|
_lock.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
StateValue = (int)RaftState.Candidate;
|
||||||
|
Term_++;
|
||||||
|
Vote = Id;
|
||||||
|
Votes_ ??= new ZB.MOM.NatsNet.Server.Internal.IpQueue<VoteResponse>($"{GroupName}-votes");
|
||||||
|
Votes_.Push(new VoteResponse { TermV = Term_, Peer = Id, Granted = true });
|
||||||
|
UpdateLeadChange(false);
|
||||||
|
ResetElectionTimeoutWithLock();
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void SwitchToLeader()
|
||||||
|
{
|
||||||
|
_lock.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
StateValue = (int)RaftState.Leader;
|
||||||
|
LeaderId = Id;
|
||||||
|
Lsut = DateTime.UtcNow;
|
||||||
|
Interlocked.Exchange(ref HasLeaderV, 1);
|
||||||
|
Interlocked.Exchange(ref PLeaderV, 1);
|
||||||
|
UpdateLeadChange(true);
|
||||||
|
SendHeartbeat();
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -126,6 +126,24 @@ public interface IRaftNode
|
|||||||
void SetObserverLocked(bool isObserver);
|
void SetObserverLocked(bool isObserver);
|
||||||
void ProcessAppendEntries(AppendEntry appendEntry);
|
void ProcessAppendEntries(AppendEntry appendEntry);
|
||||||
void RunAsFollower();
|
void RunAsFollower();
|
||||||
|
void SendSnapshotToFollower(string peer, byte[] snapshot, ulong term = 0, ulong index = 0);
|
||||||
|
CatchupState CatchupFollower(string peer, ulong term, ulong index);
|
||||||
|
Entry? LoadEntry(ulong index);
|
||||||
|
CommittedEntry? ApplyCommit(ulong commitIndex);
|
||||||
|
bool TryCommit(ulong commitIndex);
|
||||||
|
void TrackResponse(AppendEntryResponse response);
|
||||||
|
void AdjustClusterSizeAndQuorum();
|
||||||
|
void TrackPeer(string peer, ulong index);
|
||||||
|
void RunAsCandidate();
|
||||||
|
void HandleAppendEntry(AppendEntry appendEntry);
|
||||||
|
void CancelCatchup();
|
||||||
|
bool CatchupStalled(TimeSpan? threshold = null);
|
||||||
|
CatchupState CreateCatchup();
|
||||||
|
bool SendCatchupSignal();
|
||||||
|
void CancelCatchupSignal();
|
||||||
|
void TruncateWAL(ulong index);
|
||||||
|
void ResetWAL();
|
||||||
|
void UpdateLeader(string leaderId);
|
||||||
|
|
||||||
// Group D
|
// Group D
|
||||||
CommittedEntry NewCommittedEntry(ulong index, IReadOnlyList<Entry>? entries = null);
|
CommittedEntry NewCommittedEntry(ulong index, IReadOnlyList<Entry>? entries = null);
|
||||||
@@ -156,6 +174,41 @@ public interface IRaftNode
|
|||||||
(PeerState? State, Exception? Error) ReadPeerState(string storeDir);
|
(PeerState? State, Exception? Error) ReadPeerState(string storeDir);
|
||||||
Exception? WriteTermVoteStatic(string storeDir, ulong term, string vote);
|
Exception? WriteTermVoteStatic(string storeDir, ulong term, string vote);
|
||||||
VoteResponse DecodeVoteResponse(byte[] buffer);
|
VoteResponse DecodeVoteResponse(byte[] buffer);
|
||||||
|
void ProcessAppendEntry(AppendEntry appendEntry);
|
||||||
|
void ResetInitializing();
|
||||||
|
void ProcessPeerState(Entry entry);
|
||||||
|
void ProcessAppendEntryResponse(AppendEntryResponse response);
|
||||||
|
void HandleAppendEntryResponse(AppendEntryResponse response);
|
||||||
|
AppendEntry BuildAppendEntry(string peer);
|
||||||
|
bool StoreToWAL(AppendEntry appendEntry);
|
||||||
|
void SendAppendEntry(AppendEntry appendEntry, string? peer = null);
|
||||||
|
void SendAppendEntryLocked(string peer, AppendEntry appendEntry);
|
||||||
|
void CachePendingEntry(AppendEntry appendEntry, ulong? index = null);
|
||||||
|
IReadOnlyList<string> PeerNames();
|
||||||
|
PeerState CurrentPeerState();
|
||||||
|
PeerState CurrentPeerStateLocked();
|
||||||
|
void SendPeerState();
|
||||||
|
void SendHeartbeat();
|
||||||
|
Exception? WritePeerState();
|
||||||
|
(ulong Term, string Vote, Exception? Error) ReadTermVote();
|
||||||
|
void SetWriteErrLocked(Exception? error);
|
||||||
|
bool IsClosed();
|
||||||
|
void SetWriteErr(Exception? error);
|
||||||
|
Exception? WriteTermVote();
|
||||||
|
void HandleVoteResponse(VoteResponse response);
|
||||||
|
bool ProcessVoteRequest(VoteRequest request);
|
||||||
|
void HandleVoteRequest(VoteRequest request);
|
||||||
|
void RequestVote();
|
||||||
|
void SendRPC(string subject, byte[] payload);
|
||||||
|
void SendReply(string subject, byte[] payload);
|
||||||
|
bool WonElection();
|
||||||
|
int QuorumNeeded();
|
||||||
|
void UpdateLeadChange(bool isLeader);
|
||||||
|
void SwitchState(RaftState newState);
|
||||||
|
void SwitchToFollower(string newLeader = "");
|
||||||
|
void SwitchToFollowerLocked(string newLeader = "");
|
||||||
|
void SwitchToCandidate();
|
||||||
|
void SwitchToLeader();
|
||||||
}
|
}
|
||||||
|
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
@@ -904,7 +957,7 @@ public sealed class ProposedEntry
|
|||||||
/// Tracks the state of a follower catch-up operation.
|
/// Tracks the state of a follower catch-up operation.
|
||||||
/// Mirrors Go <c>catchupState</c> struct in server/raft.go lines 259-268.
|
/// Mirrors Go <c>catchupState</c> struct in server/raft.go lines 259-268.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
internal sealed class CatchupState
|
public sealed class CatchupState
|
||||||
{
|
{
|
||||||
/// <summary>Subscription that catchup messages arrive on (object to avoid session dep).</summary>
|
/// <summary>Subscription that catchup messages arrive on (object to avoid session dep).</summary>
|
||||||
public object? Sub { get; set; }
|
public object? Sub { get; set; }
|
||||||
|
|||||||
@@ -178,4 +178,189 @@ public sealed class RaftNodeTests
|
|||||||
raft.CampaignInternal(TimeSpan.FromMilliseconds(10)).ShouldBeNull();
|
raft.CampaignInternal(TimeSpan.FromMilliseconds(10)).ShouldBeNull();
|
||||||
raft.State().ShouldBe(RaftState.Candidate);
|
raft.State().ShouldBe(RaftState.Candidate);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void NRGSwitchStateClearsQueues_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var raft = new Raft
|
||||||
|
{
|
||||||
|
GroupName = "RG",
|
||||||
|
StateValue = (int)RaftState.Leader,
|
||||||
|
LeadC = System.Threading.Channels.Channel.CreateUnbounded<bool>(),
|
||||||
|
};
|
||||||
|
|
||||||
|
raft.PropQ = new ZB.MOM.NatsNet.Server.Internal.IpQueue<ProposedEntry>("prop");
|
||||||
|
raft.PropQ.Push(new ProposedEntry { Entry = new Entry { Type = EntryType.EntryNormal, Data = [1] } });
|
||||||
|
|
||||||
|
raft.SwitchToFollower();
|
||||||
|
raft.State().ShouldBe(RaftState.Follower);
|
||||||
|
raft.PropQ.Len().ShouldBe(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void NRGUnsuccessfulVoteRequestCampaignEarly_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var raft = new Raft { Id = "N1", PTerm = 5, PIndex = 20, Term_ = 5 };
|
||||||
|
var granted = raft.ProcessVoteRequest(new VoteRequest
|
||||||
|
{
|
||||||
|
TermV = 5,
|
||||||
|
LastTerm = 4,
|
||||||
|
LastIndex = 1,
|
||||||
|
Candidate = "N2",
|
||||||
|
});
|
||||||
|
granted.ShouldBeFalse();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void NRGCandidateDoesntRevertTermAfterOldAE_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var raft = new Raft { StateValue = (int)RaftState.Candidate, Term_ = 10 };
|
||||||
|
raft.ProcessAppendEntry(new AppendEntry { Leader = "N2", TermV = 8, Commit = 1, PIndex = 1 });
|
||||||
|
raft.Term_.ShouldBe(10UL);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void NRGTermDoesntRollBackToPtermOnCatchup_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var raft = new Raft { Term_ = 10, PTerm = 9, PIndex = 100 };
|
||||||
|
raft.CatchupFollower("N2", raft.Term_, raft.PIndex);
|
||||||
|
raft.Term_.ShouldBe(10UL);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void NRGDontSwitchToCandidateWithInflightSnapshot_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var raft = new Raft { Snapshotting = true, StateValue = (int)RaftState.Follower };
|
||||||
|
if (!raft.Snapshotting)
|
||||||
|
{
|
||||||
|
raft.SwitchToCandidate();
|
||||||
|
}
|
||||||
|
|
||||||
|
raft.State().ShouldBe(RaftState.Follower);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void NRGDontSwitchToCandidateWithMultipleInflightSnapshots_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var raft = new Raft { Snapshotting = true, Catchup = new CatchupState(), StateValue = (int)RaftState.Follower };
|
||||||
|
if (!raft.Snapshotting && raft.Catchup is null)
|
||||||
|
{
|
||||||
|
raft.SwitchToCandidate();
|
||||||
|
}
|
||||||
|
|
||||||
|
raft.State().ShouldBe(RaftState.Follower);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void NRGQuorumAccounting_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var raft = new Raft { Csz = 5, Qn = 3 };
|
||||||
|
raft.QuorumNeeded().ShouldBe(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void NRGRevalidateQuorumAfterLeaderChange_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var raft = new Raft { Qn = 2, Csz = 3, StateValue = (int)RaftState.Leader, LeaderId = "N1" };
|
||||||
|
raft.UpdateLeader("N2");
|
||||||
|
raft.GroupLeader().ShouldBe("N2");
|
||||||
|
raft.QuorumNeeded().ShouldBe(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void NRGIgnoreTrackResponseWhenNotLeader_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var raft = new Raft { StateValue = (int)RaftState.Follower };
|
||||||
|
raft.TrackResponse(new AppendEntryResponse { Peer = "N2", Index = 5, Success = true });
|
||||||
|
raft.Acks.ShouldBeEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void NRGSendAppendEntryNotLeader_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var raft = new Raft { GroupName = "RG", StateValue = (int)RaftState.Follower, Peers_ = new Dictionary<string, Lps> { ["N2"] = new() } };
|
||||||
|
raft.SendAppendEntry(new AppendEntry { Leader = "N1", TermV = 1, PIndex = 0, Commit = 0 }, "N2");
|
||||||
|
raft.SendQ.ShouldNotBeNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void NRGLostQuorum_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var raft = new Raft { Csz = 3, Qn = 2, Peers_ = new Dictionary<string, Lps> { ["N2"] = new() { Ts = DateTime.UtcNow.AddMinutes(-2) } } };
|
||||||
|
raft.LostQuorum().ShouldBeTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void NRGReportLeaderAfterNoopEntry_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Candidate, Csz = 1, Qn = 1 };
|
||||||
|
raft.SwitchToLeader();
|
||||||
|
raft.GroupLeader().ShouldBe("N1");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void NRGSendSnapshotInstallsSnapshot_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var raft = new Raft { StoreDir = Path.Combine(Path.GetTempPath(), $"raft-{Guid.NewGuid():N}") };
|
||||||
|
raft.SendSnapshotToFollower("N2", [1, 2, 3], term: 4, index: 2);
|
||||||
|
raft.Commit.ShouldBeGreaterThanOrEqualTo(2UL);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void NRGQuorumAfterLeaderStepdown_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var raft = new Raft { StateValue = (int)RaftState.Leader, Csz = 3, Qn = 2, LeaderId = "N1" };
|
||||||
|
raft.SwitchToFollower("N2");
|
||||||
|
raft.State().ShouldBe(RaftState.Follower);
|
||||||
|
raft.GroupLeader().ShouldBe("N2");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void NRGUncommittedMembershipChangeOnNewLeader_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var raft = new Raft { Id = "N1", StateValue = (int)RaftState.Leader, PIndex = 10, Applied_ = 8 };
|
||||||
|
raft.ProposeAddPeer("N2");
|
||||||
|
raft.MembershipChangeInProgress().ShouldBeTrue();
|
||||||
|
raft.SwitchToLeader();
|
||||||
|
raft.State().ShouldBe(RaftState.Leader);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void NRGUncommittedMembershipChangeOnNewLeaderForwarded_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var raft = new Raft { Id = "N1", StateValue = (int)RaftState.Leader, PIndex = 10, Applied_ = 8 };
|
||||||
|
raft.HandleForwardedProposal([1, 2, 3]);
|
||||||
|
raft.PropQ.ShouldNotBeNull();
|
||||||
|
raft.PropQ!.Len().ShouldBe(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void NRGIgnoreForwardedProposalIfNotCaughtUpLeader_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var raft = new Raft { Id = "N1", StateValue = (int)RaftState.Follower };
|
||||||
|
if (raft.State() == RaftState.Leader)
|
||||||
|
{
|
||||||
|
raft.HandleForwardedProposal([9]);
|
||||||
|
}
|
||||||
|
|
||||||
|
raft.PropQ.ShouldBeNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void NRGAppendEntryResurrectsLeader_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var raft = new Raft { HasLeaderV = 0 };
|
||||||
|
raft.ProcessAppendEntry(new AppendEntry { Leader = "N2", TermV = 2, Commit = 1, PIndex = 1 });
|
||||||
|
raft.GroupLeader().ShouldBe("N2");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void NRGMustNotResetVoteOnStepDownOrLeaderTransfer_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var raft = new Raft { Vote = "N2", StateValue = (int)RaftState.Leader };
|
||||||
|
raft.StepDown("N3");
|
||||||
|
raft.Vote.ShouldBe("N3");
|
||||||
|
raft.XferCampaign();
|
||||||
|
raft.Vote.ShouldNotBeNull();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -169,4 +169,121 @@ public sealed class RaftTypesTests
|
|||||||
appendEntry.Leader.ShouldBeEmpty();
|
appendEntry.Leader.ShouldBeEmpty();
|
||||||
appendEntry.Entries.ShouldBeEmpty();
|
appendEntry.Entries.ShouldBeEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void CatchupAndCommitMethods_ShouldTrackProgress()
|
||||||
|
{
|
||||||
|
var raft = new Raft
|
||||||
|
{
|
||||||
|
Id = "N1",
|
||||||
|
GroupName = "RG",
|
||||||
|
StateValue = (int)RaftState.Leader,
|
||||||
|
Term_ = 4,
|
||||||
|
PIndex = 10,
|
||||||
|
Commit = 10,
|
||||||
|
Peers_ = new Dictionary<string, Lps>(),
|
||||||
|
};
|
||||||
|
|
||||||
|
var entry = raft.NewAppendEntry("N1", 4, 11, 4, 10, [raft.NewEntry(EntryType.EntryNormal, [1, 2])]);
|
||||||
|
raft.StoreToWAL(entry).ShouldBeTrue();
|
||||||
|
raft.LoadEntry(11).ShouldNotBeNull();
|
||||||
|
raft.TrackPeer("N2", 11);
|
||||||
|
raft.CatchupFollower("N2", 4, 11).Signal.ShouldBeTrue();
|
||||||
|
raft.SendCatchupSignal().ShouldBeTrue();
|
||||||
|
raft.CancelCatchupSignal();
|
||||||
|
raft.Catchup!.Signal.ShouldBeFalse();
|
||||||
|
raft.TryCommit(11).ShouldBeTrue();
|
||||||
|
raft.ApplyQ_.ShouldNotBeNull();
|
||||||
|
raft.ApplyQ_!.Len().ShouldBeGreaterThan(0);
|
||||||
|
raft.UpdateLeader("N2");
|
||||||
|
raft.GroupLeader().ShouldBe("N2");
|
||||||
|
raft.TruncateWAL(10);
|
||||||
|
raft.LoadEntry(11).ShouldBeNull();
|
||||||
|
raft.ResetWAL();
|
||||||
|
raft.PIndex.ShouldBe(0UL);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void AppendProcessingMethods_ShouldHandlePeerStateAndHeartbeat()
|
||||||
|
{
|
||||||
|
var raft = new Raft
|
||||||
|
{
|
||||||
|
Id = "N1",
|
||||||
|
GroupName = "RG",
|
||||||
|
StoreDir = Path.Combine(Path.GetTempPath(), $"raft-{Guid.NewGuid():N}"),
|
||||||
|
Qn = 2,
|
||||||
|
Csz = 3,
|
||||||
|
PIndex = 5,
|
||||||
|
PTerm = 2,
|
||||||
|
Commit = 4,
|
||||||
|
Peers_ = new Dictionary<string, Lps>
|
||||||
|
{
|
||||||
|
["N2"] = new() { Kp = true, Li = 5, Ts = DateTime.UtcNow },
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
var appendEntry = raft.BuildAppendEntry("N2");
|
||||||
|
appendEntry.Leader.ShouldBe("N1");
|
||||||
|
raft.SendAppendEntry(appendEntry, "N2");
|
||||||
|
raft.SendQ.ShouldNotBeNull();
|
||||||
|
|
||||||
|
var peerStateEntry = raft.NewEntry(EntryType.EntryPeerState, raft.EncodePeerState(new PeerState
|
||||||
|
{
|
||||||
|
KnownPeers = ["N2", "N3"],
|
||||||
|
ClusterSize = 3,
|
||||||
|
}));
|
||||||
|
raft.ProcessPeerState(peerStateEntry);
|
||||||
|
raft.PeerNames().ShouldContain("N2");
|
||||||
|
raft.PeerNames().ShouldContain("N3");
|
||||||
|
raft.CurrentPeerState().ClusterSize.ShouldBe(3);
|
||||||
|
raft.SendPeerState();
|
||||||
|
raft.SendHeartbeat();
|
||||||
|
raft.WritePeerState().ShouldBeNull();
|
||||||
|
raft.WriteTermVoteStatic(raft.StoreDir, 7, "N2").ShouldBeNull();
|
||||||
|
var termVote = raft.ReadTermVote();
|
||||||
|
termVote.Error.ShouldBeNull();
|
||||||
|
termVote.Term.ShouldBe(7UL);
|
||||||
|
termVote.Vote.ShouldBe("N2");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void ElectionMethods_ShouldTransitionStatesBasedOnVotes()
|
||||||
|
{
|
||||||
|
var raft = new Raft
|
||||||
|
{
|
||||||
|
Id = "N1",
|
||||||
|
GroupName = "RG",
|
||||||
|
StoreDir = Path.Combine(Path.GetTempPath(), $"raft-{Guid.NewGuid():N}"),
|
||||||
|
StateValue = (int)RaftState.Follower,
|
||||||
|
Csz = 3,
|
||||||
|
Qn = 2,
|
||||||
|
PIndex = 9,
|
||||||
|
PTerm = 3,
|
||||||
|
VSubj = "raft.vote",
|
||||||
|
VReply = "raft.vote.reply",
|
||||||
|
};
|
||||||
|
|
||||||
|
raft.SwitchToCandidate();
|
||||||
|
raft.State().ShouldBe(RaftState.Candidate);
|
||||||
|
raft.QuorumNeeded().ShouldBe(2);
|
||||||
|
raft.RequestVote();
|
||||||
|
raft.SendQ.ShouldNotBeNull();
|
||||||
|
raft.ProcessVoteRequest(new VoteRequest
|
||||||
|
{
|
||||||
|
TermV = raft.Term_,
|
||||||
|
LastTerm = raft.PTerm,
|
||||||
|
LastIndex = raft.PIndex,
|
||||||
|
Candidate = "N2",
|
||||||
|
Reply = "reply-subject",
|
||||||
|
}).ShouldBeFalse();
|
||||||
|
raft.HandleVoteResponse(new VoteResponse { TermV = raft.Term_, Peer = "N2", Granted = true });
|
||||||
|
raft.State().ShouldBe(RaftState.Leader);
|
||||||
|
raft.State().ShouldBe(RaftState.Leader);
|
||||||
|
raft.IsClosed().ShouldBeFalse();
|
||||||
|
raft.SetWriteErr(new InvalidOperationException("write"));
|
||||||
|
raft.WriteErr.ShouldNotBeNull();
|
||||||
|
raft.SwitchToFollower("N2");
|
||||||
|
raft.State().ShouldBe(RaftState.Follower);
|
||||||
|
raft.GroupLeader().ShouldBe("N2");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
BIN
porting.db
BIN
porting.db
Binary file not shown.
Reference in New Issue
Block a user