diff --git a/src/NATS.Server/Internal/MessageTraceContext.cs b/src/NATS.Server/Internal/MessageTraceContext.cs index 97d763a..26f8793 100644 --- a/src/NATS.Server/Internal/MessageTraceContext.cs +++ b/src/NATS.Server/Internal/MessageTraceContext.cs @@ -671,6 +671,196 @@ public sealed class MsgTraceContext } } +/// +/// Immutable trace context for a single server hop. +/// Go reference: server/msgtrace.go — trace context embedding and extraction. +/// +public sealed record TraceContext( + string TraceId, + string SpanId, + string? Destination, + bool TraceOnly, + DateTime CreatedAt); + +/// +/// Manages message trace context propagation across server hops. +/// Handles the Nats-Trace-Parent header used to correlate trace spans +/// as a message travels through a NATS cluster. +/// Go reference: server/msgtrace.go — trace context embedding and extraction. +/// +public sealed class TraceContextPropagator +{ + /// Header carrying the traceparent in the form {traceId}-{spanId}. + public const string TraceParentHeader = "Nats-Trace-Parent"; + + /// Header carrying the trace destination subject. + public const string TraceDestHeader = "Nats-Trace-Dest"; + + /// Header that, when present, suppresses message delivery. + public const string TraceOnlyHeader = "Nats-Trace-Only"; + + private static readonly byte[] CrLf = "\r\n"u8.ToArray(); + private static readonly byte[] HeaderSep = ": "u8.ToArray(); + + /// + /// Creates a new trace context for an origin message. + /// + public static TraceContext CreateTrace(string traceId, string spanId, string? destination = null) + => new(traceId, spanId, destination, TraceOnly: false, DateTime.UtcNow); + + /// + /// Extracts a trace context from raw NATS message headers. + /// Parses "Nats-Trace-Parent: {traceId}-{spanId}" from the header block. + /// Returns null if the header is absent or malformed. + /// + public static TraceContext? ExtractTrace(ReadOnlySpan headers) + { + if (headers.IsEmpty) + return null; + + // Skip the NATS/1.0 status line + var crlf = CrLf.AsSpan(); + int firstCrlf = headers.IndexOf(crlf); + if (firstCrlf < 0) + return null; + + int i = firstCrlf + 2; + string? traceParent = null; + string? destination = null; + bool traceOnly = false; + + while (i < headers.Length) + { + // Find colon + int colonIdx = -1; + for (int j = i; j < headers.Length; j++) + { + if (headers[j] == (byte)':') + { + colonIdx = j; + break; + } + if (headers[j] == (byte)'\r' || headers[j] == (byte)'\n') + break; + } + + if (colonIdx < 0) + { + int nextCrlf = headers[i..].IndexOf(crlf); + if (nextCrlf < 0) break; + i += nextCrlf + 2; + continue; + } + + var keySpan = headers[i..colonIdx]; + i = colonIdx + 1; + + // Skip whitespace + while (i < headers.Length && (headers[i] == (byte)' ' || headers[i] == (byte)'\t')) + i++; + + int valStart = i; + int valCrlf = headers[valStart..].IndexOf(crlf); + if (valCrlf < 0) break; + + int valEnd = valStart + valCrlf; + while (valEnd > valStart && (headers[valEnd - 1] == (byte)' ' || headers[valEnd - 1] == (byte)'\t')) + valEnd--; + + if (keySpan.Length > 0) + { + var key = Encoding.ASCII.GetString(keySpan); + var val = Encoding.ASCII.GetString(headers[valStart..valEnd]); + + if (key.Equals(TraceParentHeader, StringComparison.OrdinalIgnoreCase)) + traceParent = val; + else if (key.Equals(TraceDestHeader, StringComparison.OrdinalIgnoreCase)) + destination = val; + else if (key.Equals(TraceOnlyHeader, StringComparison.OrdinalIgnoreCase)) + traceOnly = val is "1" or "true" or "on"; + } + + i = valStart + valCrlf + 2; + } + + if (traceParent == null) + return null; + + // Parse "{traceId}-{spanId}": split on the first dash. Callers must use + // dash-free identifiers (e.g. hex-encoded bytes) to avoid ambiguity. + int dash = traceParent.IndexOf('-'); + if (dash < 0 || dash == traceParent.Length - 1) + return null; + + var traceId = traceParent[..dash]; + var spanId = traceParent[(dash + 1)..]; + + return new TraceContext(traceId, spanId, destination, traceOnly, DateTime.UtcNow); + } + + /// + /// Injects a trace context into a header block, appending + /// "Nats-Trace-Parent: {traceId}-{spanId}\r\n". + /// If existingHeaders is empty a minimal NATS/1.0 header block is created. + /// + public static byte[] InjectTrace(TraceContext context, ReadOnlySpan existingHeaders) + { + var headerLine = $"{TraceParentHeader}: {context.TraceId}-{context.SpanId}\r\n"; + var headerBytes = Encoding.ASCII.GetBytes(headerLine); + + if (existingHeaders.IsEmpty) + { + // Build minimal NATS/1.0 block + var preamble = Encoding.ASCII.GetBytes("NATS/1.0\r\n"); + var result = new byte[preamble.Length + headerBytes.Length + 2]; + preamble.CopyTo(result, 0); + headerBytes.CopyTo(result, preamble.Length); + "\r\n"u8.CopyTo(result.AsSpan(preamble.Length + headerBytes.Length)); + return result; + } + + // Append before the terminal \r\n (if present) + var existing = existingHeaders.ToArray(); + + // Strip trailing \r\n if present so we can append cleanly + int insertAt = existing.Length; + if (insertAt >= 2 + && existing[insertAt - 2] == (byte)'\r' + && existing[insertAt - 1] == (byte)'\n') + { + insertAt -= 2; + } + + var final = new byte[insertAt + headerBytes.Length + 2]; + existing.AsSpan(0, insertAt).CopyTo(final); + headerBytes.CopyTo(final, insertAt); + final[insertAt + headerBytes.Length] = (byte)'\r'; + final[insertAt + headerBytes.Length + 1] = (byte)'\n'; + return final; + } + + /// + /// Creates a child span that preserves the parent TraceId but + /// uses a new SpanId for this hop. + /// + public static TraceContext CreateChildSpan(TraceContext parent, string newSpanId) + => new(parent.TraceId, newSpanId, parent.Destination, parent.TraceOnly, DateTime.UtcNow); + + /// + /// Returns true if the header block contains a Nats-Trace-Parent header, + /// indicating the message should be traced. + /// + public static bool ShouldTrace(ReadOnlySpan headers) + { + if (headers.IsEmpty) + return false; + + // Fast path: search for the header name as an ASCII byte sequence + var needle = Encoding.ASCII.GetBytes(TraceParentHeader + ":"); + return headers.IndexOf(needle.AsSpan()) >= 0; + } +} + /// /// JSON serialization context for message trace types. /// diff --git a/src/NATS.Server/Monitoring/Connz.cs b/src/NATS.Server/Monitoring/Connz.cs index 2f0f214..958b4ed 100644 --- a/src/NATS.Server/Monitoring/Connz.cs +++ b/src/NATS.Server/Monitoring/Connz.cs @@ -188,6 +188,26 @@ public sealed record ConnzFilterResult( int Offset, int Limit); +/// +/// Sort options for the /connz endpoint. +/// Go reference: monitor.go ConnzSortOpt constants. +/// +public enum ConnzSortOption +{ + ConnectionId, + Start, + Subs, + Pending, + MsgsTo, + MsgsFrom, + BytesTo, + BytesFrom, + LastActivity, + Uptime, + Idle, + RTT, +} + /// /// Query-string options for the account-scoped filter API. /// Parses the ?acc=, ?state=, ?offset=, and ?limit= parameters that the Go server @@ -206,7 +226,20 @@ public sealed class ConnzFilterOptions public int Limit { get; init; } = 1024; /// - /// Parses a raw query string (e.g. "?acc=ACCOUNT&state=open&offset=0&limit=100") + /// Sort field for connection listing. Default: . + /// Go reference: monitor.go ConnzOptions.SortBy. + /// + public ConnzSortOption SortBy { get; init; } = ConnzSortOption.ConnectionId; + + /// + /// When , reverses the natural sort direction for the chosen + /// option. + /// Go reference: monitor.go ConnzOptions.SortBy (descending variant). + /// + public bool SortDescending { get; init; } + + /// + /// Parses a raw query string (e.g. "?acc=ACCOUNT&state=open&offset=0&limit=100&sort=bytes_to") /// into a instance. /// public static ConnzFilterOptions Parse(string? queryString) @@ -221,6 +254,8 @@ public sealed class ConnzFilterOptions string? stateFilter = null; int offset = 0; int limit = 1024; + var sortBy = ConnzSortOption.ConnectionId; + bool sortDescending = false; foreach (var pair in qs.Split('&', StringSplitOptions.RemoveEmptyEntries)) { @@ -244,6 +279,12 @@ public sealed class ConnzFilterOptions case "limit" when int.TryParse(value, out var l): limit = l; break; + case "sort": + sortBy = ConnzSorter.Parse(value); + break; + case "desc" when value is "1" or "true": + sortDescending = true; + break; } } @@ -253,6 +294,8 @@ public sealed class ConnzFilterOptions StateFilter = stateFilter, Offset = offset, Limit = limit, + SortBy = sortBy, + SortDescending = sortDescending, }; } } @@ -371,3 +414,24 @@ public sealed class ConnzOptions public int Limit { get; set; } = 1024; } + +/// +/// Parses sort query-string values for the /connz endpoint. +/// Go reference: monitor.go ConnzSortOpt string constants. +/// +public static class ConnzSorter +{ + public static ConnzSortOption Parse(string value) => value.ToLowerInvariant() switch + { + "start" or "start_time" => ConnzSortOption.Start, + "subs" => ConnzSortOption.Subs, + "pending" => ConnzSortOption.Pending, + "msgs_to" or "msgs_from" => ConnzSortOption.MsgsTo, + "bytes_to" or "bytes_from" => ConnzSortOption.BytesTo, + "last" or "last_activity" => ConnzSortOption.LastActivity, + "uptime" => ConnzSortOption.Uptime, + "idle" => ConnzSortOption.Idle, + "rtt" => ConnzSortOption.RTT, + _ => ConnzSortOption.ConnectionId, + }; +} diff --git a/tests/NATS.Server.Tests/Internal/TraceContextPropagationTests.cs b/tests/NATS.Server.Tests/Internal/TraceContextPropagationTests.cs new file mode 100644 index 0000000..7db8ba9 --- /dev/null +++ b/tests/NATS.Server.Tests/Internal/TraceContextPropagationTests.cs @@ -0,0 +1,136 @@ +using System.Text; +using NATS.Server.Internal; + +namespace NATS.Server.Tests.Internal; + +/// +/// Tests for TraceContextPropagator: trace creation, header injection/extraction, +/// child span creation, round-trip fidelity, and ShouldTrace detection. +/// Go reference: server/msgtrace.go — trace context embedding and extraction. +/// +public class TraceContextPropagationTests +{ + // Helper: build a minimal NATS/1.0 header block with the given headers. + private static byte[] BuildNatsHeaders(params (string key, string value)[] headers) + { + var sb = new StringBuilder("NATS/1.0\r\n"); + foreach (var (key, value) in headers) + sb.Append($"{key}: {value}\r\n"); + sb.Append("\r\n"); + return Encoding.ASCII.GetBytes(sb.ToString()); + } + + [Fact] + public void CreateTrace_GeneratesValidContext() + { + var ctx = TraceContextPropagator.CreateTrace("abc123", "span456", destination: "trace.dest"); + + ctx.TraceId.ShouldBe("abc123"); + ctx.SpanId.ShouldBe("span456"); + ctx.Destination.ShouldBe("trace.dest"); + ctx.TraceOnly.ShouldBeFalse(); + ctx.CreatedAt.ShouldBeInRange(DateTime.UtcNow.AddSeconds(-5), DateTime.UtcNow.AddSeconds(1)); + } + + [Fact] + public void ExtractTrace_ValidHeaders_ReturnsContext() + { + var headers = BuildNatsHeaders((TraceContextPropagator.TraceParentHeader, "trace1-span1")); + + var ctx = TraceContextPropagator.ExtractTrace(headers); + + ctx.ShouldNotBeNull(); + ctx.TraceId.ShouldBe("trace1"); + ctx.SpanId.ShouldBe("span1"); + } + + [Fact] + public void ExtractTrace_NoTraceHeader_ReturnsNull() + { + var headers = BuildNatsHeaders(("Content-Type", "text/plain")); + + var ctx = TraceContextPropagator.ExtractTrace(headers); + + ctx.ShouldBeNull(); + } + + [Fact] + public void InjectTrace_AppendsToHeaders() + { + var existing = BuildNatsHeaders(("Content-Type", "text/plain")); + var ctx = TraceContextPropagator.CreateTrace("tid", "sid"); + + var result = TraceContextPropagator.InjectTrace(ctx, existing); + + var text = Encoding.ASCII.GetString(result); + text.ShouldContain($"{TraceContextPropagator.TraceParentHeader}: tid-sid"); + text.ShouldContain("Content-Type: text/plain"); + } + + [Fact] + public void InjectTrace_EmptyHeaders_CreatesNew() + { + var ctx = TraceContextPropagator.CreateTrace("newtrace", "newspan"); + + var result = TraceContextPropagator.InjectTrace(ctx, ReadOnlySpan.Empty); + + var text = Encoding.ASCII.GetString(result); + text.ShouldStartWith("NATS/1.0\r\n"); + text.ShouldContain($"{TraceContextPropagator.TraceParentHeader}: newtrace-newspan"); + } + + [Fact] + public void CreateChildSpan_PreservesTraceId() + { + var parent = TraceContextPropagator.CreateTrace("parentTrace", "parentSpan"); + + var child = TraceContextPropagator.CreateChildSpan(parent, "childSpan"); + + child.TraceId.ShouldBe("parentTrace"); + } + + [Fact] + public void CreateChildSpan_NewSpanId() + { + var parent = TraceContextPropagator.CreateTrace("parentTrace", "parentSpan"); + + var child = TraceContextPropagator.CreateChildSpan(parent, "childSpan"); + + child.SpanId.ShouldBe("childSpan"); + child.SpanId.ShouldNotBe(parent.SpanId); + } + + [Fact] + public void ShouldTrace_WithHeader_ReturnsTrue() + { + var headers = BuildNatsHeaders((TraceContextPropagator.TraceParentHeader, "trace1-span1")); + + TraceContextPropagator.ShouldTrace(headers).ShouldBeTrue(); + } + + [Fact] + public void ShouldTrace_WithoutHeader_ReturnsFalse() + { + var headers = BuildNatsHeaders(("Content-Type", "text/plain")); + + TraceContextPropagator.ShouldTrace(headers).ShouldBeFalse(); + } + + [Fact] + public void RoundTrip_CreateInjectExtract_Matches() + { + // Use hex-style IDs (no dashes) so the "{traceId}-{spanId}" wire format + // can be unambiguously split on the single separator dash. + var original = TraceContextPropagator.CreateTrace("0af7651916cd43dd8448eb211c80319c", "b7ad6b7169203331", destination: "trace.dest"); + + // Inject into empty headers + var injected = TraceContextPropagator.InjectTrace(original, ReadOnlySpan.Empty); + + // Extract back from the injected headers + var extracted = TraceContextPropagator.ExtractTrace(injected); + + extracted.ShouldNotBeNull(); + extracted.TraceId.ShouldBe(original.TraceId); + extracted.SpanId.ShouldBe(original.SpanId); + } +}