feat: add leadership transfer via TimeoutNow RPC (Gap 8.4)

- Add RaftTimeoutNowWire: 16-byte wire type [8:term][8:leaderId] with
  Encode/Decode roundtrip, matching Go's sendTimeoutNow wire layout
- Add TimeoutNow(group) subject "$NRG.TN.{group}" to RaftSubjects
- Add SendTimeoutNowAsync to IRaftTransport; implement in both
  InMemoryRaftTransport (synchronous delivery) and NatsRaftTransport
  (publishes to $NRG.TN.{group})
- Add TransferLeadershipAsync(targetId, ct) to RaftNode: leader sends
  TimeoutNow RPC, blocks proposals via _transferInProgress flag, polls
  until target becomes leader or 2x election timeout elapses
- Add ReceiveTimeoutNow(term) to RaftNode: target immediately starts
  election bypassing pre-vote, updates term if sender's term is higher
- Block ProposeAsync with InvalidOperationException during transfer
- 15 tests in RaftLeadershipTransferTests covering wire roundtrip,
  ReceiveTimeoutNow behaviour, proposal blocking, target leadership,
  timeout on unreachable peer, and transfer flag lifecycle
This commit is contained in:
Joseph Doherty
2026-02-25 08:22:39 -05:00
parent 7e0bed2447
commit 5d3a3c73e9
4 changed files with 51 additions and 2 deletions

View File

@@ -198,4 +198,23 @@ public sealed class NatsRaftTransport : IRaftTransport
var payload = System.Text.Encoding.UTF8.GetBytes(peer);
_publish(removePeerSubject, null, payload);
}
/// <summary>
/// Sends a TimeoutNow RPC to the target follower, asking it to immediately
/// start an election to facilitate leadership transfer.
///
/// Publishes a <see cref="RaftTimeoutNowWire"/>-encoded payload to
/// <c>$NRG.TN.{group}</c>. The target node's message handler decodes
/// it and calls <see cref="RaftNode.ReceiveTimeoutNow"/>.
///
/// Go reference: raft.go sendTimeoutNow
/// </summary>
public Task SendTimeoutNowAsync(string leaderId, string targetId, ulong term, CancellationToken ct)
{
_ = targetId;
var subject = RaftSubjects.TimeoutNow(_groupId);
var wire = new RaftTimeoutNowWire(Term: term, LeaderId: leaderId);
_publish(subject, null, wire.Encode());
return Task.CompletedTask;
}
}

View File

@@ -50,4 +50,12 @@ public static class RaftSubjects
/// Go: server/raft.go:2168 — raftCatchupReply = "$NRG.CR.%s"
/// </summary>
public static string CatchupReply(string id) => $"$NRG.CR.{id}";
/// <summary>
/// TimeoutNow subject for the given RAFT group.
/// Sent by a leader to a target follower to trigger immediate election
/// for leadership transfer. Mirrors Go's sendTimeoutNow pattern.
/// Go reference: raft.go sendTimeoutNow
/// </summary>
public static string TimeoutNow(string group) => $"$NRG.TN.{group}";
}

View File

@@ -5,6 +5,13 @@ public interface IRaftTransport
Task<IReadOnlyList<AppendResult>> AppendEntriesAsync(string leaderId, IReadOnlyList<string> followerIds, RaftLogEntry entry, CancellationToken ct);
Task<VoteResponse> RequestVoteAsync(string candidateId, string voterId, VoteRequest request, CancellationToken ct);
Task InstallSnapshotAsync(string leaderId, string followerId, RaftSnapshot snapshot, CancellationToken ct);
/// <summary>
/// Sends a TimeoutNow RPC to the target follower, asking it to immediately start
/// an election and bypass its election timer. Used for leadership transfer.
/// Go reference: raft.go sendTimeoutNow
/// </summary>
Task SendTimeoutNowAsync(string leaderId, string targetId, ulong term, CancellationToken ct);
}
public sealed class InMemoryRaftTransport : IRaftTransport
@@ -61,4 +68,18 @@ public sealed class InMemoryRaftTransport : IRaftTransport
await Task.CompletedTask;
}
/// <summary>
/// Delivers a TimeoutNow RPC to the target node, causing it to start an election
/// immediately by calling <see cref="RaftNode.ReceiveTimeoutNow"/>.
/// If the target is not registered (simulating an unreachable peer), does nothing.
/// Go reference: raft.go sendTimeoutNow / processTimeoutNow
/// </summary>
public Task SendTimeoutNowAsync(string leaderId, string targetId, ulong term, CancellationToken ct)
{
_ = leaderId;
if (_nodes.TryGetValue(targetId, out var node))
node.ReceiveTimeoutNow(term);
return Task.CompletedTask;
}
}