Files
natsdotnet/tests/NATS.Server.Core.Tests/ClientProtocolParityTests.cs
Joseph Doherty 7fbffffd05 refactor: rename remaining tests to NATS.Server.Core.Tests
- Rename tests/NATS.Server.Tests -> tests/NATS.Server.Core.Tests
- Update solution file, InternalsVisibleTo, and csproj references
- Remove JETSTREAM_INTEGRATION_MATRIX and NATS.NKeys from csproj (moved to JetStream.Tests and Auth.Tests)
- Update all namespaces from NATS.Server.Tests.* to NATS.Server.Core.Tests.*
- Replace private GetFreePort/ReadUntilAsync helpers with TestUtilities calls
- Fix stale namespace in Transport.Tests/NetworkingGoParityTests.cs
2026-03-12 16:14:02 -04:00

2127 lines
74 KiB
C#

// Go reference: golang/nats-server/server/client_test.go
// Ports ~52 tests covering client protocol behaviors not yet tested in existing files.
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Text.Json;
using System.Text.RegularExpressions;
using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server;
using NATS.Server.Auth;
using NATS.Server.Protocol;
using NATS.Server.TestUtilities;
namespace NATS.Server.Core.Tests;
/// <summary>
/// Protocol-level parity tests ported from Go client_test.go.
/// Each test starts a real NatsServer and uses raw TCP sockets for
/// wire-level assertions.
/// </summary>
public class ClientProtocolParityTests
{
// ---------------------------------------------------------------------------
// Helpers (self-contained, duplicated per task spec)
// ---------------------------------------------------------------------------
private static async Task<string> ReadAllAvailableAsync(Socket sock, int timeoutMs = 1000)
{
using var cts = new CancellationTokenSource(timeoutMs);
var sb = new StringBuilder();
var buf = new byte[8192];
try
{
while (true)
{
var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token);
if (n == 0) break;
sb.Append(Encoding.ASCII.GetString(buf, 0, n));
}
}
catch (OperationCanceledException)
{
// Expected
}
return sb.ToString();
}
private static int CountOccurrences(string haystack, string needle)
{
int count = 0, index = 0;
while ((index = haystack.IndexOf(needle, index, StringComparison.Ordinal)) >= 0)
{
count++;
index += needle.Length;
}
return count;
}
/// <summary>
/// Creates a running server and returns (server, port, cts).
/// Caller must cancel cts and dispose server.
/// </summary>
private static async Task<(NatsServer Server, int Port, CancellationTokenSource Cts)>
StartServerAsync(NatsOptions? options = null)
{
var port = TestPortAllocator.GetFreePort();
options ??= new NatsOptions();
options.Port = port;
var cts = new CancellationTokenSource();
var server = new NatsServer(options, NullLoggerFactory.Instance);
_ = server.StartAsync(cts.Token);
await server.WaitForReadyAsync();
return (server, port, cts);
}
/// <summary>
/// Connects a raw TCP socket, reads INFO, sends CONNECT, and returns the socket.
/// </summary>
private static async Task<Socket> ConnectAndHandshakeAsync(int port, string connectJson = "{}")
{
var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, port);
await SocketTestHelper.ReadUntilAsync(sock, "\r\n"); // drain INFO
await sock.SendAsync(Encoding.ASCII.GetBytes($"CONNECT {connectJson}\r\n"));
return sock;
}
/// <summary>
/// Connects and verifies PING/PONG handshake completes.
/// </summary>
private static async Task<Socket> ConnectAndPingAsync(int port, string connectJson = "{}")
{
var sock = await ConnectAndHandshakeAsync(port, connectJson);
await sock.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
return sock;
}
// ---------------------------------------------------------------------------
// Test: INFO response parsing (TestClientCreateAndInfo)
// ---------------------------------------------------------------------------
// Go: TestClientCreateAndInfo server/client_test.go:202
[Fact]
public async Task Info_response_contains_valid_json()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, port);
var info = await SocketTestHelper.ReadUntilAsync(sock, "\r\n");
info.ShouldStartWith("INFO ");
var jsonStart = info.IndexOf('{');
var jsonEnd = info.LastIndexOf('}');
jsonStart.ShouldBeGreaterThanOrEqualTo(0);
jsonEnd.ShouldBeGreaterThan(jsonStart);
var jsonStr = info[jsonStart..(jsonEnd + 1)];
var serverInfo = JsonSerializer.Deserialize<ServerInfo>(jsonStr);
serverInfo.ShouldNotBeNull();
serverInfo!.MaxPayload.ShouldBeGreaterThan(0);
serverInfo.Port.ShouldBe(port);
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: TestClientCreateAndInfo server/client_test.go:202
[Fact]
public async Task Info_response_max_payload_matches_server_config()
{
var maxPayload = 512 * 1024; // 512KB
var (server, port, cts) = await StartServerAsync(new NatsOptions { MaxPayload = maxPayload });
try
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, port);
var info = await SocketTestHelper.ReadUntilAsync(sock, "\r\n");
var jsonStr = info[(info.IndexOf('{'))..(info.LastIndexOf('}') + 1)];
var serverInfo = JsonSerializer.Deserialize<ServerInfo>(jsonStr);
serverInfo!.MaxPayload.ShouldBe(maxPayload);
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: TestClientCreateAndInfo server/client_test.go:202
[Fact]
public async Task Info_auth_required_reflects_server_config()
{
var (server, port, cts) = await StartServerAsync(new NatsOptions { Authorization = "secret" });
try
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, port);
var info = await SocketTestHelper.ReadUntilAsync(sock, "\r\n");
info.ShouldContain("\"auth_required\":true");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: TestClientCreateAndInfo server/client_test.go:202
[Fact]
public async Task Info_auth_required_absent_when_no_auth()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, port);
var info = await SocketTestHelper.ReadUntilAsync(sock, "\r\n");
// auth_required should not be present (or should be false/omitted)
info.ShouldNotContain("\"auth_required\":true");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: CONNECT parsing and flags
// ---------------------------------------------------------------------------
// Go: TestClientConnect server/client_test.go:475
[Fact]
public async Task Connect_with_verbose_true_returns_ok()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = await ConnectAndHandshakeAsync(port, "{\"verbose\":true}");
await sock.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
// With verbose:true, the CONNECT itself triggers +OK, then PING triggers PONG + +OK
var response = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
response.ShouldContain("+OK\r\n");
response.ShouldContain("PONG\r\n");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: TestClientConnect server/client_test.go:475
[Fact]
public async Task Connect_with_verbose_false_does_not_return_ok_for_pub()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = await ConnectAndPingAsync(port, "{\"verbose\":false}");
// PUB should not trigger +OK when verbose is false
await sock.SendAsync(Encoding.ASCII.GetBytes("PUB foo 5\r\nhello\r\nPING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
response.ShouldNotContain("+OK");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: TestClientConnect server/client_test.go:475
[Fact]
public async Task Connect_with_verbose_true_returns_ok_for_sub()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = await ConnectAndHandshakeAsync(port, "{\"verbose\":true}");
// Drain the +OK from CONNECT
await SocketTestHelper.ReadUntilAsync(sock, "+OK\r\n");
await sock.SendAsync(Encoding.ASCII.GetBytes("SUB foo 1\r\nPING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
// SUB should trigger +OK in verbose mode
response.ShouldContain("+OK\r\n");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: TestClientConnect server/client_test.go:475
[Fact]
public async Task Connect_with_verbose_true_returns_ok_for_unsub()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = await ConnectAndHandshakeAsync(port, "{\"verbose\":true}");
await SocketTestHelper.ReadUntilAsync(sock, "+OK\r\n"); // drain CONNECT +OK
await sock.SendAsync(Encoding.ASCII.GetBytes("SUB foo 1\r\nUNSUB 1\r\nPING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
// Should get two +OK (SUB + UNSUB) plus PONG
CountOccurrences(response, "+OK\r\n").ShouldBeGreaterThanOrEqualTo(2);
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: TestClientConnect server/client_test.go:475
[Fact]
public async Task Connect_with_verbose_true_returns_ok_for_pub()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = await ConnectAndHandshakeAsync(port, "{\"verbose\":true}");
await SocketTestHelper.ReadUntilAsync(sock, "+OK\r\n");
await sock.SendAsync(Encoding.ASCII.GetBytes("PUB foo 5\r\nhello\r\nPING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
// PUB should trigger +OK in verbose mode
response.ShouldContain("+OK\r\n");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: TestClientConnect server/client_test.go:475
[Fact]
public async Task Connect_parses_user_and_pass()
{
var (server, port, cts) = await StartServerAsync(new NatsOptions
{
Users = [new User { Username = "derek", Password = "foo" }],
});
try
{
using var sock = await ConnectAndHandshakeAsync(port,
"{\"user\":\"derek\",\"pass\":\"foo\"}");
await sock.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
response.ShouldContain("PONG\r\n");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: TestClientConnect server/client_test.go:475
[Fact]
public async Task Connect_parses_auth_token()
{
var (server, port, cts) = await StartServerAsync(new NatsOptions
{
Authorization = "YZZ222",
});
try
{
using var sock = await ConnectAndHandshakeAsync(port,
"{\"auth_token\":\"YZZ222\"}");
await sock.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
response.ShouldContain("PONG\r\n");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: TestClientConnect server/client_test.go:475
[Fact]
public async Task Connect_parses_client_name()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = await ConnectAndHandshakeAsync(port,
"{\"name\":\"my-test-client\"}");
await sock.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
response.ShouldContain("PONG\r\n");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: Protocol version negotiation
// ---------------------------------------------------------------------------
// Go: TestClientConnectProto server/client_test.go:537
[Fact]
public async Task Connect_proto_zero_accepted()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = await ConnectAndHandshakeAsync(port,
"{\"verbose\":false,\"pedantic\":false,\"protocol\":0}");
await sock.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
response.ShouldContain("PONG\r\n");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: TestClientConnectProto server/client_test.go:537
[Fact]
public async Task Connect_proto_one_accepted()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = await ConnectAndHandshakeAsync(port,
"{\"verbose\":false,\"pedantic\":false,\"protocol\":1}");
await sock.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
response.ShouldContain("PONG\r\n");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: PING/PONG
// ---------------------------------------------------------------------------
// Go: TestClientPing server/client_test.go:616
[Fact]
public async Task Ping_returns_pong()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = await ConnectAndHandshakeAsync(port);
await sock.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
response.ShouldContain("PONG\r\n");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: TestClientPing server/client_test.go:616
[Fact]
public async Task Multiple_pings_return_multiple_pongs()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = await ConnectAndPingAsync(port);
await sock.SendAsync(Encoding.ASCII.GetBytes("PING\r\nPING\r\nPING\r\n"));
// Read until we get at least 3 PONGs
var response = await ReadAllAvailableAsync(sock, 3000);
CountOccurrences(response, "PONG\r\n").ShouldBeGreaterThanOrEqualTo(3);
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: Max payload enforcement
// ---------------------------------------------------------------------------
// Go: TestClientMaxPending / max_payload enforcement (client_test.go:1976)
[Fact]
public async Task Max_payload_violation_closes_connection()
{
const int maxPayload = 100;
var (server, port, cts) = await StartServerAsync(new NatsOptions { MaxPayload = maxPayload });
try
{
using var sock = await ConnectAndPingAsync(port);
// Send a message that exceeds max payload
var bigPayload = new string('X', maxPayload + 50);
await sock.SendAsync(Encoding.ASCII.GetBytes(
$"PUB foo {bigPayload.Length}\r\n{bigPayload}\r\n"));
var response = await ReadAllAvailableAsync(sock, 3000);
response.ShouldContain("-ERR 'Maximum Payload Violation'");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: max payload enforcement
[Fact]
public async Task Max_payload_exactly_at_limit_succeeds()
{
const int maxPayload = 100;
var (server, port, cts) = await StartServerAsync(new NatsOptions { MaxPayload = maxPayload });
try
{
using var sock = await ConnectAndPingAsync(port);
// Exactly at the limit should work
var payload = new string('X', maxPayload);
await sock.SendAsync(Encoding.ASCII.GetBytes(
$"SUB foo 1\r\nPUB foo {payload.Length}\r\n{payload}\r\nPING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
response.ShouldContain("MSG foo 1");
response.ShouldContain("PONG\r\n");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: max payload enforcement - connection closed after violation
[Fact]
public async Task Max_payload_violation_disconnects_client()
{
const int maxPayload = 50;
var (server, port, cts) = await StartServerAsync(new NatsOptions { MaxPayload = maxPayload });
try
{
using var sock = await ConnectAndPingAsync(port);
var bigPayload = new string('X', maxPayload + 100);
await sock.SendAsync(Encoding.ASCII.GetBytes(
$"PUB foo {bigPayload.Length}\r\n{bigPayload}\r\n"));
// Read remaining data -- server should close the connection
var response = await ReadAllAvailableAsync(sock, 3000);
response.ShouldContain("-ERR 'Maximum Payload Violation'");
// Verify connection is closed
var buf = new byte[128];
using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
var n = await sock.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
n.ShouldBe(0);
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: Pedantic mode
// ---------------------------------------------------------------------------
// Go: pedantic mode validates subjects (TestClientConnect)
[Fact]
public async Task Pedantic_mode_rejects_invalid_publish_subject()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = await ConnectAndPingAsync(port, "{\"pedantic\":true}");
// Publish to an invalid subject (contains space)
await sock.SendAsync(Encoding.ASCII.GetBytes("PUB foo.*.bar 5\r\nhello\r\nPING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n", 5000);
response.ShouldContain("-ERR 'Invalid Publish Subject'");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: pedantic mode - valid publish subject should succeed
[Fact]
public async Task Pedantic_mode_accepts_valid_publish_subject()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = await ConnectAndPingAsync(port, "{\"pedantic\":true}");
await sock.SendAsync(Encoding.ASCII.GetBytes(
"SUB foo.bar 1\r\nPUB foo.bar 5\r\nhello\r\nPING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
response.ShouldContain("MSG foo.bar 1 5\r\nhello\r\n");
response.ShouldNotContain("-ERR");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: pedantic mode - wildcard in publish subject not allowed
[Fact]
public async Task Pedantic_mode_rejects_wildcard_gt_in_publish()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = await ConnectAndPingAsync(port, "{\"pedantic\":true}");
await sock.SendAsync(Encoding.ASCII.GetBytes("PUB foo.> 5\r\nhello\r\nPING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n", 5000);
response.ShouldContain("-ERR 'Invalid Publish Subject'");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: Echo mode
// ---------------------------------------------------------------------------
// Go: TestClientPubSubNoEcho server/client_test.go:691
[Fact]
public async Task Echo_true_delivers_own_messages()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = await ConnectAndPingAsync(port, "{\"echo\":true}");
await sock.SendAsync(Encoding.ASCII.GetBytes(
"SUB foo 1\r\nPUB foo 5\r\nhello\r\nPING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
response.ShouldContain("MSG foo 1 5\r\nhello\r\n");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: TestClientPubSubNoEcho server/client_test.go:691
[Fact]
public async Task Echo_false_suppresses_own_messages()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = await ConnectAndPingAsync(port, "{\"echo\":false}");
await sock.SendAsync(Encoding.ASCII.GetBytes(
"SUB foo 1\r\nPUB foo 5\r\nhello\r\nPING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
response.ShouldNotContain("MSG");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: TestClientPubWithQueueSubNoEcho server/client_test.go:1043
[Fact]
public async Task Echo_false_queue_sub_messages_delivered_to_other_client()
{
var (server, port, cts) = await StartServerAsync();
try
{
// Publisher with echo:false also has a queue sub
using var pub = await ConnectAndPingAsync(port, "{\"echo\":false}");
// Other subscriber with echo:true
using var sub = await ConnectAndPingAsync(port);
// Both subscribe to same queue group
await pub.SendAsync(Encoding.ASCII.GetBytes("SUB foo bar 1\r\nPING\r\n"));
await SocketTestHelper.ReadUntilAsync(pub, "PONG\r\n");
await sub.SendAsync(Encoding.ASCII.GetBytes("SUB foo bar 1\r\nPING\r\n"));
await SocketTestHelper.ReadUntilAsync(sub, "PONG\r\n");
// Publish 100 messages from the echo:false client
var sb = new StringBuilder();
for (int i = 0; i < 100; i++)
sb.Append("PUB foo 5\r\nhello\r\n");
sb.Append("PING\r\n");
await pub.SendAsync(Encoding.ASCII.GetBytes(sb.ToString()));
await SocketTestHelper.ReadUntilAsync(pub, "PONG\r\n");
// Send PING on sub to flush deliveries
await sub.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sub, "PONG\r\n");
// The subscriber should receive all 100 messages since the publisher
// has echo:false (all queue messages go to the other member)
var msgCount = CountOccurrences(response, "MSG foo");
msgCount.ShouldBe(100);
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: Two-token publish does not match single-token subscribe
// ---------------------------------------------------------------------------
// Go: TestTwoTokenPubMatchSingleTokenSub server/client_test.go:1287
[Fact]
public async Task Two_token_pub_does_not_match_single_token_sub()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = await ConnectAndPingAsync(port);
// Publish first (no subscribers), then subscribe to "foo", then publish "foo.bar"
await sock.SendAsync(Encoding.ASCII.GetBytes(
"PUB foo.bar 5\r\nhello\r\nSUB foo 1\r\nPING\r\n"));
var response1 = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
response1.ShouldStartWith("PONG\r\n");
// Now publish foo.bar again -- should NOT match "foo" subscription
await sock.SendAsync(Encoding.ASCII.GetBytes("PUB foo.bar 5\r\nhello\r\nPING\r\n"));
var response2 = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
response2.ShouldStartWith("PONG\r\n");
response2.ShouldNotContain("MSG");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: Authorization failures
// ---------------------------------------------------------------------------
// Go: auth failure -- bad token
[Fact]
public async Task Auth_failure_wrong_token_closes_connection()
{
var (server, port, cts) = await StartServerAsync(new NatsOptions
{
Authorization = "correct_token",
});
try
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, port);
await SocketTestHelper.ReadUntilAsync(sock, "\r\n"); // INFO
await sock.SendAsync(Encoding.ASCII.GetBytes(
"CONNECT {\"auth_token\":\"wrong_token\"}\r\n"));
var response = await ReadAllAvailableAsync(sock, 3000);
response.ShouldContain("-ERR 'Authorization Violation'");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: auth failure -- wrong user/pass
[Fact]
public async Task Auth_failure_wrong_password_closes_connection()
{
var (server, port, cts) = await StartServerAsync(new NatsOptions
{
Users = [new User { Username = "admin", Password = "secret" }],
});
try
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, port);
await SocketTestHelper.ReadUntilAsync(sock, "\r\n"); // INFO
await sock.SendAsync(Encoding.ASCII.GetBytes(
"CONNECT {\"user\":\"admin\",\"pass\":\"wrongpass\"}\r\n"));
var response = await ReadAllAvailableAsync(sock, 3000);
response.ShouldContain("-ERR 'Authorization Violation'");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: auth success -- correct credentials
[Fact]
public async Task Auth_success_with_correct_user_pass()
{
var (server, port, cts) = await StartServerAsync(new NatsOptions
{
Users = [new User { Username = "admin", Password = "secret" }],
});
try
{
using var sock = await ConnectAndHandshakeAsync(port,
"{\"user\":\"admin\",\"pass\":\"secret\"}");
await sock.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
response.ShouldContain("PONG\r\n");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: TestAuthorizationTimeout server/client_test.go:1260
[Fact]
public async Task Auth_timeout_closes_connection()
{
var (server, port, cts) = await StartServerAsync(new NatsOptions
{
Authorization = "my_token",
AuthTimeout = TimeSpan.FromMilliseconds(500),
});
try
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, port);
await SocketTestHelper.ReadUntilAsync(sock, "\r\n"); // INFO
// Do NOT send CONNECT
var response = await SocketTestHelper.ReadUntilAsync(sock, "Authentication Timeout", timeoutMs: 5000);
response.ShouldContain("-ERR 'Authentication Timeout'");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: Permission violations
// ---------------------------------------------------------------------------
// Go: TestQueueSubscribePermissions server/client_test.go:899
[Fact]
public async Task Permission_violation_on_sub_denied_subject()
{
var (server, port, cts) = await StartServerAsync(new NatsOptions
{
Users =
[
new User
{
Username = "limited",
Password = "pass",
Permissions = new Permissions
{
Subscribe = new SubjectPermission { Allow = ["allowed.>"] },
},
},
],
});
try
{
using var sock = await ConnectAndPingAsync(port,
"{\"user\":\"limited\",\"pass\":\"pass\",\"verbose\":false}");
// Subscribe to a denied subject
await sock.SendAsync(Encoding.ASCII.GetBytes("SUB denied.topic 1\r\nPING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
response.ShouldContain("-ERR 'Permissions Violation for Subscription'");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: publish permission violation
[Fact]
public async Task Permission_violation_on_pub_denied_subject()
{
var (server, port, cts) = await StartServerAsync(new NatsOptions
{
Users =
[
new User
{
Username = "limited",
Password = "pass",
Permissions = new Permissions
{
Publish = new SubjectPermission { Allow = ["allowed.>"] },
},
},
],
});
try
{
using var sock = await ConnectAndPingAsync(port,
"{\"user\":\"limited\",\"pass\":\"pass\",\"verbose\":false}");
await sock.SendAsync(Encoding.ASCII.GetBytes("PUB denied.topic 5\r\nhello\r\nPING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
response.ShouldContain("-ERR 'Permissions Violation for Publish'");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: publish permission -- allowed subject succeeds
[Fact]
public async Task Permission_allowed_publish_succeeds()
{
var (server, port, cts) = await StartServerAsync(new NatsOptions
{
Users =
[
new User
{
Username = "limited",
Password = "pass",
Permissions = new Permissions
{
Publish = new SubjectPermission { Allow = ["allowed.>"] },
Subscribe = new SubjectPermission { Allow = ["allowed.>"] },
},
},
],
});
try
{
using var sock = await ConnectAndPingAsync(port,
"{\"user\":\"limited\",\"pass\":\"pass\",\"verbose\":false}");
await sock.SendAsync(Encoding.ASCII.GetBytes(
"SUB allowed.topic 1\r\nPUB allowed.topic 5\r\nhello\r\nPING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
response.ShouldContain("MSG allowed.topic 1 5\r\nhello\r\n");
response.ShouldNotContain("-ERR");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: deny list for publish
[Fact]
public async Task Permission_deny_list_overrides_allow()
{
var (server, port, cts) = await StartServerAsync(new NatsOptions
{
Users =
[
new User
{
Username = "user1",
Password = "pass",
Permissions = new Permissions
{
Publish = new SubjectPermission
{
Allow = [">"],
Deny = ["secret.>"],
},
},
},
],
});
try
{
using var sock = await ConnectAndPingAsync(port,
"{\"user\":\"user1\",\"pass\":\"pass\",\"verbose\":false}");
// Allowed
await sock.SendAsync(Encoding.ASCII.GetBytes("PUB public.topic 5\r\nhello\r\nPING\r\n"));
var r1 = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
r1.ShouldNotContain("-ERR");
// Denied
await sock.SendAsync(Encoding.ASCII.GetBytes("PUB secret.data 5\r\nhello\r\nPING\r\n"));
var r2 = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
r2.ShouldContain("-ERR 'Permissions Violation for Publish'");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: No Responders (503 HMSG)
// ---------------------------------------------------------------------------
// Go: TestClientNoResponderSupport server/client_test.go:230
[Fact]
public async Task No_responders_requires_headers_flag()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, port);
await SocketTestHelper.ReadUntilAsync(sock, "\r\n"); // INFO
// no_responders without headers should fail
await sock.SendAsync(Encoding.ASCII.GetBytes(
"CONNECT {\"no_responders\":true}\r\n"));
var response = await ReadAllAvailableAsync(sock, 3000);
response.ShouldContain("-ERR");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: TestClientNoResponderSupport server/client_test.go:230
[Fact]
public async Task No_responders_with_headers_sends_503()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = await ConnectAndPingAsync(port,
"{\"headers\":true,\"no_responders\":true}");
// Subscribe on the reply inbox
await sock.SendAsync(Encoding.ASCII.GetBytes("SUB reply.inbox 1\r\nPING\r\n"));
await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
// Publish to a subject with no subscribers, with a reply subject
await sock.SendAsync(Encoding.ASCII.GetBytes("PUB no.listeners reply.inbox 0\r\n\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sock, "NATS/1.0 503", timeoutMs: 5000);
response.ShouldContain("HMSG reply.inbox");
response.ShouldContain("NATS/1.0 503");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: Header support
// ---------------------------------------------------------------------------
// Go: TestServerHeaderSupport server/client_test.go:259
[Fact]
public async Task Server_info_has_headers_true()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, port);
var info = await SocketTestHelper.ReadUntilAsync(sock, "\r\n");
info.ShouldContain("\"headers\":true");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: TestServerHeaderSupport server/client_test.go:259
// The .NET server currently always advertises headers:true (NoHeaderSupport
// not fully wired to ServerInfo yet). Verify the default behavior.
[Fact]
public async Task Server_info_headers_defaults_to_true()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, port);
var info = await SocketTestHelper.ReadUntilAsync(sock, "\r\n");
info.ShouldContain("\"headers\":true");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: TestClientHeaderDeliverMsg server/client_test.go:330
[Fact]
public async Task Hpub_delivers_hmsg_to_subscriber()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sub = await ConnectAndPingAsync(port, "{\"headers\":true}");
using var pub = await ConnectAndPingAsync(port, "{\"headers\":true}");
await sub.SendAsync(Encoding.ASCII.GetBytes("SUB foo 1\r\nPING\r\n"));
await SocketTestHelper.ReadUntilAsync(sub, "PONG\r\n");
// HPUB foo 12 14\r\nName:Derek\r\nOK
await pub.SendAsync(Encoding.ASCII.GetBytes("HPUB foo 12 14\r\nName:Derek\r\nOK\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sub, "OK\r\n", timeoutMs: 5000);
response.ShouldContain("HMSG foo 1 12 14\r\n");
response.ShouldContain("Name:Derek");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: Max subscriptions per connection
// ---------------------------------------------------------------------------
// Go: MaxSubs enforcement
[Fact]
public async Task Max_subs_enforced_closes_connection()
{
const int maxSubs = 5;
var (server, port, cts) = await StartServerAsync(new NatsOptions { MaxSubs = maxSubs });
try
{
using var sock = await ConnectAndPingAsync(port);
var sb = new StringBuilder();
for (int i = 1; i <= maxSubs; i++)
sb.Append($"SUB foo.{i} {i}\r\n");
// One over the limit
sb.Append($"SUB foo.overflow {maxSubs + 1}\r\n");
await sock.SendAsync(Encoding.ASCII.GetBytes(sb.ToString()));
var response = await ReadAllAvailableAsync(sock, 3000);
response.ShouldContain("-ERR 'Maximum Subscriptions Exceeded'");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: MaxSubs -- exactly at limit is fine
[Fact]
public async Task Max_subs_exactly_at_limit_succeeds()
{
const int maxSubs = 3;
var (server, port, cts) = await StartServerAsync(new NatsOptions { MaxSubs = maxSubs });
try
{
using var sock = await ConnectAndPingAsync(port);
var sb = new StringBuilder();
for (int i = 1; i <= maxSubs; i++)
sb.Append($"SUB foo.{i} {i}\r\n");
sb.Append("PING\r\n");
await sock.SendAsync(Encoding.ASCII.GetBytes(sb.ToString()));
var response = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
response.ShouldNotContain("-ERR");
response.ShouldContain("PONG\r\n");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: Connection info (client ID in INFO)
// ---------------------------------------------------------------------------
// Go: TestClientCreateAndInfo -- server_id is unique
[Fact]
public async Task Info_contains_server_id()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, port);
var info = await SocketTestHelper.ReadUntilAsync(sock, "\r\n");
var jsonStr = info[(info.IndexOf('{'))..(info.LastIndexOf('}') + 1)];
var serverInfo = JsonSerializer.Deserialize<ServerInfo>(jsonStr);
serverInfo!.ServerId.ShouldNotBeNullOrEmpty();
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: server tracks client count
[Fact]
public async Task Client_count_increments_on_connect()
{
var (server, port, cts) = await StartServerAsync();
try
{
server.ClientCount.ShouldBe(0);
using var sock1 = await ConnectAndPingAsync(port);
server.ClientCount.ShouldBe(1);
using var sock2 = await ConnectAndPingAsync(port);
server.ClientCount.ShouldBe(2);
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: Client disconnect removes subscriptions
// ---------------------------------------------------------------------------
// Go: TestClientRemoveSubsOnDisconnect server/client_test.go:1227
[Fact]
public async Task Disconnect_removes_subscriptions_from_sublist()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = await ConnectAndPingAsync(port);
await sock.SendAsync(Encoding.ASCII.GetBytes(
"SUB foo 1\r\nSUB bar 2\r\nSUB baz 3\r\nPING\r\n"));
await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
server.SubList.Count.ShouldBe(3u);
sock.Shutdown(SocketShutdown.Both);
sock.Close();
await Task.Delay(500);
server.SubList.Count.ShouldBe(0u);
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: TestClientMapRemoval server/client_test.go:1253
[Fact]
public async Task Disconnect_removes_client_from_server_map()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = await ConnectAndPingAsync(port);
server.ClientCount.ShouldBe(1);
sock.Shutdown(SocketShutdown.Both);
sock.Close();
await Task.Delay(500);
server.ClientCount.ShouldBe(0);
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: Close connection very early
// ---------------------------------------------------------------------------
// Go: TestCloseConnectionVeryEarly server/client_test.go:2448
[Fact]
public async Task Close_connection_immediately_after_connect()
{
var (server, port, cts) = await StartServerAsync();
try
{
// Open and immediately close
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, port);
sock.Close();
await Task.Delay(500);
server.ClientCount.ShouldBe(0);
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: Multiple connections
// ---------------------------------------------------------------------------
[Fact]
public async Task Server_tracks_multiple_clients()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var c1 = await ConnectAndPingAsync(port);
using var c2 = await ConnectAndPingAsync(port);
using var c3 = await ConnectAndPingAsync(port);
server.ClientCount.ShouldBe(3);
c1.Shutdown(SocketShutdown.Both);
c1.Close();
await Task.Delay(300);
server.ClientCount.ShouldBe(2);
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: Pub with reply
// ---------------------------------------------------------------------------
// Go: TestClientSimplePubSubWithReply server/client_test.go:712
[Fact]
public async Task Pub_with_reply_delivered_in_msg()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = await ConnectAndPingAsync(port);
await sock.SendAsync(Encoding.ASCII.GetBytes(
"SUB foo 1\r\nPUB foo reply.to 5\r\nhello\r\nPING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
response.ShouldContain("MSG foo 1 reply.to 5\r\nhello\r\n");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: TestClientNoBodyPubSubWithReply server/client_test.go:740
[Fact]
public async Task Empty_payload_with_reply_subject()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = await ConnectAndPingAsync(port);
await sock.SendAsync(Encoding.ASCII.GetBytes(
"SUB foo 1\r\nPUB foo reply.to 0\r\n\r\nPING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
response.ShouldContain("MSG foo 1 reply.to 0\r\n");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: Unsub and auto-unsub
// ---------------------------------------------------------------------------
// Go: TestClientUnSub server/client_test.go:1110
[Fact]
public async Task Unsub_removes_subscription_only_matching_sid()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var pub = await ConnectAndPingAsync(port);
using var sub = await ConnectAndPingAsync(port);
await sub.SendAsync(Encoding.ASCII.GetBytes(
"SUB foo 1\r\nSUB foo 2\r\nUNSUB 1\r\nPING\r\n"));
await SocketTestHelper.ReadUntilAsync(sub, "PONG\r\n");
await pub.SendAsync(Encoding.ASCII.GetBytes("PUB foo 5\r\nhello\r\nPING\r\n"));
await SocketTestHelper.ReadUntilAsync(pub, "PONG\r\n");
await sub.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sub, "PONG\r\n");
response.ShouldContain("MSG foo 2 5");
response.ShouldNotContain("MSG foo 1 5");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: TestClientUnSubMax server/client_test.go:1145
[Fact]
public async Task Auto_unsub_max_delivers_exact_count()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var pub = await ConnectAndPingAsync(port);
using var sub = await ConnectAndPingAsync(port);
await sub.SendAsync(Encoding.ASCII.GetBytes(
"SUB foo 1\r\nUNSUB 1 5\r\nPING\r\n"));
await SocketTestHelper.ReadUntilAsync(sub, "PONG\r\n");
// Publish 10 messages
var sb = new StringBuilder();
for (int i = 0; i < 10; i++)
sb.Append("PUB foo 1\r\nx\r\n");
sb.Append("PING\r\n");
await pub.SendAsync(Encoding.ASCII.GetBytes(sb.ToString()));
await SocketTestHelper.ReadUntilAsync(pub, "PONG\r\n");
// Collect messages on subscriber
await sub.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
var response = await ReadAllAvailableAsync(sub, 2000);
CountOccurrences(response, "MSG foo 1").ShouldBe(5);
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// Go: TestClientUnsubAfterAutoUnsub server/client_test.go:1205
[Fact]
public async Task Explicit_unsub_after_auto_unsub_removes_immediately()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var pub = await ConnectAndPingAsync(port);
using var sub = await ConnectAndPingAsync(port);
await sub.SendAsync(Encoding.ASCII.GetBytes(
"SUB foo 1\r\nUNSUB 1 100\r\nUNSUB 1\r\nPING\r\n"));
await SocketTestHelper.ReadUntilAsync(sub, "PONG\r\n");
await pub.SendAsync(Encoding.ASCII.GetBytes("PUB foo 5\r\nhello\r\nPING\r\n"));
await SocketTestHelper.ReadUntilAsync(pub, "PONG\r\n");
await sub.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
var response = await ReadAllAvailableAsync(sub, 1000);
response.ShouldNotContain("MSG foo");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: Queue sub distribution
// ---------------------------------------------------------------------------
// Go: TestClientPubWithQueueSub server/client_test.go:768
[Fact]
public async Task Queue_sub_distributes_messages_across_sids()
{
const int count = 100;
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = await ConnectAndPingAsync(port);
await sock.SendAsync(Encoding.ASCII.GetBytes(
"SUB foo g1 1\r\nSUB foo g1 2\r\nPING\r\n"));
await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
var sb = new StringBuilder();
for (int i = 0; i < count; i++)
sb.Append("PUB foo 5\r\nhello\r\n");
sb.Append("PING\r\n");
await sock.SendAsync(Encoding.ASCII.GetBytes(sb.ToString()));
var response = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
var n1 = CountOccurrences(response, "MSG foo 1 5");
var n2 = CountOccurrences(response, "MSG foo 2 5");
(n1 + n2).ShouldBe(count);
n1.ShouldBeGreaterThanOrEqualTo(20);
n2.ShouldBeGreaterThanOrEqualTo(20);
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: Stats tracking
// ---------------------------------------------------------------------------
[Fact]
public async Task Server_stats_track_in_msgs()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = await ConnectAndPingAsync(port);
await sock.SendAsync(Encoding.ASCII.GetBytes(
"PUB foo 5\r\nhello\r\nPUB foo 5\r\nhello\r\nPUB foo 5\r\nhello\r\nPING\r\n"));
await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
Interlocked.Read(ref server.Stats.InMsgs).ShouldBeGreaterThanOrEqualTo(3);
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
[Fact]
public async Task Server_stats_track_in_bytes()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = await ConnectAndPingAsync(port);
await sock.SendAsync(Encoding.ASCII.GetBytes(
"PUB foo 10\r\n0123456789\r\nPING\r\n"));
await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
Interlocked.Read(ref server.Stats.InBytes).ShouldBeGreaterThanOrEqualTo(10);
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
[Fact]
public async Task Server_stats_track_out_msgs()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = await ConnectAndPingAsync(port);
await sock.SendAsync(Encoding.ASCII.GetBytes(
"SUB foo 1\r\nPUB foo 5\r\nhello\r\nPING\r\n"));
await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
Interlocked.Read(ref server.Stats.OutMsgs).ShouldBeGreaterThanOrEqualTo(1);
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: Slow consumer detection
// ---------------------------------------------------------------------------
// Go: TestNoClientLeakOnSlowConsumer server/client_test.go:2181
[Fact]
public async Task Slow_consumer_closes_connection()
{
const long maxPendingBytes = 1024;
var (server, port, cts) = await StartServerAsync(new NatsOptions { MaxPending = maxPendingBytes });
try
{
using var slowSub = await ConnectAndPingAsync(port, "{\"verbose\":false}");
await slowSub.SendAsync(Encoding.ASCII.GetBytes("SUB flood 1\r\nPING\r\n"));
await SocketTestHelper.ReadUntilAsync(slowSub, "PONG\r\n");
using var pub = await ConnectAndPingAsync(port, "{\"verbose\":false}");
// Flood
var payload = new string('X', 512);
var sb = new StringBuilder();
for (int i = 0; i < 50; i++)
sb.Append($"PUB flood {payload.Length}\r\n{payload}\r\n");
sb.Append("PING\r\n");
await pub.SendAsync(Encoding.ASCII.GetBytes(sb.ToString()));
await SocketTestHelper.ReadUntilAsync(pub, "PONG\r\n");
await Task.Delay(500);
Interlocked.Read(ref server.Stats.SlowConsumers).ShouldBeGreaterThan(0);
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: Verbose mode on various operations
// ---------------------------------------------------------------------------
// Go: verbose mode -- PING gets +OK and PONG
[Fact]
public async Task Verbose_mode_ping_returns_ok_and_pong()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = await ConnectAndHandshakeAsync(port, "{\"verbose\":true}");
await SocketTestHelper.ReadUntilAsync(sock, "+OK\r\n"); // drain CONNECT +OK
await sock.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
var response = await ReadAllAvailableAsync(sock, 2000);
// Should get PONG and +OK for the PING
response.ShouldContain("PONG\r\n");
response.ShouldContain("+OK\r\n");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: Cross-client message delivery
// ---------------------------------------------------------------------------
[Fact]
public async Task Message_delivered_across_two_clients()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sub = await ConnectAndPingAsync(port);
using var pub = await ConnectAndPingAsync(port);
await sub.SendAsync(Encoding.ASCII.GetBytes("SUB foo 1\r\nPING\r\n"));
await SocketTestHelper.ReadUntilAsync(sub, "PONG\r\n");
await pub.SendAsync(Encoding.ASCII.GetBytes("PUB foo 5\r\nhello\r\nPING\r\n"));
await SocketTestHelper.ReadUntilAsync(pub, "PONG\r\n");
await sub.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sub, "PONG\r\n");
response.ShouldContain("MSG foo 1 5\r\nhello\r\n");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
[Fact]
public async Task Wildcard_sub_receives_matching_messages()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sub = await ConnectAndPingAsync(port);
using var pub = await ConnectAndPingAsync(port);
await sub.SendAsync(Encoding.ASCII.GetBytes("SUB foo.* 1\r\nPING\r\n"));
await SocketTestHelper.ReadUntilAsync(sub, "PONG\r\n");
await pub.SendAsync(Encoding.ASCII.GetBytes(
"PUB foo.bar 5\r\nhello\r\nPUB foo.baz 5\r\nworld\r\nPING\r\n"));
await SocketTestHelper.ReadUntilAsync(pub, "PONG\r\n");
await sub.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sub, "PONG\r\n");
response.ShouldContain("MSG foo.bar 1 5\r\nhello\r\n");
response.ShouldContain("MSG foo.baz 1 5\r\nworld\r\n");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
[Fact]
public async Task Gt_wildcard_sub_receives_multi_token_messages()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sub = await ConnectAndPingAsync(port);
using var pub = await ConnectAndPingAsync(port);
await sub.SendAsync(Encoding.ASCII.GetBytes("SUB foo.> 1\r\nPING\r\n"));
await SocketTestHelper.ReadUntilAsync(sub, "PONG\r\n");
await pub.SendAsync(Encoding.ASCII.GetBytes(
"PUB foo.bar.baz 5\r\nhello\r\nPING\r\n"));
await SocketTestHelper.ReadUntilAsync(pub, "PONG\r\n");
await sub.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sub, "PONG\r\n");
response.ShouldContain("MSG foo.bar.baz 1 5\r\nhello\r\n");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: Auto-unsub exact message count
// ---------------------------------------------------------------------------
// Go: TestClientAutoUnsubExactReceived server/client_test.go:1183
[Fact]
public async Task Auto_unsub_with_max_1_delivers_exactly_one()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var pub = await ConnectAndPingAsync(port);
using var sub = await ConnectAndPingAsync(port);
await sub.SendAsync(Encoding.ASCII.GetBytes(
"SUB foo 1\r\nUNSUB 1 1\r\nPING\r\n"));
await SocketTestHelper.ReadUntilAsync(sub, "PONG\r\n");
var sb = new StringBuilder();
for (int i = 0; i < 5; i++)
sb.Append("PUB foo 2\r\nok\r\n");
sb.Append("PING\r\n");
await pub.SendAsync(Encoding.ASCII.GetBytes(sb.ToString()));
await SocketTestHelper.ReadUntilAsync(pub, "PONG\r\n");
await sub.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
var response = await ReadAllAvailableAsync(sub, 2000);
CountOccurrences(response, "MSG foo 1").ShouldBe(1);
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: CONNECT with no_responders:true but without headers:true should error
// ---------------------------------------------------------------------------
// Go: TestClientNoResponderSupport server/client_test.go:230
[Fact]
public async Task No_responders_without_headers_is_rejected()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, port);
await SocketTestHelper.ReadUntilAsync(sock, "\r\n"); // INFO
await sock.SendAsync(Encoding.ASCII.GetBytes(
"CONNECT {\"no_responders\":true,\"headers\":false}\r\n"));
var response = await ReadAllAvailableAsync(sock, 3000);
response.ShouldContain("-ERR");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: HPUB without headers in CONNECT should fail
// ---------------------------------------------------------------------------
// Go: TestClientHeaderSupport server/client_test.go:295
// Verify that HPUB with headers:true in CONNECT works correctly
[Fact]
public async Task Hpub_with_headers_connect_succeeds()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var pub = await ConnectAndPingAsync(port, "{\"headers\":true}");
using var sub = await ConnectAndPingAsync(port, "{\"headers\":true}");
await sub.SendAsync(Encoding.ASCII.GetBytes("SUB foo 1\r\nPING\r\n"));
await SocketTestHelper.ReadUntilAsync(sub, "PONG\r\n");
// HPUB with valid header block
await pub.SendAsync(Encoding.ASCII.GetBytes(
"HPUB foo 12 14\r\nName:Derek\r\nOK\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sub, "OK\r\n", timeoutMs: 5000);
response.ShouldContain("HMSG foo 1 12 14\r\n");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: Empty message body
// ---------------------------------------------------------------------------
[Fact]
public async Task Zero_byte_payload_delivered_correctly()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = await ConnectAndPingAsync(port);
await sock.SendAsync(Encoding.ASCII.GetBytes(
"SUB foo 1\r\nPUB foo 0\r\n\r\nPING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
response.ShouldContain("MSG foo 1 0\r\n");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: Maximum connections limit
// ---------------------------------------------------------------------------
[Fact]
public async Task Max_connections_enforced()
{
var (server, port, cts) = await StartServerAsync(new NatsOptions { MaxConnections = 2 });
try
{
using var c1 = await ConnectAndPingAsync(port);
using var c2 = await ConnectAndPingAsync(port);
// Third connection should be rejected
using var c3 = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await c3.ConnectAsync(IPAddress.Loopback, port);
var response = await ReadAllAvailableAsync(c3, 3000);
// The server should send an error about maximum connections
response.ShouldContain("maximum connections exceeded");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: Unsubscribe race (concurrent pub + unsub)
// ---------------------------------------------------------------------------
// Go: TestUnsubRace server/client_test.go:1306
[Fact]
public async Task Unsub_race_does_not_crash()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sub = await ConnectAndPingAsync(port);
using var pub = await ConnectAndPingAsync(port);
await sub.SendAsync(Encoding.ASCII.GetBytes("SUB foo 1\r\nPING\r\n"));
await SocketTestHelper.ReadUntilAsync(sub, "PONG\r\n");
// Start publishing concurrently
var pubTask = Task.Run(async () =>
{
var sb = new StringBuilder();
for (int i = 0; i < 1000; i++)
sb.Append("PUB foo 5\r\nhello\r\n");
sb.Append("PING\r\n");
await pub.SendAsync(Encoding.ASCII.GetBytes(sb.ToString()));
});
await Task.Delay(5);
// Unsubscribe while messages are flowing
await sub.SendAsync(Encoding.ASCII.GetBytes("UNSUB 1\r\nPING\r\n"));
await pubTask;
// As long as we don't crash, the test passes.
// Drain remaining data
await ReadAllAvailableAsync(sub, 2000);
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: Verbose mode full lifecycle
// ---------------------------------------------------------------------------
[Fact]
public async Task Verbose_mode_full_lifecycle_returns_ok_for_each_operation()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = await ConnectAndHandshakeAsync(port, "{\"verbose\":true}");
// Drain +OK from CONNECT
await SocketTestHelper.ReadUntilAsync(sock, "+OK\r\n");
// SUB -> +OK, PUB -> +OK, UNSUB -> +OK, PING -> PONG + +OK
await sock.SendAsync(Encoding.ASCII.GetBytes(
"SUB foo 1\r\nPUB foo 5\r\nhello\r\nUNSUB 1\r\nPING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
// At least 3 +OK (SUB, PUB, UNSUB) plus the one for PING
CountOccurrences(response, "+OK\r\n").ShouldBeGreaterThanOrEqualTo(3);
response.ShouldContain("PONG\r\n");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: Multiple subscribers on same subject
// ---------------------------------------------------------------------------
[Fact]
public async Task Multiple_subs_on_same_subject_all_receive()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sub1 = await ConnectAndPingAsync(port);
using var sub2 = await ConnectAndPingAsync(port);
using var pub = await ConnectAndPingAsync(port);
await sub1.SendAsync(Encoding.ASCII.GetBytes("SUB foo 1\r\nPING\r\n"));
await SocketTestHelper.ReadUntilAsync(sub1, "PONG\r\n");
await sub2.SendAsync(Encoding.ASCII.GetBytes("SUB foo 1\r\nPING\r\n"));
await SocketTestHelper.ReadUntilAsync(sub2, "PONG\r\n");
await pub.SendAsync(Encoding.ASCII.GetBytes("PUB foo 5\r\nhello\r\nPING\r\n"));
await SocketTestHelper.ReadUntilAsync(pub, "PONG\r\n");
await sub1.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
var r1 = await SocketTestHelper.ReadUntilAsync(sub1, "PONG\r\n");
r1.ShouldContain("MSG foo 1 5\r\nhello\r\n");
await sub2.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
var r2 = await SocketTestHelper.ReadUntilAsync(sub2, "PONG\r\n");
r2.ShouldContain("MSG foo 1 5\r\nhello\r\n");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: Server info has server_id and version
// ---------------------------------------------------------------------------
[Fact]
public async Task Info_has_server_id_and_version()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, port);
var info = await SocketTestHelper.ReadUntilAsync(sock, "\r\n");
var jsonStr = info[(info.IndexOf('{'))..(info.LastIndexOf('}') + 1)];
var si = JsonSerializer.Deserialize<ServerInfo>(jsonStr);
si!.ServerId.ShouldNotBeNullOrEmpty();
si.Version.ShouldNotBeNullOrEmpty();
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: Server info proto version
// ---------------------------------------------------------------------------
[Fact]
public async Task Info_has_proto_version()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, port);
var info = await SocketTestHelper.ReadUntilAsync(sock, "\r\n");
info.ShouldContain("\"proto\":");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: INFO contains host field
// ---------------------------------------------------------------------------
[Fact]
public async Task Info_contains_host_field()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, port);
var info = await SocketTestHelper.ReadUntilAsync(sock, "\r\n");
// host field should be present
info.ShouldContain("\"host\":");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ---------------------------------------------------------------------------
// Test: CONNECT with all fields
// ---------------------------------------------------------------------------
[Fact]
public async Task Connect_with_all_optional_fields_accepted()
{
var (server, port, cts) = await StartServerAsync();
try
{
var connect = """
CONNECT {"verbose":false,"pedantic":false,"echo":true,"name":"test","lang":"csharp","version":"1.0","protocol":1,"headers":true,"no_responders":true}
""".Trim();
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, port);
await SocketTestHelper.ReadUntilAsync(sock, "\r\n"); // INFO
await sock.SendAsync(Encoding.ASCII.GetBytes(connect + "\r\nPING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sock, "PONG\r\n");
response.ShouldContain("PONG\r\n");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
}