Files
natsdotnet/tests/NATS.Server.Tests/ClientUnsubTests.cs
Joseph Doherty 7ffee8741f feat: phase A foundation test parity — 64 new tests across 11 subsystems
Port Go NATS server test behaviors to .NET:
- Client pub/sub (5 tests): simple, no-echo, reply, queue distribution, empty body
- Client UNSUB (4 tests): unsub, auto-unsub max, unsub after auto, disconnect cleanup
- Client headers (3 tests): HPUB/HMSG, server info headers, no-responders 503
- Client lifecycle (3 tests): connect proto, max subscriptions, auth timeout
- Client slow consumer (1 test): pending limit detection and disconnect
- Parser edge cases (3 tests + 2 bug fixes): PUB arg variations, malformed protocol, max control line
- SubList concurrency (13 tests): race on remove/insert/match, large lists, invalid subjects, wildcards
- Server config (4 tests): ephemeral port, server name, name defaults, lame duck
- Route config (3 tests): cluster formation, cross-cluster messaging, reconnect
- Gateway basic (2 tests): cross-cluster forwarding, no echo to origin
- Leaf node basic (2 tests): hub-to-spoke and spoke-to-hub forwarding
- Account import/export (2 tests): stream export/import delivery, isolation

Also fixes NatsParser.ParseSub/ParseUnsub to throw ProtocolViolationException
for short command lines instead of ArgumentOutOfRangeException.

Full suite: 933 passed, 0 failed (up from 869).
2026-02-23 19:26:30 -05:00

225 lines
7.9 KiB
C#

// Reference: golang/nats-server/server/client_test.go
// Functions: TestClientUnSub, TestClientUnSubMax, TestClientAutoUnsubExactReceived,
// TestClientUnsubAfterAutoUnsub, TestClientRemoveSubsOnDisconnect
using System.Net;
using System.Net.Sockets;
using System.Text;
using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server;
namespace NATS.Server.Tests;
public class ClientUnsubTests : IAsyncLifetime
{
private readonly NatsServer _server;
private readonly int _port;
private readonly CancellationTokenSource _cts = new();
public ClientUnsubTests()
{
_port = GetFreePort();
_server = new NatsServer(new NatsOptions { Port = _port }, NullLoggerFactory.Instance);
}
public async Task InitializeAsync()
{
_ = _server.StartAsync(_cts.Token);
await _server.WaitForReadyAsync();
}
public async Task DisposeAsync()
{
await _cts.CancelAsync();
_server.Dispose();
}
private static int GetFreePort()
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
sock.Bind(new IPEndPoint(IPAddress.Loopback, 0));
return ((IPEndPoint)sock.LocalEndPoint!).Port;
}
private async Task<Socket> ConnectAndHandshakeAsync()
{
var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, _port);
// Drain INFO
var buf = new byte[4096];
await sock.ReceiveAsync(buf, SocketFlags.None);
// Send CONNECT
await sock.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\n"));
return sock;
}
private static async Task<string> ReadUntilAsync(Socket sock, string expected, int timeoutMs = 5000)
{
using var cts = new CancellationTokenSource(timeoutMs);
var sb = new StringBuilder();
var buf = new byte[4096];
while (!sb.ToString().Contains(expected))
{
var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token);
if (n == 0) break;
sb.Append(Encoding.ASCII.GetString(buf, 0, n));
}
return sb.ToString();
}
/// <summary>
/// Mirrors TestClientUnSub: subscribe twice, unsubscribe one sid, publish,
/// verify only the remaining sid gets the MSG.
/// Reference: golang/nats-server/server/client_test.go TestClientUnSub
/// </summary>
[Fact]
public async Task Unsub_removes_subscription()
{
using var pub = await ConnectAndHandshakeAsync();
using var sub = await ConnectAndHandshakeAsync();
// Subscribe to "foo" with sid 1 and sid 2
await sub.SendAsync(Encoding.ASCII.GetBytes("SUB foo 1\r\nSUB foo 2\r\nPING\r\n"));
await ReadUntilAsync(sub, "PONG");
// Unsubscribe sid 1
await sub.SendAsync(Encoding.ASCII.GetBytes("UNSUB 1\r\nPING\r\n"));
await ReadUntilAsync(sub, "PONG");
// Publish one message to "foo"
await pub.SendAsync(Encoding.ASCII.GetBytes("PUB foo 5\r\nHello\r\n"));
// Should receive exactly one MSG for sid 2; sid 1 is gone
var response = await ReadUntilAsync(sub, "MSG foo 2 5");
response.ShouldContain("MSG foo 2 5");
response.ShouldNotContain("MSG foo 1 5");
}
/// <summary>
/// Mirrors TestClientUnSubMax: UNSUB with a max-messages limit auto-removes
/// the subscription after exactly N deliveries.
/// Reference: golang/nats-server/server/client_test.go TestClientUnSubMax
/// </summary>
[Fact]
public async Task Unsub_max_auto_removes_after_n_messages()
{
const int maxMessages = 5;
const int totalPublishes = 10;
using var pub = await ConnectAndHandshakeAsync();
using var sub = await ConnectAndHandshakeAsync();
// Subscribe to "foo" with sid 1, limit to 5 messages
await sub.SendAsync(Encoding.ASCII.GetBytes($"SUB foo 1\r\nUNSUB 1 {maxMessages}\r\nPING\r\n"));
await ReadUntilAsync(sub, "PONG");
// Publish 10 messages
var pubData = new StringBuilder();
for (int i = 0; i < totalPublishes; i++)
pubData.Append("PUB foo 1\r\nx\r\n");
await pub.SendAsync(Encoding.ASCII.GetBytes(pubData.ToString()));
// Collect received messages within a short timeout, stopping when no more arrive
var received = new StringBuilder();
try
{
using var timeout = new CancellationTokenSource(2000);
var buf = new byte[4096];
while (true)
{
var n = await sub.ReceiveAsync(buf, SocketFlags.None, timeout.Token);
if (n == 0) break;
received.Append(Encoding.ASCII.GetString(buf, 0, n));
}
}
catch (OperationCanceledException)
{
// Expected — timeout means no more messages
}
// Count MSG occurrences
var text = received.ToString();
var msgCount = CountOccurrences(text, "MSG foo 1");
msgCount.ShouldBe(maxMessages);
}
/// <summary>
/// Mirrors TestClientUnsubAfterAutoUnsub: after setting a max-messages limit,
/// an explicit UNSUB removes the subscription immediately and no messages arrive.
/// Reference: golang/nats-server/server/client_test.go TestClientUnsubAfterAutoUnsub
/// </summary>
[Fact]
public async Task Unsub_after_auto_unsub_removes_immediately()
{
using var pub = await ConnectAndHandshakeAsync();
using var sub = await ConnectAndHandshakeAsync();
// Subscribe with a large max-messages limit, then immediately UNSUB without limit
await sub.SendAsync(Encoding.ASCII.GetBytes("SUB foo 1\r\nUNSUB 1 100\r\nUNSUB 1\r\nPING\r\n"));
await ReadUntilAsync(sub, "PONG");
// Publish a message — subscription should already be gone
await pub.SendAsync(Encoding.ASCII.GetBytes("PUB foo 5\r\nHello\r\n"));
// Wait briefly; no MSG should arrive
var received = new StringBuilder();
try
{
using var timeout = new CancellationTokenSource(500);
var buf = new byte[4096];
while (true)
{
var n = await sub.ReceiveAsync(buf, SocketFlags.None, timeout.Token);
if (n == 0) break;
received.Append(Encoding.ASCII.GetString(buf, 0, n));
}
}
catch (OperationCanceledException)
{
// Expected
}
received.ToString().ShouldNotContain("MSG foo");
}
/// <summary>
/// Mirrors TestClientRemoveSubsOnDisconnect: when a client disconnects the server
/// removes all its subscriptions from the global SubList.
/// Reference: golang/nats-server/server/client_test.go TestClientRemoveSubsOnDisconnect
/// </summary>
[Fact]
public async Task Disconnect_removes_all_subscriptions()
{
using var client = await ConnectAndHandshakeAsync();
// Subscribe to 3 distinct subjects
await client.SendAsync(Encoding.ASCII.GetBytes("SUB foo 1\r\nSUB bar 2\r\nSUB baz 3\r\nPING\r\n"));
await ReadUntilAsync(client, "PONG");
// Confirm subscriptions are registered in the server's SubList
_server.SubList.Count.ShouldBe(3u);
// Close the TCP connection abruptly
client.Shutdown(SocketShutdown.Both);
client.Close();
// Give the server a moment to detect the disconnect and clean up
await Task.Delay(500);
// All 3 subscriptions should be removed
_server.SubList.Count.ShouldBe(0u);
}
private static int CountOccurrences(string haystack, string needle)
{
int count = 0;
int index = 0;
while ((index = haystack.IndexOf(needle, index, StringComparison.Ordinal)) >= 0)
{
count++;
index += needle.Length;
}
return count;
}
}