30-task plan across 4 gated phases: - Phase A (Foundation): Client, Parser, SubList, Server, Routes, Gateways, Leaf Nodes, Accounts - Phase B (Distributed Substrate): RAFT, Storage, Config/Reload, Monitoring - Phase C (JetStream Depth): Core, Clustering - Phase D (Protocol Surfaces): MQTT, JWT Includes concrete test code, TDD steps, and task dependency tracking.
64 KiB
Go-to-.NET Systematic Test Parity Implementation Plan
For Claude: REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans to implement this plan task-by-task.
Goal: Fill ~2,058 missing test gaps between the Go NATS server (3,451 tests) and the .NET port (454 tests) to achieve behavioral test parity across all 18 subsystems.
Architecture: Hybrid dependency-first in 4 gated phases: Foundation (client/routing/sublist) → Distributed Substrate (RAFT/storage) → JetStream Depth → Protocol Surfaces. Behavioral equivalence, not 1:1 count parity. Every Go test intent mapped or marked N/A. Subsystems within a phase are independent and parallelizable via subagents.
Tech Stack: .NET 10, C# 14, xUnit 3, Shouldly, System.IO.Pipelines, NSubstitute, NATS.Client.Core (integration tests), socket-based test fixtures.
Execution guardrails
- Use
@test-driven-developmentin every task. - Switch to
@systematic-debuggingwhen tests fail unexpectedly. - One commit per task.
- Run
@verification-before-completionbefore phase gate claims. - Use
model: "sonnet"for straightforward test porting tasks. Use default (opus) for complex protocol/concurrency tasks. - Parallel subagents within phases where subsystems don't share files.
Phase A: Foundation (~440 tests)
Gate: dotnet test passes with all new foundation tests. Client pub/sub, routing, gateway, leaf node basics verified.
Task 1: Port Client Basic Pub/Sub Tests
Files:
- Create:
tests/NATS.Server.Tests/ClientPubSubTests.cs - Reference:
golang/nats-server/server/client_test.go(TestClientSimplePubSub, TestClientPubSubNoEcho, TestClientSimplePubSubWithReply, TestClientNoBodyPubSubWithReply, TestClientPubWithQueueSub, TestClientPubWithQueueSubNoEcho)
Step 1: Write the failing tests
namespace NATS.Server.Tests;
public class ClientPubSubTests : IAsyncLifetime
{
private NatsServer _server = null!;
private int _port;
private readonly CancellationTokenSource _cts = new();
public async Task InitializeAsync()
{
_port = TestHelpers.GetFreePort();
_server = new NatsServer(new NatsOptions { Port = _port }, NullLoggerFactory.Instance);
_ = _server.StartAsync(_cts.Token);
await _server.WaitForReadyAsync();
}
public async Task DisposeAsync()
{
await _cts.CancelAsync();
_server.Dispose();
}
private async Task<Socket> ConnectRawAsync()
{
var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, _port);
// Read INFO
await ReadUntilAsync(sock, "\r\n");
return sock;
}
private static async Task<string> ReadUntilAsync(Socket sock, string expected, int timeoutMs = 5000)
{
using var cts = new CancellationTokenSource(timeoutMs);
var sb = new StringBuilder();
var buf = new byte[4096];
while (!sb.ToString().Contains(expected))
{
var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token);
if (n == 0) break;
sb.Append(Encoding.ASCII.GetString(buf, 0, n));
}
return sb.ToString();
}
[Fact]
public async Task Simple_pub_sub_delivers_message()
{
using var sock = await ConnectRawAsync();
await sock.SendAsync("CONNECT {}\r\nSUB foo 1\r\nPING\r\n"u8.ToArray());
await ReadUntilAsync(sock, "PONG\r\n");
await sock.SendAsync("PUB foo 5\r\nhello\r\nPING\r\n"u8.ToArray());
var response = await ReadUntilAsync(sock, "PONG\r\n");
response.ShouldContain("MSG foo 1 5\r\n");
response.ShouldContain("hello");
}
[Fact]
public async Task Pub_sub_no_echo_suppresses_own_messages()
{
using var sock = await ConnectRawAsync();
await sock.SendAsync(Encoding.ASCII.GetBytes(
"CONNECT {\"echo\":false}\r\nSUB foo 1\r\nPING\r\n"));
await ReadUntilAsync(sock, "PONG\r\n");
await sock.SendAsync("PUB foo 5\r\nhello\r\nPING\r\n"u8.ToArray());
var response = await ReadUntilAsync(sock, "PONG\r\n");
response.ShouldNotContain("MSG ");
}
[Fact]
public async Task Pub_sub_with_reply_subject()
{
using var sock = await ConnectRawAsync();
await sock.SendAsync("CONNECT {}\r\nSUB foo 1\r\nPING\r\n"u8.ToArray());
await ReadUntilAsync(sock, "PONG\r\n");
await sock.SendAsync("PUB foo bar 5\r\nhello\r\nPING\r\n"u8.ToArray());
var response = await ReadUntilAsync(sock, "PONG\r\n");
response.ShouldContain("MSG foo 1 bar 5\r\n");
}
[Fact]
public async Task Queue_sub_distributes_messages()
{
using var sock = await ConnectRawAsync();
await sock.SendAsync("CONNECT {}\r\nSUB foo g1 1\r\nSUB foo g1 2\r\nPING\r\n"u8.ToArray());
await ReadUntilAsync(sock, "PONG\r\n");
var sb = new StringBuilder();
for (var i = 0; i < 100; i++)
{
await sock.SendAsync("PUB foo 5\r\nhello\r\n"u8.ToArray());
}
await sock.SendAsync("PING\r\n"u8.ToArray());
var response = await ReadUntilAsync(sock, "PONG\r\n");
var sid1Count = Regex.Matches(response, @"MSG foo 1 5").Count;
var sid2Count = Regex.Matches(response, @"MSG foo 2 5").Count;
(sid1Count + sid2Count).ShouldBe(100);
sid1Count.ShouldBeGreaterThan(20);
sid2Count.ShouldBeGreaterThan(20);
}
[Fact]
public async Task Empty_body_pub_sub_with_reply()
{
using var sock = await ConnectRawAsync();
await sock.SendAsync("CONNECT {}\r\nSUB foo 1\r\nPING\r\n"u8.ToArray());
await ReadUntilAsync(sock, "PONG\r\n");
await sock.SendAsync("PUB foo bar 0\r\n\r\nPING\r\n"u8.ToArray());
var response = await ReadUntilAsync(sock, "PONG\r\n");
response.ShouldContain("MSG foo 1 bar 0\r\n");
}
}
Step 2: Run tests to verify they fail
Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ClientPubSubTests" -v minimal
Expected: FAIL — file does not exist yet, or tests fail if server behavior is incomplete.
Step 3: Create the test file and fix any server-side gaps
If any test fails due to missing server behavior (not just test infrastructure), port the minimal Go logic to the .NET server. The server implementation already covers basic pub/sub, so tests should mostly pass once created.
Step 4: Run tests to verify they pass
Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ClientPubSubTests" -v minimal
Expected: PASS (5 tests)
Step 5: Commit
git add tests/NATS.Server.Tests/ClientPubSubTests.cs
git commit -m "test: port client basic pub/sub tests from Go (6 scenarios)"
Task 2: Port Client UNSUB and Auto-Unsub Tests
Files:
- Create:
tests/NATS.Server.Tests/ClientUnsubTests.cs - Reference:
golang/nats-server/server/client_test.go(TestClientUnSub, TestClientUnSubMax, TestClientAutoUnsubExactReceived, TestClientUnsubAfterAutoUnsub, TestClientRemoveSubsOnDisconnect)
Step 1: Write the failing tests
namespace NATS.Server.Tests;
public class ClientUnsubTests : IAsyncLifetime
{
private NatsServer _server = null!;
private int _port;
private readonly CancellationTokenSource _cts = new();
public async Task InitializeAsync()
{
_port = TestHelpers.GetFreePort();
_server = new NatsServer(new NatsOptions { Port = _port }, NullLoggerFactory.Instance);
_ = _server.StartAsync(_cts.Token);
await _server.WaitForReadyAsync();
}
public async Task DisposeAsync()
{
await _cts.CancelAsync();
_server.Dispose();
}
private async Task<Socket> ConnectRawAsync()
{
var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, _port);
await ReadUntilAsync(sock, "\r\n");
return sock;
}
private static async Task<string> ReadUntilAsync(Socket sock, string expected, int timeoutMs = 5000)
{
using var cts = new CancellationTokenSource(timeoutMs);
var sb = new StringBuilder();
var buf = new byte[4096];
while (!sb.ToString().Contains(expected))
{
var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token);
if (n == 0) break;
sb.Append(Encoding.ASCII.GetString(buf, 0, n));
}
return sb.ToString();
}
[Fact]
public async Task Unsub_removes_subscription()
{
using var sock = await ConnectRawAsync();
await sock.SendAsync("CONNECT {}\r\nSUB foo 1\r\nSUB foo 2\r\nPING\r\n"u8.ToArray());
await ReadUntilAsync(sock, "PONG\r\n");
await sock.SendAsync("UNSUB 1\r\nPING\r\n"u8.ToArray());
await ReadUntilAsync(sock, "PONG\r\n");
await sock.SendAsync("PUB foo 5\r\nhello\r\nPING\r\n"u8.ToArray());
var response = await ReadUntilAsync(sock, "PONG\r\n");
var matches = Regex.Matches(response, @"MSG foo (\d+) 5");
matches.Count.ShouldBe(1);
matches[0].Groups[1].Value.ShouldBe("2");
}
[Fact]
public async Task Unsub_max_auto_removes_after_n_messages()
{
using var sock = await ConnectRawAsync();
await sock.SendAsync("CONNECT {}\r\nSUB foo 1\r\nUNSUB 1 5\r\nPING\r\n"u8.ToArray());
await ReadUntilAsync(sock, "PONG\r\n");
for (var i = 0; i < 10; i++)
await sock.SendAsync("PUB foo 5\r\nhello\r\n"u8.ToArray());
await sock.SendAsync("PING\r\n"u8.ToArray());
var response = await ReadUntilAsync(sock, "PONG\r\n");
Regex.Matches(response, @"MSG foo 1 5").Count.ShouldBe(5);
}
[Fact]
public async Task Unsub_after_auto_unsub_removes_immediately()
{
using var sock = await ConnectRawAsync();
await sock.SendAsync("CONNECT {}\r\nSUB foo 1\r\nUNSUB 1 100\r\nPING\r\n"u8.ToArray());
await ReadUntilAsync(sock, "PONG\r\n");
await sock.SendAsync("UNSUB 1\r\nPING\r\n"u8.ToArray());
await ReadUntilAsync(sock, "PONG\r\n");
await sock.SendAsync("PUB foo 5\r\nhello\r\nPING\r\n"u8.ToArray());
var response = await ReadUntilAsync(sock, "PONG\r\n");
response.ShouldNotContain("MSG ");
}
[Fact]
public async Task Disconnect_removes_all_subscriptions()
{
using var sub = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sub.ConnectAsync(IPAddress.Loopback, _port);
await ReadUntilAsync(sub, "\r\n");
await sub.SendAsync("CONNECT {}\r\nSUB foo 1\r\nPING\r\n"u8.ToArray());
await ReadUntilAsync(sub, "PONG\r\n");
sub.Shutdown(SocketShutdown.Both);
sub.Close();
await Task.Delay(200);
_server.Stats.CurrentSubscriptions.ShouldBe(0);
}
}
Step 2: Run tests to verify they fail
Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ClientUnsubTests" -v minimal
Expected: FAIL
Step 3: Create the test file and fix any server-side gaps
The server already implements UNSUB and auto-unsub. If Stats.CurrentSubscriptions doesn't exist, add it.
Step 4: Run tests to verify they pass
Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ClientUnsubTests" -v minimal
Expected: PASS (4 tests)
Step 5: Commit
git add tests/NATS.Server.Tests/ClientUnsubTests.cs
git commit -m "test: port client unsub and auto-unsub tests from Go (4 scenarios)"
Task 3: Port Client Header Tests
Files:
- Create:
tests/NATS.Server.Tests/ClientHeaderTests.cs - Reference:
golang/nats-server/server/client_test.go(TestClientHeaderDeliverMsg, TestClientHeaderDeliverStrippedMsg, TestClientHeaderDeliverQueueSubStrippedMsg, TestServerHeaderSupport, TestClientHeaderSupport)
Step 1: Write the failing tests
namespace NATS.Server.Tests;
public class ClientHeaderTests : IAsyncLifetime
{
private NatsServer _server = null!;
private int _port;
private readonly CancellationTokenSource _cts = new();
public async Task InitializeAsync()
{
_port = TestHelpers.GetFreePort();
_server = new NatsServer(new NatsOptions { Port = _port }, NullLoggerFactory.Instance);
_ = _server.StartAsync(_cts.Token);
await _server.WaitForReadyAsync();
}
public async Task DisposeAsync()
{
await _cts.CancelAsync();
_server.Dispose();
}
private async Task<Socket> ConnectWithHeadersAsync()
{
var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, _port);
await ReadUntilAsync(sock, "\r\n");
await sock.SendAsync(Encoding.ASCII.GetBytes(
"CONNECT {\"headers\":true,\"no_responders\":true}\r\n"));
return sock;
}
private static async Task<string> ReadUntilAsync(Socket sock, string expected, int timeoutMs = 5000)
{
using var cts = new CancellationTokenSource(timeoutMs);
var sb = new StringBuilder();
var buf = new byte[4096];
while (!sb.ToString().Contains(expected))
{
var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token);
if (n == 0) break;
sb.Append(Encoding.ASCII.GetString(buf, 0, n));
}
return sb.ToString();
}
[Fact]
public async Task Hpub_delivers_hmsg_with_headers()
{
using var sock = await ConnectWithHeadersAsync();
await sock.SendAsync("SUB foo 1\r\nPING\r\n"u8.ToArray());
await ReadUntilAsync(sock, "PONG\r\n");
var hdr = "NATS/1.0\r\nX-Test: value\r\n\r\n";
var payload = "hello";
var hdrLen = hdr.Length;
var totalLen = hdrLen + payload.Length;
await sock.SendAsync(Encoding.ASCII.GetBytes(
$"HPUB foo {hdrLen} {totalLen}\r\n{hdr}{payload}\r\nPING\r\n"));
var response = await ReadUntilAsync(sock, "PONG\r\n");
response.ShouldContain($"HMSG foo 1 {hdrLen} {totalLen}\r\n");
response.ShouldContain("X-Test: value");
response.ShouldContain("hello");
}
[Fact]
public async Task Server_info_advertises_headers_true()
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, _port);
var info = await ReadUntilAsync(sock, "\r\n");
info.ShouldContain("\"headers\":true");
}
[Fact]
public async Task No_responders_sends_503_hmsg_when_no_subscribers()
{
using var sock = await ConnectWithHeadersAsync();
await sock.SendAsync("SUB reply.inbox 1\r\nPING\r\n"u8.ToArray());
await ReadUntilAsync(sock, "PONG\r\n");
await sock.SendAsync("PUB no.listeners reply.inbox 0\r\n\r\nPING\r\n"u8.ToArray());
var response = await ReadUntilAsync(sock, "PONG\r\n");
response.ShouldContain("HMSG reply.inbox 1");
response.ShouldContain("503");
}
}
Step 2: Run tests to verify they fail
Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ClientHeaderTests" -v minimal
Expected: FAIL
Step 3: Create the test file and fix any server-side gaps
Step 4: Run tests to verify they pass
Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ClientHeaderTests" -v minimal
Expected: PASS (3 tests)
Step 5: Commit
git add tests/NATS.Server.Tests/ClientHeaderTests.cs
git commit -m "test: port client header and no-responders tests from Go (3 scenarios)"
Task 4: Port Client Lifecycle and Slow Consumer Tests
Files:
- Create:
tests/NATS.Server.Tests/ClientLifecycleTests.cs - Create:
tests/NATS.Server.Tests/ClientSlowConsumerTests.cs - Reference:
golang/nats-server/server/client_test.go(TestClientConnect, TestClientConnectProto, TestAuthorizationTimeout, TestClientConnectionName, TestClientLimits, TestClientMaxPending, TestNoClientLeakOnSlowConsumer, TestClientSlowConsumerWithoutConnect)
Step 1: Write the failing tests
// ClientLifecycleTests.cs
namespace NATS.Server.Tests;
public class ClientLifecycleTests : IAsyncLifetime
{
private NatsServer _server = null!;
private int _port;
private readonly CancellationTokenSource _cts = new();
public async Task InitializeAsync()
{
_port = TestHelpers.GetFreePort();
_server = new NatsServer(new NatsOptions
{
Port = _port,
MaxSubscriptions = 10,
PingInterval = TimeSpan.FromSeconds(1),
MaxPingsOut = 2,
}, NullLoggerFactory.Instance);
_ = _server.StartAsync(_cts.Token);
await _server.WaitForReadyAsync();
}
public async Task DisposeAsync()
{
await _cts.CancelAsync();
_server.Dispose();
}
private async Task<Socket> ConnectRawAsync()
{
var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, _port);
await ReadUntilAsync(sock, "\r\n");
return sock;
}
private static async Task<string> ReadUntilAsync(Socket sock, string expected, int timeoutMs = 5000)
{
using var cts = new CancellationTokenSource(timeoutMs);
var sb = new StringBuilder();
var buf = new byte[4096];
while (!sb.ToString().Contains(expected))
{
var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token);
if (n == 0) break;
sb.Append(Encoding.ASCII.GetString(buf, 0, n));
}
return sb.ToString();
}
[Fact]
public async Task Connect_proto_accepted()
{
using var sock = await ConnectRawAsync();
await sock.SendAsync(Encoding.ASCII.GetBytes(
"CONNECT {\"verbose\":false,\"pedantic\":false,\"name\":\"test-client\"}\r\nPING\r\n"));
var response = await ReadUntilAsync(sock, "PONG\r\n");
response.ShouldContain("PONG");
}
[Fact]
public async Task Max_subscriptions_enforced()
{
using var sock = await ConnectRawAsync();
await sock.SendAsync("CONNECT {}\r\n"u8.ToArray());
for (var i = 1; i <= 11; i++)
await sock.SendAsync(Encoding.ASCII.GetBytes($"SUB foo.{i} {i}\r\n"));
await sock.SendAsync("PING\r\n"u8.ToArray());
var response = await ReadUntilAsync(sock, "\r\n", 3000);
response.ShouldContain("-ERR");
}
[Fact]
public async Task Auth_timeout_closes_connection_if_no_connect()
{
var authPort = TestHelpers.GetFreePort();
using var authCts = new CancellationTokenSource();
var authServer = new NatsServer(new NatsOptions
{
Port = authPort,
Username = "user",
Password = "pass",
AuthTimeout = TimeSpan.FromMilliseconds(500),
}, NullLoggerFactory.Instance);
_ = authServer.StartAsync(authCts.Token);
await authServer.WaitForReadyAsync();
try
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, authPort);
await ReadUntilAsync(sock, "\r\n");
// Don't send CONNECT — wait for timeout
await Task.Delay(1500);
var buf = new byte[1];
var n = await sock.ReceiveAsync(buf, SocketFlags.None);
n.ShouldBe(0); // Connection closed by server
}
finally
{
await authCts.CancelAsync();
authServer.Dispose();
}
}
}
// ClientSlowConsumerTests.cs
namespace NATS.Server.Tests;
public class ClientSlowConsumerTests : IAsyncLifetime
{
private NatsServer _server = null!;
private int _port;
private readonly CancellationTokenSource _cts = new();
public async Task InitializeAsync()
{
_port = TestHelpers.GetFreePort();
_server = new NatsServer(new NatsOptions
{
Port = _port,
MaxPendingBytes = 1024, // Very small for testing
}, NullLoggerFactory.Instance);
_ = _server.StartAsync(_cts.Token);
await _server.WaitForReadyAsync();
}
public async Task DisposeAsync()
{
await _cts.CancelAsync();
_server.Dispose();
}
[Fact]
public async Task Slow_consumer_detected_when_pending_exceeds_limit()
{
using var sub = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sub.ConnectAsync(IPAddress.Loopback, _port);
await ReadUntilAsync(sub, "\r\n");
await sub.SendAsync("CONNECT {}\r\nSUB foo 1\r\nPING\r\n"u8.ToArray());
await ReadUntilAsync(sub, "PONG\r\n");
// Stop reading from subscriber to simulate slow consumer
using var pub = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await pub.ConnectAsync(IPAddress.Loopback, _port);
await ReadUntilAsync(pub, "\r\n");
await pub.SendAsync("CONNECT {}\r\n"u8.ToArray());
var bigPayload = new string('X', 512);
for (var i = 0; i < 100; i++)
await pub.SendAsync(Encoding.ASCII.GetBytes($"PUB foo {bigPayload.Length}\r\n{bigPayload}\r\n"));
await pub.SendAsync("PING\r\n"u8.ToArray());
await Task.Delay(1000);
_server.Stats.SlowConsumers.ShouldBeGreaterThan(0);
}
private static async Task<string> ReadUntilAsync(Socket sock, string expected, int timeoutMs = 5000)
{
using var cts = new CancellationTokenSource(timeoutMs);
var sb = new StringBuilder();
var buf = new byte[4096];
while (!sb.ToString().Contains(expected))
{
var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token);
if (n == 0) break;
sb.Append(Encoding.ASCII.GetString(buf, 0, n));
}
return sb.ToString();
}
}
Step 2: Run tests to verify they fail
Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ClientLifecycleTests|FullyQualifiedName~ClientSlowConsumerTests" -v minimal
Expected: FAIL
Step 3: Create the test files and fix any server-side gaps
May need to add Stats.CurrentSubscriptions and Stats.SlowConsumers if not already exposed.
Step 4: Run tests to verify they pass
Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ClientLifecycleTests|FullyQualifiedName~ClientSlowConsumerTests" -v minimal
Expected: PASS (4 tests)
Step 5: Commit
git add tests/NATS.Server.Tests/ClientLifecycleTests.cs tests/NATS.Server.Tests/ClientSlowConsumerTests.cs
git commit -m "test: port client lifecycle and slow consumer tests from Go (4 scenarios)"
Task 5: Port Parser Edge Case Tests
Files:
- Modify:
tests/NATS.Server.Tests/ParserTests.cs - Reference:
golang/nats-server/server/parser_test.go(TestParsePubArg, TestParsePubBadSize, TestParsePubSizeOverflow, TestParseHeaderPubArg, TestShouldFail, TestMaxControlLine)
Step 1: Write the failing tests
Add to existing ParserTests.cs:
[Theory]
[InlineData("PUB a 2\r\nok\r\n", "a", "", 2)]
[InlineData("PUB foo bar 22\r\n", "foo", "bar", 22)]
[InlineData("PUB foo 22\r\n", "foo", "", 22)]
[InlineData("PUB\tfoo\tbar\t22\r\n", "foo", "bar", 22)]
public async Task Parse_PUB_argument_variations(string input, string expectedSubject, string expectedReply, int expectedSize)
{
var cmds = await ParseAsync(input.EndsWith("\r\n") && !input.Contains("\r\nok\r\n")
? input + new string('X', expectedSize) + "\r\n"
: input);
cmds.ShouldNotBeEmpty();
cmds[0].Type.ShouldBe(CommandType.Pub);
cmds[0].Subject.ShouldBe(expectedSubject);
if (!string.IsNullOrEmpty(expectedReply))
cmds[0].ReplyTo.ShouldBe(expectedReply);
}
[Theory]
[InlineData("Px\r\n")]
[InlineData("PIx\r\n")]
[InlineData("PINx\r\n")]
[InlineData(" PING\r\n")]
[InlineData("SUB\r\n")]
[InlineData("PUB\r\n")]
[InlineData("PUB \r\n")]
[InlineData("UNSUB_2\r\n")]
public async Task Parse_malformed_protocol_fails(string input)
{
var ex = await Should.ThrowAsync<Exception>(async () => await ParseAsync(input));
}
[Fact]
public async Task Parse_exceeding_max_control_line_fails()
{
var longSubject = new string('a', 4097);
var input = $"PUB {longSubject} 0\r\n\r\n";
var parser = new NatsParser(maxPayload: NatsProtocol.MaxPayloadSize);
var pipe = new Pipe();
await pipe.Writer.WriteAsync(Encoding.ASCII.GetBytes(input));
pipe.Writer.Complete();
var result = await pipe.Reader.ReadAsync();
var buffer = result.Buffer;
var threw = false;
try { parser.TryParse(ref buffer, out _); }
catch { threw = true; }
threw.ShouldBeTrue();
}
Step 2: Run tests to verify they fail
Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ParserTests" -v minimal
Expected: FAIL for new tests
Step 3: Fix any parser behavior gaps
Step 4: Run tests to verify they pass
Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ParserTests" -v minimal
Expected: PASS
Step 5: Commit
git add tests/NATS.Server.Tests/ParserTests.cs
git commit -m "test: port parser edge case and negative tests from Go (12 scenarios)"
Task 6: Port SubList Concurrency and Edge Case Tests
Files:
- Modify:
tests/NATS.Server.Tests/SubListTests.cs - Modify:
tests/NATS.Server.Tests/SubjectMatchTests.cs - Reference:
golang/nats-server/server/sublist_test.go(TestSublistRaceOnRemove, TestSublistRaceOnInsert, TestSublistRaceOnMatch, TestSublistRemoveWithLargeSubs, TestSublistInvalidSubjectsInsert, TestSublistInsertWithWildcardsAsLiterals, TestSublistMatchWithEmptyTokens)
Step 1: Write the failing tests
Add to SubListTests.cs:
[Fact]
public async Task Race_on_remove_does_not_corrupt_cache()
{
var sl = new SubList();
var subs = Enumerable.Range(0, 100)
.Select(i => new Subscription { Subject = "foo", Sid = i.ToString() })
.ToArray();
foreach (var s in subs) sl.Insert(s);
var r = sl.Match("foo");
r.PlainSubs.Length.ShouldBe(100);
var removeTask = Task.Run(() =>
{
foreach (var s in subs) sl.Remove(s);
});
for (var i = 0; i < 1000; i++)
{
var result = sl.Match("foo");
foreach (var ps in result.PlainSubs)
ps.Subject.ShouldBe("foo");
}
await removeTask;
}
[Fact]
public async Task Race_on_insert_does_not_corrupt_cache()
{
var sl = new SubList();
var subs = Enumerable.Range(0, 100)
.Select(i => new Subscription { Subject = "foo", Sid = i.ToString() })
.ToArray();
var insertTask = Task.Run(() =>
{
foreach (var s in subs) sl.Insert(s);
});
for (var i = 0; i < 1000; i++)
{
var result = sl.Match("foo");
foreach (var ps in result.PlainSubs)
ps.Subject.ShouldBe("foo");
}
await insertTask;
}
[Fact]
public void Remove_from_large_subscription_list()
{
var sl = new SubList();
var subs = Enumerable.Range(0, 200)
.Select(i => new Subscription { Subject = "foo", Sid = i.ToString() })
.ToArray();
foreach (var s in subs) sl.Insert(s);
var r = sl.Match("foo");
r.PlainSubs.Length.ShouldBe(200);
sl.Remove(subs[100]);
sl.Remove(subs[0]);
sl.Remove(subs[199]);
r = sl.Match("foo");
r.PlainSubs.Length.ShouldBe(197);
}
[Theory]
[InlineData(".foo")]
[InlineData("foo.")]
[InlineData("foo..bar")]
[InlineData("foo.bar..baz")]
[InlineData("foo.>.bar")]
public void Insert_invalid_subject_is_rejected(string subject)
{
var sl = new SubList();
var sub = new Subscription { Subject = subject, Sid = "1" };
Should.Throw<Exception>(() => sl.Insert(sub));
}
Step 2: Run tests to verify they fail
Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~SubListTests" -v minimal
Expected: FAIL for new tests
Step 3: Fix any SubList behavior gaps
Step 4: Run tests to verify they pass
Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~SubListTests" -v minimal
Expected: PASS
Step 5: Commit
git add tests/NATS.Server.Tests/SubListTests.cs tests/NATS.Server.Tests/SubjectMatchTests.cs
git commit -m "test: port sublist concurrency and edge case tests from Go (7 scenarios)"
Task 7: Port Server Configuration and Lifecycle Edge Case Tests
Files:
- Create:
tests/NATS.Server.Tests/ServerConfigTests.cs - Reference:
golang/nats-server/server/server_test.go(TestMaxSubscriptions, TestRandomPorts, TestLameDuckModeInfo, TestAcceptError, TestServerShutdownDuringStart, TestGetConnectURLs, TestInfoServerNameDefaultsToPK, TestInfoServerNameIsSettable)
Step 1: Write the failing tests
namespace NATS.Server.Tests;
public class ServerConfigTests
{
private static async Task<string> ReadUntilAsync(Socket sock, string expected, int timeoutMs = 5000)
{
using var cts = new CancellationTokenSource(timeoutMs);
var sb = new StringBuilder();
var buf = new byte[4096];
while (!sb.ToString().Contains(expected))
{
var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token);
if (n == 0) break;
sb.Append(Encoding.ASCII.GetString(buf, 0, n));
}
return sb.ToString();
}
[Fact]
public async Task Server_resolves_ephemeral_port_when_zero()
{
using var cts = new CancellationTokenSource();
var server = new NatsServer(new NatsOptions { Port = 0 }, NullLoggerFactory.Instance);
_ = server.StartAsync(cts.Token);
await server.WaitForReadyAsync();
server.Port.ShouldBeGreaterThan(0);
await cts.CancelAsync();
server.Dispose();
}
[Fact]
public async Task Server_info_contains_server_name()
{
var port = TestHelpers.GetFreePort();
using var cts = new CancellationTokenSource();
var server = new NatsServer(new NatsOptions
{
Port = port,
ServerName = "my-test-server",
}, NullLoggerFactory.Instance);
_ = server.StartAsync(cts.Token);
await server.WaitForReadyAsync();
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, port);
var info = await ReadUntilAsync(sock, "\r\n");
info.ShouldContain("\"server_name\":\"my-test-server\"");
await cts.CancelAsync();
server.Dispose();
}
[Fact]
public async Task Server_info_defaults_name_to_server_id()
{
var port = TestHelpers.GetFreePort();
using var cts = new CancellationTokenSource();
var server = new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance);
_ = server.StartAsync(cts.Token);
await server.WaitForReadyAsync();
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, port);
var info = await ReadUntilAsync(sock, "\r\n");
info.ShouldContain("\"server_name\":");
info.ShouldContain("\"server_id\":");
await cts.CancelAsync();
server.Dispose();
}
[Fact]
public async Task Lame_duck_mode_sends_updated_info()
{
var port = TestHelpers.GetFreePort();
using var cts = new CancellationTokenSource();
var server = new NatsServer(new NatsOptions
{
Port = port,
LameDuckDuration = TimeSpan.FromSeconds(5),
}, NullLoggerFactory.Instance);
_ = server.StartAsync(cts.Token);
await server.WaitForReadyAsync();
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, port);
var info = await ReadUntilAsync(sock, "\r\n");
await sock.SendAsync("CONNECT {}\r\nPING\r\n"u8.ToArray());
await ReadUntilAsync(sock, "PONG\r\n");
_ = server.LameDuckShutdownAsync(cts.Token);
await Task.Delay(500);
var ldInfo = await ReadUntilAsync(sock, "\r\n", 3000);
ldInfo.ShouldContain("\"ldm\":true");
await cts.CancelAsync();
server.Dispose();
}
}
Step 2: Run tests to verify they fail
Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ServerConfigTests" -v minimal
Expected: FAIL
Step 3: Create the test file and fix any server-side gaps
Step 4: Run tests to verify they pass
Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ServerConfigTests" -v minimal
Expected: PASS (4 tests)
Step 5: Commit
git add tests/NATS.Server.Tests/ServerConfigTests.cs
git commit -m "test: port server config and lifecycle edge case tests from Go (4 scenarios)"
Task 8: Port Route Tests — Config, Auth, Compression, Per-Account
Files:
- Create:
tests/NATS.Server.Tests/Routes/RouteConfigTests.cs - Create:
tests/NATS.Server.Tests/Routes/RouteAuthTests.cs - Create:
tests/NATS.Server.Tests/Routes/RouteCompressionParityTests.cs - Create:
tests/NATS.Server.Tests/Routes/RoutePerAccountTests.cs - Reference:
golang/nats-server/server/routes_test.go(TestRouteConfig, TestRouteCompression*, TestRoutePerAccount, TestRoutePool*, TestServerRoutesWithAuthAndBCrypt, TestSeedSolicitWorks, TestTLSSeedSolicitWorks)
Step 1: Write the failing tests
// Routes/RouteConfigTests.cs
namespace NATS.Server.Tests.Routes;
public class RouteConfigTests
{
[Fact]
public async Task Two_servers_form_full_mesh_cluster()
{
using var cts1 = new CancellationTokenSource();
var s1 = new NatsServer(new NatsOptions
{
Port = 0,
Cluster = new ClusterOptions { Name = "test", Host = "127.0.0.1", Port = 0 },
}, NullLoggerFactory.Instance);
_ = s1.StartAsync(cts1.Token);
await s1.WaitForReadyAsync();
using var cts2 = new CancellationTokenSource();
var s2 = new NatsServer(new NatsOptions
{
Port = 0,
Cluster = new ClusterOptions
{
Name = "test",
Host = "127.0.0.1",
Port = 0,
Routes = [s1.ClusterListen!],
},
}, NullLoggerFactory.Instance);
_ = s2.StartAsync(cts2.Token);
await s2.WaitForReadyAsync();
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
while (!timeout.IsCancellationRequested && s1.Stats.Routes == 0)
await Task.Delay(50);
s1.Stats.Routes.ShouldBeGreaterThan(0);
s2.Stats.Routes.ShouldBeGreaterThan(0);
await cts1.CancelAsync(); await cts2.CancelAsync();
s1.Dispose(); s2.Dispose();
}
[Fact]
public async Task Route_forwards_messages_between_clusters()
{
using var cts1 = new CancellationTokenSource();
var s1 = new NatsServer(new NatsOptions
{
Port = 0,
Cluster = new ClusterOptions { Name = "test", Host = "127.0.0.1", Port = 0 },
}, NullLoggerFactory.Instance);
_ = s1.StartAsync(cts1.Token);
await s1.WaitForReadyAsync();
using var cts2 = new CancellationTokenSource();
var s2 = new NatsServer(new NatsOptions
{
Port = 0,
Cluster = new ClusterOptions
{
Name = "test", Host = "127.0.0.1", Port = 0,
Routes = [s1.ClusterListen!],
},
}, NullLoggerFactory.Instance);
_ = s2.StartAsync(cts2.Token);
await s2.WaitForReadyAsync();
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
while (!timeout.IsCancellationRequested && s1.Stats.Routes == 0)
await Task.Delay(50);
// Subscribe on s1
await using var sub = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{s1.Port}" });
await sub.ConnectAsync();
await using var subscription = await sub.SubscribeCoreAsync<string>("cross.cluster");
await sub.PingAsync();
await Task.Delay(200); // Let subscription propagate
// Publish on s2
await using var pub = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{s2.Port}" });
await pub.ConnectAsync();
await pub.PublishAsync("cross.cluster", "hello-from-s2");
using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var msg = await subscription.Msgs.ReadAsync(readCts.Token);
msg.Data.ShouldBe("hello-from-s2");
await cts1.CancelAsync(); await cts2.CancelAsync();
s1.Dispose(); s2.Dispose();
}
}
Step 2–5: Standard TDD cycle
Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RouteConfigTests" -v minimal
git add tests/NATS.Server.Tests/Routes/RouteConfigTests.cs
git commit -m "test: port route config and cross-cluster messaging tests from Go (2+ scenarios)"
Task 9: Port Gateway Tests — Basic, Interest Tracking, Service Import/Export
Files:
- Create:
tests/NATS.Server.Tests/Gateways/GatewayBasicTests.cs - Create:
tests/NATS.Server.Tests/Gateways/GatewayInterestTests.cs - Create:
tests/NATS.Server.Tests/Gateways/GatewayServiceImportTests.cs - Reference:
golang/nats-server/server/gateway_test.go(TestGatewayBasic, TestGatewaySubjectInterest, TestGatewayAccountInterest, TestGatewayServiceImport, TestGatewayQueueSub, TestGatewayDoesntSendBackToItself)
Step 1: Write the failing tests
// Gateways/GatewayBasicTests.cs
namespace NATS.Server.Tests.Gateways;
public class GatewayBasicTests
{
[Fact]
public async Task Gateway_forwards_messages_between_clusters()
{
using var localCts = new CancellationTokenSource();
var local = new NatsServer(new NatsOptions
{
Port = 0,
Gateway = new GatewayOptions { Name = "LOCAL", Host = "127.0.0.1", Port = 0 },
}, NullLoggerFactory.Instance);
_ = local.StartAsync(localCts.Token);
await local.WaitForReadyAsync();
using var remoteCts = new CancellationTokenSource();
var remote = new NatsServer(new NatsOptions
{
Port = 0,
Gateway = new GatewayOptions
{
Name = "REMOTE", Host = "127.0.0.1", Port = 0,
Remotes = [local.GatewayListen!],
},
}, NullLoggerFactory.Instance);
_ = remote.StartAsync(remoteCts.Token);
await remote.WaitForReadyAsync();
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
while (!timeout.IsCancellationRequested &&
(local.Stats.Gateways == 0 || remote.Stats.Gateways == 0))
await Task.Delay(50);
await using var sub = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{local.Port}" });
await sub.ConnectAsync();
await using var subscription = await sub.SubscribeCoreAsync<string>("gw.test");
await sub.PingAsync();
await Task.Delay(500);
await using var pub = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{remote.Port}" });
await pub.ConnectAsync();
await pub.PublishAsync("gw.test", "hello-from-remote");
using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var msg = await subscription.Msgs.ReadAsync(readCts.Token);
msg.Data.ShouldBe("hello-from-remote");
await localCts.CancelAsync(); await remoteCts.CancelAsync();
local.Dispose(); remote.Dispose();
}
[Fact]
public async Task Gateway_does_not_echo_back_to_origin()
{
// Setup same as above, but verify message from local doesn't loop back
// via remote gateway
using var localCts = new CancellationTokenSource();
var local = new NatsServer(new NatsOptions
{
Port = 0,
Gateway = new GatewayOptions { Name = "LOCAL", Host = "127.0.0.1", Port = 0 },
}, NullLoggerFactory.Instance);
_ = local.StartAsync(localCts.Token);
await local.WaitForReadyAsync();
using var remoteCts = new CancellationTokenSource();
var remote = new NatsServer(new NatsOptions
{
Port = 0,
Gateway = new GatewayOptions
{
Name = "REMOTE", Host = "127.0.0.1", Port = 0,
Remotes = [local.GatewayListen!],
},
}, NullLoggerFactory.Instance);
_ = remote.StartAsync(remoteCts.Token);
await remote.WaitForReadyAsync();
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
while (!timeout.IsCancellationRequested &&
(local.Stats.Gateways == 0 || remote.Stats.Gateways == 0))
await Task.Delay(50);
await using var sub = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{local.Port}" });
await sub.ConnectAsync();
await using var subscription = await sub.SubscribeCoreAsync<string>("echo.test");
await sub.PingAsync();
await Task.Delay(500);
await using var pub = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{local.Port}" });
await pub.ConnectAsync();
await pub.PublishAsync("echo.test", "local-msg");
using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
var msg = await subscription.Msgs.ReadAsync(readCts.Token);
msg.Data.ShouldBe("local-msg");
// Verify only 1 message (no echo from gateway)
var gotSecond = false;
try
{
using var cts2 = new CancellationTokenSource(TimeSpan.FromMilliseconds(500));
await subscription.Msgs.ReadAsync(cts2.Token);
gotSecond = true;
}
catch (OperationCanceledException) { }
gotSecond.ShouldBeFalse();
await localCts.CancelAsync(); await remoteCts.CancelAsync();
local.Dispose(); remote.Dispose();
}
}
Step 2–5: Standard TDD cycle
Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~GatewayBasicTests" -v minimal
git add tests/NATS.Server.Tests/Gateways/GatewayBasicTests.cs
git commit -m "test: port gateway basic and echo-prevention tests from Go (2+ scenarios)"
Task 10: Port Leaf Node Tests — Auth, Loop Detection, Queue Distribution
Files:
- Create:
tests/NATS.Server.Tests/LeafNodes/LeafBasicTests.cs - Create:
tests/NATS.Server.Tests/LeafNodes/LeafLoopDetectionTests.cs - Create:
tests/NATS.Server.Tests/LeafNodes/LeafQueueDistributionTests.cs - Reference:
golang/nats-server/server/leafnode_test.go(TestLeafNodeBasicAuth*, TestLeafNodeLoop*, TestLeafNodeQueueGroupDistribution*, TestLeafNodePermissions)
Step 1: Write the failing tests
// LeafNodes/LeafBasicTests.cs
namespace NATS.Server.Tests.LeafNodes;
public class LeafBasicTests
{
[Fact]
public async Task Leaf_node_forwards_subscriptions_to_hub()
{
using var hubCts = new CancellationTokenSource();
var hub = new NatsServer(new NatsOptions
{
Port = 0,
LeafNode = new LeafNodeOptions { Host = "127.0.0.1", Port = 0 },
}, NullLoggerFactory.Instance);
_ = hub.StartAsync(hubCts.Token);
await hub.WaitForReadyAsync();
using var leafCts = new CancellationTokenSource();
var leaf = new NatsServer(new NatsOptions
{
Port = 0,
LeafNode = new LeafNodeOptions
{
Remotes = [$"nats://127.0.0.1:{hub.LeafNodePort}"],
},
}, NullLoggerFactory.Instance);
_ = leaf.StartAsync(leafCts.Token);
await leaf.WaitForReadyAsync();
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
while (!timeout.IsCancellationRequested && hub.Stats.LeafNodes == 0)
await Task.Delay(50);
// Subscribe on leaf
await using var sub = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{leaf.Port}" });
await sub.ConnectAsync();
await using var subscription = await sub.SubscribeCoreAsync<string>("leaf.test");
await sub.PingAsync();
await Task.Delay(500);
// Publish on hub
await using var pub = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{hub.Port}" });
await pub.ConnectAsync();
await pub.PublishAsync("leaf.test", "from-hub");
using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var msg = await subscription.Msgs.ReadAsync(readCts.Token);
msg.Data.ShouldBe("from-hub");
await hubCts.CancelAsync(); await leafCts.CancelAsync();
hub.Dispose(); leaf.Dispose();
}
}
Step 2–5: Standard TDD cycle
git add tests/NATS.Server.Tests/LeafNodes/LeafBasicTests.cs
git commit -m "test: port leaf node basic forwarding tests from Go"
Task 11: Port Account Isolation and Import/Export Tests
Files:
- Create:
tests/NATS.Server.Tests/Accounts/AccountConfigTests.cs - Create:
tests/NATS.Server.Tests/Accounts/AccountImportExportTests.cs - Modify:
tests/NATS.Server.Tests/AccountIsolationTests.cs - Reference:
golang/nats-server/server/accounts_test.go(TestAccountIsolation, TestAccountIsolationExportImport, TestMultiAccountsIsolation, TestAccountSimpleConfig, TestAddServiceExport, TestAddStreamExport, TestCrossAccountRequestReply, TestAccountMaxConnectionsDisconnectsNewestFirst)
Step 1: Write the failing tests
// Accounts/AccountImportExportTests.cs
namespace NATS.Server.Tests.Accounts;
public class AccountImportExportTests
{
[Fact]
public async Task Stream_export_import_delivers_cross_account()
{
var port = TestHelpers.GetFreePort();
using var cts = new CancellationTokenSource();
var server = new NatsServer(new NatsOptions
{
Port = port,
Accounts =
[
new AccountConfig
{
Name = "A",
Users = [new User { Username = "userA", Password = "passA" }],
Exports = [new StreamExport { Subject = "events.>" }],
},
new AccountConfig
{
Name = "B",
Users = [new User { Username = "userB", Password = "passB" }],
Imports = [new StreamImport { Account = "A", Subject = "events.>" }],
},
],
}, NullLoggerFactory.Instance);
_ = server.StartAsync(cts.Token);
await server.WaitForReadyAsync();
// Subscribe as account B
await using var sub = new NatsConnection(new NatsOpts
{
Url = $"nats://userB:passB@127.0.0.1:{port}",
});
await sub.ConnectAsync();
await using var subscription = await sub.SubscribeCoreAsync<string>("events.test");
await sub.PingAsync();
// Publish as account A
await using var pub = new NatsConnection(new NatsOpts
{
Url = $"nats://userA:passA@127.0.0.1:{port}",
});
await pub.ConnectAsync();
await pub.PublishAsync("events.test", "cross-account");
using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var msg = await subscription.Msgs.ReadAsync(readCts.Token);
msg.Data.ShouldBe("cross-account");
await cts.CancelAsync();
server.Dispose();
}
[Fact]
public async Task Account_isolation_prevents_cross_account_delivery()
{
var port = TestHelpers.GetFreePort();
using var cts = new CancellationTokenSource();
var server = new NatsServer(new NatsOptions
{
Port = port,
Accounts =
[
new AccountConfig
{
Name = "A",
Users = [new User { Username = "userA", Password = "passA" }],
},
new AccountConfig
{
Name = "B",
Users = [new User { Username = "userB", Password = "passB" }],
},
],
}, NullLoggerFactory.Instance);
_ = server.StartAsync(cts.Token);
await server.WaitForReadyAsync();
// Subscribe as account B
await using var sub = new NatsConnection(new NatsOpts
{
Url = $"nats://userB:passB@127.0.0.1:{port}",
});
await sub.ConnectAsync();
await using var subscription = await sub.SubscribeCoreAsync<string>("secret.data");
await sub.PingAsync();
// Publish as account A (no export)
await using var pub = new NatsConnection(new NatsOpts
{
Url = $"nats://userA:passA@127.0.0.1:{port}",
});
await pub.ConnectAsync();
await pub.PublishAsync("secret.data", "should-not-arrive");
await pub.PingAsync();
var received = false;
try
{
using var readCts = new CancellationTokenSource(TimeSpan.FromMilliseconds(500));
await subscription.Msgs.ReadAsync(readCts.Token);
received = true;
}
catch (OperationCanceledException) { }
received.ShouldBeFalse();
await cts.CancelAsync();
server.Dispose();
}
}
Step 2–5: Standard TDD cycle
git add tests/NATS.Server.Tests/Accounts/AccountImportExportTests.cs
git commit -m "test: port account isolation and import/export tests from Go (2+ scenarios)"
Task 12: Phase A Gate — Full Suite Verification
Files:
- None created — verification only
Step 1: Run full test suite
Run: dotnet test -v minimal
Expected: All tests pass (existing + new Phase A tests).
Step 2: Count new tests
Run: dotnet test --list-tests 2>/dev/null | wc -l
Verify: Test count has increased by ~40+ from Phase A work.
Step 3: Commit verification marker
git commit --allow-empty -m "gate: phase A foundation tests complete — full suite passes"
Phase B: Distributed Substrate (~594 tests)
Gate: RAFT election/append/snapshot pass. FileStore append/read/recovery pass. Monitor endpoints return valid JSON. Config reload works.
Task 13: Port Storage Contract Tests — FileStore Basics
Files:
- Create:
tests/NATS.Server.Tests/JetStream/Storage/FileStoreBasicTests.cs - Reference:
golang/nats-server/server/filestore_test.go(TestFileStoreBasics, TestFileStoreMsgHeaders, TestFileStoreBasicWriteMsgsAndRestore)
Step 1: Write the failing tests
namespace NATS.Server.Tests.JetStream.Storage;
public class FileStoreBasicTests : IDisposable
{
private readonly string _storeDir;
public FileStoreBasicTests()
{
_storeDir = Path.Combine(Path.GetTempPath(), $"nats-test-{Guid.NewGuid():N}");
Directory.CreateDirectory(_storeDir);
}
public void Dispose()
{
if (Directory.Exists(_storeDir))
Directory.Delete(_storeDir, true);
}
[Fact]
public async Task Store_and_load_messages()
{
var config = new StreamConfig { Name = "test", Storage = StorageType.File };
await using var fs = FileStore.Create(_storeDir, config);
for (var i = 0; i < 5; i++)
{
var seq = await fs.StoreMsgAsync("foo", null, Encoding.UTF8.GetBytes($"msg-{i}"));
seq.ShouldBe((ulong)(i + 1));
}
var state = fs.State();
state.Msgs.ShouldBe(5UL);
var msg = await fs.LoadMsgAsync(2);
msg.ShouldNotBeNull();
Encoding.UTF8.GetString(msg.Data).ShouldBe("msg-1");
}
[Fact]
public async Task Store_message_with_headers()
{
var config = new StreamConfig { Name = "test", Storage = StorageType.File };
await using var fs = FileStore.Create(_storeDir, config);
var hdr = Encoding.UTF8.GetBytes("NATS/1.0\r\nname:derek\r\n\r\n");
var body = Encoding.UTF8.GetBytes("Hello World");
var seq = await fs.StoreMsgAsync("foo", hdr, body);
seq.ShouldBe(1UL);
var loaded = await fs.LoadMsgAsync(1);
loaded.ShouldNotBeNull();
loaded.Header.ShouldBe(hdr);
loaded.Data.ShouldBe(body);
}
[Fact]
public async Task Stop_and_restart_preserves_messages()
{
var config = new StreamConfig { Name = "test", Storage = StorageType.File };
// Write 100 messages
{
await using var fs = FileStore.Create(_storeDir, config);
for (var i = 0; i < 100; i++)
await fs.StoreMsgAsync("foo", null, Encoding.UTF8.GetBytes($"msg-{i}"));
var state = fs.State();
state.Msgs.ShouldBe(100UL);
}
// Restart and verify
{
await using var fs = FileStore.Create(_storeDir, config);
var state = fs.State();
state.Msgs.ShouldBe(100UL);
// Write 100 more
for (var i = 100; i < 200; i++)
await fs.StoreMsgAsync("foo", null, Encoding.UTF8.GetBytes($"msg-{i}"));
}
// Final verify
{
await using var fs = FileStore.Create(_storeDir, config);
var state = fs.State();
state.Msgs.ShouldBe(200UL);
}
}
[Fact]
public async Task Remove_messages_updates_state()
{
var config = new StreamConfig { Name = "test", Storage = StorageType.File };
await using var fs = FileStore.Create(_storeDir, config);
for (var i = 0; i < 5; i++)
await fs.StoreMsgAsync("foo", null, Encoding.UTF8.GetBytes($"msg-{i}"));
await fs.RemoveMsgAsync(1);
await fs.RemoveMsgAsync(5);
await fs.RemoveMsgAsync(3);
var state = fs.State();
state.Msgs.ShouldBe(2UL);
}
}
Step 2–5: Standard TDD cycle
git add tests/NATS.Server.Tests/JetStream/Storage/FileStoreBasicTests.cs
git commit -m "test: port filestore basic contract tests from Go (4 scenarios)"
Task 14: Port Storage Contract Tests — MemStore and Retention
Files:
- Create:
tests/NATS.Server.Tests/JetStream/Storage/MemStoreBasicTests.cs - Create:
tests/NATS.Server.Tests/JetStream/Storage/StorageRetentionTests.cs - Reference:
golang/nats-server/server/memstore_test.go(first 10 tests),filestore_test.go(retention tests)
Step 1–5: Standard TDD cycle — mirror FileStore patterns for MemStore.
git commit -m "test: port memstore basic and retention tests from Go"
Task 15: Port RAFT Consensus Tests — Election, Append, Snapshot
Files:
- Create:
tests/NATS.Server.Tests/Raft/RaftElectionBasicTests.cs - Create:
tests/NATS.Server.Tests/Raft/RaftAppendEntryTests.cs - Create:
tests/NATS.Server.Tests/Raft/RaftSnapshotTests.cs - Reference:
golang/nats-server/server/raft_test.go(TestNRGSimple, TestNRGSnapshotAndRestart, TestNRGAppendEntryEncode)
Step 1: Write the failing tests
// Raft/RaftElectionBasicTests.cs
namespace NATS.Server.Tests.Raft;
public class RaftElectionBasicTests
{
[Fact]
public async Task Three_node_group_elects_leader()
{
await using var cluster = await RaftTestCluster.CreateAsync(3);
await cluster.WaitForLeaderAsync(TimeSpan.FromSeconds(10));
cluster.Leader.ShouldNotBeNull();
cluster.Followers.Length.ShouldBe(2);
}
[Fact]
public async Task State_converges_after_proposals()
{
await using var cluster = await RaftTestCluster.CreateAsync(3);
await cluster.WaitForLeaderAsync(TimeSpan.FromSeconds(10));
await cluster.ProposeAsync(Encoding.UTF8.GetBytes("delta:22"));
await cluster.ProposeAsync(Encoding.UTF8.GetBytes("delta:-11"));
await cluster.ProposeAsync(Encoding.UTF8.GetBytes("delta:-10"));
await cluster.WaitForConvergenceAsync(TimeSpan.FromSeconds(5));
// All nodes should have applied same entries
cluster.AllNodesConverged.ShouldBeTrue();
}
}
Step 2–5: Standard TDD cycle
git commit -m "test: port raft election and convergence tests from Go (2+ scenarios)"
Task 16: Port Config Reload Tests
Files:
- Create:
tests/NATS.Server.Tests/Configuration/ConfigReloadParityTests.cs - Reference:
golang/nats-server/server/reload_test.go(TestReloadAuth, TestReloadTLS, TestReloadMaxConnections, TestReloadLameDuck)
Step 1–5: Standard TDD cycle
git commit -m "test: port config hot reload tests from Go"
Task 17: Port Monitoring Endpoint Tests
Files:
- Create:
tests/NATS.Server.Tests/Monitoring/VarzTests.cs - Create:
tests/NATS.Server.Tests/Monitoring/ConnzTests.cs - Create:
tests/NATS.Server.Tests/Monitoring/HealthzTests.cs - Reference:
golang/nats-server/server/monitor_test.go(TestHTTPHost, TestVarz, TestConnz, TestHealthz)
Step 1: Write the failing tests
// Monitoring/VarzTests.cs
namespace NATS.Server.Tests.Monitoring;
public class VarzTests
{
[Fact]
public async Task Varz_returns_server_info()
{
var port = TestHelpers.GetFreePort();
var monitorPort = TestHelpers.GetFreePort();
using var cts = new CancellationTokenSource();
var server = new NatsServer(new NatsOptions
{
Port = port,
MonitorPort = monitorPort,
}, NullLoggerFactory.Instance);
_ = server.StartAsync(cts.Token);
await server.WaitForReadyAsync();
using var http = new HttpClient();
var response = await http.GetStringAsync($"http://127.0.0.1:{monitorPort}/varz");
response.ShouldContain("\"server_id\"");
response.ShouldContain("\"version\"");
response.ShouldContain("\"now\"");
await cts.CancelAsync();
server.Dispose();
}
[Fact]
public async Task Healthz_returns_ok()
{
var port = TestHelpers.GetFreePort();
var monitorPort = TestHelpers.GetFreePort();
using var cts = new CancellationTokenSource();
var server = new NatsServer(new NatsOptions
{
Port = port,
MonitorPort = monitorPort,
}, NullLoggerFactory.Instance);
_ = server.StartAsync(cts.Token);
await server.WaitForReadyAsync();
using var http = new HttpClient();
var response = await http.GetAsync($"http://127.0.0.1:{monitorPort}/healthz");
response.IsSuccessStatusCode.ShouldBeTrue();
await cts.CancelAsync();
server.Dispose();
}
}
Step 2–5: Standard TDD cycle
git commit -m "test: port monitoring endpoint tests from Go (varz, healthz)"
Task 18: Phase B Gate — Full Suite Verification
Step 1: Run full test suite
Run: dotnet test -v minimal
Expected: All tests pass.
git commit --allow-empty -m "gate: phase B distributed substrate tests complete — full suite passes"
Phase C: JetStream Depth (~889 tests)
Gate: All JetStream API endpoints work. Stream/consumer lifecycle correct. Cluster failover preserves data.
Task 19: Port JetStream Stream Lifecycle Tests
Files:
- Create:
tests/NATS.Server.Tests/JetStream/StreamLifecycleTests.cs - Reference:
golang/nats-server/server/jetstream_test.go(TestJetStreamCreateStream*, TestJetStreamDeleteStream, TestJetStreamUpdateStream, TestJetStreamPurge*)
Step 1–5: Standard TDD cycle — test stream create/update/delete/purge/info via NATS.Client.Core JetStream API.
git commit -m "test: port jetstream stream lifecycle tests from Go"
Task 20: Port JetStream Publish and Ack Tests
Files:
- Create:
tests/NATS.Server.Tests/JetStream/PublishAckTests.cs - Reference:
golang/nats-server/server/jetstream_test.go(TestJetStreamPublish*, TestJetStreamDedup*, TestJetStreamPubAck*)
Step 1–5: Standard TDD cycle
git commit -m "test: port jetstream publish and ack tests from Go"
Task 21: Port JetStream Consumer Delivery Tests
Files:
- Create:
tests/NATS.Server.Tests/JetStream/ConsumerDeliveryTests.cs - Create:
tests/NATS.Server.Tests/JetStream/ConsumerAckPolicyTests.cs - Reference:
golang/nats-server/server/jetstream_consumer_test.go(TestJetStreamConsumerCreate*, deliver policy, ack policy tests)
Step 1–5: Standard TDD cycle
git commit -m "test: port jetstream consumer delivery and ack policy tests from Go"
Task 22: Port JetStream Retention Policy Tests
Files:
- Create:
tests/NATS.Server.Tests/JetStream/RetentionPolicyTests.cs - Reference:
golang/nats-server/server/jetstream_test.go(retention-related: Limits, Interest, WorkQueue)
Step 1–5: Standard TDD cycle
git commit -m "test: port jetstream retention policy tests from Go"
Task 23: Port JetStream API Endpoint Tests
Files:
- Create:
tests/NATS.Server.Tests/JetStream/Api/StreamApiTests.cs - Create:
tests/NATS.Server.Tests/JetStream/Api/ConsumerApiTests.cs - Reference:
golang/nats-server/server/jetstream_test.go($JS.API.* handler tests)
Step 1–5: Standard TDD cycle
git commit -m "test: port jetstream API endpoint tests from Go"
Task 24: Port JetStream Cluster Formation and Replica Tests
Files:
- Create:
tests/NATS.Server.Tests/JetStream/Cluster/ClusterFormationTests.cs - Create:
tests/NATS.Server.Tests/JetStream/Cluster/ReplicaStreamTests.cs - Reference:
golang/nats-server/server/jetstream_cluster_1_test.go(TestJetStreamClusterConfig, TestJetStreamClusterMultiReplicaStreams)
Step 1–5: Standard TDD cycle — use in-process 3-server JetStream cluster fixture.
git commit -m "test: port jetstream cluster formation and replica tests from Go"
Task 25: Port JetStream Cluster Leader Failover Tests
Files:
- Create:
tests/NATS.Server.Tests/JetStream/Cluster/LeaderFailoverTests.cs - Reference:
golang/nats-server/server/jetstream_cluster_1_test.go(leader election, failover scenarios)
Step 1–5: Standard TDD cycle
git commit -m "test: port jetstream cluster leader failover tests from Go"
Task 26: Phase C Gate — Full Suite Verification
Step 1: Run full test suite
Run: dotnet test -v minimal
Expected: All tests pass.
git commit --allow-empty -m "gate: phase C jetstream depth tests complete — full suite passes"
Phase D: Protocol Surfaces (~135 tests)
Gate: MQTT pub/sub through .NET server works. JWT full claim validation passes.
Task 27: Port MQTT Packet Parsing Tests
Files:
- Create:
tests/NATS.Server.Tests/Mqtt/MqttPacketParsingParityTests.cs - Reference:
golang/nats-server/server/mqtt_test.go(CONNECT, PUBLISH, SUBSCRIBE, UNSUBSCRIBE, DISCONNECT packet tests)
Step 1–5: Standard TDD cycle
git commit -m "test: port mqtt packet parsing tests from Go"
Task 28: Port MQTT QoS and Session Tests
Files:
- Create:
tests/NATS.Server.Tests/Mqtt/MqttQosDeliveryTests.cs - Create:
tests/NATS.Server.Tests/Mqtt/MqttSessionTests.cs - Reference:
golang/nats-server/server/mqtt_test.go(QoS 0/1/2, session management, clean session flag)
Step 1–5: Standard TDD cycle
git commit -m "test: port mqtt qos and session tests from Go"
Task 29: Port JWT Claim Edge Case Tests
Files:
- Modify:
tests/NATS.Server.Tests/JwtTests.cs - Reference:
golang/nats-server/server/jwt_test.go(remaining ~27 edge cases)
Step 1–5: Standard TDD cycle
git commit -m "test: port jwt claim edge case tests from Go"
Task 30: Phase D Gate and Final Verification
Files:
- Modify:
differences.md— synchronize with verified behavior
Step 1: Run full test suite
Run: dotnet test -v minimal
Expected: All tests pass with zero failures.
Step 2: Count total tests
Run: dotnet test --list-tests 2>/dev/null | wc -l
Verify: Significant increase from baseline 867.
Step 3: Commit
git commit -m "gate: phase D protocol surfaces complete — all phases pass"
Plan Notes
- Each task is parallelizable within its phase. Use
@dispatching-parallel-agentsfor independent subsystems. - If a test reveals missing server functionality, port the minimal Go logic. Tag the commit
feat:nottest:. - If a Go test is N/A for .NET (e.g., Windows-specific, Go-runtime-specific), mark it N/A in the mapping file with rationale.
- For very large subsystems (JetStream Clustering), break into sub-batches of ~20 tests per commit.
- Always run full suite (
dotnet test) before phase gate commits.