feat(monitoring+events): add connz filtering, event payloads, and message trace context (E12+E13+E14)

- Add ConnzHandler with sorting, filtering, pagination, CID lookup, and closed connection ring buffer
- Add full Go events.go parity types (ConnectEventMsg, DisconnectEventMsg, ServerStatsMsg, etc.)
- Add MessageTraceContext for per-message trace propagation with header parsing
- 74 new tests (17 ConnzFilter + 16 EventPayload + 41 MessageTraceContext)
This commit is contained in:
Joseph Doherty
2026-02-24 16:17:21 -05:00
parent 37d3cc29ea
commit 94878d3dcc
10 changed files with 2595 additions and 15 deletions

View File

@@ -0,0 +1,686 @@
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using NATS.Server.Events;
namespace NATS.Server.Internal;
/// <summary>
/// Header constants for NATS message tracing.
/// Go reference: msgtrace.go:28-33
/// </summary>
public static class MsgTraceHeaders
{
public const string TraceDest = "Nats-Trace-Dest";
public const string TraceDestDisabled = "trace disabled";
public const string TraceHop = "Nats-Trace-Hop";
public const string TraceOriginAccount = "Nats-Trace-Origin-Account";
public const string TraceOnly = "Nats-Trace-Only";
public const string TraceParent = "traceparent";
}
/// <summary>
/// Types of message trace events in the MsgTraceEvents list.
/// Go reference: msgtrace.go:54-61
/// </summary>
public static class MsgTraceTypes
{
public const string Ingress = "in";
public const string SubjectMapping = "sm";
public const string StreamExport = "se";
public const string ServiceImport = "si";
public const string JetStream = "js";
public const string Egress = "eg";
}
/// <summary>
/// Error messages used in message trace events.
/// Go reference: msgtrace.go:248-258
/// </summary>
public static class MsgTraceErrors
{
public const string OnlyNoSupport = "Not delivered because remote does not support message tracing";
public const string NoSupport = "Message delivered but remote does not support message tracing so no trace event generated from there";
public const string NoEcho = "Not delivered because of no echo";
public const string PubViolation = "Not delivered because publish denied for this subject";
public const string SubDeny = "Not delivered because subscription denies this subject";
public const string SubClosed = "Not delivered because subscription is closed";
public const string ClientClosed = "Not delivered because client is closed";
public const string AutoSubExceeded = "Not delivered because auto-unsubscribe exceeded";
}
/// <summary>
/// Represents the full trace event document published to the trace destination.
/// Go reference: msgtrace.go:63-68
/// </summary>
public sealed class MsgTraceEvent
{
[JsonPropertyName("server")]
public EventServerInfo Server { get; set; } = new();
[JsonPropertyName("request")]
public MsgTraceRequest Request { get; set; } = new();
[JsonPropertyName("hops")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public int Hops { get; set; }
[JsonPropertyName("events")]
public List<MsgTraceEntry> Events { get; set; } = [];
}
/// <summary>
/// The original request information captured for the trace.
/// Go reference: msgtrace.go:70-74
/// </summary>
public sealed class MsgTraceRequest
{
[JsonPropertyName("header")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public Dictionary<string, string[]>? Header { get; set; }
[JsonPropertyName("msgsize")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public int MsgSize { get; set; }
}
/// <summary>
/// Base class for all trace event entries (ingress, egress, JS, etc.).
/// Go reference: msgtrace.go:83-86
/// </summary>
[JsonDerivedType(typeof(MsgTraceIngress))]
[JsonDerivedType(typeof(MsgTraceSubjectMapping))]
[JsonDerivedType(typeof(MsgTraceStreamExport))]
[JsonDerivedType(typeof(MsgTraceServiceImport))]
[JsonDerivedType(typeof(MsgTraceJetStreamEntry))]
[JsonDerivedType(typeof(MsgTraceEgress))]
public class MsgTraceEntry
{
[JsonPropertyName("type")]
public string Type { get; set; } = "";
[JsonPropertyName("ts")]
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
}
/// <summary>
/// Ingress trace event recorded when a message first enters the server.
/// Go reference: msgtrace.go:88-96
/// </summary>
public sealed class MsgTraceIngress : MsgTraceEntry
{
[JsonPropertyName("kind")]
public int Kind { get; set; }
[JsonPropertyName("cid")]
public ulong Cid { get; set; }
[JsonPropertyName("name")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? Name { get; set; }
[JsonPropertyName("acc")]
public string Account { get; set; } = "";
[JsonPropertyName("subj")]
public string Subject { get; set; } = "";
[JsonPropertyName("error")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? Error { get; set; }
}
/// <summary>
/// Subject mapping trace event.
/// Go reference: msgtrace.go:98-101
/// </summary>
public sealed class MsgTraceSubjectMapping : MsgTraceEntry
{
[JsonPropertyName("to")]
public string MappedTo { get; set; } = "";
}
/// <summary>
/// Stream export trace event.
/// Go reference: msgtrace.go:103-107
/// </summary>
public sealed class MsgTraceStreamExport : MsgTraceEntry
{
[JsonPropertyName("acc")]
public string Account { get; set; } = "";
[JsonPropertyName("to")]
public string To { get; set; } = "";
}
/// <summary>
/// Service import trace event.
/// Go reference: msgtrace.go:109-114
/// </summary>
public sealed class MsgTraceServiceImport : MsgTraceEntry
{
[JsonPropertyName("acc")]
public string Account { get; set; } = "";
[JsonPropertyName("from")]
public string From { get; set; } = "";
[JsonPropertyName("to")]
public string To { get; set; } = "";
}
/// <summary>
/// JetStream trace event.
/// Go reference: msgtrace.go:116-122
/// </summary>
public sealed class MsgTraceJetStreamEntry : MsgTraceEntry
{
[JsonPropertyName("stream")]
public string Stream { get; set; } = "";
[JsonPropertyName("subject")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? Subject { get; set; }
[JsonPropertyName("nointerest")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public bool NoInterest { get; set; }
[JsonPropertyName("error")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? Error { get; set; }
}
/// <summary>
/// Egress trace event recorded for each delivery target.
/// Go reference: msgtrace.go:124-138
/// </summary>
public sealed class MsgTraceEgress : MsgTraceEntry
{
[JsonPropertyName("kind")]
public int Kind { get; set; }
[JsonPropertyName("cid")]
public ulong Cid { get; set; }
[JsonPropertyName("name")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? Name { get; set; }
[JsonPropertyName("hop")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? Hop { get; set; }
[JsonPropertyName("acc")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? Account { get; set; }
[JsonPropertyName("sub")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? Subscription { get; set; }
[JsonPropertyName("queue")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? Queue { get; set; }
[JsonPropertyName("error")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? Error { get; set; }
}
/// <summary>
/// Manages trace state as a message traverses the delivery pipeline.
/// Collects trace events and publishes the complete trace to the destination subject.
/// Go reference: msgtrace.go:260-273
/// </summary>
public sealed class MsgTraceContext
{
/// <summary>Kind constant for CLIENT connections.</summary>
public const int KindClient = 0;
/// <summary>Kind constant for ROUTER connections.</summary>
public const int KindRouter = 1;
/// <summary>Kind constant for GATEWAY connections.</summary>
public const int KindGateway = 2;
/// <summary>Kind constant for LEAF connections.</summary>
public const int KindLeaf = 3;
private int _ready;
private MsgTraceJetStreamEntry? _js;
/// <summary>
/// The destination subject where the trace event will be published.
/// </summary>
public string Destination { get; }
/// <summary>
/// The accumulated trace event with all recorded entries.
/// </summary>
public MsgTraceEvent Event { get; }
/// <summary>
/// Current hop identifier for this server.
/// </summary>
public string Hop { get; private set; } = "";
/// <summary>
/// Next hop identifier set before forwarding to routes/gateways/leafs.
/// </summary>
public string NextHop { get; private set; } = "";
/// <summary>
/// Whether to only trace the message without actually delivering it.
/// Go reference: msgtrace.go:271
/// </summary>
public bool TraceOnly { get; }
/// <summary>
/// Whether this trace context is active (non-null destination).
/// </summary>
public bool IsActive => !string.IsNullOrEmpty(Destination);
/// <summary>
/// The account to use when publishing the trace event.
/// </summary>
public string? AccountName { get; }
/// <summary>
/// Callback to publish the trace event. Set by the server.
/// </summary>
public Action<string, string?, object?>? PublishCallback { get; set; }
private MsgTraceContext(string destination, MsgTraceEvent evt, bool traceOnly, string? accountName, string hop)
{
Destination = destination;
Event = evt;
TraceOnly = traceOnly;
AccountName = accountName;
Hop = hop;
}
/// <summary>
/// Creates a new trace context from inbound message headers.
/// Parses Nats-Trace-Dest, Nats-Trace-Only, and Nats-Trace-Hop headers.
/// Go reference: msgtrace.go:332-492
/// </summary>
public static MsgTraceContext? Create(
ReadOnlyMemory<byte> headers,
ulong clientId,
string? clientName,
string accountName,
string subject,
int msgSize,
int clientKind = KindClient)
{
if (headers.Length == 0)
return null;
var parsedHeaders = ParseTraceHeaders(headers.Span);
if (parsedHeaders == null || parsedHeaders.Count == 0)
return null;
// Check for disabled trace
if (parsedHeaders.TryGetValue(MsgTraceHeaders.TraceDest, out var destValues)
&& destValues.Length > 0
&& destValues[0] == MsgTraceHeaders.TraceDestDisabled)
{
return null;
}
var dest = destValues?.Length > 0 ? destValues[0] : null;
if (string.IsNullOrEmpty(dest))
return null;
// Parse trace-only flag
bool traceOnly = false;
if (parsedHeaders.TryGetValue(MsgTraceHeaders.TraceOnly, out var onlyValues) && onlyValues.Length > 0)
{
var val = onlyValues[0].ToLowerInvariant();
traceOnly = val is "1" or "true" or "on";
}
// Parse hop from non-CLIENT connections
string hop = "";
if (clientKind != KindClient
&& parsedHeaders.TryGetValue(MsgTraceHeaders.TraceHop, out var hopValues)
&& hopValues.Length > 0)
{
hop = hopValues[0];
}
// Build ingress event
var evt = new MsgTraceEvent
{
Request = new MsgTraceRequest
{
Header = parsedHeaders,
MsgSize = msgSize,
},
Events =
[
new MsgTraceIngress
{
Type = MsgTraceTypes.Ingress,
Timestamp = DateTime.UtcNow,
Kind = clientKind,
Cid = clientId,
Name = clientName,
Account = accountName,
Subject = subject,
},
],
};
return new MsgTraceContext(dest, evt, traceOnly, accountName, hop);
}
/// <summary>
/// Sets an error on the ingress event.
/// Go reference: msgtrace.go:657-661
/// </summary>
public void SetIngressError(string error)
{
if (Event.Events.Count > 0 && Event.Events[0] is MsgTraceIngress ingress)
{
ingress.Error = error;
}
}
/// <summary>
/// Adds a subject mapping trace event.
/// Go reference: msgtrace.go:663-674
/// </summary>
public void AddSubjectMappingEvent(string mappedTo)
{
Event.Events.Add(new MsgTraceSubjectMapping
{
Type = MsgTraceTypes.SubjectMapping,
Timestamp = DateTime.UtcNow,
MappedTo = mappedTo,
});
}
/// <summary>
/// Adds an egress trace event for a delivery target.
/// Go reference: msgtrace.go:676-711
/// </summary>
public void AddEgressEvent(ulong clientId, string? clientName, int clientKind,
string? subscriptionSubject = null, string? queue = null, string? account = null, string? error = null)
{
var egress = new MsgTraceEgress
{
Type = MsgTraceTypes.Egress,
Timestamp = DateTime.UtcNow,
Kind = clientKind,
Cid = clientId,
Name = clientName,
Hop = string.IsNullOrEmpty(NextHop) ? null : NextHop,
Error = error,
};
NextHop = "";
// Set subscription and queue for CLIENT connections
if (clientKind == KindClient)
{
egress.Subscription = subscriptionSubject;
egress.Queue = queue;
}
// Set account if different from ingress account
if ((clientKind == KindClient || clientKind == KindLeaf) && account != null)
{
if (Event.Events.Count > 0 && Event.Events[0] is MsgTraceIngress ingress && account != ingress.Account)
{
egress.Account = account;
}
}
Event.Events.Add(egress);
}
/// <summary>
/// Adds a stream export trace event.
/// Go reference: msgtrace.go:713-728
/// </summary>
public void AddStreamExportEvent(string accountName, string to)
{
Event.Events.Add(new MsgTraceStreamExport
{
Type = MsgTraceTypes.StreamExport,
Timestamp = DateTime.UtcNow,
Account = accountName,
To = to,
});
}
/// <summary>
/// Adds a service import trace event.
/// Go reference: msgtrace.go:730-743
/// </summary>
public void AddServiceImportEvent(string accountName, string from, string to)
{
Event.Events.Add(new MsgTraceServiceImport
{
Type = MsgTraceTypes.ServiceImport,
Timestamp = DateTime.UtcNow,
Account = accountName,
From = from,
To = to,
});
}
/// <summary>
/// Adds a JetStream trace event for stream storage.
/// Go reference: msgtrace.go:745-757
/// </summary>
public void AddJetStreamEvent(string streamName)
{
_js = new MsgTraceJetStreamEntry
{
Type = MsgTraceTypes.JetStream,
Timestamp = DateTime.UtcNow,
Stream = streamName,
};
Event.Events.Add(_js);
}
/// <summary>
/// Updates the JetStream trace event with subject and interest info.
/// Go reference: msgtrace.go:759-772
/// </summary>
public void UpdateJetStreamEvent(string subject, bool noInterest)
{
if (_js == null) return;
_js.Subject = subject;
_js.NoInterest = noInterest;
_js.Timestamp = DateTime.UtcNow;
}
/// <summary>
/// Sets the hop header for forwarding to routes/gateways/leafs.
/// Increments the hop counter and builds the next hop id.
/// Go reference: msgtrace.go:646-655
/// </summary>
public void SetHopHeader()
{
Event.Hops++;
NextHop = string.IsNullOrEmpty(Hop)
? Event.Hops.ToString()
: $"{Hop}.{Event.Hops}";
}
/// <summary>
/// Sends the accumulated trace event from the JetStream path.
/// Delegates to SendEvent for the two-phase ready logic.
/// Go reference: msgtrace.go:774-786
/// </summary>
public void SendEventFromJetStream(string? error = null)
{
if (_js == null) return;
if (error != null) _js.Error = error;
SendEvent();
}
/// <summary>
/// Sends the accumulated trace event to the destination subject.
/// For non-JetStream paths, sends immediately. For JetStream paths,
/// uses a two-phase ready check: both the message delivery path and
/// the JetStream storage path must call SendEvent before the event
/// is actually published.
/// Go reference: msgtrace.go:788-799
/// </summary>
public void SendEvent()
{
if (_js != null)
{
var ready = Interlocked.Increment(ref _ready) == 2;
if (!ready) return;
}
PublishCallback?.Invoke(Destination, null, Event);
}
/// <summary>
/// Parses NATS headers looking for trace-related headers.
/// Returns null if no trace headers found.
/// Go reference: msgtrace.go:509-591
/// </summary>
internal static Dictionary<string, string[]>? ParseTraceHeaders(ReadOnlySpan<byte> hdr)
{
// Must start with NATS/1.0 header line
var hdrLine = "NATS/1.0 "u8;
if (hdr.Length < hdrLine.Length || !hdr[..hdrLine.Length].SequenceEqual(hdrLine))
{
// Also try NATS/1.0\r\n (status line without status code)
var hdrLine2 = "NATS/1.0\r\n"u8;
if (hdr.Length < hdrLine2.Length || !hdr[..hdrLine2.Length].SequenceEqual(hdrLine2))
return null;
}
bool traceDestFound = false;
bool traceParentFound = false;
var keys = new List<string>();
var vals = new List<string>();
// Skip the first line (status line)
int i = 0;
var crlf = "\r\n"u8;
var firstCrlf = hdr.IndexOf(crlf);
if (firstCrlf < 0) return null;
i = firstCrlf + 2;
while (i < hdr.Length)
{
// Find the colon delimiter
int colonIdx = -1;
for (int j = i; j < hdr.Length; j++)
{
if (hdr[j] == (byte)':')
{
colonIdx = j;
break;
}
if (hdr[j] == (byte)'\r' || hdr[j] == (byte)'\n')
break;
}
if (colonIdx < 0)
{
// Skip to next line
var nextCrlf = hdr[i..].IndexOf(crlf);
if (nextCrlf < 0) break;
i += nextCrlf + 2;
continue;
}
var keySpan = hdr[i..colonIdx];
i = colonIdx + 1;
// Skip leading whitespace in value
while (i < hdr.Length && (hdr[i] == (byte)' ' || hdr[i] == (byte)'\t'))
i++;
// Find end of value (CRLF)
int valStart = i;
var valCrlf = hdr[valStart..].IndexOf(crlf);
if (valCrlf < 0) break;
int valEnd = valStart + valCrlf;
// Trim trailing whitespace
while (valEnd > valStart && (hdr[valEnd - 1] == (byte)' ' || hdr[valEnd - 1] == (byte)'\t'))
valEnd--;
var valSpan = hdr[valStart..valEnd];
if (keySpan.Length > 0 && valSpan.Length > 0)
{
var key = Encoding.ASCII.GetString(keySpan);
var val = Encoding.ASCII.GetString(valSpan);
// Check for trace-dest header
if (!traceDestFound && key == MsgTraceHeaders.TraceDest)
{
if (val == MsgTraceHeaders.TraceDestDisabled)
return null; // Tracing explicitly disabled
traceDestFound = true;
}
// Check for traceparent header (case-insensitive)
else if (!traceParentFound && key.Equals(MsgTraceHeaders.TraceParent, StringComparison.OrdinalIgnoreCase))
{
// Parse W3C trace context: version-traceid-parentid-flags
var parts = val.Split('-');
if (parts.Length == 4 && parts[3].Length == 2)
{
if (int.TryParse(parts[3], System.Globalization.NumberStyles.HexNumber, null, out var flags)
&& (flags & 0x1) == 0x1)
{
traceParentFound = true;
}
}
}
keys.Add(key);
vals.Add(val);
}
i = valStart + valCrlf + 2;
}
if (!traceDestFound && !traceParentFound)
return null;
// Build the header map
var map = new Dictionary<string, string[]>(keys.Count);
for (int k = 0; k < keys.Count; k++)
{
if (map.TryGetValue(keys[k], out var existing))
{
var newArr = new string[existing.Length + 1];
existing.CopyTo(newArr, 0);
newArr[^1] = vals[k];
map[keys[k]] = newArr;
}
else
{
map[keys[k]] = [vals[k]];
}
}
return map;
}
}
/// <summary>
/// JSON serialization context for message trace types.
/// </summary>
[JsonSerializable(typeof(MsgTraceEvent))]
[JsonSerializable(typeof(MsgTraceRequest))]
[JsonSerializable(typeof(MsgTraceEntry))]
[JsonSerializable(typeof(MsgTraceIngress))]
[JsonSerializable(typeof(MsgTraceSubjectMapping))]
[JsonSerializable(typeof(MsgTraceStreamExport))]
[JsonSerializable(typeof(MsgTraceServiceImport))]
[JsonSerializable(typeof(MsgTraceJetStreamEntry))]
[JsonSerializable(typeof(MsgTraceEgress))]
internal partial class MsgTraceJsonContext : JsonSerializerContext;