namespace NATS.Server.Raft; public sealed class RaftNode : IDisposable { private int _votesReceived; private readonly List _cluster = []; private readonly RaftReplicator _replicator = new(); private readonly RaftSnapshotStore _snapshotStore = new(); private readonly IRaftTransport? _transport; private readonly string? _persistDirectory; private readonly HashSet _members = new(StringComparer.Ordinal); // B2: Election timer fields // Go reference: raft.go:1400-1450 (resetElectionTimeout), raft.go:1500-1550 (campaign logic) private Timer? _electionTimer; private CancellationTokenSource? _electionTimerCts; // B3: Peer state tracking // Go reference: raft.go peer tracking (nextIndex, matchIndex, last contact) private readonly Dictionary _peerStates = new(StringComparer.Ordinal); // B4: In-flight membership change tracking — only one at a time is permitted. // Go reference: raft.go:961-1019 (proposeAddPeer / proposeRemovePeer, single-change invariant) private long _membershipChangeIndex; // Joint consensus (two-phase membership change) per Raft paper Section 4. // During the joint phase both the old config (Cold) and new config (Cnew) are stored. // A quorum decision requires majority from BOTH configurations simultaneously. // Go reference: raft.go joint consensus / two-phase membership transitions. private HashSet? _jointOldMembers; private HashSet? _jointNewMembers; // Pre-vote: Go NATS server does not implement pre-vote (RFC 5849 §9.6). Skipped for parity. public string Id { get; } public int Term => TermState.CurrentTerm; public bool IsLeader => Role == RaftRole.Leader; public RaftRole Role { get; private set; } = RaftRole.Follower; public IReadOnlyCollection Members => _members; public RaftTermState TermState { get; } = new(); public long AppliedIndex { get; set; } public RaftLog Log { get; private set; } = new(); // B1: Commit tracking // Go reference: raft.go:150-160 (applied/processed fields), raft.go:2100-2150 (ApplyQ) public long CommitIndex { get; private set; } public long ProcessedIndex { get; private set; } public CommitQueue CommitQueue { get; } = new(); // B2: Election timeout configuration (milliseconds) public int ElectionTimeoutMinMs { get; set; } = 150; public int ElectionTimeoutMaxMs { get; set; } = 300; // B6: Pre-vote protocol // Go reference: raft.go:1600-1700 (pre-vote logic) // When enabled, a node first conducts a pre-vote round before starting a real election. // This prevents partitioned nodes from disrupting the cluster by incrementing terms. public bool PreVoteEnabled { get; set; } = true; // B4: True while a membership change log entry is pending quorum. // Go reference: raft.go:961-1019 single-change invariant. public bool MembershipChangeInProgress => Interlocked.Read(ref _membershipChangeIndex) > 0; /// /// True when this node is in the joint consensus phase (transitioning between /// two membership configurations). /// Go reference: raft.go joint consensus / two-phase membership transitions. /// public bool InJointConsensus => _jointNewMembers != null; /// /// The new (Cnew) member set stored during a joint configuration transition, /// or null when not in joint consensus. Exposed for testing. /// public IReadOnlyCollection? JointNewMembers => _jointNewMembers; public RaftNode(string id, IRaftTransport? transport = null, string? persistDirectory = null) { Id = id; _transport = transport; _persistDirectory = persistDirectory; _members.Add(id); } public void ConfigureCluster(IEnumerable peers) { _cluster.Clear(); _cluster.AddRange(peers); _members.Clear(); _peerStates.Clear(); foreach (var peer in peers) { _members.Add(peer.Id); // B3: Initialize peer state for all peers except self if (!string.Equals(peer.Id, Id, StringComparison.Ordinal)) { _peerStates[peer.Id] = new RaftPeerState { PeerId = peer.Id }; } } } public void AddMember(string memberId) => _members.Add(memberId); public void RemoveMember(string memberId) => _members.Remove(memberId); public void StartElection(int clusterSize) { Role = RaftRole.Candidate; TermState.CurrentTerm++; TermState.VotedFor = Id; _votesReceived = 1; TryBecomeLeader(clusterSize); } public VoteResponse GrantVote(int term, string candidateId = "") { if (term < TermState.CurrentTerm) return new VoteResponse { Granted = false }; if (term > TermState.CurrentTerm) { TermState.CurrentTerm = term; TermState.VotedFor = null; } if (!string.IsNullOrEmpty(TermState.VotedFor) && !string.Equals(TermState.VotedFor, candidateId, StringComparison.Ordinal)) { return new VoteResponse { Granted = false }; } TermState.VotedFor = candidateId; return new VoteResponse { Granted = true }; } public void ReceiveHeartbeat(int term, string? fromPeerId = null) { if (term < TermState.CurrentTerm) return; TermState.CurrentTerm = term; Role = RaftRole.Follower; // B2: Reset election timer on valid heartbeat ResetElectionTimeout(); // B3: Update peer contact time if (fromPeerId != null && _peerStates.TryGetValue(fromPeerId, out var peerState)) { peerState.LastContact = DateTime.UtcNow; } } public void ReceiveVote(VoteResponse response, int clusterSize = 3) { if (!response.Granted) return; _votesReceived++; TryBecomeLeader(clusterSize); } public async ValueTask ProposeAsync(string command, CancellationToken ct) { if (Role != RaftRole.Leader) throw new InvalidOperationException("Only leader can propose entries."); var entry = Log.Append(TermState.CurrentTerm, command); var followers = _cluster.Where(n => n.Id != Id).ToList(); var results = await _replicator.ReplicateAsync(Id, entry, followers, _transport, ct); var acknowledgements = results.Count(r => r.Success); var quorum = (_cluster.Count / 2) + 1; if (acknowledgements + 1 >= quorum) { AppliedIndex = entry.Index; foreach (var node in _cluster) node.AppliedIndex = Math.Max(node.AppliedIndex, entry.Index); // B1: Update commit index and enqueue for state machine application CommitIndex = entry.Index; await CommitQueue.EnqueueAsync(entry, ct); // B3: Update peer match/next indices for successful replications foreach (var result in results.Where(r => r.Success)) { if (_peerStates.TryGetValue(result.FollowerId, out var peerState)) { peerState.MatchIndex = Math.Max(peerState.MatchIndex, entry.Index); peerState.NextIndex = entry.Index + 1; peerState.LastContact = DateTime.UtcNow; } } foreach (var node in _cluster.Where(n => n._persistDirectory != null)) await node.PersistAsync(ct); } if (_persistDirectory != null) await PersistAsync(ct); return entry.Index; } // B4: Membership change proposals // Go reference: raft.go:961-1019 (proposeAddPeer, proposeRemovePeer) /// /// Proposes adding a new peer to the cluster as a RAFT log entry. /// Only the leader may propose; only one membership change may be in flight at a time. /// After the entry reaches quorum the peer is added to _members. /// Go reference: raft.go:961-990 (proposeAddPeer). /// public async ValueTask ProposeAddPeerAsync(string peerId, CancellationToken ct) { if (Role != RaftRole.Leader) throw new InvalidOperationException("Only the leader can propose membership changes."); if (Interlocked.Read(ref _membershipChangeIndex) > 0) throw new InvalidOperationException("A membership change is already in progress."); var command = $"+peer:{peerId}"; var entry = Log.Append(TermState.CurrentTerm, command); Interlocked.Exchange(ref _membershipChangeIndex, entry.Index); var followers = _cluster.Where(n => n.Id != Id).ToList(); var results = await _replicator.ReplicateAsync(Id, entry, followers, _transport, ct); var acknowledgements = results.Count(r => r.Success); var quorum = (_cluster.Count / 2) + 1; if (acknowledgements + 1 >= quorum) { CommitIndex = entry.Index; AppliedIndex = entry.Index; await CommitQueue.EnqueueAsync(entry, ct); // Apply the membership change: add the peer and track its state _members.Add(peerId); if (!string.Equals(peerId, Id, StringComparison.Ordinal) && !_peerStates.ContainsKey(peerId)) { _peerStates[peerId] = new RaftPeerState { PeerId = peerId }; } } // Clear the in-flight tracking regardless of quorum outcome Interlocked.Exchange(ref _membershipChangeIndex, 0); return entry.Index; } /// /// Proposes removing a peer from the cluster as a RAFT log entry. /// Refuses to remove the last remaining member. /// Only the leader may propose; only one membership change may be in flight at a time. /// Go reference: raft.go:992-1019 (proposeRemovePeer). /// public async ValueTask ProposeRemovePeerAsync(string peerId, CancellationToken ct) { if (Role != RaftRole.Leader) throw new InvalidOperationException("Only the leader can propose membership changes."); if (Interlocked.Read(ref _membershipChangeIndex) > 0) throw new InvalidOperationException("A membership change is already in progress."); if (string.Equals(peerId, Id, StringComparison.Ordinal)) throw new InvalidOperationException("Leader cannot remove itself. Step down first."); if (_members.Count <= 1) throw new InvalidOperationException("Cannot remove the last member from the cluster."); var command = $"-peer:{peerId}"; var entry = Log.Append(TermState.CurrentTerm, command); Interlocked.Exchange(ref _membershipChangeIndex, entry.Index); var followers = _cluster.Where(n => n.Id != Id).ToList(); var results = await _replicator.ReplicateAsync(Id, entry, followers, _transport, ct); var acknowledgements = results.Count(r => r.Success); var quorum = (_cluster.Count / 2) + 1; if (acknowledgements + 1 >= quorum) { CommitIndex = entry.Index; AppliedIndex = entry.Index; await CommitQueue.EnqueueAsync(entry, ct); // Apply the membership change: remove the peer and its state _members.Remove(peerId); _peerStates.Remove(peerId); } // Clear the in-flight tracking regardless of quorum outcome Interlocked.Exchange(ref _membershipChangeIndex, 0); return entry.Index; } // Joint consensus (Raft paper Section 4) — two-phase membership transitions. // Go reference: raft.go joint consensus / two-phase membership transitions. /// /// Enters the joint consensus phase with the given old and new configurations. /// During this phase quorum decisions require majority from BOTH configurations. /// The active member set is set to the union of Cold and Cnew so that entries /// are replicated to all nodes that participate in either configuration. /// Go reference: raft.go Section 4 (joint consensus). /// public void BeginJointConsensus(IReadOnlyCollection cold, IReadOnlyCollection cnew) { _jointOldMembers = new HashSet(cold, StringComparer.Ordinal); _jointNewMembers = new HashSet(cnew, StringComparer.Ordinal); // The active member set is the union of both configs foreach (var member in cnew) _members.Add(member); } /// /// Commits the joint configuration by finalizing Cnew as the active member set. /// Clears both Cold and Cnew, leaving only the new configuration. /// Call this once the Cnew log entry has reached quorum in both configs. /// Go reference: raft.go joint consensus commit. /// public void CommitJointConsensus() { if (_jointNewMembers == null) return; _members.Clear(); foreach (var m in _jointNewMembers) _members.Add(m); _jointOldMembers = null; _jointNewMembers = null; } /// /// During joint consensus, checks whether a set of acknowledging voters satisfies /// a majority in BOTH the old configuration (Cold) and the new configuration (Cnew). /// Returns false when not in joint consensus. /// Go reference: raft.go Section 4 — joint config quorum calculation. /// public bool CalculateJointQuorum( IReadOnlyCollection coldVoters, IReadOnlyCollection cnewVoters) { if (_jointOldMembers == null || _jointNewMembers == null) return false; var oldQuorum = (_jointOldMembers.Count / 2) + 1; var newQuorum = (_jointNewMembers.Count / 2) + 1; return coldVoters.Count >= oldQuorum && cnewVoters.Count >= newQuorum; } // B5: Snapshot checkpoints and log compaction // Go reference: raft.go CreateSnapshotCheckpoint, DrainAndReplaySnapshot /// /// Creates a snapshot at the current applied index and compacts the log up to that point. /// This combines snapshot creation with log truncation so that snapshotted entries /// do not need to be replayed on restart. /// Go reference: raft.go CreateSnapshotCheckpoint. /// public async Task CreateSnapshotCheckpointAsync(CancellationToken ct) { var snapshot = new RaftSnapshot { LastIncludedIndex = AppliedIndex, LastIncludedTerm = Term, }; await _snapshotStore.SaveAsync(snapshot, ct); Log.Compact(snapshot.LastIncludedIndex); return snapshot; } /// /// Drains the commit queue, installs the given snapshot, and updates the commit index. /// Used when a leader sends a snapshot to a lagging follower: the follower pauses its /// apply pipeline, discards pending entries, then fast-forwards to the snapshot state. /// Go reference: raft.go DrainAndReplaySnapshot. /// public async Task DrainAndReplaySnapshotAsync(RaftSnapshot snapshot, CancellationToken ct) { // Drain any pending commit-queue entries that are now superseded by the snapshot while (CommitQueue.TryDequeue(out _)) { // discard — snapshot covers these } // Install the snapshot: replaces the log and advances applied state Log.ReplaceWithSnapshot(snapshot); AppliedIndex = snapshot.LastIncludedIndex; CommitIndex = snapshot.LastIncludedIndex; await _snapshotStore.SaveAsync(snapshot, ct); } /// /// Compacts the log up to the most recent snapshot index. /// Entries already covered by a snapshot are removed from the in-memory log. /// This is typically called after a snapshot has been persisted. /// Go reference: raft.go WAL compact. /// public Task CompactLogAsync(CancellationToken ct) { _ = ct; // Compact up to the applied index (which is the snapshot point) if (AppliedIndex > 0) Log.Compact(AppliedIndex); return Task.CompletedTask; } /// /// Installs a snapshot assembled from streaming chunks. /// Used for large snapshot transfers where the entire snapshot is sent in pieces. /// Go reference: raft.go:3500-3700 (installSnapshot with chunked transfer). /// public async Task InstallSnapshotFromChunksAsync( IEnumerable chunks, long snapshotIndex, int snapshotTerm, CancellationToken ct) { var checkpoint = new RaftSnapshotCheckpoint { SnapshotIndex = snapshotIndex, SnapshotTerm = snapshotTerm, }; foreach (var chunk in chunks) checkpoint.AddChunk(chunk); var data = checkpoint.Assemble(); var snapshot = new RaftSnapshot { LastIncludedIndex = snapshotIndex, LastIncludedTerm = snapshotTerm, Data = data, }; Log.ReplaceWithSnapshot(snapshot); AppliedIndex = snapshotIndex; CommitIndex = snapshotIndex; await _snapshotStore.SaveAsync(snapshot, ct); } /// /// Marks the given index as processed by the state machine. /// Go reference: raft.go applied/processed tracking. /// public void MarkProcessed(long index) { if (index > ProcessedIndex) ProcessedIndex = index; } public void ReceiveReplicatedEntry(RaftLogEntry entry) { Log.AppendReplicated(entry); } public Task TryAppendFromLeaderAsync(RaftLogEntry entry, CancellationToken ct) { _ = ct; if (entry.Term < TermState.CurrentTerm) throw new InvalidOperationException("stale term append rejected"); // B2: Reset election timer when receiving append from leader ResetElectionTimeout(); ReceiveReplicatedEntry(entry); return Task.CompletedTask; } public async Task CreateSnapshotAsync(CancellationToken ct) { var snapshot = new RaftSnapshot { LastIncludedIndex = AppliedIndex, LastIncludedTerm = Term, }; await _snapshotStore.SaveAsync(snapshot, ct); return snapshot; } public Task InstallSnapshotAsync(RaftSnapshot snapshot, CancellationToken ct) { Log.ReplaceWithSnapshot(snapshot); AppliedIndex = snapshot.LastIncludedIndex; return _snapshotStore.SaveAsync(snapshot, ct); } public void RequestStepDown() { Role = RaftRole.Follower; _votesReceived = 0; TermState.VotedFor = null; } // B2: Election timer management // Go reference: raft.go:1400-1450 (resetElectionTimeout) /// /// Resets the election timeout timer with a new randomized interval. /// Called on heartbeat receipt and append entries from leader. /// public void ResetElectionTimeout() { var timeout = Random.Shared.Next(ElectionTimeoutMinMs, ElectionTimeoutMaxMs + 1); _electionTimer?.Change(timeout, Timeout.Infinite); } /// /// Starts the background election timer. When it fires and this node is a Follower, /// an election campaign is triggered automatically. /// Go reference: raft.go:1500-1550 (campaign logic). /// public void StartElectionTimer(CancellationToken ct = default) { _electionTimerCts = CancellationTokenSource.CreateLinkedTokenSource(ct); var timeout = Random.Shared.Next(ElectionTimeoutMinMs, ElectionTimeoutMaxMs + 1); _electionTimer = new Timer(ElectionTimerCallback, null, timeout, Timeout.Infinite); } /// /// Stops and disposes the election timer. /// public void StopElectionTimer() { _electionTimer?.Dispose(); _electionTimer = null; _electionTimerCts?.Cancel(); _electionTimerCts?.Dispose(); _electionTimerCts = null; } /// /// Bypasses the election timer and immediately starts an election campaign. /// Useful for testing. /// public void CampaignImmediately() { var clusterSize = _cluster.Count > 0 ? _cluster.Count : _members.Count; StartElection(clusterSize); } private void ElectionTimerCallback(object? state) { if (_electionTimerCts?.IsCancellationRequested == true) return; if (Role == RaftRole.Follower) { // B6: Use pre-vote when enabled to avoid disrupting the cluster CampaignWithPreVote(); } else { // Re-arm the timer for non-follower states so it can fire again // if the node transitions back to follower. ResetElectionTimeout(); } } // B3: Peer state accessors /// /// Returns a read-only view of all tracked peer states. /// public IReadOnlyDictionary GetPeerStates() => _peerStates; /// /// Checks if this node's log is current (within one election timeout of the leader). /// Go reference: raft.go isCurrent check. /// public bool IsCurrent(TimeSpan electionTimeout) { // A leader is always current if (Role == RaftRole.Leader) return true; // Check if any peer (which could be the leader) has contacted us recently return _peerStates.Values.Any(p => p.IsCurrent(electionTimeout)); } /// /// Overall health check: node is active and peers are responsive. /// public bool IsHealthy(TimeSpan healthThreshold) { if (Role == RaftRole.Leader) { // Leader is healthy if a majority of peers are responsive var healthyPeers = _peerStates.Values.Count(p => p.IsHealthy(healthThreshold)); var quorum = (_peerStates.Count + 1) / 2; // +1 for self return healthyPeers >= quorum; } // Follower/candidate: healthy if at least one peer (the leader) is responsive return _peerStates.Values.Any(p => p.IsHealthy(healthThreshold)); } // B6: Pre-vote protocol implementation // Go reference: raft.go:1600-1700 (pre-vote logic) /// /// Evaluates a pre-vote request from a candidate. Grants the pre-vote if the /// candidate's log is at least as up-to-date as this node's log and the candidate's /// term is at least as high as the current term. /// Pre-votes do NOT change any persistent state (no term increment, no votedFor change). /// Go reference: raft.go:1600-1700 (pre-vote logic). /// public bool RequestPreVote(ulong term, ulong lastTerm, ulong lastIndex, string candidateId) { _ = candidateId; // used for logging in production; not needed for correctness // Deny if candidate's term is behind ours if ((int)term < TermState.CurrentTerm) return false; // Check if candidate's log is at least as up-to-date as ours var ourLastTerm = Log.Entries.Count > 0 ? (ulong)Log.Entries[^1].Term : 0UL; var ourLastIndex = Log.Entries.Count > 0 ? (ulong)Log.Entries[^1].Index : 0UL; // Candidate's log is at least as up-to-date if: // (1) candidate's last term > our last term, OR // (2) candidate's last term == our last term AND candidate's last index >= our last index if (lastTerm > ourLastTerm) return true; if (lastTerm == ourLastTerm && lastIndex >= ourLastIndex) return true; return false; } /// /// Conducts a pre-vote round among cluster peers without incrementing the term. /// Returns true if a majority of peers granted the pre-vote, meaning this node /// should proceed to a real election. /// Go reference: raft.go:1600-1700 (pre-vote logic). /// public bool StartPreVote() { var clusterSize = _cluster.Count > 0 ? _cluster.Count : _members.Count; var preVotesGranted = 1; // vote for self var ourLastTerm = Log.Entries.Count > 0 ? (ulong)Log.Entries[^1].Term : 0UL; var ourLastIndex = Log.Entries.Count > 0 ? (ulong)Log.Entries[^1].Index : 0UL; // Send pre-vote requests to all peers (without incrementing our term) foreach (var peer in _cluster.Where(n => !string.Equals(n.Id, Id, StringComparison.Ordinal))) { if (peer.RequestPreVote((ulong)TermState.CurrentTerm, ourLastTerm, ourLastIndex, Id)) preVotesGranted++; } var quorum = (clusterSize / 2) + 1; return preVotesGranted >= quorum; } /// /// Starts an election campaign, optionally preceded by a pre-vote round. /// When PreVoteEnabled is true, the node first conducts a pre-vote round. /// If the pre-vote fails, the node stays as a follower without incrementing its term. /// Go reference: raft.go:1600-1700 (pre-vote), raft.go:1500-1550 (campaign). /// public void CampaignWithPreVote() { var clusterSize = _cluster.Count > 0 ? _cluster.Count : _members.Count; if (PreVoteEnabled && _cluster.Count > 0) { // Pre-vote round: test if we would win without incrementing term if (!StartPreVote()) return; // Pre-vote failed, stay as follower — don't disrupt cluster } // Pre-vote succeeded (or disabled), proceed to real election StartElection(clusterSize); } private void TryBecomeLeader(int clusterSize) { var quorum = (clusterSize / 2) + 1; if (_votesReceived >= quorum) Role = RaftRole.Leader; } public async Task PersistAsync(CancellationToken ct) { var dir = _persistDirectory ?? Path.Combine(Path.GetTempPath(), "natsdotnet-raft", Id); Directory.CreateDirectory(dir); await Log.PersistAsync(Path.Combine(dir, "log.json"), ct); await File.WriteAllTextAsync(Path.Combine(dir, "applied.txt"), AppliedIndex.ToString(), ct); // Persist term and VotedFor together in meta.json for atomic durable state. // Go reference: raft.go storeMeta / writeTermVote (term + votedFor written atomically) var meta = new RaftMetaState { CurrentTerm = TermState.CurrentTerm, VotedFor = TermState.VotedFor, }; await File.WriteAllTextAsync( Path.Combine(dir, "meta.json"), System.Text.Json.JsonSerializer.Serialize(meta), ct); } public async Task LoadPersistedStateAsync(CancellationToken ct) { var dir = _persistDirectory ?? Path.Combine(Path.GetTempPath(), "natsdotnet-raft", Id); Log = await RaftLog.LoadAsync(Path.Combine(dir, "log.json"), ct); // Load from meta.json first (includes VotedFor); fall back to legacy term.txt var metaPath = Path.Combine(dir, "meta.json"); if (File.Exists(metaPath)) { var json = await File.ReadAllTextAsync(metaPath, ct); var meta = System.Text.Json.JsonSerializer.Deserialize(json); if (meta is not null) { TermState.CurrentTerm = meta.CurrentTerm; TermState.VotedFor = meta.VotedFor; } } else { // Legacy: term.txt only (no VotedFor) var termPath = Path.Combine(dir, "term.txt"); if (File.Exists(termPath) && int.TryParse(await File.ReadAllTextAsync(termPath, ct), out var term)) TermState.CurrentTerm = term; } var appliedPath = Path.Combine(dir, "applied.txt"); if (File.Exists(appliedPath) && long.TryParse(await File.ReadAllTextAsync(appliedPath, ct), out var applied)) AppliedIndex = applied; else if (Log.Entries.Count > 0) AppliedIndex = Log.Entries[^1].Index; } /// Durable term + vote metadata written alongside the log. private sealed class RaftMetaState { public int CurrentTerm { get; set; } public string? VotedFor { get; set; } } public void Dispose() { StopElectionTimer(); } }