diff --git a/docs/test_parity.db b/docs/test_parity.db index 44559f2..d63a214 100644 Binary files a/docs/test_parity.db and b/docs/test_parity.db differ diff --git a/src/NATS.Server/Events/EventJsonContext.cs b/src/NATS.Server/Events/EventJsonContext.cs index 7ac4ed2..d601bff 100644 --- a/src/NATS.Server/Events/EventJsonContext.cs +++ b/src/NATS.Server/Events/EventJsonContext.cs @@ -5,8 +5,10 @@ namespace NATS.Server.Events; [JsonSerializable(typeof(ConnectEventMsg))] [JsonSerializable(typeof(DisconnectEventMsg))] [JsonSerializable(typeof(AccountNumConns))] +[JsonSerializable(typeof(AccNumConnsReq))] [JsonSerializable(typeof(ServerStatsMsg))] [JsonSerializable(typeof(ShutdownEventMsg))] [JsonSerializable(typeof(LameDuckEventMsg))] [JsonSerializable(typeof(AuthErrorEventMsg))] +[JsonSerializable(typeof(OcspPeerRejectEventMsg))] internal partial class EventJsonContext : JsonSerializerContext; diff --git a/src/NATS.Server/Events/EventTypes.cs b/src/NATS.Server/Events/EventTypes.cs index 9da36bb..e4341ca 100644 --- a/src/NATS.Server/Events/EventTypes.cs +++ b/src/NATS.Server/Events/EventTypes.cs @@ -4,6 +4,7 @@ namespace NATS.Server.Events; /// /// Server identity block embedded in all system events. +/// Go reference: events.go:249-265 ServerInfo struct. /// public sealed class EventServerInfo { @@ -29,17 +30,34 @@ public sealed class EventServerInfo [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] public string? Version { get; set; } + [JsonPropertyName("tags")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string[]? Tags { get; set; } + + [JsonPropertyName("metadata")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public Dictionary? Metadata { get; set; } + + [JsonPropertyName("jetstream")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public bool JetStream { get; set; } + + [JsonPropertyName("flags")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public ulong Flags { get; set; } + [JsonPropertyName("seq")] [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] public ulong Seq { get; set; } - [JsonPropertyName("tags")] - [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] - public Dictionary? Tags { get; set; } + [JsonPropertyName("time")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public DateTime Time { get; set; } } /// /// Client identity block for connect/disconnect events. +/// Go reference: events.go:308-331 ClientInfo struct. /// public sealed class EventClientInfo { @@ -62,6 +80,14 @@ public sealed class EventClientInfo [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] public string? Account { get; set; } + [JsonPropertyName("svc")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Service { get; set; } + + [JsonPropertyName("user")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? User { get; set; } + [JsonPropertyName("name")] [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] public string? Name { get; set; } @@ -77,8 +103,56 @@ public sealed class EventClientInfo [JsonPropertyName("rtt")] [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] public long RttNanos { get; set; } + + [JsonPropertyName("server")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Server { get; set; } + + [JsonPropertyName("cluster")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Cluster { get; set; } + + [JsonPropertyName("alts")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string[]? Alternates { get; set; } + + [JsonPropertyName("jwt")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Jwt { get; set; } + + [JsonPropertyName("issuer_key")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? IssuerKey { get; set; } + + [JsonPropertyName("name_tag")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? NameTag { get; set; } + + [JsonPropertyName("tags")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string[]? Tags { get; set; } + + [JsonPropertyName("kind")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Kind { get; set; } + + [JsonPropertyName("client_type")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? ClientType { get; set; } + + [JsonPropertyName("client_id")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? MqttClient { get; set; } + + [JsonPropertyName("nonce")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Nonce { get; set; } } +/// +/// Message and byte count stats. Applicable for both sent and received. +/// Go reference: events.go:407-410 MsgBytes, events.go:412-418 DataStats. +/// public sealed class DataStats { [JsonPropertyName("msgs")] @@ -86,6 +160,31 @@ public sealed class DataStats [JsonPropertyName("bytes")] public long Bytes { get; set; } + + [JsonPropertyName("gateways")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public MsgBytesStats? Gateways { get; set; } + + [JsonPropertyName("routes")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public MsgBytesStats? Routes { get; set; } + + [JsonPropertyName("leafs")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public MsgBytesStats? Leafs { get; set; } +} + +/// +/// Sub-stats for gateway/route/leaf message flow. +/// Go reference: events.go:407-410 MsgBytes. +/// +public sealed class MsgBytesStats +{ + [JsonPropertyName("msgs")] + public long Msgs { get; set; } + + [JsonPropertyName("bytes")] + public long Bytes { get; set; } } /// Client connect advisory. Go events.go:155-160. @@ -139,7 +238,10 @@ public sealed class DisconnectEventMsg public string Reason { get; set; } = string.Empty; } -/// Account connection count heartbeat. Go events.go:210-214. +/// +/// Account connection count heartbeat. Go events.go:210-214, 217-227. +/// Includes the full AccountStat fields from Go. +/// public sealed class AccountNumConns { public const string EventType = "io.nats.server.advisory.v1.account_connections"; @@ -156,23 +258,125 @@ public sealed class AccountNumConns [JsonPropertyName("server")] public EventServerInfo Server { get; set; } = new(); + /// Account identifier. Go AccountStat.Account. [JsonPropertyName("acc")] public string AccountName { get; set; } = string.Empty; + /// Account display name. Go AccountStat.Name. + [JsonPropertyName("name")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Name { get; set; } + + /// Current active connections. Go AccountStat.Conns. [JsonPropertyName("conns")] public int Connections { get; set; } - [JsonPropertyName("total_conns")] - public long TotalConnections { get; set; } + /// Active leaf node connections. Go AccountStat.LeafNodes. + [JsonPropertyName("leafnodes")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public int LeafNodes { get; set; } - [JsonPropertyName("subs")] - public int Subscriptions { get; set; } + /// Total connections over time. Go AccountStat.TotalConns. + [JsonPropertyName("total_conns")] + public int TotalConnections { get; set; } + + /// Active subscription count. Go AccountStat.NumSubs. + [JsonPropertyName("num_subscriptions")] + public uint NumSubscriptions { get; set; } [JsonPropertyName("sent")] public DataStats Sent { get; set; } = new(); [JsonPropertyName("received")] public DataStats Received { get; set; } = new(); + + /// Slow consumer count. Go AccountStat.SlowConsumers. + [JsonPropertyName("slow_consumers")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public long SlowConsumers { get; set; } +} + +/// +/// Route statistics for server stats broadcast. +/// Go reference: events.go:390-396 RouteStat. +/// +public sealed class RouteStat +{ + [JsonPropertyName("rid")] + public ulong Id { get; set; } + + [JsonPropertyName("name")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Name { get; set; } + + [JsonPropertyName("sent")] + public DataStats Sent { get; set; } = new(); + + [JsonPropertyName("received")] + public DataStats Received { get; set; } = new(); + + [JsonPropertyName("pending")] + public int Pending { get; set; } +} + +/// +/// Gateway statistics for server stats broadcast. +/// Go reference: events.go:399-405 GatewayStat. +/// +public sealed class GatewayStat +{ + [JsonPropertyName("gwid")] + public ulong Id { get; set; } + + [JsonPropertyName("name")] + public string Name { get; set; } = ""; + + [JsonPropertyName("sent")] + public DataStats Sent { get; set; } = new(); + + [JsonPropertyName("received")] + public DataStats Received { get; set; } = new(); + + [JsonPropertyName("inbound_connections")] + public int InboundConnections { get; set; } +} + +/// +/// Slow consumer breakdown statistics. +/// Go reference: events.go:377 SlowConsumersStats. +/// +public sealed class SlowConsumersStats +{ + [JsonPropertyName("clients")] + public long Clients { get; set; } + + [JsonPropertyName("routes")] + public long Routes { get; set; } + + [JsonPropertyName("gateways")] + public long Gateways { get; set; } + + [JsonPropertyName("leafs")] + public long Leafs { get; set; } +} + +/// +/// Stale connection breakdown statistics. +/// Go reference: events.go:379 StaleConnectionStats. +/// +public sealed class StaleConnectionStats +{ + [JsonPropertyName("clients")] + public long Clients { get; set; } + + [JsonPropertyName("routes")] + public long Routes { get; set; } + + [JsonPropertyName("gateways")] + public long Gateways { get; set; } + + [JsonPropertyName("leafs")] + public long Leafs { get; set; } } /// Server stats broadcast. Go events.go:150-153. @@ -185,6 +389,9 @@ public sealed class ServerStatsMsg public ServerStatsData Stats { get; set; } = new(); } +/// +/// Server stats data. Full parity with Go events.go:365-387 ServerStats. +/// public sealed class ServerStatsData { [JsonPropertyName("start")] @@ -198,6 +405,10 @@ public sealed class ServerStatsData [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] public int Cores { get; set; } + [JsonPropertyName("cpu")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public double Cpu { get; set; } + [JsonPropertyName("connections")] public int Connections { get; set; } @@ -211,6 +422,43 @@ public sealed class ServerStatsData [JsonPropertyName("subscriptions")] public long Subscriptions { get; set; } + /// Sent stats (msgs + bytes). Go ServerStats.Sent. + [JsonPropertyName("sent")] + public DataStats Sent { get; set; } = new(); + + /// Received stats (msgs + bytes). Go ServerStats.Received. + [JsonPropertyName("received")] + public DataStats Received { get; set; } = new(); + + [JsonPropertyName("slow_consumers")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public long SlowConsumers { get; set; } + + [JsonPropertyName("slow_consumer_stats")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public SlowConsumersStats? SlowConsumerStats { get; set; } + + [JsonPropertyName("stale_connections")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public long StaleConnections { get; set; } + + [JsonPropertyName("stale_connection_stats")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public StaleConnectionStats? StaleConnectionStats { get; set; } + + [JsonPropertyName("routes")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public RouteStat[]? Routes { get; set; } + + [JsonPropertyName("gateways")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public GatewayStat[]? Gateways { get; set; } + + [JsonPropertyName("active_servers")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public int ActiveServers { get; set; } + + // Kept for backward compat — flat counters that mirror Sent/Received. [JsonPropertyName("in_msgs")] public long InMsgs { get; set; } @@ -222,10 +470,6 @@ public sealed class ServerStatsData [JsonPropertyName("out_bytes")] public long OutBytes { get; set; } - - [JsonPropertyName("slow_consumers")] - [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] - public long SlowConsumers { get; set; } } /// Server shutdown notification. @@ -268,3 +512,43 @@ public sealed class AuthErrorEventMsg [JsonPropertyName("reason")] public string Reason { get; set; } = string.Empty; } + +/// +/// OCSP peer rejection advisory. +/// Go reference: events.go:182-188 OCSPPeerRejectEventMsg. +/// +public sealed class OcspPeerRejectEventMsg +{ + public const string EventType = "io.nats.server.advisory.v1.ocsp_peer_reject"; + + [JsonPropertyName("type")] + public string Type { get; set; } = EventType; + + [JsonPropertyName("id")] + public string Id { get; set; } = string.Empty; + + [JsonPropertyName("timestamp")] + public DateTime Time { get; set; } + + [JsonPropertyName("kind")] + public string Kind { get; set; } = ""; + + [JsonPropertyName("server")] + public EventServerInfo Server { get; set; } = new(); + + [JsonPropertyName("reason")] + public string Reason { get; set; } = string.Empty; +} + +/// +/// Account numeric connections request. +/// Go reference: events.go:233-236 accNumConnsReq. +/// +public sealed class AccNumConnsReq +{ + [JsonPropertyName("server")] + public EventServerInfo Server { get; set; } = new(); + + [JsonPropertyName("acc")] + public string Account { get; set; } = string.Empty; +} diff --git a/src/NATS.Server/Events/InternalEventSystem.cs b/src/NATS.Server/Events/InternalEventSystem.cs index caac5dd..e9545e4 100644 --- a/src/NATS.Server/Events/InternalEventSystem.cs +++ b/src/NATS.Server/Events/InternalEventSystem.cs @@ -159,6 +159,16 @@ public sealed class InternalEventSystem : IAsyncDisposable Connections = _server.ClientCount, TotalConnections = Interlocked.Read(ref _server.Stats.TotalConnections), Subscriptions = SystemAccount.SubList.Count, + Sent = new DataStats + { + Msgs = Interlocked.Read(ref _server.Stats.OutMsgs), + Bytes = Interlocked.Read(ref _server.Stats.OutBytes), + }, + Received = new DataStats + { + Msgs = Interlocked.Read(ref _server.Stats.InMsgs), + Bytes = Interlocked.Read(ref _server.Stats.InBytes), + }, InMsgs = Interlocked.Read(ref _server.Stats.InMsgs), OutMsgs = Interlocked.Read(ref _server.Stats.OutMsgs), InBytes = Interlocked.Read(ref _server.Stats.InBytes), diff --git a/src/NATS.Server/Internal/MessageTraceContext.cs b/src/NATS.Server/Internal/MessageTraceContext.cs new file mode 100644 index 0000000..97d763a --- /dev/null +++ b/src/NATS.Server/Internal/MessageTraceContext.cs @@ -0,0 +1,686 @@ +using System.Text; +using System.Text.Json; +using System.Text.Json.Serialization; +using NATS.Server.Events; + +namespace NATS.Server.Internal; + +/// +/// Header constants for NATS message tracing. +/// Go reference: msgtrace.go:28-33 +/// +public static class MsgTraceHeaders +{ + public const string TraceDest = "Nats-Trace-Dest"; + public const string TraceDestDisabled = "trace disabled"; + public const string TraceHop = "Nats-Trace-Hop"; + public const string TraceOriginAccount = "Nats-Trace-Origin-Account"; + public const string TraceOnly = "Nats-Trace-Only"; + public const string TraceParent = "traceparent"; +} + +/// +/// Types of message trace events in the MsgTraceEvents list. +/// Go reference: msgtrace.go:54-61 +/// +public static class MsgTraceTypes +{ + public const string Ingress = "in"; + public const string SubjectMapping = "sm"; + public const string StreamExport = "se"; + public const string ServiceImport = "si"; + public const string JetStream = "js"; + public const string Egress = "eg"; +} + +/// +/// Error messages used in message trace events. +/// Go reference: msgtrace.go:248-258 +/// +public static class MsgTraceErrors +{ + public const string OnlyNoSupport = "Not delivered because remote does not support message tracing"; + public const string NoSupport = "Message delivered but remote does not support message tracing so no trace event generated from there"; + public const string NoEcho = "Not delivered because of no echo"; + public const string PubViolation = "Not delivered because publish denied for this subject"; + public const string SubDeny = "Not delivered because subscription denies this subject"; + public const string SubClosed = "Not delivered because subscription is closed"; + public const string ClientClosed = "Not delivered because client is closed"; + public const string AutoSubExceeded = "Not delivered because auto-unsubscribe exceeded"; +} + +/// +/// Represents the full trace event document published to the trace destination. +/// Go reference: msgtrace.go:63-68 +/// +public sealed class MsgTraceEvent +{ + [JsonPropertyName("server")] + public EventServerInfo Server { get; set; } = new(); + + [JsonPropertyName("request")] + public MsgTraceRequest Request { get; set; } = new(); + + [JsonPropertyName("hops")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public int Hops { get; set; } + + [JsonPropertyName("events")] + public List Events { get; set; } = []; +} + +/// +/// The original request information captured for the trace. +/// Go reference: msgtrace.go:70-74 +/// +public sealed class MsgTraceRequest +{ + [JsonPropertyName("header")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public Dictionary? Header { get; set; } + + [JsonPropertyName("msgsize")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public int MsgSize { get; set; } +} + +/// +/// Base class for all trace event entries (ingress, egress, JS, etc.). +/// Go reference: msgtrace.go:83-86 +/// +[JsonDerivedType(typeof(MsgTraceIngress))] +[JsonDerivedType(typeof(MsgTraceSubjectMapping))] +[JsonDerivedType(typeof(MsgTraceStreamExport))] +[JsonDerivedType(typeof(MsgTraceServiceImport))] +[JsonDerivedType(typeof(MsgTraceJetStreamEntry))] +[JsonDerivedType(typeof(MsgTraceEgress))] +public class MsgTraceEntry +{ + [JsonPropertyName("type")] + public string Type { get; set; } = ""; + + [JsonPropertyName("ts")] + public DateTime Timestamp { get; set; } = DateTime.UtcNow; +} + +/// +/// Ingress trace event recorded when a message first enters the server. +/// Go reference: msgtrace.go:88-96 +/// +public sealed class MsgTraceIngress : MsgTraceEntry +{ + [JsonPropertyName("kind")] + public int Kind { get; set; } + + [JsonPropertyName("cid")] + public ulong Cid { get; set; } + + [JsonPropertyName("name")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Name { get; set; } + + [JsonPropertyName("acc")] + public string Account { get; set; } = ""; + + [JsonPropertyName("subj")] + public string Subject { get; set; } = ""; + + [JsonPropertyName("error")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Error { get; set; } +} + +/// +/// Subject mapping trace event. +/// Go reference: msgtrace.go:98-101 +/// +public sealed class MsgTraceSubjectMapping : MsgTraceEntry +{ + [JsonPropertyName("to")] + public string MappedTo { get; set; } = ""; +} + +/// +/// Stream export trace event. +/// Go reference: msgtrace.go:103-107 +/// +public sealed class MsgTraceStreamExport : MsgTraceEntry +{ + [JsonPropertyName("acc")] + public string Account { get; set; } = ""; + + [JsonPropertyName("to")] + public string To { get; set; } = ""; +} + +/// +/// Service import trace event. +/// Go reference: msgtrace.go:109-114 +/// +public sealed class MsgTraceServiceImport : MsgTraceEntry +{ + [JsonPropertyName("acc")] + public string Account { get; set; } = ""; + + [JsonPropertyName("from")] + public string From { get; set; } = ""; + + [JsonPropertyName("to")] + public string To { get; set; } = ""; +} + +/// +/// JetStream trace event. +/// Go reference: msgtrace.go:116-122 +/// +public sealed class MsgTraceJetStreamEntry : MsgTraceEntry +{ + [JsonPropertyName("stream")] + public string Stream { get; set; } = ""; + + [JsonPropertyName("subject")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Subject { get; set; } + + [JsonPropertyName("nointerest")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public bool NoInterest { get; set; } + + [JsonPropertyName("error")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Error { get; set; } +} + +/// +/// Egress trace event recorded for each delivery target. +/// Go reference: msgtrace.go:124-138 +/// +public sealed class MsgTraceEgress : MsgTraceEntry +{ + [JsonPropertyName("kind")] + public int Kind { get; set; } + + [JsonPropertyName("cid")] + public ulong Cid { get; set; } + + [JsonPropertyName("name")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Name { get; set; } + + [JsonPropertyName("hop")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Hop { get; set; } + + [JsonPropertyName("acc")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Account { get; set; } + + [JsonPropertyName("sub")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Subscription { get; set; } + + [JsonPropertyName("queue")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Queue { get; set; } + + [JsonPropertyName("error")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Error { get; set; } +} + +/// +/// Manages trace state as a message traverses the delivery pipeline. +/// Collects trace events and publishes the complete trace to the destination subject. +/// Go reference: msgtrace.go:260-273 +/// +public sealed class MsgTraceContext +{ + /// Kind constant for CLIENT connections. + public const int KindClient = 0; + /// Kind constant for ROUTER connections. + public const int KindRouter = 1; + /// Kind constant for GATEWAY connections. + public const int KindGateway = 2; + /// Kind constant for LEAF connections. + public const int KindLeaf = 3; + + private int _ready; + private MsgTraceJetStreamEntry? _js; + + /// + /// The destination subject where the trace event will be published. + /// + public string Destination { get; } + + /// + /// The accumulated trace event with all recorded entries. + /// + public MsgTraceEvent Event { get; } + + /// + /// Current hop identifier for this server. + /// + public string Hop { get; private set; } = ""; + + /// + /// Next hop identifier set before forwarding to routes/gateways/leafs. + /// + public string NextHop { get; private set; } = ""; + + /// + /// Whether to only trace the message without actually delivering it. + /// Go reference: msgtrace.go:271 + /// + public bool TraceOnly { get; } + + /// + /// Whether this trace context is active (non-null destination). + /// + public bool IsActive => !string.IsNullOrEmpty(Destination); + + /// + /// The account to use when publishing the trace event. + /// + public string? AccountName { get; } + + /// + /// Callback to publish the trace event. Set by the server. + /// + public Action? PublishCallback { get; set; } + + private MsgTraceContext(string destination, MsgTraceEvent evt, bool traceOnly, string? accountName, string hop) + { + Destination = destination; + Event = evt; + TraceOnly = traceOnly; + AccountName = accountName; + Hop = hop; + } + + /// + /// Creates a new trace context from inbound message headers. + /// Parses Nats-Trace-Dest, Nats-Trace-Only, and Nats-Trace-Hop headers. + /// Go reference: msgtrace.go:332-492 + /// + public static MsgTraceContext? Create( + ReadOnlyMemory headers, + ulong clientId, + string? clientName, + string accountName, + string subject, + int msgSize, + int clientKind = KindClient) + { + if (headers.Length == 0) + return null; + + var parsedHeaders = ParseTraceHeaders(headers.Span); + if (parsedHeaders == null || parsedHeaders.Count == 0) + return null; + + // Check for disabled trace + if (parsedHeaders.TryGetValue(MsgTraceHeaders.TraceDest, out var destValues) + && destValues.Length > 0 + && destValues[0] == MsgTraceHeaders.TraceDestDisabled) + { + return null; + } + + var dest = destValues?.Length > 0 ? destValues[0] : null; + if (string.IsNullOrEmpty(dest)) + return null; + + // Parse trace-only flag + bool traceOnly = false; + if (parsedHeaders.TryGetValue(MsgTraceHeaders.TraceOnly, out var onlyValues) && onlyValues.Length > 0) + { + var val = onlyValues[0].ToLowerInvariant(); + traceOnly = val is "1" or "true" or "on"; + } + + // Parse hop from non-CLIENT connections + string hop = ""; + if (clientKind != KindClient + && parsedHeaders.TryGetValue(MsgTraceHeaders.TraceHop, out var hopValues) + && hopValues.Length > 0) + { + hop = hopValues[0]; + } + + // Build ingress event + var evt = new MsgTraceEvent + { + Request = new MsgTraceRequest + { + Header = parsedHeaders, + MsgSize = msgSize, + }, + Events = + [ + new MsgTraceIngress + { + Type = MsgTraceTypes.Ingress, + Timestamp = DateTime.UtcNow, + Kind = clientKind, + Cid = clientId, + Name = clientName, + Account = accountName, + Subject = subject, + }, + ], + }; + + return new MsgTraceContext(dest, evt, traceOnly, accountName, hop); + } + + /// + /// Sets an error on the ingress event. + /// Go reference: msgtrace.go:657-661 + /// + public void SetIngressError(string error) + { + if (Event.Events.Count > 0 && Event.Events[0] is MsgTraceIngress ingress) + { + ingress.Error = error; + } + } + + /// + /// Adds a subject mapping trace event. + /// Go reference: msgtrace.go:663-674 + /// + public void AddSubjectMappingEvent(string mappedTo) + { + Event.Events.Add(new MsgTraceSubjectMapping + { + Type = MsgTraceTypes.SubjectMapping, + Timestamp = DateTime.UtcNow, + MappedTo = mappedTo, + }); + } + + /// + /// Adds an egress trace event for a delivery target. + /// Go reference: msgtrace.go:676-711 + /// + public void AddEgressEvent(ulong clientId, string? clientName, int clientKind, + string? subscriptionSubject = null, string? queue = null, string? account = null, string? error = null) + { + var egress = new MsgTraceEgress + { + Type = MsgTraceTypes.Egress, + Timestamp = DateTime.UtcNow, + Kind = clientKind, + Cid = clientId, + Name = clientName, + Hop = string.IsNullOrEmpty(NextHop) ? null : NextHop, + Error = error, + }; + + NextHop = ""; + + // Set subscription and queue for CLIENT connections + if (clientKind == KindClient) + { + egress.Subscription = subscriptionSubject; + egress.Queue = queue; + } + + // Set account if different from ingress account + if ((clientKind == KindClient || clientKind == KindLeaf) && account != null) + { + if (Event.Events.Count > 0 && Event.Events[0] is MsgTraceIngress ingress && account != ingress.Account) + { + egress.Account = account; + } + } + + Event.Events.Add(egress); + } + + /// + /// Adds a stream export trace event. + /// Go reference: msgtrace.go:713-728 + /// + public void AddStreamExportEvent(string accountName, string to) + { + Event.Events.Add(new MsgTraceStreamExport + { + Type = MsgTraceTypes.StreamExport, + Timestamp = DateTime.UtcNow, + Account = accountName, + To = to, + }); + } + + /// + /// Adds a service import trace event. + /// Go reference: msgtrace.go:730-743 + /// + public void AddServiceImportEvent(string accountName, string from, string to) + { + Event.Events.Add(new MsgTraceServiceImport + { + Type = MsgTraceTypes.ServiceImport, + Timestamp = DateTime.UtcNow, + Account = accountName, + From = from, + To = to, + }); + } + + /// + /// Adds a JetStream trace event for stream storage. + /// Go reference: msgtrace.go:745-757 + /// + public void AddJetStreamEvent(string streamName) + { + _js = new MsgTraceJetStreamEntry + { + Type = MsgTraceTypes.JetStream, + Timestamp = DateTime.UtcNow, + Stream = streamName, + }; + Event.Events.Add(_js); + } + + /// + /// Updates the JetStream trace event with subject and interest info. + /// Go reference: msgtrace.go:759-772 + /// + public void UpdateJetStreamEvent(string subject, bool noInterest) + { + if (_js == null) return; + _js.Subject = subject; + _js.NoInterest = noInterest; + _js.Timestamp = DateTime.UtcNow; + } + + /// + /// Sets the hop header for forwarding to routes/gateways/leafs. + /// Increments the hop counter and builds the next hop id. + /// Go reference: msgtrace.go:646-655 + /// + public void SetHopHeader() + { + Event.Hops++; + NextHop = string.IsNullOrEmpty(Hop) + ? Event.Hops.ToString() + : $"{Hop}.{Event.Hops}"; + } + + /// + /// Sends the accumulated trace event from the JetStream path. + /// Delegates to SendEvent for the two-phase ready logic. + /// Go reference: msgtrace.go:774-786 + /// + public void SendEventFromJetStream(string? error = null) + { + if (_js == null) return; + if (error != null) _js.Error = error; + + SendEvent(); + } + + /// + /// Sends the accumulated trace event to the destination subject. + /// For non-JetStream paths, sends immediately. For JetStream paths, + /// uses a two-phase ready check: both the message delivery path and + /// the JetStream storage path must call SendEvent before the event + /// is actually published. + /// Go reference: msgtrace.go:788-799 + /// + public void SendEvent() + { + if (_js != null) + { + var ready = Interlocked.Increment(ref _ready) == 2; + if (!ready) return; + } + + PublishCallback?.Invoke(Destination, null, Event); + } + + /// + /// Parses NATS headers looking for trace-related headers. + /// Returns null if no trace headers found. + /// Go reference: msgtrace.go:509-591 + /// + internal static Dictionary? ParseTraceHeaders(ReadOnlySpan hdr) + { + // Must start with NATS/1.0 header line + var hdrLine = "NATS/1.0 "u8; + if (hdr.Length < hdrLine.Length || !hdr[..hdrLine.Length].SequenceEqual(hdrLine)) + { + // Also try NATS/1.0\r\n (status line without status code) + var hdrLine2 = "NATS/1.0\r\n"u8; + if (hdr.Length < hdrLine2.Length || !hdr[..hdrLine2.Length].SequenceEqual(hdrLine2)) + return null; + } + + bool traceDestFound = false; + bool traceParentFound = false; + var keys = new List(); + var vals = new List(); + + // Skip the first line (status line) + int i = 0; + var crlf = "\r\n"u8; + var firstCrlf = hdr.IndexOf(crlf); + if (firstCrlf < 0) return null; + i = firstCrlf + 2; + + while (i < hdr.Length) + { + // Find the colon delimiter + int colonIdx = -1; + for (int j = i; j < hdr.Length; j++) + { + if (hdr[j] == (byte)':') + { + colonIdx = j; + break; + } + if (hdr[j] == (byte)'\r' || hdr[j] == (byte)'\n') + break; + } + + if (colonIdx < 0) + { + // Skip to next line + var nextCrlf = hdr[i..].IndexOf(crlf); + if (nextCrlf < 0) break; + i += nextCrlf + 2; + continue; + } + + var keySpan = hdr[i..colonIdx]; + i = colonIdx + 1; + + // Skip leading whitespace in value + while (i < hdr.Length && (hdr[i] == (byte)' ' || hdr[i] == (byte)'\t')) + i++; + + // Find end of value (CRLF) + int valStart = i; + var valCrlf = hdr[valStart..].IndexOf(crlf); + if (valCrlf < 0) break; + + int valEnd = valStart + valCrlf; + // Trim trailing whitespace + while (valEnd > valStart && (hdr[valEnd - 1] == (byte)' ' || hdr[valEnd - 1] == (byte)'\t')) + valEnd--; + + var valSpan = hdr[valStart..valEnd]; + + if (keySpan.Length > 0 && valSpan.Length > 0) + { + var key = Encoding.ASCII.GetString(keySpan); + var val = Encoding.ASCII.GetString(valSpan); + + // Check for trace-dest header + if (!traceDestFound && key == MsgTraceHeaders.TraceDest) + { + if (val == MsgTraceHeaders.TraceDestDisabled) + return null; // Tracing explicitly disabled + traceDestFound = true; + } + // Check for traceparent header (case-insensitive) + else if (!traceParentFound && key.Equals(MsgTraceHeaders.TraceParent, StringComparison.OrdinalIgnoreCase)) + { + // Parse W3C trace context: version-traceid-parentid-flags + var parts = val.Split('-'); + if (parts.Length == 4 && parts[3].Length == 2) + { + if (int.TryParse(parts[3], System.Globalization.NumberStyles.HexNumber, null, out var flags) + && (flags & 0x1) == 0x1) + { + traceParentFound = true; + } + } + } + + keys.Add(key); + vals.Add(val); + } + + i = valStart + valCrlf + 2; + } + + if (!traceDestFound && !traceParentFound) + return null; + + // Build the header map + var map = new Dictionary(keys.Count); + for (int k = 0; k < keys.Count; k++) + { + if (map.TryGetValue(keys[k], out var existing)) + { + var newArr = new string[existing.Length + 1]; + existing.CopyTo(newArr, 0); + newArr[^1] = vals[k]; + map[keys[k]] = newArr; + } + else + { + map[keys[k]] = [vals[k]]; + } + } + + return map; + } +} + +/// +/// JSON serialization context for message trace types. +/// +[JsonSerializable(typeof(MsgTraceEvent))] +[JsonSerializable(typeof(MsgTraceRequest))] +[JsonSerializable(typeof(MsgTraceEntry))] +[JsonSerializable(typeof(MsgTraceIngress))] +[JsonSerializable(typeof(MsgTraceSubjectMapping))] +[JsonSerializable(typeof(MsgTraceStreamExport))] +[JsonSerializable(typeof(MsgTraceServiceImport))] +[JsonSerializable(typeof(MsgTraceJetStreamEntry))] +[JsonSerializable(typeof(MsgTraceEgress))] +internal partial class MsgTraceJsonContext : JsonSerializerContext; diff --git a/src/NATS.Server/Monitoring/Connz.cs b/src/NATS.Server/Monitoring/Connz.cs index 7926dc1..0043484 100644 --- a/src/NATS.Server/Monitoring/Connz.cs +++ b/src/NATS.Server/Monitoring/Connz.cs @@ -218,6 +218,18 @@ public sealed class ConnzOptions public string MqttClient { get; set; } = ""; + /// + /// When non-zero, returns only the connection with this CID. + /// Go reference: monitor.go ConnzOptions.CID. + /// + public ulong Cid { get; set; } + + /// + /// Whether to include authorized user info. + /// Go reference: monitor.go ConnzOptions.Username. + /// + public bool Auth { get; set; } + public int Offset { get; set; } public int Limit { get; set; } = 1024; diff --git a/src/NATS.Server/Monitoring/ConnzHandler.cs b/src/NATS.Server/Monitoring/ConnzHandler.cs index b542f38..4ad791e 100644 --- a/src/NATS.Server/Monitoring/ConnzHandler.cs +++ b/src/NATS.Server/Monitoring/ConnzHandler.cs @@ -16,6 +16,13 @@ public sealed class ConnzHandler(NatsServer server) var connInfos = new List(); + // If a specific CID is requested, search for that single connection + // Go reference: monitor.go Connz() — CID fast path + if (opts.Cid > 0) + { + return HandleSingleCid(opts, now); + } + // Collect open connections if (opts.State is ConnState.Open or ConnState.All) { @@ -23,7 +30,7 @@ public sealed class ConnzHandler(NatsServer server) connInfos.AddRange(clients.Select(c => BuildConnInfo(c, now, opts))); } - // Collect closed connections + // Collect closed connections from the ring buffer if (opts.State is ConnState.Closed or ConnState.All) { connInfos.AddRange(server.GetClosedClients().Select(c => BuildClosedConnInfo(c, now, opts))); @@ -81,6 +88,59 @@ public sealed class ConnzHandler(NatsServer server) }; } + /// + /// Handles a request for a single connection by CID. + /// Go reference: monitor.go Connz() — CID-specific path. + /// + private Connz HandleSingleCid(ConnzOptions opts, DateTime now) + { + // Search open connections first + var client = server.GetClients().FirstOrDefault(c => c.Id == opts.Cid); + if (client != null) + { + var info = BuildConnInfo(client, now, opts); + return new Connz + { + Id = server.ServerId, + Now = now, + NumConns = 1, + Total = 1, + Offset = 0, + Limit = 1, + Conns = [info], + }; + } + + // Search closed connections ring buffer + var closed = server.GetClosedClients().FirstOrDefault(c => c.Cid == opts.Cid); + if (closed != null) + { + var info = BuildClosedConnInfo(closed, now, opts); + return new Connz + { + Id = server.ServerId, + Now = now, + NumConns = 1, + Total = 1, + Offset = 0, + Limit = 1, + Conns = [info], + }; + } + + // Not found — return empty result + return new Connz + { + Id = server.ServerId, + Now = now, + NumConns = 0, + Total = 0, + Offset = 0, + Limit = 0, + Conns = [], + }; + } + private static ConnInfo BuildConnInfo(NatsClient client, DateTime now, ConnzOptions opts) { var info = new ConnInfo @@ -228,6 +288,12 @@ public sealed class ConnzHandler(NatsServer server) if (q.TryGetValue("limit", out var limit) && int.TryParse(limit, out var l)) opts.Limit = l; + if (q.TryGetValue("cid", out var cid) && ulong.TryParse(cid, out var cidValue)) + opts.Cid = cidValue; + + if (q.TryGetValue("auth", out var auth)) + opts.Auth = auth.ToString().ToLowerInvariant() is "1" or "true"; + if (q.TryGetValue("mqtt_client", out var mqttClient)) opts.MqttClient = mqttClient.ToString(); @@ -243,10 +309,13 @@ public sealed class ConnzHandler(NatsServer server) private static bool MatchesSubjectFilter(ConnInfo info, string filterSubject) { - if (info.Subs.Any(s => SubjectMatch.MatchLiteral(s, filterSubject))) + // Go reference: monitor.go — matchLiteral(testSub, string(sub.subject)) + // The filter subject is the literal, the subscription subject is the pattern + // (subscriptions may contain wildcards like orders.> that match the filter orders.new) + if (info.Subs.Any(s => SubjectMatch.MatchLiteral(filterSubject, s))) return true; - return info.SubsDetail.Any(s => SubjectMatch.MatchLiteral(s.Subject, filterSubject)); + return info.SubsDetail.Any(s => SubjectMatch.MatchLiteral(filterSubject, s.Subject)); } private static string FormatRtt(TimeSpan rtt) diff --git a/tests/NATS.Server.Tests/Events/EventPayloadTests.cs b/tests/NATS.Server.Tests/Events/EventPayloadTests.cs new file mode 100644 index 0000000..240595f --- /dev/null +++ b/tests/NATS.Server.Tests/Events/EventPayloadTests.cs @@ -0,0 +1,469 @@ +using System.Text.Json; +using NATS.Server.Events; + +namespace NATS.Server.Tests.Events; + +/// +/// Tests that all event DTOs have complete JSON fields matching Go's output. +/// Go reference: events.go:100-300 — TypedEvent, ServerInfo, ClientInfo, +/// DataStats, ServerStats, ConnectEventMsg, DisconnectEventMsg, AccountNumConns. +/// +public class EventPayloadTests +{ + // --- EventServerInfo --- + + [Fact] + public void EventServerInfo_serializes_all_fields_matching_Go() + { + var info = new EventServerInfo + { + Name = "test-server", + Host = "127.0.0.1", + Id = "ABCDEF123456", + Cluster = "test-cluster", + Domain = "test-domain", + Version = "2.10.0", + Tags = ["tag1", "tag2"], + Metadata = new Dictionary { ["env"] = "test" }, + JetStream = true, + Flags = 1, + Seq = 42, + Time = new DateTime(2025, 1, 1, 0, 0, 0, DateTimeKind.Utc), + }; + + var json = JsonSerializer.Serialize(info); + var doc = JsonDocument.Parse(json); + var root = doc.RootElement; + + root.GetProperty("name").GetString().ShouldBe("test-server"); + root.GetProperty("host").GetString().ShouldBe("127.0.0.1"); + root.GetProperty("id").GetString().ShouldBe("ABCDEF123456"); + root.GetProperty("cluster").GetString().ShouldBe("test-cluster"); + root.GetProperty("domain").GetString().ShouldBe("test-domain"); + root.GetProperty("ver").GetString().ShouldBe("2.10.0"); + root.GetProperty("tags").GetArrayLength().ShouldBe(2); + root.GetProperty("metadata").GetProperty("env").GetString().ShouldBe("test"); + root.GetProperty("jetstream").GetBoolean().ShouldBeTrue(); + root.GetProperty("flags").GetUInt64().ShouldBe(1UL); + root.GetProperty("seq").GetUInt64().ShouldBe(42UL); + root.GetProperty("time").GetDateTime().Year.ShouldBe(2025); + } + + [Fact] + public void EventServerInfo_omits_null_optional_fields() + { + var info = new EventServerInfo + { + Name = "s", + Id = "ID", + }; + + var json = JsonSerializer.Serialize(info); + var doc = JsonDocument.Parse(json); + var root = doc.RootElement; + + root.TryGetProperty("cluster", out _).ShouldBeFalse(); + root.TryGetProperty("domain", out _).ShouldBeFalse(); + root.TryGetProperty("tags", out _).ShouldBeFalse(); + root.TryGetProperty("metadata", out _).ShouldBeFalse(); + } + + // --- EventClientInfo --- + + [Fact] + public void EventClientInfo_serializes_all_fields_matching_Go() + { + var ci = new EventClientInfo + { + Start = new DateTime(2025, 1, 1, 0, 0, 0, DateTimeKind.Utc), + Stop = new DateTime(2025, 1, 1, 1, 0, 0, DateTimeKind.Utc), + Host = "10.0.0.1", + Id = 99, + Account = "$G", + Service = "orders", + User = "admin", + Name = "my-client", + Lang = "go", + Version = "1.30.0", + RttNanos = 5_000_000, // 5ms + Server = "srv-1", + Cluster = "cluster-east", + Alternates = ["alt1", "alt2"], + Jwt = "eyJ...", + IssuerKey = "OABC...", + NameTag = "test-tag", + Tags = ["dev"], + Kind = "Client", + ClientType = "nats", + MqttClient = "mqtt-abc", + Nonce = "nonce123", + }; + + var json = JsonSerializer.Serialize(ci); + var doc = JsonDocument.Parse(json); + var root = doc.RootElement; + + root.GetProperty("host").GetString().ShouldBe("10.0.0.1"); + root.GetProperty("id").GetUInt64().ShouldBe(99UL); + root.GetProperty("acc").GetString().ShouldBe("$G"); + root.GetProperty("svc").GetString().ShouldBe("orders"); + root.GetProperty("user").GetString().ShouldBe("admin"); + root.GetProperty("name").GetString().ShouldBe("my-client"); + root.GetProperty("lang").GetString().ShouldBe("go"); + root.GetProperty("ver").GetString().ShouldBe("1.30.0"); + root.GetProperty("rtt").GetInt64().ShouldBe(5_000_000); + root.GetProperty("server").GetString().ShouldBe("srv-1"); + root.GetProperty("cluster").GetString().ShouldBe("cluster-east"); + root.GetProperty("alts").GetArrayLength().ShouldBe(2); + root.GetProperty("jwt").GetString().ShouldBe("eyJ..."); + root.GetProperty("issuer_key").GetString().ShouldBe("OABC..."); + root.GetProperty("name_tag").GetString().ShouldBe("test-tag"); + root.GetProperty("tags").GetArrayLength().ShouldBe(1); + root.GetProperty("kind").GetString().ShouldBe("Client"); + root.GetProperty("client_type").GetString().ShouldBe("nats"); + root.GetProperty("client_id").GetString().ShouldBe("mqtt-abc"); + root.GetProperty("nonce").GetString().ShouldBe("nonce123"); + } + + [Fact] + public void EventClientInfo_omits_null_optional_fields() + { + var ci = new EventClientInfo { Id = 1 }; + var json = JsonSerializer.Serialize(ci); + var doc = JsonDocument.Parse(json); + var root = doc.RootElement; + + root.TryGetProperty("svc", out _).ShouldBeFalse(); + root.TryGetProperty("user", out _).ShouldBeFalse(); + root.TryGetProperty("server", out _).ShouldBeFalse(); + root.TryGetProperty("cluster", out _).ShouldBeFalse(); + root.TryGetProperty("alts", out _).ShouldBeFalse(); + root.TryGetProperty("jwt", out _).ShouldBeFalse(); + root.TryGetProperty("issuer_key", out _).ShouldBeFalse(); + root.TryGetProperty("nonce", out _).ShouldBeFalse(); + } + + // --- DataStats --- + + [Fact] + public void DataStats_serializes_with_optional_sub_stats() + { + var ds = new DataStats + { + Msgs = 100, + Bytes = 2048, + Gateways = new MsgBytesStats { Msgs = 10, Bytes = 256 }, + Routes = new MsgBytesStats { Msgs = 50, Bytes = 1024 }, + Leafs = new MsgBytesStats { Msgs = 40, Bytes = 768 }, + }; + + var json = JsonSerializer.Serialize(ds); + var doc = JsonDocument.Parse(json); + var root = doc.RootElement; + + root.GetProperty("msgs").GetInt64().ShouldBe(100); + root.GetProperty("bytes").GetInt64().ShouldBe(2048); + root.GetProperty("gateways").GetProperty("msgs").GetInt64().ShouldBe(10); + root.GetProperty("routes").GetProperty("bytes").GetInt64().ShouldBe(1024); + root.GetProperty("leafs").GetProperty("msgs").GetInt64().ShouldBe(40); + } + + [Fact] + public void DataStats_omits_null_sub_stats() + { + var ds = new DataStats { Msgs = 5, Bytes = 50 }; + var json = JsonSerializer.Serialize(ds); + var doc = JsonDocument.Parse(json); + var root = doc.RootElement; + + root.TryGetProperty("gateways", out _).ShouldBeFalse(); + root.TryGetProperty("routes", out _).ShouldBeFalse(); + root.TryGetProperty("leafs", out _).ShouldBeFalse(); + } + + // --- ConnectEventMsg --- + + [Fact] + public void ConnectEventMsg_has_correct_type_and_required_fields() + { + var evt = new ConnectEventMsg + { + Id = "evt-1", + Time = DateTime.UtcNow, + Server = new EventServerInfo { Name = "s1", Id = "SRV1" }, + Client = new EventClientInfo { Id = 42, Name = "test-client" }, + }; + + var json = JsonSerializer.Serialize(evt); + var doc = JsonDocument.Parse(json); + var root = doc.RootElement; + + root.GetProperty("type").GetString().ShouldBe("io.nats.server.advisory.v1.client_connect"); + root.GetProperty("id").GetString().ShouldBe("evt-1"); + root.GetProperty("server").GetProperty("name").GetString().ShouldBe("s1"); + root.GetProperty("client").GetProperty("id").GetUInt64().ShouldBe(42UL); + } + + // --- DisconnectEventMsg --- + + [Fact] + public void DisconnectEventMsg_has_correct_type_and_data_stats() + { + var evt = new DisconnectEventMsg + { + Id = "evt-2", + Time = DateTime.UtcNow, + Server = new EventServerInfo { Name = "s1", Id = "SRV1" }, + Client = new EventClientInfo { Id = 42 }, + Sent = new DataStats { Msgs = 100, Bytes = 2000 }, + Received = new DataStats { Msgs = 50, Bytes = 1000 }, + Reason = "Client Closed", + }; + + var json = JsonSerializer.Serialize(evt); + var doc = JsonDocument.Parse(json); + var root = doc.RootElement; + + root.GetProperty("type").GetString().ShouldBe("io.nats.server.advisory.v1.client_disconnect"); + root.GetProperty("sent").GetProperty("msgs").GetInt64().ShouldBe(100); + root.GetProperty("received").GetProperty("bytes").GetInt64().ShouldBe(1000); + root.GetProperty("reason").GetString().ShouldBe("Client Closed"); + } + + // --- AccountNumConns --- + + [Fact] + public void AccountNumConns_serializes_all_Go_AccountStat_fields() + { + var evt = new AccountNumConns + { + Id = "evt-3", + Time = DateTime.UtcNow, + Server = new EventServerInfo { Name = "s1", Id = "SRV1" }, + AccountName = "$G", + Name = "Global", + Connections = 5, + LeafNodes = 2, + TotalConnections = 100, + NumSubscriptions = 42, + Sent = new DataStats { Msgs = 500, Bytes = 10_000 }, + Received = new DataStats { Msgs = 400, Bytes = 8_000 }, + SlowConsumers = 1, + }; + + var json = JsonSerializer.Serialize(evt); + var doc = JsonDocument.Parse(json); + var root = doc.RootElement; + + root.GetProperty("type").GetString().ShouldBe("io.nats.server.advisory.v1.account_connections"); + root.GetProperty("acc").GetString().ShouldBe("$G"); + root.GetProperty("name").GetString().ShouldBe("Global"); + root.GetProperty("conns").GetInt32().ShouldBe(5); + root.GetProperty("leafnodes").GetInt32().ShouldBe(2); + root.GetProperty("total_conns").GetInt32().ShouldBe(100); + root.GetProperty("num_subscriptions").GetUInt32().ShouldBe(42u); + root.GetProperty("sent").GetProperty("msgs").GetInt64().ShouldBe(500); + root.GetProperty("received").GetProperty("bytes").GetInt64().ShouldBe(8_000); + root.GetProperty("slow_consumers").GetInt64().ShouldBe(1); + } + + // --- ServerStatsMsg --- + + [Fact] + public void ServerStatsMsg_has_sent_received_and_breakdown_fields() + { + var msg = new ServerStatsMsg + { + Server = new EventServerInfo { Name = "s1", Id = "SRV1", Seq = 1 }, + Stats = new ServerStatsData + { + Start = new DateTime(2025, 1, 1, 0, 0, 0, DateTimeKind.Utc), + Mem = 100_000_000, + Cores = 8, + Cpu = 12.5, + Connections = 10, + TotalConnections = 500, + ActiveAccounts = 3, + Subscriptions = 50, + Sent = new DataStats { Msgs = 1000, Bytes = 50_000 }, + Received = new DataStats { Msgs = 800, Bytes = 40_000 }, + InMsgs = 800, + OutMsgs = 1000, + InBytes = 40_000, + OutBytes = 50_000, + SlowConsumers = 2, + SlowConsumerStats = new SlowConsumersStats { Clients = 1, Routes = 1 }, + StaleConnections = 3, + StaleConnectionStats = new StaleConnectionStats { Clients = 2, Leafs = 1 }, + ActiveServers = 3, + Routes = [new RouteStat { Id = 1, Name = "r1", Sent = new DataStats { Msgs = 10 }, Received = new DataStats { Msgs = 5 }, Pending = 0 }], + Gateways = [new GatewayStat { Id = 1, Name = "gw1", Sent = new DataStats { Msgs = 20 }, Received = new DataStats { Msgs = 15 }, InboundConnections = 2 }], + }, + }; + + var json = JsonSerializer.Serialize(msg); + var doc = JsonDocument.Parse(json); + var root = doc.RootElement; + var stats = root.GetProperty("statsz"); + + stats.GetProperty("mem").GetInt64().ShouldBe(100_000_000); + stats.GetProperty("cores").GetInt32().ShouldBe(8); + stats.GetProperty("cpu").GetDouble().ShouldBe(12.5); + stats.GetProperty("connections").GetInt32().ShouldBe(10); + stats.GetProperty("total_connections").GetInt64().ShouldBe(500); + stats.GetProperty("active_accounts").GetInt32().ShouldBe(3); + stats.GetProperty("subscriptions").GetInt64().ShouldBe(50); + stats.GetProperty("sent").GetProperty("msgs").GetInt64().ShouldBe(1000); + stats.GetProperty("received").GetProperty("bytes").GetInt64().ShouldBe(40_000); + stats.GetProperty("in_msgs").GetInt64().ShouldBe(800); + stats.GetProperty("out_msgs").GetInt64().ShouldBe(1000); + stats.GetProperty("slow_consumers").GetInt64().ShouldBe(2); + stats.GetProperty("slow_consumer_stats").GetProperty("clients").GetInt64().ShouldBe(1); + stats.GetProperty("stale_connections").GetInt64().ShouldBe(3); + stats.GetProperty("stale_connection_stats").GetProperty("leafs").GetInt64().ShouldBe(1); + stats.GetProperty("active_servers").GetInt32().ShouldBe(3); + stats.GetProperty("routes").GetArrayLength().ShouldBe(1); + stats.GetProperty("routes")[0].GetProperty("rid").GetUInt64().ShouldBe(1UL); + stats.GetProperty("gateways").GetArrayLength().ShouldBe(1); + stats.GetProperty("gateways")[0].GetProperty("name").GetString().ShouldBe("gw1"); + } + + // --- AuthErrorEventMsg --- + + [Fact] + public void AuthErrorEventMsg_has_correct_type() + { + var evt = new AuthErrorEventMsg + { + Id = "evt-4", + Time = DateTime.UtcNow, + Server = new EventServerInfo { Name = "s1", Id = "SRV1" }, + Client = new EventClientInfo { Id = 99, Host = "10.0.0.1" }, + Reason = "Authorization Violation", + }; + + var json = JsonSerializer.Serialize(evt); + var doc = JsonDocument.Parse(json); + var root = doc.RootElement; + + root.GetProperty("type").GetString().ShouldBe("io.nats.server.advisory.v1.client_auth"); + root.GetProperty("reason").GetString().ShouldBe("Authorization Violation"); + root.GetProperty("client").GetProperty("host").GetString().ShouldBe("10.0.0.1"); + } + + // --- OcspPeerRejectEventMsg --- + + [Fact] + public void OcspPeerRejectEventMsg_has_correct_type() + { + var evt = new OcspPeerRejectEventMsg + { + Id = "evt-5", + Time = DateTime.UtcNow, + Kind = "client", + Server = new EventServerInfo { Name = "s1", Id = "SRV1" }, + Reason = "OCSP revoked", + }; + + var json = JsonSerializer.Serialize(evt); + var doc = JsonDocument.Parse(json); + var root = doc.RootElement; + + root.GetProperty("type").GetString().ShouldBe("io.nats.server.advisory.v1.ocsp_peer_reject"); + root.GetProperty("kind").GetString().ShouldBe("client"); + root.GetProperty("reason").GetString().ShouldBe("OCSP revoked"); + } + + // --- ShutdownEventMsg --- + + [Fact] + public void ShutdownEventMsg_serializes_reason() + { + var evt = new ShutdownEventMsg + { + Server = new EventServerInfo { Name = "s1", Id = "SRV1" }, + Reason = "Server Shutdown", + }; + + var json = JsonSerializer.Serialize(evt); + var doc = JsonDocument.Parse(json); + doc.RootElement.GetProperty("reason").GetString().ShouldBe("Server Shutdown"); + } + + // --- AccNumConnsReq --- + + [Fact] + public void AccNumConnsReq_serializes_account() + { + var req = new AccNumConnsReq + { + Server = new EventServerInfo { Name = "s1", Id = "SRV1" }, + Account = "myAccount", + }; + + var json = JsonSerializer.Serialize(req); + var doc = JsonDocument.Parse(json); + doc.RootElement.GetProperty("acc").GetString().ShouldBe("myAccount"); + } + + // --- Round-trip deserialization --- + + [Fact] + public void ConnectEventMsg_roundtrips_through_json() + { + var original = new ConnectEventMsg + { + Id = "rt-1", + Time = new DateTime(2025, 6, 15, 12, 0, 0, DateTimeKind.Utc), + Server = new EventServerInfo { Name = "srv", Id = "SRV1", Version = "2.10.0", Seq = 5 }, + Client = new EventClientInfo + { + Id = 42, + Host = "10.0.0.1", + Account = "$G", + Name = "test", + Lang = "dotnet", + Version = "1.0.0", + RttNanos = 1_000_000, + Kind = "Client", + }, + }; + + var json = JsonSerializer.Serialize(original); + var deserialized = JsonSerializer.Deserialize(json); + + deserialized.ShouldNotBeNull(); + deserialized.Type.ShouldBe(ConnectEventMsg.EventType); + deserialized.Id.ShouldBe("rt-1"); + deserialized.Server.Name.ShouldBe("srv"); + deserialized.Server.Seq.ShouldBe(5UL); + deserialized.Client.Id.ShouldBe(42UL); + deserialized.Client.Kind.ShouldBe("Client"); + deserialized.Client.RttNanos.ShouldBe(1_000_000); + } + + [Fact] + public void ServerStatsMsg_roundtrips_through_json() + { + var original = new ServerStatsMsg + { + Server = new EventServerInfo { Name = "srv", Id = "SRV1" }, + Stats = new ServerStatsData + { + Connections = 10, + Sent = new DataStats { Msgs = 100, Bytes = 5000 }, + Received = new DataStats { Msgs = 80, Bytes = 4000 }, + InMsgs = 80, + OutMsgs = 100, + }, + }; + + var json = JsonSerializer.Serialize(original); + var deserialized = JsonSerializer.Deserialize(json); + + deserialized.ShouldNotBeNull(); + deserialized.Stats.Connections.ShouldBe(10); + deserialized.Stats.Sent.Msgs.ShouldBe(100); + deserialized.Stats.Received.Bytes.ShouldBe(4000); + } +} diff --git a/tests/NATS.Server.Tests/Internal/MessageTraceContextTests.cs b/tests/NATS.Server.Tests/Internal/MessageTraceContextTests.cs new file mode 100644 index 0000000..aa6ec17 --- /dev/null +++ b/tests/NATS.Server.Tests/Internal/MessageTraceContextTests.cs @@ -0,0 +1,628 @@ +using System.Text; +using System.Text.Json; +using NATS.Server.Events; +using NATS.Server.Internal; + +namespace NATS.Server.Tests.Internal; + +/// +/// Tests for MsgTraceContext: header parsing, event collection, trace propagation, +/// JetStream two-phase send, hop tracking, and JSON serialization. +/// Go reference: msgtrace.go — initMsgTrace, sendEvent, addEgressEvent, +/// addJetStreamEvent, genHeaderMapIfTraceHeadersPresent. +/// +public class MessageTraceContextTests +{ + private static ReadOnlyMemory BuildHeaders(params (string key, string value)[] headers) + { + var sb = new StringBuilder("NATS/1.0\r\n"); + foreach (var (key, value) in headers) + { + sb.Append($"{key}: {value}\r\n"); + } + sb.Append("\r\n"); + return Encoding.ASCII.GetBytes(sb.ToString()); + } + + // --- Header parsing --- + + [Fact] + public void ParseTraceHeaders_returns_null_for_no_trace_headers() + { + var headers = BuildHeaders(("Content-Type", "text/plain")); + var result = MsgTraceContext.ParseTraceHeaders(headers.Span); + result.ShouldBeNull(); + } + + [Fact] + public void ParseTraceHeaders_returns_map_when_trace_dest_present() + { + var headers = BuildHeaders( + (MsgTraceHeaders.TraceDest, "trace.subject"), + ("Content-Type", "text/plain")); + var result = MsgTraceContext.ParseTraceHeaders(headers.Span); + result.ShouldNotBeNull(); + result.ShouldContainKey(MsgTraceHeaders.TraceDest); + result[MsgTraceHeaders.TraceDest][0].ShouldBe("trace.subject"); + } + + [Fact] + public void ParseTraceHeaders_returns_null_when_trace_disabled() + { + var headers = BuildHeaders( + (MsgTraceHeaders.TraceDest, MsgTraceHeaders.TraceDestDisabled)); + var result = MsgTraceContext.ParseTraceHeaders(headers.Span); + result.ShouldBeNull(); + } + + [Fact] + public void ParseTraceHeaders_detects_traceparent_with_sampled_flag() + { + // W3C trace context: version-traceid-parentid-flags (01 = sampled) + var headers = BuildHeaders( + ("traceparent", "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01")); + var result = MsgTraceContext.ParseTraceHeaders(headers.Span); + result.ShouldNotBeNull(); + result.ShouldContainKey("traceparent"); + } + + [Fact] + public void ParseTraceHeaders_ignores_traceparent_without_sampled_flag() + { + // flags=00 means not sampled + var headers = BuildHeaders( + ("traceparent", "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-00")); + var result = MsgTraceContext.ParseTraceHeaders(headers.Span); + result.ShouldBeNull(); + } + + [Fact] + public void ParseTraceHeaders_returns_null_for_empty_input() + { + var result = MsgTraceContext.ParseTraceHeaders(ReadOnlySpan.Empty); + result.ShouldBeNull(); + } + + [Fact] + public void ParseTraceHeaders_returns_null_for_non_nats_header() + { + var headers = Encoding.ASCII.GetBytes("HTTP/1.1 200 OK\r\nFoo: bar\r\n\r\n"); + var result = MsgTraceContext.ParseTraceHeaders(headers); + result.ShouldBeNull(); + } + + // --- Context creation --- + + [Fact] + public void Create_returns_null_for_empty_headers() + { + var ctx = MsgTraceContext.Create( + ReadOnlyMemory.Empty, + clientId: 1, + clientName: "test", + accountName: "$G", + subject: "test.sub", + msgSize: 10); + ctx.ShouldBeNull(); + } + + [Fact] + public void Create_returns_null_for_headers_without_trace() + { + var headers = BuildHeaders(("Content-Type", "text/plain")); + var ctx = MsgTraceContext.Create( + headers, + clientId: 1, + clientName: "test", + accountName: "$G", + subject: "test.sub", + msgSize: 10); + ctx.ShouldBeNull(); + } + + [Fact] + public void Create_builds_context_with_ingress_event() + { + var headers = BuildHeaders( + (MsgTraceHeaders.TraceDest, "trace.dest")); + + var ctx = MsgTraceContext.Create( + headers, + clientId: 42, + clientName: "my-publisher", + accountName: "$G", + subject: "orders.new", + msgSize: 128); + + ctx.ShouldNotBeNull(); + ctx.IsActive.ShouldBeTrue(); + ctx.Destination.ShouldBe("trace.dest"); + ctx.TraceOnly.ShouldBeFalse(); + ctx.AccountName.ShouldBe("$G"); + + // Check ingress event + ctx.Event.Events.Count.ShouldBe(1); + var ingress = ctx.Event.Events[0].ShouldBeOfType(); + ingress.Type.ShouldBe(MsgTraceTypes.Ingress); + ingress.Cid.ShouldBe(42UL); + ingress.Name.ShouldBe("my-publisher"); + ingress.Account.ShouldBe("$G"); + ingress.Subject.ShouldBe("orders.new"); + ingress.Error.ShouldBeNull(); + + // Check request info + ctx.Event.Request.MsgSize.ShouldBe(128); + ctx.Event.Request.Header.ShouldNotBeNull(); + ctx.Event.Request.Header.ShouldContainKey(MsgTraceHeaders.TraceDest); + } + + [Fact] + public void Create_with_trace_only_flag() + { + var headers = BuildHeaders( + (MsgTraceHeaders.TraceDest, "trace.dest"), + (MsgTraceHeaders.TraceOnly, "true")); + + var ctx = MsgTraceContext.Create( + headers, + clientId: 1, + clientName: "test", + accountName: "$G", + subject: "test", + msgSize: 0); + + ctx.ShouldNotBeNull(); + ctx.TraceOnly.ShouldBeTrue(); + } + + [Fact] + public void Create_with_trace_only_flag_numeric() + { + var headers = BuildHeaders( + (MsgTraceHeaders.TraceDest, "trace.dest"), + (MsgTraceHeaders.TraceOnly, "1")); + + var ctx = MsgTraceContext.Create( + headers, + clientId: 1, + clientName: "test", + accountName: "$G", + subject: "test", + msgSize: 0); + + ctx.ShouldNotBeNull(); + ctx.TraceOnly.ShouldBeTrue(); + } + + [Fact] + public void Create_without_trace_only_flag() + { + var headers = BuildHeaders( + (MsgTraceHeaders.TraceDest, "trace.dest"), + (MsgTraceHeaders.TraceOnly, "false")); + + var ctx = MsgTraceContext.Create( + headers, + clientId: 1, + clientName: "test", + accountName: "$G", + subject: "test", + msgSize: 0); + + ctx.ShouldNotBeNull(); + ctx.TraceOnly.ShouldBeFalse(); + } + + [Fact] + public void Create_captures_hop_from_non_client_kind() + { + var headers = BuildHeaders( + (MsgTraceHeaders.TraceDest, "trace.dest"), + (MsgTraceHeaders.TraceHop, "1.2")); + + var ctx = MsgTraceContext.Create( + headers, + clientId: 1, + clientName: "route-1", + accountName: "$G", + subject: "test", + msgSize: 0, + clientKind: MsgTraceContext.KindRouter); + + ctx.ShouldNotBeNull(); + ctx.Hop.ShouldBe("1.2"); + } + + [Fact] + public void Create_ignores_hop_from_client_kind() + { + var headers = BuildHeaders( + (MsgTraceHeaders.TraceDest, "trace.dest"), + (MsgTraceHeaders.TraceHop, "1.2")); + + var ctx = MsgTraceContext.Create( + headers, + clientId: 1, + clientName: "test", + accountName: "$G", + subject: "test", + msgSize: 0, + clientKind: MsgTraceContext.KindClient); + + ctx.ShouldNotBeNull(); + ctx.Hop.ShouldBe(""); // Client hop is ignored + } + + // --- Event recording --- + + [Fact] + public void SetIngressError_sets_error_on_first_event() + { + var ctx = CreateSimpleContext(); + ctx.SetIngressError("publish denied"); + + var ingress = ctx.Event.Events[0].ShouldBeOfType(); + ingress.Error.ShouldBe("publish denied"); + } + + [Fact] + public void AddSubjectMappingEvent_appends_mapping() + { + var ctx = CreateSimpleContext(); + ctx.AddSubjectMappingEvent("orders.mapped"); + + ctx.Event.Events.Count.ShouldBe(2); + var mapping = ctx.Event.Events[1].ShouldBeOfType(); + mapping.Type.ShouldBe(MsgTraceTypes.SubjectMapping); + mapping.MappedTo.ShouldBe("orders.mapped"); + } + + [Fact] + public void AddEgressEvent_appends_egress_with_subscription_and_queue() + { + var ctx = CreateSimpleContext(); + ctx.AddEgressEvent( + clientId: 99, + clientName: "subscriber", + clientKind: MsgTraceContext.KindClient, + subscriptionSubject: "orders.>", + queue: "workers"); + + ctx.Event.Events.Count.ShouldBe(2); + var egress = ctx.Event.Events[1].ShouldBeOfType(); + egress.Type.ShouldBe(MsgTraceTypes.Egress); + egress.Kind.ShouldBe(MsgTraceContext.KindClient); + egress.Cid.ShouldBe(99UL); + egress.Name.ShouldBe("subscriber"); + egress.Subscription.ShouldBe("orders.>"); + egress.Queue.ShouldBe("workers"); + } + + [Fact] + public void AddEgressEvent_records_account_when_different_from_ingress() + { + var ctx = CreateSimpleContext(accountName: "acctA"); + ctx.AddEgressEvent( + clientId: 99, + clientName: "subscriber", + clientKind: MsgTraceContext.KindClient, + subscriptionSubject: "api.>", + account: "acctB"); + + var egress = ctx.Event.Events[1].ShouldBeOfType(); + egress.Account.ShouldBe("acctB"); + } + + [Fact] + public void AddEgressEvent_omits_account_when_same_as_ingress() + { + var ctx = CreateSimpleContext(accountName: "$G"); + ctx.AddEgressEvent( + clientId: 99, + clientName: "subscriber", + clientKind: MsgTraceContext.KindClient, + subscriptionSubject: "test", + account: "$G"); + + var egress = ctx.Event.Events[1].ShouldBeOfType(); + egress.Account.ShouldBeNull(); + } + + [Fact] + public void AddEgressEvent_for_router_omits_subscription_and_queue() + { + var ctx = CreateSimpleContext(); + ctx.AddEgressEvent( + clientId: 1, + clientName: "route-1", + clientKind: MsgTraceContext.KindRouter, + subscriptionSubject: "should.not.appear", + queue: "should.not.appear"); + + var egress = ctx.Event.Events[1].ShouldBeOfType(); + egress.Subscription.ShouldBeNull(); + egress.Queue.ShouldBeNull(); + } + + [Fact] + public void AddEgressEvent_with_error() + { + var ctx = CreateSimpleContext(); + ctx.AddEgressEvent( + clientId: 50, + clientName: "slow-client", + clientKind: MsgTraceContext.KindClient, + error: MsgTraceErrors.ClientClosed); + + var egress = ctx.Event.Events[1].ShouldBeOfType(); + egress.Error.ShouldBe(MsgTraceErrors.ClientClosed); + } + + [Fact] + public void AddStreamExportEvent_records_account_and_target() + { + var ctx = CreateSimpleContext(); + ctx.AddStreamExportEvent("exportAccount", "export.subject"); + + ctx.Event.Events.Count.ShouldBe(2); + var se = ctx.Event.Events[1].ShouldBeOfType(); + se.Type.ShouldBe(MsgTraceTypes.StreamExport); + se.Account.ShouldBe("exportAccount"); + se.To.ShouldBe("export.subject"); + } + + [Fact] + public void AddServiceImportEvent_records_from_and_to() + { + var ctx = CreateSimpleContext(); + ctx.AddServiceImportEvent("importAccount", "from.subject", "to.subject"); + + ctx.Event.Events.Count.ShouldBe(2); + var si = ctx.Event.Events[1].ShouldBeOfType(); + si.Type.ShouldBe(MsgTraceTypes.ServiceImport); + si.Account.ShouldBe("importAccount"); + si.From.ShouldBe("from.subject"); + si.To.ShouldBe("to.subject"); + } + + // --- JetStream events --- + + [Fact] + public void AddJetStreamEvent_records_stream_name() + { + var ctx = CreateSimpleContext(); + ctx.AddJetStreamEvent("ORDERS"); + + ctx.Event.Events.Count.ShouldBe(2); + var js = ctx.Event.Events[1].ShouldBeOfType(); + js.Type.ShouldBe(MsgTraceTypes.JetStream); + js.Stream.ShouldBe("ORDERS"); + } + + [Fact] + public void UpdateJetStreamEvent_sets_subject_and_nointerest() + { + var ctx = CreateSimpleContext(); + ctx.AddJetStreamEvent("ORDERS"); + ctx.UpdateJetStreamEvent("orders.new", noInterest: true); + + var js = ctx.Event.Events[1].ShouldBeOfType(); + js.Subject.ShouldBe("orders.new"); + js.NoInterest.ShouldBeTrue(); + } + + [Fact] + public void SendEventFromJetStream_requires_both_phases() + { + var ctx = CreateSimpleContext(); + ctx.AddJetStreamEvent("ORDERS"); + + bool published = false; + ctx.PublishCallback = (dest, reply, body) => { published = true; }; + + // Phase 1: message path calls SendEvent — should not publish yet + ctx.SendEvent(); + published.ShouldBeFalse(); + + // Phase 2: JetStream path calls SendEventFromJetStream — now publishes + ctx.SendEventFromJetStream(); + published.ShouldBeTrue(); + } + + [Fact] + public void SendEventFromJetStream_with_error() + { + var ctx = CreateSimpleContext(); + ctx.AddJetStreamEvent("ORDERS"); + + object? publishedBody = null; + ctx.PublishCallback = (dest, reply, body) => { publishedBody = body; }; + + ctx.SendEvent(); // Phase 1 + ctx.SendEventFromJetStream("stream full"); // Phase 2 + + publishedBody.ShouldNotBeNull(); + var js = ctx.Event.Events[1].ShouldBeOfType(); + js.Error.ShouldBe("stream full"); + } + + // --- Hop tracking --- + + [Fact] + public void SetHopHeader_increments_and_builds_hop_id() + { + var ctx = CreateSimpleContext(); + + ctx.SetHopHeader(); + ctx.Event.Hops.ShouldBe(1); + ctx.NextHop.ShouldBe("1"); + + ctx.SetHopHeader(); + ctx.Event.Hops.ShouldBe(2); + ctx.NextHop.ShouldBe("2"); + } + + [Fact] + public void SetHopHeader_chains_from_existing_hop() + { + var headers = BuildHeaders( + (MsgTraceHeaders.TraceDest, "trace.dest"), + (MsgTraceHeaders.TraceHop, "1")); + + var ctx = MsgTraceContext.Create( + headers, + clientId: 1, + clientName: "router", + accountName: "$G", + subject: "test", + msgSize: 0, + clientKind: MsgTraceContext.KindRouter); + + ctx.ShouldNotBeNull(); + ctx.Hop.ShouldBe("1"); + + ctx.SetHopHeader(); + ctx.NextHop.ShouldBe("1.1"); + + ctx.SetHopHeader(); + ctx.NextHop.ShouldBe("1.2"); + } + + [Fact] + public void AddEgressEvent_captures_and_clears_next_hop() + { + var ctx = CreateSimpleContext(); + ctx.SetHopHeader(); + ctx.NextHop.ShouldBe("1"); + + ctx.AddEgressEvent(1, "route-1", MsgTraceContext.KindRouter); + + var egress = ctx.Event.Events[1].ShouldBeOfType(); + egress.Hop.ShouldBe("1"); + + // NextHop should be cleared after adding egress + ctx.NextHop.ShouldBe(""); + } + + // --- SendEvent (non-JetStream) --- + + [Fact] + public void SendEvent_publishes_immediately_without_jetstream() + { + var ctx = CreateSimpleContext(); + string? publishedDest = null; + ctx.PublishCallback = (dest, reply, body) => { publishedDest = dest; }; + + ctx.SendEvent(); + publishedDest.ShouldBe("trace.dest"); + } + + // --- JSON serialization --- + + [Fact] + public void MsgTraceEvent_serializes_to_valid_json() + { + var ctx = CreateSimpleContext(); + ctx.Event.Server = new EventServerInfo { Name = "srv", Id = "SRV1" }; + ctx.AddSubjectMappingEvent("mapped.subject"); + ctx.AddEgressEvent(99, "subscriber", MsgTraceContext.KindClient, "test.>", "q1"); + ctx.AddStreamExportEvent("exportAcc", "export.subject"); + + var json = JsonSerializer.Serialize(ctx.Event); + var doc = JsonDocument.Parse(json); + var root = doc.RootElement; + + root.GetProperty("server").GetProperty("name").GetString().ShouldBe("srv"); + root.GetProperty("request").GetProperty("msgsize").GetInt32().ShouldBe(64); + root.GetProperty("events").GetArrayLength().ShouldBe(4); + + var events = root.GetProperty("events"); + events[0].GetProperty("type").GetString().ShouldBe(MsgTraceTypes.Ingress); + events[1].GetProperty("type").GetString().ShouldBe(MsgTraceTypes.SubjectMapping); + events[2].GetProperty("type").GetString().ShouldBe(MsgTraceTypes.Egress); + events[3].GetProperty("type").GetString().ShouldBe(MsgTraceTypes.StreamExport); + } + + [Fact] + public void MsgTraceIngress_json_omits_null_error() + { + var ingress = new MsgTraceIngress + { + Type = MsgTraceTypes.Ingress, + Cid = 1, + Account = "$G", + Subject = "test", + }; + + var json = JsonSerializer.Serialize(ingress); + var doc = JsonDocument.Parse(json); + doc.RootElement.TryGetProperty("error", out _).ShouldBeFalse(); + } + + [Fact] + public void MsgTraceEgress_json_omits_null_optional_fields() + { + var egress = new MsgTraceEgress + { + Type = MsgTraceTypes.Egress, + Kind = MsgTraceContext.KindRouter, + Cid = 5, + }; + + var json = JsonSerializer.Serialize(egress); + var doc = JsonDocument.Parse(json); + var root = doc.RootElement; + + root.TryGetProperty("hop", out _).ShouldBeFalse(); + root.TryGetProperty("acc", out _).ShouldBeFalse(); + root.TryGetProperty("sub", out _).ShouldBeFalse(); + root.TryGetProperty("queue", out _).ShouldBeFalse(); + root.TryGetProperty("error", out _).ShouldBeFalse(); + } + + [Fact] + public void Full_trace_event_with_all_event_types_serializes_correctly() + { + var ctx = CreateSimpleContext(); + ctx.Event.Server = new EventServerInfo { Name = "test-srv", Id = "ABC123" }; + ctx.AddSubjectMappingEvent("mapped"); + ctx.AddServiceImportEvent("importAcc", "from.sub", "to.sub"); + ctx.AddStreamExportEvent("exportAcc", "export.sub"); + ctx.AddJetStreamEvent("ORDERS"); + ctx.UpdateJetStreamEvent("orders.new", false); + ctx.AddEgressEvent(100, "sub-1", MsgTraceContext.KindClient, "orders.>", "workers"); + ctx.AddEgressEvent(200, "route-east", MsgTraceContext.KindRouter, error: MsgTraceErrors.NoSupport); + + var json = JsonSerializer.Serialize(ctx.Event); + var doc = JsonDocument.Parse(json); + var events = doc.RootElement.GetProperty("events"); + + events.GetArrayLength().ShouldBe(7); + events[0].GetProperty("type").GetString().ShouldBe("in"); + events[1].GetProperty("type").GetString().ShouldBe("sm"); + events[2].GetProperty("type").GetString().ShouldBe("si"); + events[3].GetProperty("type").GetString().ShouldBe("se"); + events[4].GetProperty("type").GetString().ShouldBe("js"); + events[5].GetProperty("type").GetString().ShouldBe("eg"); + events[6].GetProperty("type").GetString().ShouldBe("eg"); + } + + // --- Helper --- + + private static MsgTraceContext CreateSimpleContext(string destination = "trace.dest", string accountName = "$G") + { + var headers = BuildHeaders( + (MsgTraceHeaders.TraceDest, destination)); + + var ctx = MsgTraceContext.Create( + headers, + clientId: 1, + clientName: "publisher", + accountName: accountName, + subject: "test.subject", + msgSize: 64); + + ctx.ShouldNotBeNull(); + return ctx; + } +} diff --git a/tests/NATS.Server.Tests/Monitoring/ConnzFilterTests.cs b/tests/NATS.Server.Tests/Monitoring/ConnzFilterTests.cs new file mode 100644 index 0000000..17af8b0 --- /dev/null +++ b/tests/NATS.Server.Tests/Monitoring/ConnzFilterTests.cs @@ -0,0 +1,420 @@ +using System.Net; +using System.Net.Sockets; +using System.Text; +using Microsoft.AspNetCore.Http; +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Server; +using NATS.Server.Auth; +using NATS.Server.Monitoring; + +namespace NATS.Server.Tests.Monitoring; + +/// +/// Tests for ConnzHandler filtering, sorting, pagination, and closed connection +/// ring buffer behavior. +/// Go reference: monitor_test.go — TestConnz, TestConnzSortedByCid, TestConnzSortedByBytesTo, +/// TestConnzFilter, TestConnzWithCID, TestConnzOffsetAndLimit. +/// +public class ConnzFilterTests : IAsyncLifetime +{ + private readonly NatsServer _server; + private readonly NatsOptions _opts; + private readonly CancellationTokenSource _cts = new(); + private readonly List _sockets = []; + + public ConnzFilterTests() + { + _opts = new NatsOptions + { + Port = GetFreePort(), + MaxClosedClients = 100, + Users = + [ + new User { Username = "alice", Password = "pw", Account = "acctA" }, + new User { Username = "bob", Password = "pw", Account = "acctB" }, + ], + }; + _server = new NatsServer(_opts, NullLoggerFactory.Instance); + } + + public async Task InitializeAsync() + { + _ = _server.StartAsync(_cts.Token); + await _server.WaitForReadyAsync(); + } + + public async Task DisposeAsync() + { + foreach (var s in _sockets) + { + try { s.Shutdown(SocketShutdown.Both); } catch { } + s.Dispose(); + } + await _cts.CancelAsync(); + _server.Dispose(); + } + + private static int GetFreePort() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + sock.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + return ((IPEndPoint)sock.LocalEndPoint!).Port; + } + + private async Task ConnectAsync(string user, string? subjectToSubscribe = null) + { + var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + _sockets.Add(sock); + await sock.ConnectAsync(IPAddress.Loopback, _opts.Port); + + var buf = new byte[4096]; + await sock.ReceiveAsync(buf, SocketFlags.None); // INFO + + var connect = $"CONNECT {{\"user\":\"{user}\",\"pass\":\"pw\"}}\r\n"; + await sock.SendAsync(Encoding.ASCII.GetBytes(connect)); + + if (subjectToSubscribe != null) + { + await sock.SendAsync(Encoding.ASCII.GetBytes($"SUB {subjectToSubscribe} sid1\r\n")); + } + + await sock.SendAsync("PING\r\n"u8.ToArray()); + await ReadUntilAsync(sock, "PONG"); + return sock; + } + + private Connz GetConnz(string queryString = "") + { + var ctx = new DefaultHttpContext(); + ctx.Request.QueryString = new QueryString(queryString); + return new ConnzHandler(_server).HandleConnz(ctx); + } + + // --- Sort tests --- + + [Fact] + public async Task Sort_by_cid_returns_ascending_order() + { + await ConnectAsync("alice"); + await ConnectAsync("bob"); + await Task.Delay(50); + + var connz = GetConnz("?sort=cid"); + connz.Conns.Length.ShouldBeGreaterThanOrEqualTo(2); + + for (int i = 1; i < connz.Conns.Length; i++) + { + connz.Conns[i].Cid.ShouldBeGreaterThan(connz.Conns[i - 1].Cid); + } + } + + [Fact] + public async Task Sort_by_bytes_to_returns_descending_order() + { + var sock1 = await ConnectAsync("alice"); + var sock2 = await ConnectAsync("bob"); + await Task.Delay(50); + + // Publish some data through sock1 to accumulate bytes + await sock1.SendAsync(Encoding.ASCII.GetBytes("SUB test 1\r\nPUB test 10\r\n1234567890\r\n")); + await Task.Delay(100); + + var connz = GetConnz("?sort=bytes_to"); + connz.Conns.Length.ShouldBeGreaterThanOrEqualTo(2); + + for (int i = 1; i < connz.Conns.Length; i++) + { + connz.Conns[i].OutBytes.ShouldBeLessThanOrEqualTo(connz.Conns[i - 1].OutBytes); + } + } + + [Fact] + public async Task Sort_by_msgs_from_returns_descending_order() + { + var sock1 = await ConnectAsync("alice"); + await Task.Delay(50); + + // Send a PUB to increment InMsgs + await sock1.SendAsync(Encoding.ASCII.GetBytes("PUB test 3\r\nabc\r\n")); + await Task.Delay(100); + + var connz = GetConnz("?sort=msgs_from"); + connz.Conns.Length.ShouldBeGreaterThanOrEqualTo(1); + + for (int i = 1; i < connz.Conns.Length; i++) + { + connz.Conns[i].InMsgs.ShouldBeLessThanOrEqualTo(connz.Conns[i - 1].InMsgs); + } + } + + [Fact] + public async Task Sort_by_subs_returns_descending_order() + { + // Alice has 2 subs, Bob has 1 + var sock1 = await ConnectAsync("alice", "test.a"); + await sock1.SendAsync(Encoding.ASCII.GetBytes("SUB test.b sid2\r\n")); + var sock2 = await ConnectAsync("bob", "test.c"); + await Task.Delay(100); + + var connz = GetConnz("?sort=subs"); + connz.Conns.Length.ShouldBeGreaterThanOrEqualTo(2); + + for (int i = 1; i < connz.Conns.Length; i++) + { + connz.Conns[i].NumSubs.ShouldBeLessThanOrEqualTo(connz.Conns[i - 1].NumSubs); + } + } + + [Fact] + public async Task Sort_by_start_returns_ascending_order() + { + await ConnectAsync("alice"); + await Task.Delay(20); + await ConnectAsync("bob"); + await Task.Delay(50); + + var connz = GetConnz("?sort=start"); + connz.Conns.Length.ShouldBeGreaterThanOrEqualTo(2); + + for (int i = 1; i < connz.Conns.Length; i++) + { + connz.Conns[i].Start.ShouldBeGreaterThanOrEqualTo(connz.Conns[i - 1].Start); + } + } + + // --- Filter tests --- + + [Fact] + public async Task Filter_by_account_returns_only_matching_connections() + { + await ConnectAsync("alice"); + await ConnectAsync("bob"); + await Task.Delay(50); + + var connz = GetConnz("?acc=acctA"); + connz.Conns.ShouldAllBe(c => c.Account == "acctA"); + connz.Conns.ShouldNotBeEmpty(); + } + + [Fact] + public async Task Filter_by_user_returns_only_matching_connections() + { + await ConnectAsync("alice"); + await ConnectAsync("bob"); + await Task.Delay(50); + + var connz = GetConnz("?user=bob"); + connz.Conns.ShouldAllBe(c => c.AuthorizedUser == "bob"); + connz.Conns.ShouldNotBeEmpty(); + } + + [Fact] + public async Task Filter_by_subject_returns_matching_subscribers() + { + await ConnectAsync("alice", "orders.>"); + await ConnectAsync("bob", "payments.>"); + await Task.Delay(50); + + var connz = GetConnz("?filter_subject=orders.new&subs=1"); + connz.Conns.ShouldNotBeEmpty(); + connz.Conns.ShouldAllBe(c => c.Subs.Any(s => s.Contains("orders"))); + } + + // --- Pagination tests --- + + [Fact] + public async Task Offset_and_limit_paginates_results() + { + await ConnectAsync("alice"); + await ConnectAsync("bob"); + await ConnectAsync("alice"); + await Task.Delay(50); + + var page1 = GetConnz("?sort=cid&limit=2&offset=0"); + page1.Conns.Length.ShouldBe(2); + page1.Total.ShouldBeGreaterThanOrEqualTo(3); + page1.Offset.ShouldBe(0); + page1.Limit.ShouldBe(2); + + var page2 = GetConnz("?sort=cid&limit=2&offset=2"); + page2.Conns.Length.ShouldBeGreaterThanOrEqualTo(1); + page2.Offset.ShouldBe(2); + + // Ensure no overlap between pages + var page1Cids = page1.Conns.Select(c => c.Cid).ToHashSet(); + var page2Cids = page2.Conns.Select(c => c.Cid).ToHashSet(); + page1Cids.Overlaps(page2Cids).ShouldBeFalse(); + } + + // --- CID lookup test --- + + [Fact] + public async Task Cid_lookup_returns_single_connection() + { + await ConnectAsync("alice"); + await ConnectAsync("bob"); + await Task.Delay(50); + + // Get all connections to find a known CID + var all = GetConnz("?sort=cid"); + all.Conns.ShouldNotBeEmpty(); + var targetCid = all.Conns[0].Cid; + + var single = GetConnz($"?cid={targetCid}"); + single.Conns.Length.ShouldBe(1); + single.Conns[0].Cid.ShouldBe(targetCid); + } + + [Fact] + public void Cid_lookup_nonexistent_returns_empty() + { + var result = GetConnz("?cid=99999999"); + result.Conns.Length.ShouldBe(0); + result.Total.ShouldBe(0); + } + + // --- Closed connection tests --- + + [Fact] + public async Task Closed_state_shows_disconnected_clients() + { + var sock = await ConnectAsync("alice"); + await Task.Delay(50); + + // Close the connection + sock.Shutdown(SocketShutdown.Both); + sock.Close(); + _sockets.Remove(sock); + await Task.Delay(200); + + var connz = GetConnz("?state=closed"); + connz.Conns.ShouldNotBeEmpty(); + connz.Conns.ShouldAllBe(c => c.Stop != null); + connz.Conns.ShouldAllBe(c => !string.IsNullOrEmpty(c.Reason)); + } + + [Fact] + public async Task All_state_shows_both_open_and_closed() + { + var sock1 = await ConnectAsync("alice"); + var sock2 = await ConnectAsync("bob"); + await Task.Delay(50); + + // Close one connection + sock1.Shutdown(SocketShutdown.Both); + sock1.Close(); + _sockets.Remove(sock1); + await Task.Delay(200); + + var connz = GetConnz("?state=all"); + connz.Total.ShouldBeGreaterThanOrEqualTo(2); + // Should have at least one open (bob) and one closed (alice) + connz.Conns.Any(c => c.Stop == null).ShouldBeTrue("expected at least one open connection"); + connz.Conns.Any(c => c.Stop != null).ShouldBeTrue("expected at least one closed connection"); + } + + [Fact] + public async Task Closed_ring_buffer_caps_at_max() + { + // MaxClosedClients is 100, create and close 5 connections + for (int i = 0; i < 5; i++) + { + var sock = await ConnectAsync("alice"); + await Task.Delay(20); + sock.Shutdown(SocketShutdown.Both); + sock.Close(); + _sockets.Remove(sock); + await Task.Delay(100); + } + + var connz = GetConnz("?state=closed"); + connz.Total.ShouldBeLessThanOrEqualTo(_opts.MaxClosedClients); + } + + // --- Sort fallback tests --- + + [Fact] + public async Task Sort_by_stop_with_open_state_falls_back_to_cid() + { + await ConnectAsync("alice"); + await ConnectAsync("bob"); + await Task.Delay(50); + + // sort=stop with state=open should fall back to cid sorting + var connz = GetConnz("?sort=stop&state=open"); + connz.Conns.Length.ShouldBeGreaterThanOrEqualTo(2); + + for (int i = 1; i < connz.Conns.Length; i++) + { + connz.Conns[i].Cid.ShouldBeGreaterThan(connz.Conns[i - 1].Cid); + } + } + + // --- Combined filter + sort test --- + + [Fact] + public async Task Account_filter_with_bytes_sort_and_limit() + { + // Connect multiple alice clients + for (int i = 0; i < 3; i++) + { + var sock = await ConnectAsync("alice"); + // Send varying amounts of data + var data = new string('x', (i + 1) * 100); + await sock.SendAsync(Encoding.ASCII.GetBytes($"SUB test 1\r\nPUB test {data.Length}\r\n{data}\r\n")); + } + await ConnectAsync("bob"); + await Task.Delay(100); + + var connz = GetConnz("?acc=acctA&sort=bytes_to&limit=2"); + connz.Conns.Length.ShouldBeLessThanOrEqualTo(2); + connz.Conns.ShouldAllBe(c => c.Account == "acctA"); + } + + [Fact] + public async Task Closed_cid_lookup_returns_from_ring_buffer() + { + var sock = await ConnectAsync("alice"); + await Task.Delay(50); + + // Get the CID before closing + var all = GetConnz("?sort=cid"); + all.Conns.ShouldNotBeEmpty(); + var targetCid = all.Conns.Last().Cid; + + // Close the socket + sock.Shutdown(SocketShutdown.Both); + sock.Close(); + _sockets.Remove(sock); + await Task.Delay(200); + + // Look up closed connection by CID + var single = GetConnz($"?cid={targetCid}"); + single.Conns.Length.ShouldBe(1); + single.Conns[0].Cid.ShouldBe(targetCid); + single.Conns[0].Stop.ShouldNotBeNull(); + } + + private static async Task ReadUntilAsync(Socket sock, string expected) + { + var buf = new byte[4096]; + var all = new StringBuilder(); + var deadline = DateTime.UtcNow.AddSeconds(5); + while (DateTime.UtcNow < deadline) + { + if (sock.Available > 0) + { + var n = await sock.ReceiveAsync(buf, SocketFlags.None); + all.Append(Encoding.ASCII.GetString(buf, 0, n)); + if (all.ToString().Contains(expected)) + return; + } + else + { + await Task.Delay(10); + } + } + + throw new TimeoutException($"Did not receive '{expected}' within 5 seconds. Got: {all}"); + } +}