Files
natsdotnet/tests/NATS.Server.Core.Tests/ClientHeaderTests.cs
Joseph Doherty 7fbffffd05 refactor: rename remaining tests to NATS.Server.Core.Tests
- Rename tests/NATS.Server.Tests -> tests/NATS.Server.Core.Tests
- Update solution file, InternalsVisibleTo, and csproj references
- Remove JETSTREAM_INTEGRATION_MATRIX and NATS.NKeys from csproj (moved to JetStream.Tests and Auth.Tests)
- Update all namespaces from NATS.Server.Tests.* to NATS.Server.Core.Tests.*
- Replace private GetFreePort/ReadUntilAsync helpers with TestUtilities calls
- Fix stale namespace in Transport.Tests/NetworkingGoParityTests.cs
2026-03-12 16:14:02 -04:00

180 lines
7.1 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Reference: golang/nats-server/server/client_test.go — TestClientHeaderDeliverMsg,
// TestServerHeaderSupport, TestClientHeaderSupport
using System.Net;
using System.Net.Sockets;
using System.Text;
using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server;
using NATS.Server.TestUtilities;
namespace NATS.Server.Core.Tests;
/// <summary>
/// Tests for HPUB/HMSG header support, mirroring the Go reference tests:
/// TestClientHeaderDeliverMsg, TestServerHeaderSupport, TestClientHeaderSupport.
///
/// Go reference: golang/nats-server/server/client_test.go:259368
/// </summary>
public class ClientHeaderTests : IAsyncLifetime
{
private readonly NatsServer _server;
private readonly int _port;
private readonly CancellationTokenSource _cts = new();
public ClientHeaderTests()
{
_port = TestPortAllocator.GetFreePort();
_server = new NatsServer(new NatsOptions { Port = _port }, NullLoggerFactory.Instance);
}
public async Task InitializeAsync()
{
_ = _server.StartAsync(_cts.Token);
await _server.WaitForReadyAsync();
}
public async Task DisposeAsync()
{
await _cts.CancelAsync();
_server.Dispose();
}
/// <summary>
/// Reads from the socket accumulating data until the accumulated string contains
/// <paramref name="expected"/>, or the timeout elapses.
/// </summary>
/// <summary>
/// Connect a raw TCP socket, read the INFO line, and send a CONNECT with
/// headers:true and no_responders:true.
/// </summary>
private async Task<Socket> ConnectWithHeadersAsync()
{
var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, _port);
await SocketTestHelper.ReadUntilAsync(sock, "\r\n"); // discard INFO
await sock.SendAsync(Encoding.ASCII.GetBytes(
"CONNECT {\"headers\":true,\"no_responders\":true}\r\n"));
return sock;
}
/// <summary>
/// Port of TestClientHeaderDeliverMsg (client_test.go:330).
///
/// A client that advertises headers:true sends an HPUB message with a custom
/// header block. A subscriber should receive the message as HMSG with the
/// header block and payload intact.
///
/// HPUB format: HPUB subject hdr_len total_len\r\n{headers}{payload}\r\n
/// HMSG format: HMSG subject sid hdr_len total_len\r\n{headers}{payload}\r\n
///
/// Matches Go reference: HPUB foo 12 14\r\nName:Derek\r\nOK\r\n
/// hdrLen=12 ("Name:Derek\r\n"), totalLen=14 (headers + "OK")
/// </summary>
[Fact]
public async Task Hpub_delivers_hmsg_with_headers()
{
// Use two separate connections: subscriber and publisher.
// The Go reference uses a single connection for both, but two connections
// make the test clearer and avoid echo-suppression edge cases.
using var sub = await ConnectWithHeadersAsync();
using var pub = await ConnectWithHeadersAsync();
// Subscribe on 'foo' with SID 1
await sub.SendAsync(Encoding.ASCII.GetBytes("SUB foo 1\r\n"));
// Flush via PING/PONG to ensure the subscription is registered before publishing
await sub.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
await SocketTestHelper.ReadUntilAsync(sub, "PONG");
// Match Go reference test exactly:
// Header block: "Name:Derek\r\n" = 12 bytes
// Payload: "OK" = 2 bytes → total = 14 bytes
const string headerBlock = "Name:Derek\r\n";
const string payload = "OK";
const int hdrLen = 12; // "Name:Derek\r\n"
const int totalLen = 14; // hdrLen + "OK"
var hpub = $"HPUB foo {hdrLen} {totalLen}\r\n{headerBlock}{payload}\r\n";
await pub.SendAsync(Encoding.ASCII.GetBytes(hpub));
// Read the full HMSG on the subscriber socket (control line + header + payload + trailing CRLF)
// The complete wire message ends with the payload followed by \r\n
var received = await SocketTestHelper.ReadUntilAsync(sub, payload + "\r\n", timeoutMs: 5000);
// Verify HMSG control line: HMSG foo 1 <hdrLen> <totalLen>
received.ShouldContain($"HMSG foo 1 {hdrLen} {totalLen}\r\n");
// Verify the header block is delivered verbatim
received.ShouldContain("Name:Derek");
// Verify the payload is delivered
received.ShouldContain(payload);
}
/// <summary>
/// Port of TestServerHeaderSupport (client_test.go:259).
///
/// By default the server advertises "headers":true in the INFO response.
/// </summary>
[Fact]
public async Task Server_info_advertises_headers_true()
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, _port);
// Read the INFO line
var infoLine = await SocketTestHelper.ReadUntilAsync(sock, "\r\n");
// INFO must start with "INFO "
infoLine.ShouldStartWith("INFO ");
// Extract the JSON blob after "INFO "
var jsonStart = infoLine.IndexOf('{');
var jsonEnd = infoLine.LastIndexOf('}');
jsonStart.ShouldBeGreaterThanOrEqualTo(0);
jsonEnd.ShouldBeGreaterThan(jsonStart);
var json = infoLine[jsonStart..(jsonEnd + 1)];
// The JSON must contain "headers":true
json.ShouldContain("\"headers\":true");
}
/// <summary>
/// Port of TestClientNoResponderSupport (client_test.go:230) — specifically
/// the branch that sends a PUB to a subject with no subscribers when the
/// client has opted in with headers:true + no_responders:true.
///
/// The server must send an HMSG on the reply subject with the 503 status
/// header "NATS/1.0 503\r\n\r\n".
///
/// Wire sequence:
/// Client → CONNECT {headers:true, no_responders:true}
/// Client → SUB reply.inbox 1
/// Client → PUB no.listeners reply.inbox 0 (0-byte payload, no subscribers)
/// Server → HMSG reply.inbox 1 {hdrLen} {hdrLen}\r\nNATS/1.0 503\r\n\r\n\r\n
/// </summary>
[Fact]
public async Task No_responders_sends_503_hmsg_when_no_subscribers()
{
using var sock = await ConnectWithHeadersAsync();
// Subscribe to the reply inbox
await sock.SendAsync(Encoding.ASCII.GetBytes("SUB reply.inbox 1\r\n"));
// Flush via PING/PONG to ensure SUB is registered
await sock.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
await SocketTestHelper.ReadUntilAsync(sock, "PONG");
// Publish to a subject with no subscribers, using reply.inbox as reply-to
await sock.SendAsync(Encoding.ASCII.GetBytes("PUB no.listeners reply.inbox 0\r\n\r\n"));
// The server should send back an HMSG on reply.inbox with status 503
var received = await SocketTestHelper.ReadUntilAsync(sock, "NATS/1.0 503", timeoutMs: 5000);
// Must be an HMSG (header message) on the reply subject
received.ShouldContain("HMSG reply.inbox");
// Must carry the 503 status header
received.ShouldContain("NATS/1.0 503");
}
}