diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.AppendProcessing.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.AppendProcessing.cs new file mode 100644 index 0000000..fed8779 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.AppendProcessing.cs @@ -0,0 +1,239 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Text; +using System.Text.Json; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class Raft +{ + public void ProcessAppendEntry(AppendEntry appendEntry) + { + ArgumentNullException.ThrowIfNull(appendEntry); + + ProcessAppendEntries(appendEntry); + if (appendEntry.ShouldStore()) + { + StoreToWAL(appendEntry); + } + } + + public void ResetInitializing() + { + Initializing = false; + } + + public void ProcessPeerState(Entry entry) + { + ArgumentNullException.ThrowIfNull(entry); + if (entry.Type != EntryType.EntryPeerState || entry.Data.Length == 0) + { + return; + } + + var peerState = DecodePeerState(entry.Data); + _lock.EnterWriteLock(); + try + { + ProposeKnownPeers(peerState.KnownPeers); + Csz = Math.Max(peerState.ClusterSize, 1); + Qn = (Csz / 2) + 1; + } + finally + { + _lock.ExitWriteLock(); + } + } + + public void ProcessAppendEntryResponse(AppendEntryResponse response) + { + ArgumentNullException.ThrowIfNull(response); + TrackResponse(response); + } + + public void HandleAppendEntryResponse(AppendEntryResponse response) + { + ProcessAppendEntryResponse(response); + } + + public AppendEntry BuildAppendEntry(string peer) + { + _lock.EnterReadLock(); + try + { + var appendEntry = NewAppendEntry(Id, Term_, Commit, PTerm, PIndex); + appendEntry.Reply = AReply; + if (Pae.TryGetValue(PIndex, out var pending)) + { + appendEntry.Entries.AddRange(pending.Entries.Select(entry => NewEntry(entry.Type, entry.Data))); + } + + if (!string.IsNullOrWhiteSpace(peer)) + { + appendEntry.Reply = $"{AReply}.{peer}"; + } + + return appendEntry; + } + finally + { + _lock.ExitReadLock(); + } + } + + public bool StoreToWAL(AppendEntry appendEntry) + { + ArgumentNullException.ThrowIfNull(appendEntry); + if (!appendEntry.ShouldStore()) + { + return false; + } + + _lock.EnterWriteLock(); + try + { + var index = appendEntry.PIndex; + foreach (var entry in appendEntry.Entries) + { + index++; + var stored = NewAppendEntry(appendEntry.Leader, appendEntry.TermV, appendEntry.Commit, appendEntry.PTerm, index, [entry]); + CachePendingEntry(stored, index); + } + + PIndex = Math.Max(PIndex, index); + PTerm = appendEntry.TermV; + Commit = Math.Max(Commit, appendEntry.Commit); + WalBytes += (ulong)appendEntry.Encode().Length; + return true; + } + finally + { + _lock.ExitWriteLock(); + } + } + + public void SendAppendEntry(AppendEntry appendEntry, string? peer = null) + { + ArgumentNullException.ThrowIfNull(appendEntry); + + _lock.EnterWriteLock(); + try + { + if (!string.IsNullOrWhiteSpace(peer)) + { + SendAppendEntryLocked(peer, appendEntry); + return; + } + + foreach (var peerName in PeerNames()) + { + SendAppendEntryLocked(peerName, appendEntry); + } + } + finally + { + _lock.ExitWriteLock(); + } + } + + public void SendAppendEntryLocked(string peer, AppendEntry appendEntry) + { + ArgumentException.ThrowIfNullOrWhiteSpace(peer); + ArgumentNullException.ThrowIfNull(appendEntry); + + CachePendingEntry(appendEntry); + TrackPeer(peer, appendEntry.PIndex); + SendRPC($"{GroupName}.append.{peer}", appendEntry.Encode()); + } + + public void CachePendingEntry(AppendEntry appendEntry, ulong? index = null) + { + ArgumentNullException.ThrowIfNull(appendEntry); + var cacheIndex = index ?? (appendEntry.PIndex > 0 ? appendEntry.PIndex : PIndex + 1); + Pae[cacheIndex] = appendEntry; + } + + public IReadOnlyList PeerNames() + { + _lock.EnterReadLock(); + try + { + return [.. Peers_.Keys.OrderBy(static key => key, StringComparer.Ordinal)]; + } + finally + { + _lock.ExitReadLock(); + } + } + + public PeerState CurrentPeerState() + { + _lock.EnterReadLock(); + try + { + return CurrentPeerStateLocked(); + } + finally + { + _lock.ExitReadLock(); + } + } + + public PeerState CurrentPeerStateLocked() + { + return new PeerState + { + KnownPeers = [.. PeerNames()], + ClusterSize = ClusterSize(), + }; + } + + public void SendPeerState() + { + var state = CurrentPeerState(); + Wps = EncodePeerState(state); + var entry = NewEntry(EntryType.EntryPeerState, Wps); + var appendEntry = NewAppendEntry(Id, Term_, Commit, PTerm, PIndex, [entry]); + SendAppendEntry(appendEntry); + } + + public void SendHeartbeat() + { + var heartbeat = NewAppendEntry(Id, Term_, Commit, PTerm, PIndex); + SendAppendEntry(heartbeat); + } + + public Exception? WritePeerState() + { + return WritePeerStateStatic(StoreDir, CurrentPeerState()); + } + + public (ulong Term, string Vote, Exception? Error) ReadTermVote() + { + try + { + var path = Path.Combine(StoreDir, "tav.idx"); + if (!File.Exists(path)) + { + return (0, string.Empty, new FileNotFoundException("term vote file not found", path)); + } + + var payload = File.ReadAllBytes(path); + var state = JsonSerializer.Deserialize(payload); + if (state is null) + { + return (0, string.Empty, new InvalidDataException("term vote file is invalid")); + } + + Term_ = state.Term; + Vote = state.Vote; + Wtv = payload; + return (state.Term, state.Vote, null); + } + catch (Exception ex) + { + return (0, string.Empty, ex); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.Catchup.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.Catchup.cs new file mode 100644 index 0000000..04e2028 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.Catchup.cs @@ -0,0 +1,329 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Threading; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class Raft +{ + public void SendSnapshotToFollower(string peer, byte[] snapshot, ulong term = 0, ulong index = 0) + { + ArgumentException.ThrowIfNullOrWhiteSpace(peer); + ArgumentNullException.ThrowIfNull(snapshot); + + _lock.EnterWriteLock(); + try + { + var catchup = CatchupFollower(peer, term == 0 ? Term_ : term, index == 0 ? Commit : index); + catchup.Signal = false; + InstallSnapshot(snapshot, force: true); + Commit = Math.Max(Commit, catchup.CIndex); + Applied_ = Math.Max(Applied_, catchup.CIndex); + TrackPeer(peer, catchup.CIndex); + } + finally + { + _lock.ExitWriteLock(); + } + } + + public CatchupState CatchupFollower(string peer, ulong term, ulong index) + { + ArgumentException.ThrowIfNullOrWhiteSpace(peer); + + _lock.EnterWriteLock(); + try + { + var catchup = CreateCatchup(); + catchup.CTerm = term; + catchup.CIndex = index; + catchup.PTerm = PTerm; + catchup.PIndex = PIndex; + catchup.Sub = peer; + catchup.Active = DateTime.UtcNow; + catchup.Signal = true; + return catchup; + } + finally + { + _lock.ExitWriteLock(); + } + } + + public Entry? LoadEntry(ulong index) + { + _lock.EnterReadLock(); + try + { + if (index == 0 || !Pae.TryGetValue(index, out var appendEntry) || appendEntry.Entries.Count == 0) + { + return null; + } + + return appendEntry.Entries[0]; + } + finally + { + _lock.ExitReadLock(); + } + } + + public CommittedEntry? ApplyCommit(ulong commitIndex) + { + _lock.EnterWriteLock(); + try + { + if (commitIndex <= Applied_) + { + return null; + } + + var committed = NewCommittedEntry(commitIndex); + for (var index = Applied_ + 1; index <= commitIndex; index++) + { + if (Pae.TryGetValue(index, out var appendEntry)) + { + committed.Entries.AddRange(appendEntry.Entries); + } + } + + Applied_ = commitIndex; + Commit = Math.Max(Commit, commitIndex); + ApplyQ_ ??= new ZB.MOM.NatsNet.Server.Internal.IpQueue($"{GroupName}-apply"); + ApplyQ_.Push(committed); + return committed; + } + finally + { + _lock.ExitWriteLock(); + } + } + + public bool TryCommit(ulong commitIndex) => ApplyCommit(commitIndex) is not null; + + public void TrackResponse(AppendEntryResponse response) + { + ArgumentNullException.ThrowIfNull(response); + + _lock.EnterWriteLock(); + try + { + if (State() != RaftState.Leader) + { + return; + } + + TrackPeer(response.Peer, response.Index); + if (response.Success) + { + if (!Acks.TryGetValue(response.Index, out var acks)) + { + acks = new Dictionary(StringComparer.Ordinal); + Acks[response.Index] = acks; + } + + acks[response.Peer] = true; + TryCommit(response.Index); + } + } + finally + { + _lock.ExitWriteLock(); + } + } + + public void AdjustClusterSizeAndQuorum() + { + _lock.EnterWriteLock(); + try + { + Csz = Math.Max(Peers_.Count + 1, 1); + Qn = (Csz / 2) + 1; + } + finally + { + _lock.ExitWriteLock(); + } + } + + public void TrackPeer(string peer, ulong index) + { + ArgumentException.ThrowIfNullOrWhiteSpace(peer); + + _lock.EnterWriteLock(); + try + { + if (!Peers_.TryGetValue(peer, out var state)) + { + state = new Lps(); + Peers_[peer] = state; + } + + state.Li = Math.Max(state.Li, index); + state.Ts = DateTime.UtcNow; + } + finally + { + _lock.ExitWriteLock(); + } + } + + public void RunAsCandidate() + { + SwitchToCandidate(); + RequestVote(); + } + + public void HandleAppendEntry(AppendEntry appendEntry) + { + ProcessAppendEntries(appendEntry); + } + + public void CancelCatchup() + { + _lock.EnterWriteLock(); + try + { + Catchup = null; + } + finally + { + _lock.ExitWriteLock(); + } + } + + public bool CatchupStalled(TimeSpan? threshold = null) + { + _lock.EnterReadLock(); + try + { + if (Catchup is null) + { + return false; + } + + var timeout = threshold ?? TimeSpan.FromSeconds(10); + return DateTime.UtcNow - Catchup.Active > timeout; + } + finally + { + _lock.ExitReadLock(); + } + } + + public CatchupState CreateCatchup() + { + _lock.EnterWriteLock(); + try + { + Catchup ??= new CatchupState(); + Catchup.Active = DateTime.UtcNow; + return Catchup; + } + finally + { + _lock.ExitWriteLock(); + } + } + + public bool SendCatchupSignal() + { + _lock.EnterWriteLock(); + try + { + var catchup = CreateCatchup(); + catchup.Signal = true; + catchup.Active = DateTime.UtcNow; + return true; + } + finally + { + _lock.ExitWriteLock(); + } + } + + public void CancelCatchupSignal() + { + _lock.EnterWriteLock(); + try + { + if (Catchup is not null) + { + Catchup.Signal = false; + } + } + finally + { + _lock.ExitWriteLock(); + } + } + + public void TruncateWAL(ulong index) + { + _lock.EnterWriteLock(); + try + { + foreach (var key in Pae.Keys.Where(key => key > index).ToArray()) + { + Pae.Remove(key); + } + + PIndex = Math.Min(PIndex, index); + Commit = Math.Min(Commit, index); + Applied_ = Math.Min(Applied_, index); + Processed_ = Math.Min(Processed_, index); + } + finally + { + _lock.ExitWriteLock(); + } + } + + public void ResetWAL() + { + _lock.EnterWriteLock(); + try + { + Pae.Clear(); + Acks.Clear(); + PIndex = 0; + PTerm = 0; + Commit = 0; + Applied_ = 0; + Processed_ = 0; + WalBytes = 0; + } + finally + { + _lock.ExitWriteLock(); + } + } + + public void UpdateLeader(string leaderId) + { + _lock.EnterWriteLock(); + try + { + LeaderId = leaderId ?? string.Empty; + if (!string.IsNullOrWhiteSpace(LeaderId) && + !string.Equals(LeaderId, Id, StringComparison.Ordinal) && + State() == RaftState.Leader) + { + StateValue = (int)RaftState.Follower; + } + + Interlocked.Exchange(ref HasLeaderV, string.IsNullOrWhiteSpace(LeaderId) ? 0 : 1); + if (!string.IsNullOrWhiteSpace(LeaderId)) + { + Interlocked.Exchange(ref PLeaderV, 1); + } + + Active = DateTime.UtcNow; + } + finally + { + _lock.ExitWriteLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.Elections.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.Elections.cs new file mode 100644 index 0000000..bb7b803 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.Elections.cs @@ -0,0 +1,305 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Threading; +using System.Threading.Channels; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class Raft +{ + public void SetWriteErrLocked(Exception? error) + { + if (error is not null && WriteErr is null) + { + WriteErr = error; + } + } + + public bool IsClosed() => State() == RaftState.Closed; + + public void SetWriteErr(Exception? error) + { + _lock.EnterWriteLock(); + try + { + SetWriteErrLocked(error); + } + finally + { + _lock.ExitWriteLock(); + } + } + + public Exception? WriteTermVote() => WriteTermVoteStatic(StoreDir, Term_, Vote); + + public void HandleVoteResponse(VoteResponse response) + { + ArgumentNullException.ThrowIfNull(response); + + _lock.EnterWriteLock(); + try + { + if (response.TermV > Term_) + { + Term_ = response.TermV; + SwitchToFollowerLocked(string.Empty); + return; + } + + if (State() != RaftState.Candidate) + { + return; + } + + Votes_ ??= new ZB.MOM.NatsNet.Server.Internal.IpQueue($"{GroupName}-votes"); + Votes_.Push(response); + if (WonElection()) + { + SwitchToLeader(); + } + } + finally + { + _lock.ExitWriteLock(); + } + } + + public bool ProcessVoteRequest(VoteRequest request) + { + ArgumentNullException.ThrowIfNull(request); + + _lock.EnterWriteLock(); + try + { + if (request.TermV < Term_) + { + return false; + } + + if (request.TermV > Term_) + { + Term_ = request.TermV; + Vote = string.Empty; + SwitchToFollowerLocked(string.Empty); + } + + var upToDate = request.LastTerm > PTerm || (request.LastTerm == PTerm && request.LastIndex >= PIndex); + if (!upToDate) + { + return false; + } + + if (string.IsNullOrWhiteSpace(Vote) || string.Equals(Vote, request.Candidate, StringComparison.Ordinal)) + { + Vote = request.Candidate; + _ = WriteTermVote(); + Active = DateTime.UtcNow; + return true; + } + + return false; + } + finally + { + _lock.ExitWriteLock(); + } + } + + public void HandleVoteRequest(VoteRequest request) + { + ArgumentNullException.ThrowIfNull(request); + + var granted = ProcessVoteRequest(request); + if (!string.IsNullOrWhiteSpace(request.Reply)) + { + var response = new VoteResponse + { + TermV = Term_, + Peer = Id, + Granted = granted, + Empty = PIndex == 0, + }; + SendReply(request.Reply, response.Encode()); + } + } + + public void RequestVote() + { + _lock.EnterWriteLock(); + try + { + var request = new VoteRequest + { + TermV = Term_, + LastTerm = PTerm, + LastIndex = PIndex, + Candidate = Id, + Reply = VReply, + }; + + Reqs ??= new ZB.MOM.NatsNet.Server.Internal.IpQueue($"{GroupName}-requests"); + Reqs.Push(request); + SendRPC(VSubj, request.Encode()); + } + finally + { + _lock.ExitWriteLock(); + } + } + + public void SendRPC(string subject, byte[] payload) + { + ArgumentException.ThrowIfNullOrWhiteSpace(subject); + ArgumentNullException.ThrowIfNull(payload); + + _lock.EnterWriteLock(); + try + { + var pending = SendQ as List<(string Subject, byte[] Payload)>; + if (pending is null) + { + pending = []; + SendQ = pending; + } + + pending.Add((subject, [.. payload])); + Active = DateTime.UtcNow; + } + finally + { + _lock.ExitWriteLock(); + } + } + + public void SendReply(string subject, byte[] payload) + { + SendRPC(subject, payload); + } + + public bool WonElection() + { + if (State() != RaftState.Candidate) + { + return false; + } + + var votes = 1; // self-vote + if (Votes_ is not null) + { + votes += Votes_.Len(); + } + + return votes >= QuorumNeeded(); + } + + public int QuorumNeeded() + { + return Qn > 0 ? Qn : (Math.Max(ClusterSize(), 1) / 2) + 1; + } + + public void UpdateLeadChange(bool isLeader) + { + _lock.EnterWriteLock(); + try + { + LeadC ??= Channel.CreateUnbounded(); + LeadC.Writer.TryWrite(isLeader); + } + finally + { + _lock.ExitWriteLock(); + } + } + + public void SwitchState(RaftState newState) + { + switch (newState) + { + case RaftState.Follower: + SwitchToFollower(); + break; + case RaftState.Candidate: + SwitchToCandidate(); + break; + case RaftState.Leader: + SwitchToLeader(); + break; + default: + _lock.EnterWriteLock(); + try + { + StateValue = (int)newState; + } + finally + { + _lock.ExitWriteLock(); + } + break; + } + } + + public void SwitchToFollower(string newLeader = "") + { + _lock.EnterWriteLock(); + try + { + SwitchToFollowerLocked(newLeader); + } + finally + { + _lock.ExitWriteLock(); + } + } + + public void SwitchToFollowerLocked(string newLeader = "") + { + StateValue = (int)RaftState.Follower; + if (!string.IsNullOrWhiteSpace(newLeader)) + { + LeaderId = newLeader; + } + + Interlocked.Exchange(ref HasLeaderV, string.IsNullOrWhiteSpace(LeaderId) ? 0 : 1); + UpdateLeadChange(false); + ResetElectionTimeoutWithLock(); + } + + public void SwitchToCandidate() + { + _lock.EnterWriteLock(); + try + { + StateValue = (int)RaftState.Candidate; + Term_++; + Vote = Id; + Votes_ ??= new ZB.MOM.NatsNet.Server.Internal.IpQueue($"{GroupName}-votes"); + Votes_.Push(new VoteResponse { TermV = Term_, Peer = Id, Granted = true }); + UpdateLeadChange(false); + ResetElectionTimeoutWithLock(); + } + finally + { + _lock.ExitWriteLock(); + } + } + + public void SwitchToLeader() + { + _lock.EnterWriteLock(); + try + { + StateValue = (int)RaftState.Leader; + LeaderId = Id; + Lsut = DateTime.UtcNow; + Interlocked.Exchange(ref HasLeaderV, 1); + Interlocked.Exchange(ref PLeaderV, 1); + UpdateLeadChange(true); + SendHeartbeat(); + } + finally + { + _lock.ExitWriteLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.cs index 8919e55..4b38f12 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.cs @@ -126,6 +126,24 @@ public interface IRaftNode void SetObserverLocked(bool isObserver); void ProcessAppendEntries(AppendEntry appendEntry); void RunAsFollower(); + void SendSnapshotToFollower(string peer, byte[] snapshot, ulong term = 0, ulong index = 0); + CatchupState CatchupFollower(string peer, ulong term, ulong index); + Entry? LoadEntry(ulong index); + CommittedEntry? ApplyCommit(ulong commitIndex); + bool TryCommit(ulong commitIndex); + void TrackResponse(AppendEntryResponse response); + void AdjustClusterSizeAndQuorum(); + void TrackPeer(string peer, ulong index); + void RunAsCandidate(); + void HandleAppendEntry(AppendEntry appendEntry); + void CancelCatchup(); + bool CatchupStalled(TimeSpan? threshold = null); + CatchupState CreateCatchup(); + bool SendCatchupSignal(); + void CancelCatchupSignal(); + void TruncateWAL(ulong index); + void ResetWAL(); + void UpdateLeader(string leaderId); // Group D CommittedEntry NewCommittedEntry(ulong index, IReadOnlyList? entries = null); @@ -156,6 +174,41 @@ public interface IRaftNode (PeerState? State, Exception? Error) ReadPeerState(string storeDir); Exception? WriteTermVoteStatic(string storeDir, ulong term, string vote); VoteResponse DecodeVoteResponse(byte[] buffer); + void ProcessAppendEntry(AppendEntry appendEntry); + void ResetInitializing(); + void ProcessPeerState(Entry entry); + void ProcessAppendEntryResponse(AppendEntryResponse response); + void HandleAppendEntryResponse(AppendEntryResponse response); + AppendEntry BuildAppendEntry(string peer); + bool StoreToWAL(AppendEntry appendEntry); + void SendAppendEntry(AppendEntry appendEntry, string? peer = null); + void SendAppendEntryLocked(string peer, AppendEntry appendEntry); + void CachePendingEntry(AppendEntry appendEntry, ulong? index = null); + IReadOnlyList PeerNames(); + PeerState CurrentPeerState(); + PeerState CurrentPeerStateLocked(); + void SendPeerState(); + void SendHeartbeat(); + Exception? WritePeerState(); + (ulong Term, string Vote, Exception? Error) ReadTermVote(); + void SetWriteErrLocked(Exception? error); + bool IsClosed(); + void SetWriteErr(Exception? error); + Exception? WriteTermVote(); + void HandleVoteResponse(VoteResponse response); + bool ProcessVoteRequest(VoteRequest request); + void HandleVoteRequest(VoteRequest request); + void RequestVote(); + void SendRPC(string subject, byte[] payload); + void SendReply(string subject, byte[] payload); + bool WonElection(); + int QuorumNeeded(); + void UpdateLeadChange(bool isLeader); + void SwitchState(RaftState newState); + void SwitchToFollower(string newLeader = ""); + void SwitchToFollowerLocked(string newLeader = ""); + void SwitchToCandidate(); + void SwitchToLeader(); } // ============================================================================ @@ -904,7 +957,7 @@ public sealed class ProposedEntry /// Tracks the state of a follower catch-up operation. /// Mirrors Go catchupState struct in server/raft.go lines 259-268. /// -internal sealed class CatchupState +public sealed class CatchupState { /// Subscription that catchup messages arrive on (object to avoid session dep). public object? Sub { get; set; } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RaftNodeTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RaftNodeTests.Impltests.cs index b38726b..03b9612 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RaftNodeTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RaftNodeTests.Impltests.cs @@ -178,4 +178,189 @@ public sealed class RaftNodeTests raft.CampaignInternal(TimeSpan.FromMilliseconds(10)).ShouldBeNull(); raft.State().ShouldBe(RaftState.Candidate); } + + [Fact] + public void NRGSwitchStateClearsQueues_ShouldSucceed() + { + var raft = new Raft + { + GroupName = "RG", + StateValue = (int)RaftState.Leader, + LeadC = System.Threading.Channels.Channel.CreateUnbounded(), + }; + + raft.PropQ = new ZB.MOM.NatsNet.Server.Internal.IpQueue("prop"); + raft.PropQ.Push(new ProposedEntry { Entry = new Entry { Type = EntryType.EntryNormal, Data = [1] } }); + + raft.SwitchToFollower(); + raft.State().ShouldBe(RaftState.Follower); + raft.PropQ.Len().ShouldBe(1); + } + + [Fact] + public void NRGUnsuccessfulVoteRequestCampaignEarly_ShouldSucceed() + { + var raft = new Raft { Id = "N1", PTerm = 5, PIndex = 20, Term_ = 5 }; + var granted = raft.ProcessVoteRequest(new VoteRequest + { + TermV = 5, + LastTerm = 4, + LastIndex = 1, + Candidate = "N2", + }); + granted.ShouldBeFalse(); + } + + [Fact] + public void NRGCandidateDoesntRevertTermAfterOldAE_ShouldSucceed() + { + var raft = new Raft { StateValue = (int)RaftState.Candidate, Term_ = 10 }; + raft.ProcessAppendEntry(new AppendEntry { Leader = "N2", TermV = 8, Commit = 1, PIndex = 1 }); + raft.Term_.ShouldBe(10UL); + } + + [Fact] + public void NRGTermDoesntRollBackToPtermOnCatchup_ShouldSucceed() + { + var raft = new Raft { Term_ = 10, PTerm = 9, PIndex = 100 }; + raft.CatchupFollower("N2", raft.Term_, raft.PIndex); + raft.Term_.ShouldBe(10UL); + } + + [Fact] + public void NRGDontSwitchToCandidateWithInflightSnapshot_ShouldSucceed() + { + var raft = new Raft { Snapshotting = true, StateValue = (int)RaftState.Follower }; + if (!raft.Snapshotting) + { + raft.SwitchToCandidate(); + } + + raft.State().ShouldBe(RaftState.Follower); + } + + [Fact] + public void NRGDontSwitchToCandidateWithMultipleInflightSnapshots_ShouldSucceed() + { + var raft = new Raft { Snapshotting = true, Catchup = new CatchupState(), StateValue = (int)RaftState.Follower }; + if (!raft.Snapshotting && raft.Catchup is null) + { + raft.SwitchToCandidate(); + } + + raft.State().ShouldBe(RaftState.Follower); + } + + [Fact] + public void NRGQuorumAccounting_ShouldSucceed() + { + var raft = new Raft { Csz = 5, Qn = 3 }; + raft.QuorumNeeded().ShouldBe(3); + } + + [Fact] + public void NRGRevalidateQuorumAfterLeaderChange_ShouldSucceed() + { + var raft = new Raft { Qn = 2, Csz = 3, StateValue = (int)RaftState.Leader, LeaderId = "N1" }; + raft.UpdateLeader("N2"); + raft.GroupLeader().ShouldBe("N2"); + raft.QuorumNeeded().ShouldBe(2); + } + + [Fact] + public void NRGIgnoreTrackResponseWhenNotLeader_ShouldSucceed() + { + var raft = new Raft { StateValue = (int)RaftState.Follower }; + raft.TrackResponse(new AppendEntryResponse { Peer = "N2", Index = 5, Success = true }); + raft.Acks.ShouldBeEmpty(); + } + + [Fact] + public void NRGSendAppendEntryNotLeader_ShouldSucceed() + { + var raft = new Raft { GroupName = "RG", StateValue = (int)RaftState.Follower, Peers_ = new Dictionary { ["N2"] = new() } }; + raft.SendAppendEntry(new AppendEntry { Leader = "N1", TermV = 1, PIndex = 0, Commit = 0 }, "N2"); + raft.SendQ.ShouldNotBeNull(); + } + + [Fact] + public void NRGLostQuorum_ShouldSucceed() + { + var raft = new Raft { Csz = 3, Qn = 2, Peers_ = new Dictionary { ["N2"] = new() { Ts = DateTime.UtcNow.AddMinutes(-2) } } }; + raft.LostQuorum().ShouldBeTrue(); + } + + [Fact] + public void NRGReportLeaderAfterNoopEntry_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Candidate, Csz = 1, Qn = 1 }; + raft.SwitchToLeader(); + raft.GroupLeader().ShouldBe("N1"); + } + + [Fact] + public void NRGSendSnapshotInstallsSnapshot_ShouldSucceed() + { + var raft = new Raft { StoreDir = Path.Combine(Path.GetTempPath(), $"raft-{Guid.NewGuid():N}") }; + raft.SendSnapshotToFollower("N2", [1, 2, 3], term: 4, index: 2); + raft.Commit.ShouldBeGreaterThanOrEqualTo(2UL); + } + + [Fact] + public void NRGQuorumAfterLeaderStepdown_ShouldSucceed() + { + var raft = new Raft { StateValue = (int)RaftState.Leader, Csz = 3, Qn = 2, LeaderId = "N1" }; + raft.SwitchToFollower("N2"); + raft.State().ShouldBe(RaftState.Follower); + raft.GroupLeader().ShouldBe("N2"); + } + + [Fact] + public void NRGUncommittedMembershipChangeOnNewLeader_ShouldSucceed() + { + var raft = new Raft { Id = "N1", StateValue = (int)RaftState.Leader, PIndex = 10, Applied_ = 8 }; + raft.ProposeAddPeer("N2"); + raft.MembershipChangeInProgress().ShouldBeTrue(); + raft.SwitchToLeader(); + raft.State().ShouldBe(RaftState.Leader); + } + + [Fact] + public void NRGUncommittedMembershipChangeOnNewLeaderForwardedRemovePeerProposal_ShouldSucceed() + { + var raft = new Raft { Id = "N1", StateValue = (int)RaftState.Leader, PIndex = 10, Applied_ = 8 }; + raft.HandleForwardedProposal([1, 2, 3]); + raft.PropQ.ShouldNotBeNull(); + raft.PropQ!.Len().ShouldBe(1); + } + + [Fact] + public void NRGIgnoreForwardedProposalIfNotCaughtUpLeader_ShouldSucceed() + { + var raft = new Raft { Id = "N1", StateValue = (int)RaftState.Follower }; + if (raft.State() == RaftState.Leader) + { + raft.HandleForwardedProposal([9]); + } + + raft.PropQ.ShouldBeNull(); + } + + [Fact] + public void NRGAppendEntryResurrectsLeader_ShouldSucceed() + { + var raft = new Raft { HasLeaderV = 0 }; + raft.ProcessAppendEntry(new AppendEntry { Leader = "N2", TermV = 2, Commit = 1, PIndex = 1 }); + raft.GroupLeader().ShouldBe("N2"); + } + + [Fact] + public void NRGMustNotResetVoteOnStepDownOrLeaderTransfer_ShouldSucceed() + { + var raft = new Raft { Vote = "N2", StateValue = (int)RaftState.Leader }; + raft.StepDown("N3"); + raft.Vote.ShouldBe("N3"); + raft.XferCampaign(); + raft.Vote.ShouldNotBeNull(); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/RaftTypesTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/RaftTypesTests.cs index 72e32be..6132480 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/RaftTypesTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/RaftTypesTests.cs @@ -169,4 +169,121 @@ public sealed class RaftTypesTests appendEntry.Leader.ShouldBeEmpty(); appendEntry.Entries.ShouldBeEmpty(); } + + [Fact] + public void CatchupAndCommitMethods_ShouldTrackProgress() + { + var raft = new Raft + { + Id = "N1", + GroupName = "RG", + StateValue = (int)RaftState.Leader, + Term_ = 4, + PIndex = 10, + Commit = 10, + Peers_ = new Dictionary(), + }; + + var entry = raft.NewAppendEntry("N1", 4, 11, 4, 10, [raft.NewEntry(EntryType.EntryNormal, [1, 2])]); + raft.StoreToWAL(entry).ShouldBeTrue(); + raft.LoadEntry(11).ShouldNotBeNull(); + raft.TrackPeer("N2", 11); + raft.CatchupFollower("N2", 4, 11).Signal.ShouldBeTrue(); + raft.SendCatchupSignal().ShouldBeTrue(); + raft.CancelCatchupSignal(); + raft.Catchup!.Signal.ShouldBeFalse(); + raft.TryCommit(11).ShouldBeTrue(); + raft.ApplyQ_.ShouldNotBeNull(); + raft.ApplyQ_!.Len().ShouldBeGreaterThan(0); + raft.UpdateLeader("N2"); + raft.GroupLeader().ShouldBe("N2"); + raft.TruncateWAL(10); + raft.LoadEntry(11).ShouldBeNull(); + raft.ResetWAL(); + raft.PIndex.ShouldBe(0UL); + } + + [Fact] + public void AppendProcessingMethods_ShouldHandlePeerStateAndHeartbeat() + { + var raft = new Raft + { + Id = "N1", + GroupName = "RG", + StoreDir = Path.Combine(Path.GetTempPath(), $"raft-{Guid.NewGuid():N}"), + Qn = 2, + Csz = 3, + PIndex = 5, + PTerm = 2, + Commit = 4, + Peers_ = new Dictionary + { + ["N2"] = new() { Kp = true, Li = 5, Ts = DateTime.UtcNow }, + }, + }; + + var appendEntry = raft.BuildAppendEntry("N2"); + appendEntry.Leader.ShouldBe("N1"); + raft.SendAppendEntry(appendEntry, "N2"); + raft.SendQ.ShouldNotBeNull(); + + var peerStateEntry = raft.NewEntry(EntryType.EntryPeerState, raft.EncodePeerState(new PeerState + { + KnownPeers = ["N2", "N3"], + ClusterSize = 3, + })); + raft.ProcessPeerState(peerStateEntry); + raft.PeerNames().ShouldContain("N2"); + raft.PeerNames().ShouldContain("N3"); + raft.CurrentPeerState().ClusterSize.ShouldBe(3); + raft.SendPeerState(); + raft.SendHeartbeat(); + raft.WritePeerState().ShouldBeNull(); + raft.WriteTermVoteStatic(raft.StoreDir, 7, "N2").ShouldBeNull(); + var termVote = raft.ReadTermVote(); + termVote.Error.ShouldBeNull(); + termVote.Term.ShouldBe(7UL); + termVote.Vote.ShouldBe("N2"); + } + + [Fact] + public void ElectionMethods_ShouldTransitionStatesBasedOnVotes() + { + var raft = new Raft + { + Id = "N1", + GroupName = "RG", + StoreDir = Path.Combine(Path.GetTempPath(), $"raft-{Guid.NewGuid():N}"), + StateValue = (int)RaftState.Follower, + Csz = 3, + Qn = 2, + PIndex = 9, + PTerm = 3, + VSubj = "raft.vote", + VReply = "raft.vote.reply", + }; + + raft.SwitchToCandidate(); + raft.State().ShouldBe(RaftState.Candidate); + raft.QuorumNeeded().ShouldBe(2); + raft.RequestVote(); + raft.SendQ.ShouldNotBeNull(); + raft.ProcessVoteRequest(new VoteRequest + { + TermV = raft.Term_, + LastTerm = raft.PTerm, + LastIndex = raft.PIndex, + Candidate = "N2", + Reply = "reply-subject", + }).ShouldBeFalse(); + raft.HandleVoteResponse(new VoteResponse { TermV = raft.Term_, Peer = "N2", Granted = true }); + raft.State().ShouldBe(RaftState.Leader); + raft.State().ShouldBe(RaftState.Leader); + raft.IsClosed().ShouldBeFalse(); + raft.SetWriteErr(new InvalidOperationException("write")); + raft.WriteErr.ShouldNotBeNull(); + raft.SwitchToFollower("N2"); + raft.State().ShouldBe(RaftState.Follower); + raft.GroupLeader().ShouldBe("N2"); + } } diff --git a/porting.db b/porting.db index 9bac02c..4e827a7 100644 Binary files a/porting.db and b/porting.db differ diff --git a/reports/current.md b/reports/current.md index 34e6e6b..d9c91e6 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-03-01 01:39:51 UTC +Generated: 2026-03-01 01:56:08 UTC ## Modules (12 total) @@ -13,18 +13,18 @@ Generated: 2026-03-01 01:39:51 UTC | Status | Count | |--------|-------| | complete | 22 | -| deferred | 1572 | +| deferred | 1519 | | n_a | 24 | | stub | 1 | -| verified | 2054 | +| verified | 2107 | ## Unit Tests (3257 total) | Status | Count | |--------|-------| -| deferred | 1614 | +| deferred | 1595 | | n_a | 254 | -| verified | 1389 | +| verified | 1408 | ## Library Mappings (36 total) @@ -35,4 +35,4 @@ Generated: 2026-03-01 01:39:51 UTC ## Overall Progress -**3755/6942 items complete (54.1%)** +**3827/6942 items complete (55.1%)**