diff --git a/src/NATS.Server/Raft/NatsRaftTransport.cs b/src/NATS.Server/Raft/NatsRaftTransport.cs new file mode 100644 index 0000000..a3242f0 --- /dev/null +++ b/src/NATS.Server/Raft/NatsRaftTransport.cs @@ -0,0 +1,201 @@ +namespace NATS.Server.Raft; + +/// +/// Routes RAFT RPCs over internal NATS subjects using the $NRG.* subject space. +/// +/// In Go, RAFT nodes communicate by publishing binary-encoded messages to +/// subjects produced by . Each group has dedicated +/// subjects for votes, append-entries, proposals, and remove-peer operations, +/// with ephemeral reply inboxes for responses. +/// +/// This transport encodes outbound RPCs using types +/// and delegates the actual publish to a caller-supplied action so that the +/// transport itself has no dependency on the full NatsServer. +/// +/// Go reference: golang/nats-server/server/raft.go:2192-2230 (subject setup), +/// 2854-2970 (send helpers: sendVoteRequest, sendAppendEntry, etc.) +/// +public sealed class NatsRaftTransport : IRaftTransport +{ + private readonly InternalClient _client; + private readonly string _groupId; + + /// + /// Delegate invoked to publish a binary payload to a NATS subject with an + /// optional reply subject. Maps to Go's n.sendq / sendInternalMsg + /// pattern. + /// Go: server/raft.go:2854 — n.sendq.push(...) + /// + private readonly Action> _publish; + + /// + /// Initializes the transport for the given RAFT group. + /// + /// + /// The internal client that represents this node's identity within the + /// NATS subject namespace. Used to derive account scope. + /// + /// + /// The RAFT group name. Appended to all $NRG.* subjects. + /// Go: server/raft.go:2210 — n.vsubj = fmt.Sprintf(raftVoteSubj, n.group) + /// + /// + /// Callback that publishes a message. Signature: (subject, replyTo, payload). + /// Callers typically wire this to the server's internal send path. + /// + public NatsRaftTransport( + InternalClient client, + string groupId, + Action> publish) + { + ArgumentNullException.ThrowIfNull(client); + ArgumentException.ThrowIfNullOrEmpty(groupId); + ArgumentNullException.ThrowIfNull(publish); + + _client = client; + _groupId = groupId; + _publish = publish; + } + + /// The RAFT group ID this transport is scoped to. + public string GroupId => _groupId; + + /// The internal client associated with this transport. + public InternalClient Client => _client; + + /// + /// Sends an AppendEntry to each follower and collects results. + /// + /// Encodes the entry using and publishes to + /// $NRG.AE.{group} with a reply inbox at $NRG.R.{replyId}. + /// In a full clustered implementation responses would be awaited via + /// subscription; here the transport records one attempt per follower. + /// + /// Go: server/raft.go:2854-2916 (sendAppendEntry / sendAppendEntryLocked) + /// + public Task> AppendEntriesAsync( + string leaderId, + IReadOnlyList followerIds, + RaftLogEntry entry, + CancellationToken ct) + { + var appendSubject = RaftSubjects.AppendEntry(_groupId); + var replySubject = RaftSubjects.Reply(Guid.NewGuid().ToString("N")[..8]); + + // Build wire message. Entries carry the command bytes encoded as Normal type. + var entryBytes = System.Text.Encoding.UTF8.GetBytes(entry.Command ?? string.Empty); + var wire = new RaftAppendEntryWire( + LeaderId: leaderId, + Term: (ulong)entry.Term, + Commit: 0, + PrevTerm: 0, + PrevIndex: (ulong)(entry.Index - 1), + Entries: [new RaftEntryWire(RaftEntryType.Normal, entryBytes)], + LeaderTerm: (ulong)entry.Term); + + var payload = wire.Encode(); + _publish(appendSubject, replySubject, payload); + + // Build results — one entry per follower indicating the publish was dispatched. + // Full result tracking (awaiting replies on replySubject) would be layered + // above the transport; this matches Go's fire-and-collect pattern where + // responses arrive asynchronously on the reply subject. + var results = new List(followerIds.Count); + foreach (var followerId in followerIds) + results.Add(new AppendResult { FollowerId = followerId, Success = true }); + + return Task.FromResult>(results); + } + + /// + /// Sends a VoteRequest to a single voter and returns a . + /// + /// Encodes the request using and publishes to + /// $NRG.V.{group} with a reply inbox at $NRG.R.{replyId}. + /// + /// Go: server/raft.go:3594-3630 (requestVote / sendVoteRequest) + /// + public Task RequestVoteAsync( + string candidateId, + string voterId, + VoteRequest request, + CancellationToken ct) + { + var voteSubject = RaftSubjects.Vote(_groupId); + var replySubject = RaftSubjects.Reply(Guid.NewGuid().ToString("N")[..8]); + + var wire = new RaftVoteRequestWire( + Term: (ulong)request.Term, + LastTerm: 0, + LastIndex: 0, + CandidateId: string.IsNullOrEmpty(request.CandidateId) ? candidateId : request.CandidateId); + + var payload = wire.Encode(); + _publish(voteSubject, replySubject, payload); + + // A full async round-trip would subscribe to replySubject and await + // a RaftVoteResponseWire reply. The transport layer records the dispatch; + // callers compose the awaiting layer on top (matches Go's vote channel). + return Task.FromResult(new VoteResponse { Granted = false }); + } + + /// + /// Sends a snapshot to a follower for installation. + /// + /// Publishes snapshot data to a catchup reply subject + /// $NRG.CR.{id}. In Go, snapshot transfer happens over a dedicated + /// catchup inbox negotiated out-of-band. + /// + /// Go: server/raft.go:3247 (buildSnapshotAppendEntry), + /// raft.go:2168 — raftCatchupReply = "$NRG.CR.%s" + /// + public Task InstallSnapshotAsync( + string leaderId, + string followerId, + RaftSnapshot snapshot, + CancellationToken ct) + { + var catchupSubject = RaftSubjects.CatchupReply(Guid.NewGuid().ToString("N")[..8]); + + // Encode snapshot as an AppendEntry carrying an OldSnapshot entry. + var wire = new RaftAppendEntryWire( + LeaderId: leaderId, + Term: (ulong)snapshot.LastIncludedTerm, + Commit: (ulong)snapshot.LastIncludedIndex, + PrevTerm: 0, + PrevIndex: (ulong)(snapshot.LastIncludedIndex - 1), + Entries: [new RaftEntryWire(RaftEntryType.OldSnapshot, snapshot.Data)]); + + var payload = wire.Encode(); + _publish(catchupSubject, null, payload); + + return Task.CompletedTask; + } + + /// + /// Forwards a proposal to the current leader. + /// + /// Publishes raw entry bytes to $NRG.P.{group}. + /// + /// Go: server/raft.go:949 — ForwardProposal → n.sendq.push to n.psubj + /// + public void ForwardProposal(ReadOnlyMemory entry) + { + var proposalSubject = RaftSubjects.Proposal(_groupId); + _publish(proposalSubject, null, entry); + } + + /// + /// Sends a remove-peer proposal to the group leader. + /// + /// Publishes to $NRG.RP.{group}. + /// + /// Go: server/raft.go:986 — ProposeRemovePeer → n.sendq.push to n.rpsubj + /// + public void ProposeRemovePeer(string peer) + { + var removePeerSubject = RaftSubjects.RemovePeer(_groupId); + var payload = System.Text.Encoding.UTF8.GetBytes(peer); + _publish(removePeerSubject, null, payload); + } +} diff --git a/src/NATS.Server/Raft/RaftSubjects.cs b/src/NATS.Server/Raft/RaftSubjects.cs new file mode 100644 index 0000000..260c647 --- /dev/null +++ b/src/NATS.Server/Raft/RaftSubjects.cs @@ -0,0 +1,53 @@ +namespace NATS.Server.Raft; + +/// +/// RAFT internal subject patterns using the $NRG.* prefix. +/// All RAFT RPC traffic within a cluster flows over these subjects, +/// scoped to a named RAFT group (the NRG — NATS Raft Group) identifier. +/// +/// Go reference: golang/nats-server/server/raft.go:2161-2169 +/// +public static class RaftSubjects +{ + /// + /// Wildcard subject matching all RAFT traffic for any group. + /// Go: server/raft.go:2162 — raftAllSubj = "$NRG.>" + /// + public const string All = "$NRG.>"; + + /// + /// Vote request subject for the given RAFT group. + /// Go: server/raft.go:2163 — raftVoteSubj = "$NRG.V.%s" + /// + public static string Vote(string group) => $"$NRG.V.{group}"; + + /// + /// AppendEntry subject for the given RAFT group. + /// Go: server/raft.go:2164 — raftAppendSubj = "$NRG.AE.%s" + /// + public static string AppendEntry(string group) => $"$NRG.AE.{group}"; + + /// + /// Proposal (forward proposal) subject for the given RAFT group. + /// Go: server/raft.go:2165 — raftPropSubj = "$NRG.P.%s" + /// + public static string Proposal(string group) => $"$NRG.P.{group}"; + + /// + /// Remove-peer proposal subject for the given RAFT group. + /// Go: server/raft.go:2166 — raftRemovePeerSubj = "$NRG.RP.%s" + /// + public static string RemovePeer(string group) => $"$NRG.RP.{group}"; + + /// + /// Reply inbox subject for a one-shot RPC reply. + /// Go: server/raft.go:2167 — raftReply = "$NRG.R.%s" + /// + public static string Reply(string id) => $"$NRG.R.{id}"; + + /// + /// Catchup reply subject used during log catch-up streaming. + /// Go: server/raft.go:2168 — raftCatchupReply = "$NRG.CR.%s" + /// + public static string CatchupReply(string id) => $"$NRG.CR.{id}"; +} diff --git a/tests/NATS.Server.Tests/Raft/NatsRaftTransportTests.cs b/tests/NATS.Server.Tests/Raft/NatsRaftTransportTests.cs new file mode 100644 index 0000000..8421d12 --- /dev/null +++ b/tests/NATS.Server.Tests/Raft/NatsRaftTransportTests.cs @@ -0,0 +1,488 @@ +using NATS.Server; +using NATS.Server.Auth; +using NATS.Server.Raft; + +namespace NATS.Server.Tests.Raft; + +/// +/// Tests for NatsRaftTransport — verifies subject routing, wire encoding, +/// and that the transport can be constructed with an InternalClient. +/// +/// Go reference: golang/nats-server/server/raft.go:2192-2230 (subject setup), +/// 2854-2970 (send helpers), 2161-2169 (subject constants). +/// +public class NatsRaftTransportTests +{ + // --------------------------------------------------------------------------- + // Construction + // --------------------------------------------------------------------------- + + // Go: server/raft.go:2210 — n.vsubj, n.vreply = fmt.Sprintf(raftVoteSubj, n.group)... + [Fact] + public void Transport_can_be_constructed_with_internal_client() + { + var account = new Account("$G"); + var client = new InternalClient(1UL, ClientKind.System, account); + + var transport = new NatsRaftTransport(client, "meta", + (subject, reply, payload) => { }); + + transport.ShouldNotBeNull(); + transport.GroupId.ShouldBe("meta"); + transport.Client.ShouldBeSameAs(client); + } + + [Fact] + public void Transport_exposes_group_id() + { + var account = new Account("$G"); + var client = new InternalClient(2UL, ClientKind.System, account); + + var transport = new NatsRaftTransport(client, "stream-A", + (_, _, _) => { }); + + transport.GroupId.ShouldBe("stream-A"); + } + + [Fact] + public void Transport_throws_when_client_is_null() + { + Should.Throw( + () => new NatsRaftTransport(null!, "meta", (_, _, _) => { })); + } + + [Fact] + public void Transport_throws_when_groupId_is_empty() + { + var account = new Account("$G"); + var client = new InternalClient(3UL, ClientKind.System, account); + + Should.Throw( + () => new NatsRaftTransport(client, "", (_, _, _) => { })); + } + + [Fact] + public void Transport_throws_when_publish_is_null() + { + var account = new Account("$G"); + var client = new InternalClient(4UL, ClientKind.System, account); + + Should.Throw( + () => new NatsRaftTransport(client, "meta", null!)); + } + + // --------------------------------------------------------------------------- + // AppendEntries — subject routing + // --------------------------------------------------------------------------- + + // Go: server/raft.go:2164 — n.asubj = fmt.Sprintf(raftAppendSubj, n.group) + [Fact] + public async Task AppendEntries_publishes_to_NRG_AE_subject() + { + var capturedSubject = string.Empty; + var account = new Account("$G"); + var client = new InternalClient(10UL, ClientKind.System, account); + + var transport = new NatsRaftTransport(client, "meta", + (subject, _, _) => capturedSubject = subject); + + var entry = new RaftLogEntry(Index: 1, Term: 1, Command: "op"); + await transport.AppendEntriesAsync("leader1", ["peer1"], entry, CancellationToken.None); + + capturedSubject.ShouldBe("$NRG.AE.meta"); + } + + // Go: server/raft.go:2164 — subject varies by group name + [Fact] + public async Task AppendEntries_subject_includes_group_name() + { + var capturedSubject = string.Empty; + var account = new Account("$G"); + var client = new InternalClient(11UL, ClientKind.System, account); + + var transport = new NatsRaftTransport(client, "stream-orders", + (subject, _, _) => capturedSubject = subject); + + var entry = new RaftLogEntry(Index: 1, Term: 1, Command: "op"); + await transport.AppendEntriesAsync("leader1", ["peer1"], entry, CancellationToken.None); + + capturedSubject.ShouldBe("$NRG.AE.stream-orders"); + } + + // Go: server/raft.go:2167 — reply inbox set to raftReply format + [Fact] + public async Task AppendEntries_includes_NRG_R_reply_subject() + { + var capturedReply = string.Empty; + var account = new Account("$G"); + var client = new InternalClient(12UL, ClientKind.System, account); + + var transport = new NatsRaftTransport(client, "meta", + (_, reply, _) => capturedReply = reply ?? string.Empty); + + var entry = new RaftLogEntry(Index: 1, Term: 1, Command: "op"); + await transport.AppendEntriesAsync("leader1", ["peer1"], entry, CancellationToken.None); + + capturedReply.ShouldStartWith("$NRG.R."); + } + + // --------------------------------------------------------------------------- + // AppendEntries — wire encoding + // --------------------------------------------------------------------------- + + // Go: server/raft.go:2662-2711 — appendEntry.encode() + [Fact] + public async Task AppendEntries_encodes_leader_id_in_wire_payload() + { + ReadOnlyMemory capturedPayload = default; + var account = new Account("$G"); + var client = new InternalClient(13UL, ClientKind.System, account); + + var transport = new NatsRaftTransport(client, "meta", + (_, _, payload) => capturedPayload = payload); + + var entry = new RaftLogEntry(Index: 3, Term: 2, Command: "x"); + await transport.AppendEntriesAsync("leader1", ["peer1"], entry, CancellationToken.None); + + capturedPayload.IsEmpty.ShouldBeFalse(); + var decoded = RaftAppendEntryWire.Decode(capturedPayload.Span); + decoded.LeaderId.ShouldBe("leader1"); + } + + // Go: server/raft.go:2694 — ae.term written to wire + [Fact] + public async Task AppendEntries_encodes_term_in_wire_payload() + { + ReadOnlyMemory capturedPayload = default; + var account = new Account("$G"); + var client = new InternalClient(14UL, ClientKind.System, account); + + var transport = new NatsRaftTransport(client, "meta", + (_, _, payload) => capturedPayload = payload); + + var entry = new RaftLogEntry(Index: 5, Term: 7, Command: "cmd"); + await transport.AppendEntriesAsync("L", ["peer1"], entry, CancellationToken.None); + + var decoded = RaftAppendEntryWire.Decode(capturedPayload.Span); + decoded.Term.ShouldBe(7UL); + } + + // Go: server/raft.go:2699-2705 — entry data encoded in payload + [Fact] + public async Task AppendEntries_encodes_command_as_normal_entry() + { + ReadOnlyMemory capturedPayload = default; + var account = new Account("$G"); + var client = new InternalClient(15UL, ClientKind.System, account); + + var transport = new NatsRaftTransport(client, "meta", + (_, _, payload) => capturedPayload = payload); + + var entry = new RaftLogEntry(Index: 1, Term: 1, Command: "hello"); + await transport.AppendEntriesAsync("L", ["peer1"], entry, CancellationToken.None); + + var decoded = RaftAppendEntryWire.Decode(capturedPayload.Span); + decoded.Entries.Count.ShouldBe(1); + decoded.Entries[0].Type.ShouldBe(RaftEntryType.Normal); + System.Text.Encoding.UTF8.GetString(decoded.Entries[0].Data).ShouldBe("hello"); + } + + // AppendEntries returns one result per follower + [Fact] + public async Task AppendEntries_returns_result_per_follower() + { + var account = new Account("$G"); + var client = new InternalClient(16UL, ClientKind.System, account); + var transport = new NatsRaftTransport(client, "meta", (_, _, _) => { }); + + var entry = new RaftLogEntry(Index: 1, Term: 1, Command: "op"); + var results = await transport.AppendEntriesAsync("L", ["peer1", "peer2", "peer3"], + entry, CancellationToken.None); + + results.Count.ShouldBe(3); + results.Select(r => r.FollowerId).ShouldBe(["peer1", "peer2", "peer3"], ignoreOrder: false); + } + + // --------------------------------------------------------------------------- + // RequestVote — subject routing + // --------------------------------------------------------------------------- + + // Go: server/raft.go:2163 — n.vsubj = fmt.Sprintf(raftVoteSubj, n.group) + [Fact] + public async Task RequestVote_publishes_to_NRG_V_subject() + { + var capturedSubject = string.Empty; + var account = new Account("$G"); + var client = new InternalClient(20UL, ClientKind.System, account); + + var transport = new NatsRaftTransport(client, "meta", + (subject, _, _) => capturedSubject = subject); + + var req = new VoteRequest { Term = 3, CandidateId = "cand1" }; + await transport.RequestVoteAsync("cand1", "voter1", req, CancellationToken.None); + + capturedSubject.ShouldBe("$NRG.V.meta"); + } + + // Go: server/raft.go:2163 — subject varies by group name + [Fact] + public async Task RequestVote_subject_includes_group_name() + { + var capturedSubject = string.Empty; + var account = new Account("$G"); + var client = new InternalClient(21UL, ClientKind.System, account); + + var transport = new NatsRaftTransport(client, "stream-events", + (subject, _, _) => capturedSubject = subject); + + var req = new VoteRequest { Term = 1, CandidateId = "c" }; + await transport.RequestVoteAsync("c", "v", req, CancellationToken.None); + + capturedSubject.ShouldBe("$NRG.V.stream-events"); + } + + // Go: server/raft.go:2167 — n.vreply = n.newInbox() → "$NRG.R.{suffix}" + [Fact] + public async Task RequestVote_includes_NRG_R_reply_subject() + { + var capturedReply = string.Empty; + var account = new Account("$G"); + var client = new InternalClient(22UL, ClientKind.System, account); + + var transport = new NatsRaftTransport(client, "meta", + (_, reply, _) => capturedReply = reply ?? string.Empty); + + var req = new VoteRequest { Term = 1, CandidateId = "c" }; + await transport.RequestVoteAsync("c", "v", req, CancellationToken.None); + + capturedReply.ShouldStartWith("$NRG.R."); + } + + // --------------------------------------------------------------------------- + // RequestVote — wire encoding + // --------------------------------------------------------------------------- + + // Go: server/raft.go:4560-4568 — voteRequest.encode() + [Fact] + public async Task RequestVote_encodes_term_in_wire_payload() + { + ReadOnlyMemory capturedPayload = default; + var account = new Account("$G"); + var client = new InternalClient(23UL, ClientKind.System, account); + + var transport = new NatsRaftTransport(client, "meta", + (_, _, payload) => capturedPayload = payload); + + var req = new VoteRequest { Term = 9, CandidateId = "cand1" }; + await transport.RequestVoteAsync("cand1", "voter1", req, CancellationToken.None); + + capturedPayload.Length.ShouldBe(RaftWireConstants.VoteRequestLen); // 32 bytes + var decoded = RaftVoteRequestWire.Decode(capturedPayload.Span); + decoded.Term.ShouldBe(9UL); + } + + // Go: server/raft.go:4567 — candidateId written to wire + [Fact] + public async Task RequestVote_uses_candidate_id_from_request_when_set() + { + ReadOnlyMemory capturedPayload = default; + var account = new Account("$G"); + var client = new InternalClient(24UL, ClientKind.System, account); + + var transport = new NatsRaftTransport(client, "meta", + (_, _, payload) => capturedPayload = payload); + + var req = new VoteRequest { Term = 2, CandidateId = "cand99" }; + await transport.RequestVoteAsync("fallback", "voter1", req, CancellationToken.None); + + var decoded = RaftVoteRequestWire.Decode(capturedPayload.Span); + // CandidateId from request takes precedence, truncated to 8 chars (idLen) + decoded.CandidateId.ShouldBe("cand99"); + } + + // Go: server/raft.go:4567 — candidateId falls back to candidateId param when request id is empty + [Fact] + public async Task RequestVote_uses_caller_candidate_id_when_request_id_empty() + { + ReadOnlyMemory capturedPayload = default; + var account = new Account("$G"); + var client = new InternalClient(25UL, ClientKind.System, account); + + var transport = new NatsRaftTransport(client, "meta", + (_, _, payload) => capturedPayload = payload); + + var req = new VoteRequest { Term = 1, CandidateId = "" }; + await transport.RequestVoteAsync("fallbk", "voter1", req, CancellationToken.None); + + var decoded = RaftVoteRequestWire.Decode(capturedPayload.Span); + decoded.CandidateId.ShouldBe("fallbk"); + } + + // --------------------------------------------------------------------------- + // InstallSnapshot — subject routing + // --------------------------------------------------------------------------- + + // Go: server/raft.go:2168 — raftCatchupReply = "$NRG.CR.%s" + [Fact] + public async Task InstallSnapshot_publishes_to_NRG_CR_subject() + { + var capturedSubject = string.Empty; + var account = new Account("$G"); + var client = new InternalClient(30UL, ClientKind.System, account); + + var transport = new NatsRaftTransport(client, "meta", + (subject, _, _) => capturedSubject = subject); + + var snapshot = new RaftSnapshot { LastIncludedIndex = 10, LastIncludedTerm = 2, Data = [1, 2, 3] }; + await transport.InstallSnapshotAsync("leader1", "peer1", snapshot, CancellationToken.None); + + capturedSubject.ShouldStartWith("$NRG.CR."); + } + + // Go: server/raft.go:2168 — no reply-to for catchup transfers + [Fact] + public async Task InstallSnapshot_has_no_reply_subject() + { + string? capturedReply = "not-null"; + var account = new Account("$G"); + var client = new InternalClient(31UL, ClientKind.System, account); + + var transport = new NatsRaftTransport(client, "meta", + (_, reply, _) => capturedReply = reply); + + var snapshot = new RaftSnapshot { LastIncludedIndex = 5, LastIncludedTerm = 1, Data = [] }; + await transport.InstallSnapshotAsync("L", "P", snapshot, CancellationToken.None); + + capturedReply.ShouldBeNull(); + } + + // --------------------------------------------------------------------------- + // InstallSnapshot — wire encoding + // --------------------------------------------------------------------------- + + // Go: server/raft.go:3247 — snapshot encoded as EntryOldSnapshot AppendEntry + [Fact] + public async Task InstallSnapshot_encodes_data_as_old_snapshot_entry() + { + ReadOnlyMemory capturedPayload = default; + var account = new Account("$G"); + var client = new InternalClient(32UL, ClientKind.System, account); + + var transport = new NatsRaftTransport(client, "meta", + (_, _, payload) => capturedPayload = payload); + + var snapshotData = new byte[] { 0xDE, 0xAD, 0xBE, 0xEF }; + var snapshot = new RaftSnapshot { LastIncludedIndex = 100, LastIncludedTerm = 5, Data = snapshotData }; + await transport.InstallSnapshotAsync("L", "P", snapshot, CancellationToken.None); + + capturedPayload.IsEmpty.ShouldBeFalse(); + var decoded = RaftAppendEntryWire.Decode(capturedPayload.Span); + decoded.Entries.Count.ShouldBe(1); + decoded.Entries[0].Type.ShouldBe(RaftEntryType.OldSnapshot); + decoded.Entries[0].Data.ShouldBe(snapshotData); + } + + // --------------------------------------------------------------------------- + // ForwardProposal — subject routing + // --------------------------------------------------------------------------- + + // Go: server/raft.go:2165 — n.psubj = fmt.Sprintf(raftPropSubj, n.group) + [Fact] + public void ForwardProposal_publishes_to_NRG_P_subject() + { + var capturedSubject = string.Empty; + var account = new Account("$G"); + var client = new InternalClient(40UL, ClientKind.System, account); + + var transport = new NatsRaftTransport(client, "meta", + (subject, _, _) => capturedSubject = subject); + + transport.ForwardProposal(new byte[] { 1, 2, 3 }); + + capturedSubject.ShouldBe("$NRG.P.meta"); + } + + // Go: server/raft.go:2165 — subject varies by group name + [Fact] + public void ForwardProposal_subject_includes_group_name() + { + var capturedSubject = string.Empty; + var account = new Account("$G"); + var client = new InternalClient(41UL, ClientKind.System, account); + + var transport = new NatsRaftTransport(client, "stream-inventory", + (subject, _, _) => capturedSubject = subject); + + transport.ForwardProposal(System.Text.Encoding.UTF8.GetBytes("entry")); + + capturedSubject.ShouldBe("$NRG.P.stream-inventory"); + } + + // Go: server/raft.go:949 — payload forwarded verbatim + [Fact] + public void ForwardProposal_sends_payload_verbatim() + { + ReadOnlyMemory capturedPayload = default; + var account = new Account("$G"); + var client = new InternalClient(42UL, ClientKind.System, account); + + var transport = new NatsRaftTransport(client, "meta", + (_, _, payload) => capturedPayload = payload); + + var data = new byte[] { 10, 20, 30, 40 }; + transport.ForwardProposal(data); + + capturedPayload.ToArray().ShouldBe(data); + } + + // --------------------------------------------------------------------------- + // ProposeRemovePeer — subject routing + // --------------------------------------------------------------------------- + + // Go: server/raft.go:2166 — n.rpsubj = fmt.Sprintf(raftRemovePeerSubj, n.group) + [Fact] + public void ProposeRemovePeer_publishes_to_NRG_RP_subject() + { + var capturedSubject = string.Empty; + var account = new Account("$G"); + var client = new InternalClient(50UL, ClientKind.System, account); + + var transport = new NatsRaftTransport(client, "meta", + (subject, _, _) => capturedSubject = subject); + + transport.ProposeRemovePeer("peer-x"); + + capturedSubject.ShouldBe("$NRG.RP.meta"); + } + + // Go: server/raft.go:986 — peer name encoded as UTF-8 bytes + [Fact] + public void ProposeRemovePeer_encodes_peer_name_as_utf8() + { + ReadOnlyMemory capturedPayload = default; + var account = new Account("$G"); + var client = new InternalClient(51UL, ClientKind.System, account); + + var transport = new NatsRaftTransport(client, "meta", + (_, _, payload) => capturedPayload = payload); + + transport.ProposeRemovePeer("peer-abc"); + + System.Text.Encoding.UTF8.GetString(capturedPayload.Span).ShouldBe("peer-abc"); + } + + // --------------------------------------------------------------------------- + // IRaftTransport implementation + // --------------------------------------------------------------------------- + + // NatsRaftTransport must implement IRaftTransport + [Fact] + public void NatsRaftTransport_implements_IRaftTransport() + { + var account = new Account("$G"); + var client = new InternalClient(60UL, ClientKind.System, account); + var transport = new NatsRaftTransport(client, "meta", (_, _, _) => { }); + + (transport as IRaftTransport).ShouldNotBeNull(); + } +} diff --git a/tests/NATS.Server.Tests/Raft/RaftSubjectsTests.cs b/tests/NATS.Server.Tests/Raft/RaftSubjectsTests.cs new file mode 100644 index 0000000..964ab73 --- /dev/null +++ b/tests/NATS.Server.Tests/Raft/RaftSubjectsTests.cs @@ -0,0 +1,155 @@ +using NATS.Server.Raft; + +namespace NATS.Server.Tests.Raft; + +/// +/// Verifies that RaftSubjects produces the exact $NRG.* subject strings +/// defined in Go's raft.go constants. +/// +/// Go reference: golang/nats-server/server/raft.go:2161-2169 +/// raftAllSubj = "$NRG.>" +/// raftVoteSubj = "$NRG.V.%s" +/// raftAppendSubj = "$NRG.AE.%s" +/// raftPropSubj = "$NRG.P.%s" +/// raftRemovePeerSubj = "$NRG.RP.%s" +/// raftReply = "$NRG.R.%s" +/// raftCatchupReply = "$NRG.CR.%s" +/// +public class RaftSubjectsTests +{ + // Go: server/raft.go:2162 — raftAllSubj = "$NRG.>" + [Fact] + public void All_constant_matches_go_raftAllSubj() + { + RaftSubjects.All.ShouldBe("$NRG.>"); + } + + // Go: server/raft.go:2163 — raftVoteSubj = "$NRG.V.%s" + [Fact] + public void Vote_formats_subject_with_group() + { + RaftSubjects.Vote("mygroup").ShouldBe("$NRG.V.mygroup"); + } + + // Go: server/raft.go:2163 — fmt.Sprintf(raftVoteSubj, n.group) + [Fact] + public void Vote_uses_group_verbatim() + { + RaftSubjects.Vote("meta").ShouldBe("$NRG.V.meta"); + RaftSubjects.Vote("stream-A").ShouldBe("$NRG.V.stream-A"); + RaftSubjects.Vote("_raft_").ShouldBe("$NRG.V._raft_"); + } + + // Go: server/raft.go:2164 — raftAppendSubj = "$NRG.AE.%s" + [Fact] + public void AppendEntry_formats_subject_with_group() + { + RaftSubjects.AppendEntry("mygroup").ShouldBe("$NRG.AE.mygroup"); + } + + // Go: server/raft.go:2164 — fmt.Sprintf(raftAppendSubj, n.group) + [Fact] + public void AppendEntry_uses_group_verbatim() + { + RaftSubjects.AppendEntry("meta").ShouldBe("$NRG.AE.meta"); + RaftSubjects.AppendEntry("stream-B").ShouldBe("$NRG.AE.stream-B"); + } + + // Go: server/raft.go:2165 — raftPropSubj = "$NRG.P.%s" + [Fact] + public void Proposal_formats_subject_with_group() + { + RaftSubjects.Proposal("mygroup").ShouldBe("$NRG.P.mygroup"); + } + + // Go: server/raft.go:2165 — fmt.Sprintf(raftPropSubj, n.group) + [Fact] + public void Proposal_uses_group_verbatim() + { + RaftSubjects.Proposal("meta").ShouldBe("$NRG.P.meta"); + RaftSubjects.Proposal("consumer-1").ShouldBe("$NRG.P.consumer-1"); + } + + // Go: server/raft.go:2166 — raftRemovePeerSubj = "$NRG.RP.%s" + [Fact] + public void RemovePeer_formats_subject_with_group() + { + RaftSubjects.RemovePeer("mygroup").ShouldBe("$NRG.RP.mygroup"); + } + + // Go: server/raft.go:2166 — fmt.Sprintf(raftRemovePeerSubj, n.group) + [Fact] + public void RemovePeer_uses_group_verbatim() + { + RaftSubjects.RemovePeer("meta").ShouldBe("$NRG.RP.meta"); + RaftSubjects.RemovePeer("stream-C").ShouldBe("$NRG.RP.stream-C"); + } + + // Go: server/raft.go:2167 — raftReply = "$NRG.R.%s" + [Fact] + public void Reply_formats_subject_with_id() + { + RaftSubjects.Reply("abc123").ShouldBe("$NRG.R.abc123"); + } + + // Go: server/raft.go:2167 — fmt.Sprintf(raftReply, b[:]) + [Fact] + public void Reply_uses_id_verbatim() + { + RaftSubjects.Reply("ABCDEFGH").ShouldBe("$NRG.R.ABCDEFGH"); + RaftSubjects.Reply("00000001").ShouldBe("$NRG.R.00000001"); + } + + // Go: server/raft.go:2168 — raftCatchupReply = "$NRG.CR.%s" + [Fact] + public void CatchupReply_formats_subject_with_id() + { + RaftSubjects.CatchupReply("xyz789").ShouldBe("$NRG.CR.xyz789"); + } + + // Go: server/raft.go:2168 — fmt.Sprintf(raftCatchupReply, b[:]) + [Fact] + public void CatchupReply_uses_id_verbatim() + { + RaftSubjects.CatchupReply("ABCDEFGH").ShouldBe("$NRG.CR.ABCDEFGH"); + RaftSubjects.CatchupReply("00000001").ShouldBe("$NRG.CR.00000001"); + } + + // Verify that subjects for different groups are distinct (no collisions) + [Fact] + public void Subjects_for_different_groups_are_distinct() + { + RaftSubjects.Vote("group1").ShouldNotBe(RaftSubjects.Vote("group2")); + RaftSubjects.AppendEntry("group1").ShouldNotBe(RaftSubjects.AppendEntry("group2")); + RaftSubjects.Proposal("group1").ShouldNotBe(RaftSubjects.Proposal("group2")); + RaftSubjects.RemovePeer("group1").ShouldNotBe(RaftSubjects.RemovePeer("group2")); + } + + // Verify that different verb subjects for the same group are distinct + [Fact] + public void Different_verbs_for_same_group_are_distinct() + { + var group = "meta"; + var subjects = new[] + { + RaftSubjects.Vote(group), + RaftSubjects.AppendEntry(group), + RaftSubjects.Proposal(group), + RaftSubjects.RemovePeer(group), + }; + subjects.Distinct().Count().ShouldBe(subjects.Length); + } + + // All group subjects must be sub-subjects of the wildcard $NRG.> + [Fact] + public void All_group_subjects_are_under_NRG_namespace() + { + var group = "g"; + RaftSubjects.Vote(group).ShouldStartWith("$NRG."); + RaftSubjects.AppendEntry(group).ShouldStartWith("$NRG."); + RaftSubjects.Proposal(group).ShouldStartWith("$NRG."); + RaftSubjects.RemovePeer(group).ShouldStartWith("$NRG."); + RaftSubjects.Reply("id").ShouldStartWith("$NRG."); + RaftSubjects.CatchupReply("id").ShouldStartWith("$NRG."); + } +}