Port Go NATS server test behaviors to .NET: - Client pub/sub (5 tests): simple, no-echo, reply, queue distribution, empty body - Client UNSUB (4 tests): unsub, auto-unsub max, unsub after auto, disconnect cleanup - Client headers (3 tests): HPUB/HMSG, server info headers, no-responders 503 - Client lifecycle (3 tests): connect proto, max subscriptions, auth timeout - Client slow consumer (1 test): pending limit detection and disconnect - Parser edge cases (3 tests + 2 bug fixes): PUB arg variations, malformed protocol, max control line - SubList concurrency (13 tests): race on remove/insert/match, large lists, invalid subjects, wildcards - Server config (4 tests): ephemeral port, server name, name defaults, lame duck - Route config (3 tests): cluster formation, cross-cluster messaging, reconnect - Gateway basic (2 tests): cross-cluster forwarding, no echo to origin - Leaf node basic (2 tests): hub-to-spoke and spoke-to-hub forwarding - Account import/export (2 tests): stream export/import delivery, isolation Also fixes NatsParser.ParseSub/ParseUnsub to throw ProtocolViolationException for short command lines instead of ArgumentOutOfRangeException. Full suite: 933 passed, 0 failed (up from 869).
198 lines
7.7 KiB
C#
198 lines
7.7 KiB
C#
// Reference: golang/nats-server/server/client_test.go — TestClientHeaderDeliverMsg,
|
||
// TestServerHeaderSupport, TestClientHeaderSupport
|
||
|
||
using System.Net;
|
||
using System.Net.Sockets;
|
||
using System.Text;
|
||
using Microsoft.Extensions.Logging.Abstractions;
|
||
using NATS.Server;
|
||
|
||
namespace NATS.Server.Tests;
|
||
|
||
/// <summary>
|
||
/// Tests for HPUB/HMSG header support, mirroring the Go reference tests:
|
||
/// TestClientHeaderDeliverMsg, TestServerHeaderSupport, TestClientHeaderSupport.
|
||
///
|
||
/// Go reference: golang/nats-server/server/client_test.go:259–368
|
||
/// </summary>
|
||
public class ClientHeaderTests : IAsyncLifetime
|
||
{
|
||
private readonly NatsServer _server;
|
||
private readonly int _port;
|
||
private readonly CancellationTokenSource _cts = new();
|
||
|
||
public ClientHeaderTests()
|
||
{
|
||
_port = GetFreePort();
|
||
_server = new NatsServer(new NatsOptions { Port = _port }, NullLoggerFactory.Instance);
|
||
}
|
||
|
||
public async Task InitializeAsync()
|
||
{
|
||
_ = _server.StartAsync(_cts.Token);
|
||
await _server.WaitForReadyAsync();
|
||
}
|
||
|
||
public async Task DisposeAsync()
|
||
{
|
||
await _cts.CancelAsync();
|
||
_server.Dispose();
|
||
}
|
||
|
||
private static int GetFreePort()
|
||
{
|
||
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||
sock.Bind(new IPEndPoint(IPAddress.Loopback, 0));
|
||
return ((IPEndPoint)sock.LocalEndPoint!).Port;
|
||
}
|
||
|
||
/// <summary>
|
||
/// Reads from the socket accumulating data until the accumulated string contains
|
||
/// <paramref name="expected"/>, or the timeout elapses.
|
||
/// </summary>
|
||
private static async Task<string> ReadUntilAsync(Socket sock, string expected, int timeoutMs = 5000)
|
||
{
|
||
using var cts = new CancellationTokenSource(timeoutMs);
|
||
var sb = new StringBuilder();
|
||
var buf = new byte[4096];
|
||
while (!sb.ToString().Contains(expected))
|
||
{
|
||
var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token);
|
||
if (n == 0) break;
|
||
sb.Append(Encoding.ASCII.GetString(buf, 0, n));
|
||
}
|
||
return sb.ToString();
|
||
}
|
||
|
||
/// <summary>
|
||
/// Connect a raw TCP socket, read the INFO line, and send a CONNECT with
|
||
/// headers:true and no_responders:true.
|
||
/// </summary>
|
||
private async Task<Socket> ConnectWithHeadersAsync()
|
||
{
|
||
var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||
await sock.ConnectAsync(IPAddress.Loopback, _port);
|
||
await ReadUntilAsync(sock, "\r\n"); // discard INFO
|
||
await sock.SendAsync(Encoding.ASCII.GetBytes(
|
||
"CONNECT {\"headers\":true,\"no_responders\":true}\r\n"));
|
||
return sock;
|
||
}
|
||
|
||
/// <summary>
|
||
/// Port of TestClientHeaderDeliverMsg (client_test.go:330).
|
||
///
|
||
/// A client that advertises headers:true sends an HPUB message with a custom
|
||
/// header block. A subscriber should receive the message as HMSG with the
|
||
/// header block and payload intact.
|
||
///
|
||
/// HPUB format: HPUB subject hdr_len total_len\r\n{headers}{payload}\r\n
|
||
/// HMSG format: HMSG subject sid hdr_len total_len\r\n{headers}{payload}\r\n
|
||
///
|
||
/// Matches Go reference: HPUB foo 12 14\r\nName:Derek\r\nOK\r\n
|
||
/// hdrLen=12 ("Name:Derek\r\n"), totalLen=14 (headers + "OK")
|
||
/// </summary>
|
||
[Fact]
|
||
public async Task Hpub_delivers_hmsg_with_headers()
|
||
{
|
||
// Use two separate connections: subscriber and publisher.
|
||
// The Go reference uses a single connection for both, but two connections
|
||
// make the test clearer and avoid echo-suppression edge cases.
|
||
using var sub = await ConnectWithHeadersAsync();
|
||
using var pub = await ConnectWithHeadersAsync();
|
||
|
||
// Subscribe on 'foo' with SID 1
|
||
await sub.SendAsync(Encoding.ASCII.GetBytes("SUB foo 1\r\n"));
|
||
// Flush via PING/PONG to ensure the subscription is registered before publishing
|
||
await sub.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
|
||
await ReadUntilAsync(sub, "PONG");
|
||
|
||
// Match Go reference test exactly:
|
||
// Header block: "Name:Derek\r\n" = 12 bytes
|
||
// Payload: "OK" = 2 bytes → total = 14 bytes
|
||
const string headerBlock = "Name:Derek\r\n";
|
||
const string payload = "OK";
|
||
const int hdrLen = 12; // "Name:Derek\r\n"
|
||
const int totalLen = 14; // hdrLen + "OK"
|
||
|
||
var hpub = $"HPUB foo {hdrLen} {totalLen}\r\n{headerBlock}{payload}\r\n";
|
||
await pub.SendAsync(Encoding.ASCII.GetBytes(hpub));
|
||
|
||
// Read the full HMSG on the subscriber socket (control line + header + payload + trailing CRLF)
|
||
// The complete wire message ends with the payload followed by \r\n
|
||
var received = await ReadUntilAsync(sub, payload + "\r\n", timeoutMs: 5000);
|
||
|
||
// Verify HMSG control line: HMSG foo 1 <hdrLen> <totalLen>
|
||
received.ShouldContain($"HMSG foo 1 {hdrLen} {totalLen}\r\n");
|
||
// Verify the header block is delivered verbatim
|
||
received.ShouldContain("Name:Derek");
|
||
// Verify the payload is delivered
|
||
received.ShouldContain(payload);
|
||
}
|
||
|
||
/// <summary>
|
||
/// Port of TestServerHeaderSupport (client_test.go:259).
|
||
///
|
||
/// By default the server advertises "headers":true in the INFO response.
|
||
/// </summary>
|
||
[Fact]
|
||
public async Task Server_info_advertises_headers_true()
|
||
{
|
||
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||
await sock.ConnectAsync(IPAddress.Loopback, _port);
|
||
|
||
// Read the INFO line
|
||
var infoLine = await ReadUntilAsync(sock, "\r\n");
|
||
|
||
// INFO must start with "INFO "
|
||
infoLine.ShouldStartWith("INFO ");
|
||
|
||
// Extract the JSON blob after "INFO "
|
||
var jsonStart = infoLine.IndexOf('{');
|
||
var jsonEnd = infoLine.LastIndexOf('}');
|
||
jsonStart.ShouldBeGreaterThanOrEqualTo(0);
|
||
jsonEnd.ShouldBeGreaterThan(jsonStart);
|
||
|
||
var json = infoLine[jsonStart..(jsonEnd + 1)];
|
||
|
||
// The JSON must contain "headers":true
|
||
json.ShouldContain("\"headers\":true");
|
||
}
|
||
|
||
/// <summary>
|
||
/// Port of TestClientNoResponderSupport (client_test.go:230) — specifically
|
||
/// the branch that sends a PUB to a subject with no subscribers when the
|
||
/// client has opted in with headers:true + no_responders:true.
|
||
///
|
||
/// The server must send an HMSG on the reply subject with the 503 status
|
||
/// header "NATS/1.0 503\r\n\r\n".
|
||
///
|
||
/// Wire sequence:
|
||
/// Client → CONNECT {headers:true, no_responders:true}
|
||
/// Client → SUB reply.inbox 1
|
||
/// Client → PUB no.listeners reply.inbox 0 (0-byte payload, no subscribers)
|
||
/// Server → HMSG reply.inbox 1 {hdrLen} {hdrLen}\r\nNATS/1.0 503\r\n\r\n\r\n
|
||
/// </summary>
|
||
[Fact]
|
||
public async Task No_responders_sends_503_hmsg_when_no_subscribers()
|
||
{
|
||
using var sock = await ConnectWithHeadersAsync();
|
||
|
||
// Subscribe to the reply inbox
|
||
await sock.SendAsync(Encoding.ASCII.GetBytes("SUB reply.inbox 1\r\n"));
|
||
// Flush via PING/PONG to ensure SUB is registered
|
||
await sock.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
|
||
await ReadUntilAsync(sock, "PONG");
|
||
|
||
// Publish to a subject with no subscribers, using reply.inbox as reply-to
|
||
await sock.SendAsync(Encoding.ASCII.GetBytes("PUB no.listeners reply.inbox 0\r\n\r\n"));
|
||
|
||
// The server should send back an HMSG on reply.inbox with status 503
|
||
var received = await ReadUntilAsync(sock, "NATS/1.0 503", timeoutMs: 5000);
|
||
|
||
// Must be an HMSG (header message) on the reply subject
|
||
received.ShouldContain("HMSG reply.inbox");
|
||
// Must carry the 503 status header
|
||
received.ShouldContain("NATS/1.0 503");
|
||
}
|
||
}
|