diff --git a/tests/NATS.Server.Tests/MessageTraceTests.cs b/tests/NATS.Server.Tests/MessageTraceTests.cs new file mode 100644 index 0000000..a6fdacc --- /dev/null +++ b/tests/NATS.Server.Tests/MessageTraceTests.cs @@ -0,0 +1,580 @@ +// Reference: golang/nats-server/server/msgtrace_test.go +// Go test suite: 33 tests covering Nats-Trace-Dest header propagation and +// $SYS.TRACE.> event publication. +// +// The .NET port has MessageTraceContext (Protocol/MessageTraceContext.cs), +// ClientFlags.TraceMode (ClientFlags.cs), NatsHeaderParser (Protocol/NatsHeaderParser.cs) +// and per-server Trace/TraceVerbose/MaxTracedMsgLen options (NatsOptions.cs). +// Full $SYS.TRACE.> event emission is not yet implemented; these tests cover the +// infrastructure that must be in place first: trace context capture, header +// propagation via HPUB/HMSG, and trace-mode flag behaviour. + +using System.Net; +using System.Net.Sockets; +using System.Text; +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Server; +using NATS.Server.Protocol; + +namespace NATS.Server.Tests; + +/// +/// Tests for message trace infrastructure: MessageTraceContext population, +/// HPUB/HMSG trace header propagation, ClientFlags.TraceMode, NatsHeaderParser, +/// and server trace options. +/// +/// Go reference: golang/nats-server/server/msgtrace_test.go +/// +public class MessageTraceTests : IAsyncLifetime +{ + private readonly NatsServer _server; + private readonly int _port; + private readonly CancellationTokenSource _cts = new(); + + public MessageTraceTests() + { + _port = GetFreePort(); + _server = new NatsServer(new NatsOptions { Port = _port }, NullLoggerFactory.Instance); + } + + public async Task InitializeAsync() + { + _ = _server.StartAsync(_cts.Token); + await _server.WaitForReadyAsync(); + } + + public async Task DisposeAsync() + { + await _cts.CancelAsync(); + _server.Dispose(); + } + + private static int GetFreePort() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + sock.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + return ((IPEndPoint)sock.LocalEndPoint!).Port; + } + + private static async Task ReadUntilAsync(Socket sock, string expected, int timeoutMs = 5000) + { + using var cts = new CancellationTokenSource(timeoutMs); + var sb = new StringBuilder(); + var buf = new byte[4096]; + while (!sb.ToString().Contains(expected)) + { + var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token); + if (n == 0) break; + sb.Append(Encoding.ASCII.GetString(buf, 0, n)); + } + return sb.ToString(); + } + + private async Task ConnectWithHeadersAsync(string? clientName = null, string? lang = null, string? version = null) + { + var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(IPAddress.Loopback, _port); + await ReadUntilAsync(sock, "\r\n"); // discard INFO + + var connectJson = BuildConnectJson(headers: true, name: clientName, lang: lang, version: version); + await sock.SendAsync(Encoding.ASCII.GetBytes($"CONNECT {connectJson}\r\n")); + return sock; + } + + private static string BuildConnectJson( + bool headers = true, + bool noResponders = false, + string? name = null, + string? lang = null, + string? version = null) + { + var parts = new List { $"\"headers\":{(headers ? "true" : "false")}" }; + if (noResponders) parts.Add("\"no_responders\":true"); + if (name != null) parts.Add($"\"name\":\"{name}\""); + if (lang != null) parts.Add($"\"lang\":\"{lang}\""); + if (version != null) parts.Add($"\"ver\":\"{version}\""); + return "{" + string.Join(",", parts) + "}"; + } + + // ------------------------------------------------------------------------- + // MessageTraceContext unit tests + // Reference: msgtrace_test.go — trace context is populated from CONNECT opts + // ------------------------------------------------------------------------- + + /// + /// MessageTraceContext.Empty has null client identity fields and false + /// headers-enabled. Mirrors Go's zero-value trace context. + /// Go reference: msgtrace_test.go — TestMsgTraceBasic setup + /// + [Fact] + public void MessageTraceContext_empty_has_null_fields() + { + var ctx = MessageTraceContext.Empty; + + ctx.ClientName.ShouldBeNull(); + ctx.ClientLang.ShouldBeNull(); + ctx.ClientVersion.ShouldBeNull(); + ctx.HeadersEnabled.ShouldBeFalse(); + } + + /// + /// MessageTraceContext.CreateFromConnect with null options returns Empty. + /// Go reference: msgtrace_test.go — trace context defaults + /// + [Fact] + public void MessageTraceContext_create_from_null_opts_returns_empty() + { + var ctx = MessageTraceContext.CreateFromConnect(null); + + ctx.ShouldBe(MessageTraceContext.Empty); + } + + /// + /// MessageTraceContext.CreateFromConnect captures client name, lang, version, + /// and headers flag from the parsed ClientOptions. + /// Go reference: msgtrace_test.go — TestMsgTraceBasic, client identity in trace events + /// + [Fact] + public void MessageTraceContext_captures_client_identity_from_connect_options() + { + var opts = new ClientOptions + { + Name = "tracer-client", + Lang = "nats.go", + Version = "1.30.0", + Headers = true, + }; + + var ctx = MessageTraceContext.CreateFromConnect(opts); + + ctx.ClientName.ShouldBe("tracer-client"); + ctx.ClientLang.ShouldBe("nats.go"); + ctx.ClientVersion.ShouldBe("1.30.0"); + ctx.HeadersEnabled.ShouldBeTrue(); + } + + /// + /// A client without headers support produces a trace context with + /// HeadersEnabled = false — that client cannot use Nats-Trace-Dest header. + /// Go reference: msgtrace_test.go — clients must have headers to receive trace events + /// + [Fact] + public void MessageTraceContext_headers_disabled_when_connect_opts_headers_false() + { + var opts = new ClientOptions { Name = "legacy", Headers = false }; + + var ctx = MessageTraceContext.CreateFromConnect(opts); + + ctx.HeadersEnabled.ShouldBeFalse(); + ctx.ClientName.ShouldBe("legacy"); + } + + /// + /// MessageTraceContext is a record — two instances with the same values are equal. + /// Go reference: msgtrace_test.go — deterministic identity comparison + /// + [Fact] + public void MessageTraceContext_record_equality_compares_by_value() + { + var a = new MessageTraceContext("myapp", "nats.go", "1.0", true); + var b = new MessageTraceContext("myapp", "nats.go", "1.0", true); + + a.ShouldBe(b); + a.GetHashCode().ShouldBe(b.GetHashCode()); + } + + // ------------------------------------------------------------------------- + // NatsHeaderParser — trace header parsing + // Reference: msgtrace_test.go — Nats-Trace-Dest header is a regular NATS header + // ------------------------------------------------------------------------- + + /// + /// NatsHeaderParser correctly parses a Nats-Trace-Dest header from an HPUB block. + /// The trace destination header identifies where trace events should be published. + /// Go reference: msgtrace_test.go — TestMsgTraceBasic HPUB with Nats-Trace-Dest + /// + [Fact] + public void NatsHeaderParser_parses_trace_dest_header() + { + // NATS/1.0\r\nNats-Trace-Dest: trace.inbox\r\n\r\n + const string rawHeaders = "NATS/1.0\r\nNats-Trace-Dest: trace.inbox\r\n\r\n"; + var bytes = Encoding.ASCII.GetBytes(rawHeaders); + + var headers = NatsHeaderParser.Parse(bytes); + + headers.ShouldNotBe(NatsHeaders.Invalid); + headers.Headers.ContainsKey("Nats-Trace-Dest").ShouldBeTrue(); + headers.Headers["Nats-Trace-Dest"].ShouldContain("trace.inbox"); + } + + /// + /// NatsHeaderParser returns NatsHeaders.Invalid when data does not start + /// with the NATS/1.0 prefix — guards against corrupted trace header blocks. + /// Go reference: msgtrace_test.go — protocol validation + /// + [Fact] + public void NatsHeaderParser_returns_invalid_for_bad_prefix() + { + var bytes = "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\n\r\n"u8.ToArray(); + + var headers = NatsHeaderParser.Parse(bytes); + + headers.ShouldBe(NatsHeaders.Invalid); + } + + /// + /// NatsHeaderParser handles an empty header block (NATS/1.0 with no headers). + /// A trace destination header may be absent — the message is then not traced. + /// Go reference: msgtrace_test.go — non-traced messages have no Nats-Trace-Dest + /// + [Fact] + public void NatsHeaderParser_parses_empty_nats_header_block() + { + const string rawHeaders = "NATS/1.0\r\n\r\n"; + var bytes = Encoding.ASCII.GetBytes(rawHeaders); + + var headers = NatsHeaderParser.Parse(bytes); + + headers.ShouldNotBe(NatsHeaders.Invalid); + headers.Status.ShouldBe(0); + headers.Headers.Count.ShouldBe(0); + } + + /// + /// NatsHeaderParser handles multiple headers in one block, matching the case + /// where Nats-Trace-Dest appears alongside other application headers. + /// Go reference: msgtrace_test.go — TestMsgTraceWithHeaders + /// + [Fact] + public void NatsHeaderParser_parses_multiple_headers_including_trace_dest() + { + const string rawHeaders = + "NATS/1.0\r\n" + + "X-App-Id: 42\r\n" + + "Nats-Trace-Dest: my.trace.inbox\r\n" + + "X-Correlation: abc123\r\n" + + "\r\n"; + var bytes = Encoding.ASCII.GetBytes(rawHeaders); + + var headers = NatsHeaderParser.Parse(bytes); + + headers.Headers.Count.ShouldBe(3); + headers.Headers["Nats-Trace-Dest"].ShouldContain("my.trace.inbox"); + headers.Headers["X-App-Id"].ShouldContain("42"); + headers.Headers["X-Correlation"].ShouldContain("abc123"); + } + + /// + /// Header lookup is case-insensitive, so "nats-trace-dest" and "Nats-Trace-Dest" + /// resolve to the same key (matches Go's http.Header case-folding behaviour). + /// Go reference: msgtrace_test.go — case-insensitive header access + /// + [Fact] + public void NatsHeaderParser_header_lookup_is_case_insensitive() + { + const string rawHeaders = "NATS/1.0\r\nNats-Trace-Dest: inbox.trace\r\n\r\n"; + var bytes = Encoding.ASCII.GetBytes(rawHeaders); + + var headers = NatsHeaderParser.Parse(bytes); + + headers.Headers.ContainsKey("nats-trace-dest").ShouldBeTrue(); + headers.Headers.ContainsKey("NATS-TRACE-DEST").ShouldBeTrue(); + headers.Headers["nats-trace-dest"][0].ShouldBe("inbox.trace"); + } + + // ------------------------------------------------------------------------- + // Wire-level HPUB/HMSG trace header propagation + // Reference: msgtrace_test.go — Nats-Trace-Dest header preserved in delivery + // ------------------------------------------------------------------------- + + /// + /// A Nats-Trace-Dest header sent in an HPUB is delivered verbatim in the + /// HMSG to the subscriber. The server must not strip or modify trace headers. + /// Go reference: msgtrace_test.go — TestMsgTraceBasic, header pass-through + /// + [Fact] + public async Task Hpub_with_trace_dest_header_delivered_verbatim_to_subscriber() + { + using var sub = await ConnectWithHeadersAsync(); + using var pub = await ConnectWithHeadersAsync(); + + await sub.SendAsync(Encoding.ASCII.GetBytes("SUB trace.test 1\r\n")); + await sub.SendAsync(Encoding.ASCII.GetBytes("PING\r\n")); + await ReadUntilAsync(sub, "PONG"); + + // Build HPUB with Nats-Trace-Dest header + // Header block: "NATS/1.0\r\nNats-Trace-Dest: trace.inbox\r\n\r\n" + const string headerBlock = "NATS/1.0\r\nNats-Trace-Dest: trace.inbox\r\n\r\n"; + const string payload = "hello"; + int hdrLen = Encoding.ASCII.GetByteCount(headerBlock); + int totalLen = hdrLen + Encoding.ASCII.GetByteCount(payload); + + var hpub = $"HPUB trace.test {hdrLen} {totalLen}\r\n{headerBlock}{payload}\r\n"; + await pub.SendAsync(Encoding.ASCII.GetBytes(hpub)); + + var received = await ReadUntilAsync(sub, "Nats-Trace-Dest"); + + received.ShouldContain("HMSG trace.test"); + received.ShouldContain("Nats-Trace-Dest: trace.inbox"); + received.ShouldContain("hello"); + } + + /// + /// A Nats-Trace-Dest header is preserved when the message matches a wildcard + /// subscription. Wildcard matching must not drop or corrupt headers. + /// Go reference: msgtrace_test.go — TestMsgTraceWithWildcardSubscription + /// + [Fact] + public async Task Hpub_trace_dest_header_preserved_through_wildcard_subscription() + { + using var sub = await ConnectWithHeadersAsync(); + using var pub = await ConnectWithHeadersAsync(); + + // Subscribe to wildcard + await sub.SendAsync(Encoding.ASCII.GetBytes("SUB trace.* 1\r\n")); + await sub.SendAsync(Encoding.ASCII.GetBytes("PING\r\n")); + await ReadUntilAsync(sub, "PONG"); + + const string headerBlock = "NATS/1.0\r\nNats-Trace-Dest: t.inbox.1\r\n\r\n"; + const string payload = "wildcard-msg"; + int hdrLen = Encoding.ASCII.GetByteCount(headerBlock); + int totalLen = hdrLen + Encoding.ASCII.GetByteCount(payload); + + var hpub = $"HPUB trace.subject {hdrLen} {totalLen}\r\n{headerBlock}{payload}\r\n"; + await pub.SendAsync(Encoding.ASCII.GetBytes(hpub)); + + var received = await ReadUntilAsync(sub, "Nats-Trace-Dest"); + + received.ShouldContain("HMSG trace.subject"); + received.ShouldContain("Nats-Trace-Dest: t.inbox.1"); + received.ShouldContain("wildcard-msg"); + } + + /// + /// HPUB with a trace header delivered to a queue group subscriber preserves + /// the header. Queue group routing must not strip trace context. + /// Go reference: msgtrace_test.go — TestMsgTraceQueueGroup + /// + [Fact] + public async Task Hpub_trace_dest_header_preserved_through_queue_group_delivery() + { + using var qsub = await ConnectWithHeadersAsync(); + using var pub = await ConnectWithHeadersAsync(); + + // Queue group subscription + await qsub.SendAsync(Encoding.ASCII.GetBytes("SUB trace.q workers 1\r\n")); + await qsub.SendAsync(Encoding.ASCII.GetBytes("PING\r\n")); + await ReadUntilAsync(qsub, "PONG"); + + const string headerBlock = "NATS/1.0\r\nNats-Trace-Dest: qg.trace\r\n\r\n"; + const string payload = "queued"; + int hdrLen = Encoding.ASCII.GetByteCount(headerBlock); + int totalLen = hdrLen + Encoding.ASCII.GetByteCount(payload); + + var hpub = $"HPUB trace.q {hdrLen} {totalLen}\r\n{headerBlock}{payload}\r\n"; + await pub.SendAsync(Encoding.ASCII.GetBytes(hpub)); + + var received = await ReadUntilAsync(qsub, "Nats-Trace-Dest"); + + received.ShouldContain("Nats-Trace-Dest: qg.trace"); + received.ShouldContain("queued"); + } + + /// + /// Multiple custom headers alongside Nats-Trace-Dest are all delivered intact. + /// The server must preserve the full header block, not just the trace header. + /// Go reference: msgtrace_test.go — TestMsgTraceWithHeaders + /// + [Fact] + public async Task Hpub_multiple_headers_with_trace_dest_all_delivered_intact() + { + using var sub = await ConnectWithHeadersAsync(); + using var pub = await ConnectWithHeadersAsync(); + + await sub.SendAsync(Encoding.ASCII.GetBytes("SUB multi.hdr 1\r\n")); + await sub.SendAsync(Encoding.ASCII.GetBytes("PING\r\n")); + await ReadUntilAsync(sub, "PONG"); + + const string headerBlock = + "NATS/1.0\r\n" + + "X-Request-Id: req-99\r\n" + + "Nats-Trace-Dest: t.multi\r\n" + + "X-Priority: high\r\n" + + "\r\n"; + const string payload = "multi-hdr-payload"; + int hdrLen = Encoding.ASCII.GetByteCount(headerBlock); + int totalLen = hdrLen + Encoding.ASCII.GetByteCount(payload); + + var hpub = $"HPUB multi.hdr {hdrLen} {totalLen}\r\n{headerBlock}{payload}\r\n"; + await pub.SendAsync(Encoding.ASCII.GetBytes(hpub)); + + var received = await ReadUntilAsync(sub, "X-Priority"); + + received.ShouldContain("X-Request-Id: req-99"); + received.ShouldContain("Nats-Trace-Dest: t.multi"); + received.ShouldContain("X-Priority: high"); + received.ShouldContain("multi-hdr-payload"); + } + + /// + /// HPUB with a very long trace ID (256 chars) is accepted and forwarded. The + /// server must not truncate long header values. + /// Go reference: msgtrace_test.go — TestMsgTraceLongTraceId + /// + [Fact] + public async Task Hpub_very_long_trace_id_is_preserved() + { + using var sub = await ConnectWithHeadersAsync(); + using var pub = await ConnectWithHeadersAsync(); + + await sub.SendAsync(Encoding.ASCII.GetBytes("SUB trace.long 1\r\n")); + await sub.SendAsync(Encoding.ASCII.GetBytes("PING\r\n")); + await ReadUntilAsync(sub, "PONG"); + + var longId = new string('a', 256); + var headerBlock = $"NATS/1.0\r\nNats-Trace-Dest: {longId}\r\n\r\n"; + const string payload = "x"; + int hdrLen = Encoding.ASCII.GetByteCount(headerBlock); + int totalLen = hdrLen + 1; + + var hpub = $"HPUB trace.long {hdrLen} {totalLen}\r\n{headerBlock}{payload}\r\n"; + await pub.SendAsync(Encoding.ASCII.GetBytes(hpub)); + + var received = await ReadUntilAsync(sub, longId); + + received.ShouldContain(longId); + } + + // ------------------------------------------------------------------------- + // Server trace options + // Reference: msgtrace_test.go — server-side Trace / TraceVerbose / MaxTracedMsgLen + // ------------------------------------------------------------------------- + + /// + /// NatsOptions.Trace is false by default. Server-level tracing is opt-in. + /// Go reference: opts.go default — trace=false + /// + [Fact] + public void NatsOptions_trace_is_false_by_default() + { + var opts = new NatsOptions(); + + opts.Trace.ShouldBeFalse(); + } + + /// + /// NatsOptions.TraceVerbose is false by default. + /// Go reference: opts.go — trace_verbose=false + /// + [Fact] + public void NatsOptions_trace_verbose_is_false_by_default() + { + var opts = new NatsOptions(); + + opts.TraceVerbose.ShouldBeFalse(); + } + + /// + /// NatsOptions.MaxTracedMsgLen is 0 by default (unlimited). + /// Go reference: opts.go — max_traced_msg_len default=0 + /// + [Fact] + public void NatsOptions_max_traced_msg_len_is_zero_by_default() + { + var opts = new NatsOptions(); + + opts.MaxTracedMsgLen.ShouldBe(0); + } + + /// + /// A server created with Trace=true starts and accepts connections normally. + /// Enabling trace mode must not prevent the server from becoming ready. + /// Go reference: msgtrace_test.go — test server setup with trace enabled + /// + [Fact] + public async Task Server_with_trace_enabled_starts_and_accepts_connections() + { + var port = GetFreePort(); + using var cts = new CancellationTokenSource(); + using var server = new NatsServer(new NatsOptions { Port = port, Trace = true }, NullLoggerFactory.Instance); + _ = server.StartAsync(cts.Token); + await server.WaitForReadyAsync(); + + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(IPAddress.Loopback, port); + var info = await ReadUntilAsync(sock, "\r\n"); + + info.ShouldStartWith("INFO "); + + await cts.CancelAsync(); + } + + /// + /// A server created with TraceVerbose=true implies Trace=true when processed + /// via ConfigProcessor. The option pair follows the Go server's precedence rules. + /// Go reference: opts.go — if TraceVerbose then Trace=true + /// + [Fact] + public void NatsOptions_trace_verbose_can_be_set_independently() + { + var opts = new NatsOptions { TraceVerbose = true }; + + // TraceVerbose is stored independently; it's up to ConfigProcessor to + // cascade Trace=true. Verify the field is stored as set. + opts.TraceVerbose.ShouldBeTrue(); + } + + // ------------------------------------------------------------------------- + // ClientFlags.TraceMode + // Reference: msgtrace_test.go — per-client trace mode from server-level trace + // ------------------------------------------------------------------------- + + /// + /// ClientFlagHolder.HasFlag returns false for TraceMode initially. A fresh + /// client has no trace mode set. + /// Go reference: client.go — clientFlag trace bit initialised to zero + /// + [Fact] + public void ClientFlagHolder_trace_mode_is_not_set_by_default() + { + var holder = new ClientFlagHolder(); + + holder.HasFlag(ClientFlags.TraceMode).ShouldBeFalse(); + } + + /// + /// ClientFlagHolder.SetFlag / ClearFlag toggle TraceMode correctly. + /// Go reference: client.go setTraceMode + /// + [Fact] + public void ClientFlagHolder_set_and_clear_trace_mode() + { + var holder = new ClientFlagHolder(); + + holder.SetFlag(ClientFlags.TraceMode); + holder.HasFlag(ClientFlags.TraceMode).ShouldBeTrue(); + + holder.ClearFlag(ClientFlags.TraceMode); + holder.HasFlag(ClientFlags.TraceMode).ShouldBeFalse(); + } + + /// + /// TraceMode is independent of other flags — toggling it does not affect + /// ConnectReceived or other status bits. + /// Go reference: client.go — per-bit flag isolation + /// + [Fact] + public void ClientFlagHolder_trace_mode_does_not_affect_other_flags() + { + var holder = new ClientFlagHolder(); + holder.SetFlag(ClientFlags.ConnectReceived); + holder.SetFlag(ClientFlags.FirstPongSent); + + holder.SetFlag(ClientFlags.TraceMode); + holder.ClearFlag(ClientFlags.TraceMode); + + holder.HasFlag(ClientFlags.ConnectReceived).ShouldBeTrue(); + holder.HasFlag(ClientFlags.FirstPongSent).ShouldBeTrue(); + holder.HasFlag(ClientFlags.TraceMode).ShouldBeFalse(); + } +}