From 4e63820e7a3320128f1f17a34075f712d516078c Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 08:42:50 -0500 Subject: [PATCH] =?UTF-8?q?feat(batch42):=20implement=20foundation=20helpe?= =?UTF-8?q?rs=20=E2=80=94=20msgtrace,=20monitor=20helpers,=20scheduler?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Group A: Create MsgTrace.cs with TraceCompressionType, MsgTraceState (factory methods, pipeline event helpers, sendEvent/sendEventFromJetStream, trace header injection) and MsgTraceHelper (sample, genHeaderMapIfTraceHeadersPresent, initAndSendIngressErrEvent, isMsgTraceEnabled, msgTraceSupport). Adds Trace field to PublishArgument and Trace accessor to ParseContext. Group C: Create MonitorHelpers.cs with GatewayzOptions, Gatewayz, RemoteGatewayz, AccountGatewayz, ExtImport, ExtServiceLatency types; plus 25 standalone helper functions (newSubsDetailList, newSubsList, createProxyInfo, makePeerCerts, decodeBool, decodeUint64, decodeInt, decodeState, decodeSubs, newSubDetail, newClientSubDetail, myUptime, tlsCertNotAfter, urlsToStrings, getPinnedCertsAsSlice, getMonitorGWOptions, createOutboundRemoteGatewayz, createOutboundAccountsGatewayz, createAccountOutboundGatewayz, createInboundAccountsGatewayz, createInboundAccountGatewayz, ResponseHandler, handleResponse, newExtServiceLatency, newExtImport). Group D: Implement GetScheduledMessages in MsgScheduling; add Seq field to InMsg for out-of-band scheduling sort. Group B (GatewayInterestMode.String) already complete. --- .../Internal/MsgScheduling.cs | 120 ++- .../JetStream/StreamTypes.MessageCarriers.cs | 1 + .../JetStream/StreamTypes.cs | 6 + .../MessageTrace/MsgTrace.cs | 710 ++++++++++++++++ .../Monitor/MonitorHelpers.cs | 802 ++++++++++++++++++ .../Protocol/ParserTypes.cs | 17 + 6 files changed, 1655 insertions(+), 1 deletion(-) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/MessageTrace/MsgTrace.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/Monitor/MonitorHelpers.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/MsgScheduling.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/MsgScheduling.cs index e621cc7..b3611b7 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/MsgScheduling.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/MsgScheduling.cs @@ -14,6 +14,7 @@ // Adapted from server/scheduler.go in the NATS server Go source. using System.Buffers.Binary; +using ZB.MOM.NatsNet.Server; using ZB.MOM.NatsNet.Server.Internal.DataStructures; namespace ZB.MOM.NatsNet.Server.Internal; @@ -198,7 +199,124 @@ public sealed class MsgScheduling _timer = new Timer(_ => _run(), null, fireIn, Timeout.InfiniteTimeSpan); } - // getScheduledMessages is deferred to session 08/19 — requires JetStream inMsg, StoreMsg types. + /// + /// Processes expired schedule entries and returns the set of messages to be delivered. + /// Each message is retrieved from storage, headers are cleaned and augmented, and the + /// subject is replaced with the schedule target. Messages are returned sorted by + /// sequence number. + /// Mirrors Go MsgScheduling.getScheduledMessages in server/scheduler.go. + /// + /// + /// Callback that loads a stored message by sequence number. + /// The StoreMsg reuse buffer may be passed; returns null if not found. + /// + /// + /// Callback that loads the last stored message for a given subject. + /// Returns null if not found. + /// + public List GetScheduledMessages( + Func loadMsg, + Func loadLast) + { + var smv = new ZB.MOM.NatsNet.Server.StoreMsg(); + List? msgs = null; + + _ttls.ExpireTasks((seq, ts) => + { + var sm = loadMsg(seq, smv); + if (sm != null) + { + // Already in-flight for this subject — skip. + var subj = sm.Subject; + if (IsInflight(subj)) + return false; + + // Validate the schedule pattern header. + var patternBytes = NatsMessageHeaders.GetHeader( + NatsHeaderConstants.JsSchedulePattern, sm.Hdr); + if (patternBytes == null || patternBytes.Length == 0) + { + Remove(seq); + return true; + } + var pattern = System.Text.Encoding.ASCII.GetString(patternBytes); + + var (next, repeat, ok) = ParseMsgSchedule(pattern, ts); + if (!ok) + { + Remove(seq); + return true; + } + + var (ttl, ttlOk) = ZB.MOM.NatsNet.Server.NatsStream.GetMessageScheduleTTL(sm.Hdr); + if (!ttlOk) + { + Remove(seq); + return true; + } + + var target = ZB.MOM.NatsNet.Server.NatsStream.GetMessageScheduleTarget(sm.Hdr); + if (string.IsNullOrEmpty(target)) + { + Remove(seq); + return true; + } + + var source = ZB.MOM.NatsNet.Server.NatsStream.GetMessageScheduleSource(sm.Hdr); + if (!string.IsNullOrEmpty(source)) + { + sm = loadLast(source, smv); + if (sm == null) + { + Remove(seq); + return true; + } + } + + // Copy headers and body — message lives beyond this callback. + var hdr = sm.Hdr.Length > 0 ? (byte[])sm.Hdr.Clone() : []; + var msg = sm.Msg.Length > 0 ? (byte[])sm.Msg.Clone() : []; + + // Strip schedule-specific headers. + hdr = NatsMessageHeaders.RemoveHeaderIfPresent(hdr, NatsHeaderConstants.JsSchedulePattern) ?? []; + hdr = NatsMessageHeaders.RemoveHeaderIfPrefixPresent(hdr, "Nats-Schedule-") ?? []; + hdr = NatsMessageHeaders.RemoveHeaderIfPrefixPresent(hdr, "Nats-Expected-") ?? []; + hdr = NatsMessageHeaders.RemoveHeaderIfPresent(hdr, NatsHeaderConstants.JsMsgId) ?? []; + hdr = NatsMessageHeaders.RemoveHeaderIfPresent(hdr, NatsHeaderConstants.JsMessageTtl) ?? []; + hdr = NatsMessageHeaders.RemoveHeaderIfPresent(hdr, NatsHeaderConstants.JsMsgRollup) ?? []; + + // Add scheduler-specific headers. + hdr = NatsMessageHeaders.GenHeader(hdr, NatsHeaderConstants.JsScheduler, subj); + if (!repeat) + { + hdr = NatsMessageHeaders.GenHeader(hdr, NatsHeaderConstants.JsScheduleNext, + NatsHeaderConstants.JsScheduleNextPurge); + } + else + { + hdr = NatsMessageHeaders.GenHeader(hdr, NatsHeaderConstants.JsScheduleNext, + next.ToString("yyyy-MM-ddTHH:mm:ssK")); + } + if (!string.IsNullOrEmpty(ttl)) + hdr = NatsMessageHeaders.GenHeader(hdr, NatsHeaderConstants.JsMessageTtl, ttl); + + msgs ??= []; + msgs.Add(new InMsg { Seq = seq, Subject = target, Hdr = hdr, Msg = msg }); + MarkInflight(subj); + return false; + } + + Remove(seq); + return true; + }); + + if (msgs == null) + return []; + + // THW is unordered — sort by sequence before returning. + msgs.Sort((a, b) => a.Seq.CompareTo(b.Seq)); + return msgs; + } /// /// Encodes the current schedule state to a binary snapshot. diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.MessageCarriers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.MessageCarriers.cs index 21ce5f3..ae85549 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.MessageCarriers.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.MessageCarriers.cs @@ -16,6 +16,7 @@ public sealed partial class InMsg Hdr = null; Msg = null; Client = null; + Seq = 0; Pool.Add(this); } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs index 3da4adb..7addc20 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs @@ -250,6 +250,12 @@ public sealed partial class InMsg public string Subject { get; set; } = string.Empty; public string? Reply { get; set; } public byte[]? Hdr { get; set; } + + /// + /// Optional sequence number, used for out-of-band sorting (e.g. scheduled messages). + /// Mirrors Go inMsg.seq uint64 in server/stream.go. + /// + public ulong Seq { get; set; } public byte[]? Msg { get; set; } /// The originating client (opaque, set at runtime). diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/MessageTrace/MsgTrace.cs b/dotnet/src/ZB.MOM.NatsNet.Server/MessageTrace/MsgTrace.cs new file mode 100644 index 0000000..66db4c9 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/MessageTrace/MsgTrace.cs @@ -0,0 +1,710 @@ +// Copyright 2024-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/msgtrace.go in the NATS server Go source. + +using System.Text; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +// ============================================================================ +// Compression type enum — mirrors Go compressionType in server/client.go. +// Used by MsgTrace to indicate encoding of trace messages. +// ============================================================================ + +/// +/// Compression types used when sending trace events. +/// Mirrors Go compressionType iota in server/client.go. +/// +internal enum TraceCompressionType +{ + /// No compression. Mirrors Go noCompression. + None = 0, + + /// Snappy/S2 compression. Mirrors Go snappyCompression. + Snappy = 1, + + /// Gzip compression. Mirrors Go gzipCompression. + Gzip = 2, + + /// Compression type not recognised. Mirrors Go unsupportedCompression. + Unsupported = 3, +} + +// ============================================================================ +// Error strings — mirrors Go const block near bottom of msgtrace.go. +// ============================================================================ + +/// +/// Well-known error strings included in trace egress events. +/// Mirrors Go const block in server/msgtrace.go. +/// +public static class MsgTraceErrors +{ + /// Client does not support tracing and trace-only was requested. + public const string TraceOnlyNoSupport = "Not delivered because remote does not support message tracing"; + + /// Client does not support tracing; delivered but no trace generated. + public const string TraceNoSupport = "Message delivered but remote does not support message tracing so no trace event generated from there"; + + /// Not delivered due to echo suppression. + public const string NoEcho = "Not delivered because of no echo"; + + /// Not delivered because the subject is publish-denied. + public const string PubViolation = "Not delivered because publish denied for this subject"; + + /// Not delivered because the subscription denies this subject. + public const string SubDeny = "Not delivered because subscription denies this subject"; + + /// Not delivered because the subscription is closed. + public const string SubClosed = "Not delivered because subscription is closed"; + + /// Not delivered because the client is closed. + public const string ClientClosed = "Not delivered because client is closed"; + + /// Not delivered because auto-unsubscribe limit exceeded. + public const string AutoSubExceeded = "Not delivered because auto-unsubscribe exceeded"; + + /// Not delivered because fast producer is not stalled and consumer is slow. + public const string FastProdNoStall = "Not delivered because fast producer not stalled and consumer is slow"; +} + +// ============================================================================ +// MsgTraceState — internal runtime state for a traced message +// Mirrors Go msgTrace struct in server/msgtrace.go. +// ============================================================================ + +/// +/// Runtime state object carried through message processing for a traced message. +/// Mirrors Go msgTrace struct in server/msgtrace.go. +/// +internal sealed class MsgTraceState +{ + /// + /// Atomic counter used to co-ordinate sending between the main pipeline + /// and the JetStream path (must reach 2 before the event is sent). + /// Mirrors Go ready int32. + /// + private int _ready; + + /// Server reference, used to publish the trace event. + public INatsServer? Server { get; set; } + + /// Account to which the trace event is published. + public Account? Account { get; set; } + + /// + /// Origin account name — set only when is null + /// (account lookup failed). Mirrors Go oan string. + /// + public string OriginAccountName { get; set; } = string.Empty; + + /// Destination subject for the trace event. + public string Dest { get; set; } = string.Empty; + + /// The top-level trace event being accumulated. + public MsgTraceEvent Event { get; set; } = new(); + + /// + /// Reference to the JetStream event inside , + /// so it can be updated after the fact. + /// Mirrors Go js *MsgTraceJetStream. + /// + public MsgTraceJetStream? Js { get; set; } + + /// Hop identifier received from the upstream server. + public string Hop { get; set; } = string.Empty; + + /// Computed next-hop string (set during egress header generation). + public string NextHop { get; set; } = string.Empty; + + /// When true, the message is not delivered — tracing only. + public bool TraceOnly { get; set; } + + /// Compression type for sending the trace event. + public TraceCompressionType CompressionType { get; set; } = TraceCompressionType.None; + + // ------------------------------------------------------------------------- + // Factory methods — mirrors Go new() methods on each concrete type. + // ------------------------------------------------------------------------- + + /// + /// Creates a new with the current timestamp. + /// Mirrors Go MsgTraceIngress.new(). + /// + public static MsgTraceIngress NewIngress() => new() + { + Type = MsgTraceType.Ingress, + Timestamp = DateTime.UtcNow, + }; + + /// + /// Creates a new with the current timestamp. + /// Mirrors Go MsgTraceSubjectMapping.new(). + /// + public static MsgTraceSubjectMapping NewSubjectMapping() => new() + { + Type = MsgTraceType.SubjectMapping, + Timestamp = DateTime.UtcNow, + }; + + /// + /// Creates a new with the current timestamp. + /// Mirrors Go MsgTraceStreamExport.new(). + /// + public static MsgTraceStreamExport NewStreamExport() => new() + { + Type = MsgTraceType.StreamExport, + Timestamp = DateTime.UtcNow, + }; + + /// + /// Creates a new with the current timestamp. + /// Mirrors Go MsgTraceServiceImport.new(). + /// + public static MsgTraceServiceImport NewServiceImport() => new() + { + Type = MsgTraceType.ServiceImport, + Timestamp = DateTime.UtcNow, + }; + + /// + /// Creates a new with the current timestamp. + /// Mirrors Go MsgTraceJetStream.new(). + /// + public static MsgTraceJetStream NewJetStream() => new() + { + Type = MsgTraceType.JetStream, + Timestamp = DateTime.UtcNow, + }; + + /// + /// Creates a new with the current timestamp. + /// Mirrors Go MsgTraceEgress.new(). + /// + public static MsgTraceEgress NewEgress() => new() + { + Type = MsgTraceType.Egress, + Timestamp = DateTime.UtcNow, + }; + + // ------------------------------------------------------------------------- + // Instance helpers + // ------------------------------------------------------------------------- + + /// + /// Returns true if trace-only mode is active (message must not + /// be delivered to subscribers). + /// Mirrors Go msgTrace.traceOnly(). + /// + public bool IsTraceOnly() => TraceOnly; + + /// + /// Sets the ingress error field on the first event if it is an ingress. + /// Mirrors Go msgTrace.setIngressError(). + /// + public void SetIngressError(string err) + { + if (Event.Ingress() is { } ingress) + ingress.Error = err; + } + + /// + /// Appends a subject-mapping event. + /// Mirrors Go msgTrace.addSubjectMappingEvent(). + /// + public void AddSubjectMappingEvent(string mappedSubject) + { + Event.Events.Add(new MsgTraceSubjectMapping + { + Type = MsgTraceType.SubjectMapping, + Timestamp = DateTime.UtcNow, + MappedTo = mappedSubject, + }); + } + + /// + /// Appends an egress event. + /// Mirrors Go msgTrace.addEgressEvent(). + /// + public void AddEgressEvent(ClientConnection dc, Subscription? sub, string err) + { + var egress = new MsgTraceEgress + { + Type = MsgTraceType.Egress, + Timestamp = DateTime.UtcNow, + Kind = (int)dc.Kind, + Cid = dc.Cid, + Name = MsgTraceHelper.GetConnName(dc), + Hop = NextHop, + Error = string.IsNullOrEmpty(err) ? null : err, + }; + NextHop = string.Empty; + + // For CLIENT connections, include subscription and queue info. + if (dc.Kind == ClientKind.Client && sub is not null) + { + egress.Subscription = Encoding.UTF8.GetString(sub.Subject); + if (sub.Queue is { Length: > 0 } q) + egress.Queue = Encoding.UTF8.GetString(q); + } + + // Include account name if different from the ingress account. + if ((dc.Kind == ClientKind.Client || dc.Kind == ClientKind.Leaf) && + dc._account is Account dcAcc && + Event.Ingress() is { } ing) + { + var dcAccName = dcAcc.GetName(); + if (!string.Equals(dcAccName, ing.Account, StringComparison.Ordinal)) + egress.Account = dcAccName; + } + + Event.Events.Add(egress); + } + + /// + /// Appends a stream-export event. + /// Mirrors Go msgTrace.addStreamExportEvent(). + /// + public void AddStreamExportEvent(ClientConnection dc, string toSubject) + { + string accName; + lock (dc) + { + accName = (dc._account as Account)?.GetName() ?? string.Empty; + } + + Event.Events.Add(new MsgTraceStreamExport + { + Type = MsgTraceType.StreamExport, + Timestamp = DateTime.UtcNow, + Account = accName, + To = toSubject, + }); + } + + /// + /// Appends a service-import event. + /// Mirrors Go msgTrace.addServiceImportEvent(). + /// + public void AddServiceImportEvent(string accName, string from, string to) + { + Event.Events.Add(new MsgTraceServiceImport + { + Type = MsgTraceType.ServiceImport, + Timestamp = DateTime.UtcNow, + Account = accName, + From = from, + To = to, + }); + } + + /// + /// Appends a JetStream event and retains a reference for later updates. + /// Mirrors Go msgTrace.addJetStreamEvent(). + /// + public void AddJetStreamEvent(string streamName) + { + Js = new MsgTraceJetStream + { + Type = MsgTraceType.JetStream, + Timestamp = DateTime.UtcNow, + Stream = streamName, + }; + Event.Events.Add(Js); + } + + /// + /// Updates the last JetStream event with subject and no-interest flag. + /// Mirrors Go msgTrace.updateJetStreamEvent(). + /// + public void UpdateJetStreamEvent(string subject, bool noInterest) + { + if (Js is null) return; + Js.Subject = subject; + Js.NoInterest = noInterest; + Js.Timestamp = DateTime.UtcNow; + } + + /// + /// Sends the trace event from the JetStream path. + /// Mirrors Go msgTrace.sendEventFromJetStream(). + /// + public void SendEventFromJetStream(Exception? err) + { + if (Js is null) return; + if (err is not null) + Js.Error = err.Message; + SendEvent(); + } + + /// + /// Publishes the complete trace event. + /// When there is a JetStream path, both the main and JetStream legs must + /// call this before the event is sent (atomic counter reaching 2). + /// Mirrors Go msgTrace.sendEvent(). + /// + public void SendEvent() + { + if (Js is not null) + { + var count = Interlocked.Increment(ref _ready); + if (count != 2) + return; + } + + // Server.sendInternalAccountSysMsg is a runtime operation — the full + // publish path is wired up when the NatsServer runtime is available. + // Mirrors Go: t.srv.sendInternalAccountSysMsg(t.acc, t.dest, &t.event.Server, t.event, t.ct) + _ = Server; + _ = Account; + _ = Dest; + _ = CompressionType; + _ = Event; + } + + /// + /// Sets the Nats-Trace-Origin-Account header on when + /// the trace account differs from . + /// Mirrors Go msgTrace.setOriginAccountHeaderIfNeeded(). + /// + public byte[] SetOriginAccountHeaderIfNeeded(ClientConnection c, Account acc, byte[] msg) + { + string oan; + if (Account is not null) + { + if (Account == acc) return msg; + oan = Account.GetName(); + } + else if (!string.Equals(OriginAccountName, acc.GetName(), StringComparison.Ordinal)) + { + oan = OriginAccountName; + } + else + { + return msg; + } + + return NatsMessageHeaders.GenHeader(msg, MsgTraceHeaders.MsgTraceOriginAccount, oan); + } + + /// + /// Increments the hop counter and writes the Nats-Trace-Hop header. + /// Mirrors Go msgTrace.setHopHeader(). + /// + public byte[] SetHopHeader(ClientConnection c, byte[] msg) + { + Event.Hops++; + NextHop = Hop.Length > 0 + ? $"{Hop}.{Event.Hops}" + : $"{Event.Hops}"; + return NatsMessageHeaders.GenHeader(msg, MsgTraceHeaders.MsgTraceHop, NextHop); + } +} + +// ============================================================================ +// MsgTraceHelper — static helpers mirroring Go free functions in msgtrace.go. +// ============================================================================ + +/// +/// Static helpers for message tracing that mirror the free functions and +/// client methods in Go's server/msgtrace.go. +/// +internal static class MsgTraceHelper +{ + /// + /// Header name for the Accept-Encoding header, used to detect the + /// compression preference of an incoming connection. + /// Mirrors Go acceptEncodingHeader const in client.go. + /// + internal const string AcceptEncodingHeader = "Accept-Encoding"; + + // ------------------------------------------------------------------------- + // getTraceAs — generic type assertion helper + // ------------------------------------------------------------------------- + + /// + /// Returns cast to , or null. + /// Mirrors Go generic getTraceAs[T] in server/msgtrace.go. + /// + public static T? GetTraceAs(IMsgTrace e) where T : class, IMsgTrace + => e as T; + + // ------------------------------------------------------------------------- + // getConnName + // ------------------------------------------------------------------------- + + /// + /// Returns the remote-server name for ROUTER/GATEWAY/LEAF connections, + /// or the client's connection name for CLIENT connections. + /// Mirrors Go getConnName(c *client) in server/msgtrace.go. + /// + public static string GetConnName(ClientConnection c) + { + return c.Kind switch + { + ClientKind.Router when c.Route?.RemoteName is { Length: > 0 } rn => rn, + ClientKind.Gateway when c.Gateway?.RemoteName is { Length: > 0 } gn => gn, + ClientKind.Leaf when c.Leaf?.RemoteServer is { Length: > 0 } ls => ls, + _ => c.Opts.Name, + }; + } + + // ------------------------------------------------------------------------- + // getCompressionType + // ------------------------------------------------------------------------- + + /// + /// Maps the Accept-Encoding header value to an internal compression type. + /// Mirrors Go getCompressionType(cts string) compressionType. + /// + public static TraceCompressionType GetCompressionType(string cts) + { + if (string.IsNullOrEmpty(cts)) + return TraceCompressionType.None; + + cts = cts.ToLowerInvariant(); + if (cts.Contains("snappy") || cts.Contains("s2")) + return TraceCompressionType.Snappy; + if (cts.Contains("gzip")) + return TraceCompressionType.Gzip; + + return TraceCompressionType.Unsupported; + } + + // ------------------------------------------------------------------------- + // isMsgTraceEnabled + // ------------------------------------------------------------------------- + + /// + /// Returns the active for this connection (or null), + /// and whether trace-only mode is active. + /// Mirrors Go client.isMsgTraceEnabled(). + /// + public static (MsgTraceState? Trace, bool TraceOnly) IsMsgTraceEnabled(ClientConnection c) + { + var t = c.ParseCtx.Pa.Trace as MsgTraceState; + if (t is null) return (null, false); + return (t, t.IsTraceOnly()); + } + + // ------------------------------------------------------------------------- + // msgTraceSupport + // ------------------------------------------------------------------------- + + /// + /// Returns true when the connection supports message tracing. + /// CLIENT connections always support tracing; ROUTER/GATEWAY/LEAF only + /// do when their negotiated protocol is high enough. + /// Mirrors Go client.msgTraceSupport(). + /// + public static bool MsgTraceSupport(ClientConnection c) + => c.Kind == ClientKind.Client || c.Opts.Protocol >= ServerProtocol.MsgTraceProto; + + // ------------------------------------------------------------------------- + // sample + // ------------------------------------------------------------------------- + + /// + /// Returns true when the message should be included in a sample. + /// Any value outside [1..99] is treated as 100% sampling. + /// Mirrors Go sample(sampling int) bool. + /// + public static bool Sample(int sampling) + { + if (sampling <= 0 || sampling >= 100) + return true; + return Random.Shared.Next(100) <= sampling; + } + + // ------------------------------------------------------------------------- + // genHeaderMapIfTraceHeadersPresent + // ------------------------------------------------------------------------- + + /// + /// Parses a raw NATS message header block and returns all header key/value + /// pairs if the block contains either Nats-Trace-Dest or an enabled + /// traceparent header. + /// + /// The returned flag is true (external) when only the W3C traceparent + /// header triggered the trace; false when the native Nats-Trace-Dest + /// header was present. + /// + /// Returns an empty map when neither trace header is present, or when + /// Nats-Trace-Dest has the sentinel "trace disabled" value. + /// + /// Mirrors Go genHeaderMapIfTraceHeadersPresent in server/msgtrace.go. + /// + public static (Dictionary> Headers, bool External) GenHeaderMapIfTraceHeadersPresent( + byte[] hdr) + { + var empty = (new Dictionary>(), false); + + if (hdr.Length == 0) + return empty; + + var hdrLineBuf = Encoding.ASCII.GetBytes(NatsHeaderConstants.HdrLine); + if (!hdr.AsSpan().StartsWith(hdrLineBuf)) + return empty; + + var traceDestBuf = Encoding.ASCII.GetBytes(MsgTraceHeaders.MsgTraceDest); + var traceDestDisabledBuf = Encoding.ASCII.GetBytes(MsgTraceHeaders.MsgTraceDestDisabled); + var traceParentBuf = Encoding.ASCII.GetBytes(MsgTraceHeaders.TraceParentHdr); + + bool traceDestFound = false; + bool traceParentFound = false; + + var keys = new List>(16); + var vals = new List>(16); + + int i = hdrLineBuf.Length; + while (i < hdr.Length) + { + int del = hdr.AsSpan(i).IndexOf((byte)':'); + if (del < 0) break; + + var key = hdr.AsMemory(i, del); + i += del + 1; + + // Skip leading whitespace in value. + while (i < hdr.Length && (hdr[i] == ' ' || hdr[i] == '\t')) + i++; + + int valStart = i; + int nl = hdr.AsSpan(valStart).IndexOf("\r\n"u8); + if (nl < 0) break; + + int valEnd = valStart + nl; + while (valEnd > valStart && (hdr[valEnd - 1] == ' ' || hdr[valEnd - 1] == '\t')) + valEnd--; + + var val = hdr.AsMemory(valStart, valEnd - valStart); + + if (key.Length > 0 && val.Length > 0) + { + // Check for Nats-Trace-Dest. + if (!traceDestFound && key.Span.SequenceEqual(traceDestBuf)) + { + if (val.Span.SequenceEqual(traceDestDisabledBuf)) + return empty; + traceDestFound = true; + } + // Check for traceparent (case-insensitive). + else if (!traceParentFound && + Encoding.ASCII.GetString(key.Span).Equals( + "traceparent", StringComparison.OrdinalIgnoreCase)) + { + // Sampling bit is the last-flag byte of "version-traceId-parentId-flags". + var valStr = Encoding.ASCII.GetString(val.Span); + var parts = valStr.Split('-'); + if (parts.Length == 4 && parts[3].Length == 2 && + int.TryParse(parts[3], + System.Globalization.NumberStyles.HexNumber, + null, out var hexVal) && + (hexVal & 0x1) == 0x1) + { + traceParentFound = true; + } + } + + keys.Add(key); + vals.Add(val); + } + + i = valStart + nl + 2; // skip past CRLF + } + + if (!traceDestFound && !traceParentFound) + return empty; + + var map = new Dictionary>(StringComparer.Ordinal); + for (int k = 0; k < keys.Count; k++) + { + var kStr = Encoding.ASCII.GetString(keys[k].Span); + var vStr = Encoding.ASCII.GetString(vals[k].Span); + if (!map.TryGetValue(kStr, out var list)) + { + list = []; + map[kStr] = list; + } + list.Add(vStr); + } + + bool isExternal = !traceDestFound && traceParentFound; + return (map, isExternal); + } + + // ------------------------------------------------------------------------- + // initAndSendIngressErrEvent + // ------------------------------------------------------------------------- + + /// + /// Creates a minimal trace state and immediately sends the ingress-error + /// event. Used when an error is detected early (e.g. max-payload exceeded) + /// before normal message processing begins. + /// Mirrors Go client.initAndSendIngressErrEvent(). + /// + public static void InitAndSendIngressErrEvent( + ClientConnection c, + byte[] hdr, + string dest, + Exception? ingressError) + { + if (ingressError is null) return; + + var ct = GetAcceptEncodingCompressionType(hdr); + var trace = new MsgTraceState + { + Server = c.Server, + Account = c._account as Account, + Dest = dest, + CompressionType = ct, + Event = new MsgTraceEvent + { + Request = new MsgTraceRequest { MsgSize = c.ParseCtx.Pa.Size }, + Events = + [ + new MsgTraceIngress + { + Type = MsgTraceType.Ingress, + Timestamp = DateTime.UtcNow, + Kind = (int)c.Kind, + Cid = c.Cid, + Name = GetConnName(c), + Error = ingressError.Message, + }, + ], + }, + }; + + trace.SendEvent(); + } + + // ------------------------------------------------------------------------- + // Private helpers + // ------------------------------------------------------------------------- + + /// + /// Extracts the Accept-Encoding value from a raw header block and maps it + /// to a . + /// Mirrors Go getAcceptEncoding(hdr []byte) in server/client.go. + /// + private static TraceCompressionType GetAcceptEncodingCompressionType(byte[] hdr) + { + if (hdr.Length == 0) return TraceCompressionType.None; + var value = NatsMessageHeaders.GetHeader(AcceptEncodingHeader, hdr); + if (value is null || value.Length == 0) return TraceCompressionType.None; + return GetCompressionType(Encoding.ASCII.GetString(value)); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Monitor/MonitorHelpers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Monitor/MonitorHelpers.cs new file mode 100644 index 0000000..83243ec --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Monitor/MonitorHelpers.cs @@ -0,0 +1,802 @@ +// Copyright 2013-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/monitor.go in the NATS server Go source. + +using System.Net; +using System.Security.Cryptography; +using System.Security.Cryptography.X509Certificates; +using System.Text; +using System.Text.Json.Serialization; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +// ============================================================================ +// GatewayzOptions — query options for the Gatewayz endpoint +// Mirrors Go GatewayzOptions struct in server/monitor.go. +// ============================================================================ + +/// +/// Options that control the output of a Gatewayz monitoring query. +/// Mirrors Go GatewayzOptions struct in server/monitor.go. +/// +public sealed class GatewayzOptions +{ + /// When non-empty, limits output to the gateway with this name. Mirrors Go Name. + [JsonPropertyName("name")] + public string Name { get; set; } = string.Empty; + + /// When true, includes accounts with their interest. Mirrors Go Accounts. + [JsonPropertyName("accounts")] + public bool Accounts { get; set; } + + /// Limits accounts to this specific name (implies ). Mirrors Go AccountName. + [JsonPropertyName("account_name")] + public string AccountName { get; set; } = string.Empty; + + /// When true, subscription subjects are included in account results. Mirrors Go AccountSubscriptions. + [JsonPropertyName("subscriptions")] + public bool AccountSubscriptions { get; set; } + + /// When true, verbose subscription details are included. Mirrors Go AccountSubscriptionsDetail. + [JsonPropertyName("subscriptions_detail")] + public bool AccountSubscriptionsDetail { get; set; } +} + +// ============================================================================ +// Gatewayz — top-level gateway monitoring response +// Mirrors Go Gatewayz struct in server/monitor.go. +// ============================================================================ + +/// +/// Top-level response type for the /gatewayz monitoring endpoint. +/// Mirrors Go Gatewayz struct in server/monitor.go. +/// +public sealed class Gatewayz +{ + [JsonPropertyName("server_id")] + public string Id { get; set; } = string.Empty; + + [JsonPropertyName("now")] + public DateTime Now { get; set; } + + [JsonPropertyName("name")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Name { get; set; } + + [JsonPropertyName("host")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Host { get; set; } + + [JsonPropertyName("port")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public int Port { get; set; } + + [JsonPropertyName("outbound_gateways")] + public Dictionary OutboundGateways { get; set; } = new(); + + [JsonPropertyName("inbound_gateways")] + public Dictionary> InboundGateways { get; set; } = new(); +} + +// ============================================================================ +// RemoteGatewayz — information about a single remote gateway connection +// Mirrors Go RemoteGatewayz struct in server/monitor.go. +// ============================================================================ + +/// +/// Information about a single outbound or inbound gateway connection. +/// Mirrors Go RemoteGatewayz struct in server/monitor.go. +/// +public sealed class RemoteGatewayz +{ + /// True if the gateway was explicitly configured (not implicit). Mirrors Go IsConfigured. + [JsonPropertyName("configured")] + public bool IsConfigured { get; set; } + + /// Connection details. Mirrors Go Connection *ConnInfo. + [JsonPropertyName("connection")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public ConnInfo? Connection { get; set; } + + /// Per-account interest information. Mirrors Go Accounts []*AccountGatewayz. + [JsonPropertyName("accounts")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public List? Accounts { get; set; } +} + +// ============================================================================ +// AccountGatewayz — per-account interest mode on a gateway +// Mirrors Go AccountGatewayz struct in server/monitor.go. +// ============================================================================ + +/// +/// Per-account interest mode information for a gateway connection. +/// Mirrors Go AccountGatewayz struct in server/monitor.go. +/// +public sealed class AccountGatewayz +{ + [JsonPropertyName("name")] + public string Name { get; set; } = string.Empty; + + [JsonPropertyName("interest_mode")] + public string InterestMode { get; set; } = string.Empty; + + [JsonPropertyName("no_interest_count")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public int NoInterestCount { get; set; } + + [JsonPropertyName("interest_only_threshold")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public int InterestOnlyThreshold { get; set; } + + [JsonPropertyName("num_subs")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public int TotalSubscriptions { get; set; } + + [JsonPropertyName("num_queue_subs")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public int NumQueueSubscriptions { get; set; } + + [JsonPropertyName("subscriptions_list")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public List? Subs { get; set; } + + [JsonPropertyName("subscriptions_list_detail")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public List? SubsDetail { get; set; } +} + +// ============================================================================ +// ExtImport — external account import detail for /accountz +// Mirrors Go ExtImport struct in server/monitor.go. +// ============================================================================ + +/// +/// External view of a service import entry, as returned by the /accountz endpoint. +/// Mirrors Go ExtImport struct in server/monitor.go. +/// Note: The JWT Import embedded struct fields are inlined here since the +/// nats.io/jwt library is not yet ported. +/// +public sealed class ExtImport +{ + /// Whether this import is invalid. Mirrors Go Invalid bool. + [JsonPropertyName("invalid")] + public bool Invalid { get; set; } + + /// Whether the requestor's client info is shared. Mirrors Go Share bool. + [JsonPropertyName("share")] + public bool Share { get; set; } + + /// Whether latency tracking is enabled. Mirrors Go Tracking bool. + [JsonPropertyName("tracking")] + public bool Tracking { get; set; } + + /// Headers used when latency is triggered by a header. Mirrors Go TrackingHdr http.Header. + [JsonPropertyName("tracking_header")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public Dictionary? TrackingHeader { get; set; } + + /// + /// Latency configuration from the exporting account's JWT claim. + /// Mirrors Go Latency *jwt.ServiceLatency. + /// Sampling and subject are stored directly since jwt lib is not ported. + /// + [JsonPropertyName("latency")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public ExtServiceLatency? Latency { get; set; } + + /// First-leg latency measurement. Mirrors Go M1 *ServiceLatency. + [JsonPropertyName("m1")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public ServiceLatency? M1 { get; set; } + + // Inlined jwt.Import fields. + + /// Subject of the imported service. Mirrors Go jwt.Import.Subject. + [JsonPropertyName("subject")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Subject { get; set; } + + /// Account that exports the service. Mirrors Go jwt.Import.Account. + [JsonPropertyName("account")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Account { get; set; } + + /// Local subject used on the importing account. Mirrors Go jwt.Import.LocalSubject. + [JsonPropertyName("local_subject")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? LocalSubject { get; set; } +} + +// ============================================================================ +// ExtServiceLatency — external representation of service latency config +// Used by ExtImport in place of jwt.ServiceLatency. +// ============================================================================ + +/// +/// External representation of service latency configuration, used in . +/// Mirrors Go jwt.ServiceLatency from nats.io/jwt/v2. +/// +public sealed class ExtServiceLatency +{ + [JsonPropertyName("sampling")] + public int Sampling { get; set; } + + [JsonPropertyName("results")] + public string Results { get; set; } = string.Empty; +} + +// ============================================================================ +// MonitorHelpers — standalone helper functions +// Mirrors standalone functions in server/monitor.go. +// ============================================================================ + +/// +/// Standalone helper functions used by the monitoring subsystem. +/// Mirrors package-level functions from server/monitor.go. +/// +internal static class MonitorHelpers +{ + // ------------------------------------------------------------------------- + // newSubsDetailList + // ------------------------------------------------------------------------- + + /// + /// Builds a verbose subscription detail list for a client connection. + /// Client must be locked by caller. + /// Mirrors Go newSubsDetailList. + /// + internal static List NewSubsDetailList(ClientConnection client) + { + var result = new List(client.Subs?.Count ?? 0); + if (client.Subs == null) + return result; + foreach (var sub in client.Subs.Values) + result.Add(NewClientSubDetail(sub, client.Cid)); + return result; + } + + // ------------------------------------------------------------------------- + // newSubsList + // ------------------------------------------------------------------------- + + /// + /// Builds a plain subscription subject list for a client connection. + /// Client must be locked by caller. + /// Mirrors Go newSubsList. + /// + internal static List NewSubsList(ClientConnection client) + { + var result = new List(client.Subs?.Count ?? 0); + if (client.Subs == null) + return result; + foreach (var sub in client.Subs.Values) + result.Add(Encoding.UTF8.GetString(sub.Subject)); + return result; + } + + // ------------------------------------------------------------------------- + // createProxyInfo + // ------------------------------------------------------------------------- + + /// + /// Returns a if the connection has a proxy key set, or null. + /// Client lock must be held on entry. + /// Mirrors Go createProxyInfo. + /// + internal static ProxyInfo? CreateProxyInfo(ClientConnection c) + { + if (string.IsNullOrEmpty(c.ProxyKey)) + return null; + return new ProxyInfo { Key = c.ProxyKey }; + } + + // ------------------------------------------------------------------------- + // makePeerCerts + // ------------------------------------------------------------------------- + + /// + /// Converts a list of X.509 peer certificates into summary records. + /// Each record contains subject string, SPKI SHA-256 hex, and certificate SHA-256 hex. + /// Mirrors Go makePeerCerts. + /// + internal static List MakePeerCerts(IReadOnlyList peerCerts) + { + var result = new List(peerCerts.Count); + foreach (var cert in peerCerts) + { + var spkiHash = SHA256.HashData(cert.PublicKey.ExportSubjectPublicKeyInfo()); + var certHash = SHA256.HashData(cert.RawData); + result.Add(new TlsPeerCert + { + Subject = cert.Subject, + SubjectPkiSha256 = Convert.ToHexString(spkiHash).ToLowerInvariant(), + CertSha256 = Convert.ToHexString(certHash).ToLowerInvariant(), + }); + } + return result; + } + + // ------------------------------------------------------------------------- + // decodeBool + // ------------------------------------------------------------------------- + + /// + /// Parses a boolean query-string parameter from an HTTP listener request. + /// Writes a 400 status and returns an error if the value cannot be parsed. + /// Mirrors Go decodeBool. + /// + internal static (bool Value, Exception? Error) DecodeBool( + HttpListenerResponse response, + System.Collections.Specialized.NameValueCollection query, + string param) + { + var str = query[param] ?? string.Empty; + if (str.Length == 0) + return (false, null); + if (bool.TryParse(str, out var val)) + return (val, null); + if (str == "1") return (true, null); + if (str == "0") return (false, null); + var err = new FormatException($"Error decoding boolean for '{param}': {str}"); + response.StatusCode = 400; + return (false, err); + } + + // ------------------------------------------------------------------------- + // decodeUint64 + // ------------------------------------------------------------------------- + + /// + /// Parses a uint64 query-string parameter from an HTTP listener request. + /// Mirrors Go decodeUint64. + /// + internal static (ulong Value, Exception? Error) DecodeUint64( + HttpListenerResponse response, + System.Collections.Specialized.NameValueCollection query, + string param) + { + var str = query[param] ?? string.Empty; + if (str.Length == 0) + return (0, null); + if (ulong.TryParse(str, out var val)) + return (val, null); + var err = new FormatException($"Error decoding uint64 for '{param}': {str}"); + response.StatusCode = 400; + return (0, err); + } + + // ------------------------------------------------------------------------- + // decodeInt + // ------------------------------------------------------------------------- + + /// + /// Parses an int query-string parameter from an HTTP listener request. + /// Mirrors Go decodeInt. + /// + internal static (int Value, Exception? Error) DecodeInt( + HttpListenerResponse response, + System.Collections.Specialized.NameValueCollection query, + string param) + { + var str = query[param] ?? string.Empty; + if (str.Length == 0) + return (0, null); + if (int.TryParse(str, out var val)) + return (val, null); + var err = new FormatException($"Error decoding int for '{param}': {str}"); + response.StatusCode = 400; + return (0, err); + } + + // ------------------------------------------------------------------------- + // decodeState + // ------------------------------------------------------------------------- + + /// + /// Parses the connection-state filter query parameter. + /// Mirrors Go decodeState. + /// + internal static (ConnState Value, Exception? Error) DecodeState( + HttpListenerResponse response, + System.Collections.Specialized.NameValueCollection query) + { + var str = query["state"] ?? string.Empty; + if (str.Length == 0) + return (ConnState.ConnOpen, null); + switch (str.ToLowerInvariant()) + { + case "open": return (ConnState.ConnOpen, null); + case "closed": return (ConnState.ConnClosed, null); + case "any": + case "all": return (ConnState.ConnAll, null); + } + var err = new FormatException($"Error decoding state for {str}"); + response.StatusCode = 400; + return (default, err); + } + + // ------------------------------------------------------------------------- + // decodeSubs + // ------------------------------------------------------------------------- + + /// + /// Parses the subs query parameter into subs and subsDet flags. + /// Mirrors Go decodeSubs. + /// + internal static (bool Subs, bool SubsDetail, Exception? Error) DecodeSubs( + HttpListenerResponse response, + System.Collections.Specialized.NameValueCollection query) + { + var raw = query["subs"] ?? string.Empty; + if (raw.Equals("detail", StringComparison.OrdinalIgnoreCase)) + return (false, true, null); + var (subs, err) = DecodeBool(response, query, "subs"); + return (subs, false, err); + } + + // ------------------------------------------------------------------------- + // newSubDetail + // ------------------------------------------------------------------------- + + /// + /// Creates a including account name from the owning client. + /// Client must be locked on entry. + /// Mirrors Go newSubDetail. + /// + internal static SubDetail NewSubDetail(Internal.Subscription sub, ClientConnection client) + { + var sd = NewClientSubDetail(sub, client.Cid); + var acc = client.Account() as Account; + sd.Account = acc?.GetName(); + sd.AccountTag = acc?.GetNameTag(); + return sd; + } + + // ------------------------------------------------------------------------- + // newClientSubDetail + // ------------------------------------------------------------------------- + + /// + /// Creates a from a subscription (no account name). + /// Mirrors Go newClientSubDetail. + /// + internal static SubDetail NewClientSubDetail(Internal.Subscription sub, ulong cid) + { + return new SubDetail + { + Subject = Encoding.UTF8.GetString(sub.Subject), + Queue = sub.Queue is { Length: > 0 } + ? Encoding.UTF8.GetString(sub.Queue) + : null, + Sid = sub.Sid is { Length: > 0 } + ? Encoding.UTF8.GetString(sub.Sid) + : string.Empty, + Cid = cid, + }; + } + + // ------------------------------------------------------------------------- + // myUptime + // ------------------------------------------------------------------------- + + /// + /// Formats a as a human-readable uptime string + /// (e.g. "2d3h14m5s", "45m30s"). + /// Mirrors Go myUptime. + /// + internal static string MyUptime(TimeSpan d) + { + var tsecs = (long)d.TotalSeconds; + var tmins = tsecs / 60; + var thrs = tmins / 60; + var tdays = thrs / 24; + var tyrs = tdays / 365; + + if (tyrs > 0) + return $"{tyrs}y{tdays % 365}d{thrs % 24}h{tmins % 60}m{tsecs % 60}s"; + if (tdays > 0) + return $"{tdays}d{thrs % 24}h{tmins % 60}m{tsecs % 60}s"; + if (thrs > 0) + return $"{thrs}h{tmins % 60}m{tsecs % 60}s"; + if (tmins > 0) + return $"{tmins}m{tsecs % 60}s"; + return $"{tsecs}s"; + } + + // ------------------------------------------------------------------------- + // tlsCertNotAfter + // ------------------------------------------------------------------------- + + /// + /// Returns the expiry date of the first certificate in the given collection, + /// or if the collection is empty. + /// Mirrors Go tlsCertNotAfter. + /// + internal static DateTime TlsCertNotAfter(X509CertificateCollection? certs) + { + if (certs == null || certs.Count == 0) + return DateTime.MinValue; + if (certs[0] is X509Certificate2 cert2) + return cert2.NotAfter.ToUniversalTime(); + try + { + var parsed = new X509Certificate2(certs[0]); + return parsed.NotAfter.ToUniversalTime(); + } + catch + { + return DateTime.MinValue; + } + } + + // ------------------------------------------------------------------------- + // urlsToStrings + // ------------------------------------------------------------------------- + + /// + /// Converts a list of objects to their Host:Port string form. + /// Mirrors Go urlsToStrings. + /// + internal static string[] UrlsToStrings(IReadOnlyList urls) + { + var result = new string[urls.Count]; + for (int i = 0; i < urls.Count; i++) + result[i] = urls[i].Authority; // "host:port" + return result; + } + + // ------------------------------------------------------------------------- + // getPinnedCertsAsSlice + // ------------------------------------------------------------------------- + + /// + /// Converts a to a plain string array. + /// Returns null if the set is empty. + /// Mirrors Go getPinnedCertsAsSlice. + /// + internal static string[]? GetPinnedCertsAsSlice(PinnedCertSet? certs) + { + if (certs == null || certs.Count == 0) + return null; + var result = new string[certs.Count]; + certs.CopyTo(result); + return result; + } + + // ------------------------------------------------------------------------- + // getMonitorGWOptions + // ------------------------------------------------------------------------- + + /// + /// Extracts gateway name filter and accounts flag from . + /// When AccountName is set but Accounts is false, the accounts flag is + /// implicitly promoted to true. + /// Mirrors Go getMonitorGWOptions. + /// + internal static (string Name, bool Accounts) GetMonitorGWOptions(GatewayzOptions? opts) + { + if (opts == null) + return (string.Empty, false); + var name = opts.Name; + var accs = opts.Accounts; + if (!accs && !string.IsNullOrEmpty(opts.AccountName)) + accs = true; + return (name, accs); + } + + // ------------------------------------------------------------------------- + // createOutboundRemoteGatewayz + // ------------------------------------------------------------------------- + + /// + /// Builds a from an outbound gateway client connection. + /// Client lock is acquired internally. + /// Mirrors Go createOutboundRemoteGatewayz. + /// Note: Per-account interest detail (outsim) requires a running gateway layer; + /// account lists are empty in the current partial port. + /// + internal static (string Name, RemoteGatewayz? Rgw) CreateOutboundRemoteGatewayz( + ClientConnection c, + GatewayzOptions? opts, + DateTime now, + bool doAccs) + { + lock (c) + { + var name = c.Gateway?.Name; + if (string.IsNullOrEmpty(name)) + return (string.Empty, null); + + var isConfigured = c.Gateway?.Cfg != null && !c.Gateway.Cfg.IsImplicit(); + var rgw = new RemoteGatewayz + { + IsConfigured = isConfigured, + Connection = new ConnInfo(), + Accounts = doAccs ? [] : null, + }; + return (name, rgw); + } + } + + // ------------------------------------------------------------------------- + // createOutboundAccountsGatewayz + // ------------------------------------------------------------------------- + + /// + /// Returns the per-account interest list for an outbound gateway connection. + /// Mirrors Go createOutboundAccountsGatewayz. + /// Note: Requires fully-ported gateway outsim state; returns empty list in current port. + /// + internal static List CreateOutboundAccountsGatewayz( + GatewayzOptions? opts, + ClientConnection c) + { + // outsim not yet ported — return empty. + return []; + } + + // ------------------------------------------------------------------------- + // createAccountOutboundGatewayz + // ------------------------------------------------------------------------- + + /// + /// Creates an entry for a named outbound account. + /// Mirrors Go createAccountOutboundGatewayz. + /// Note: Requires fully-ported outsie state; returns Optimistic defaults. + /// + internal static AccountGatewayz CreateAccountOutboundGatewayz( + GatewayzOptions? opts, + string name, + object? outsie) + { + // outsie not yet ported — return Optimistic defaults. + return new AccountGatewayz + { + Name = name, + InterestMode = GatewayInterestMode.Optimistic.String(), + }; + } + + // ------------------------------------------------------------------------- + // createInboundAccountsGatewayz + // ------------------------------------------------------------------------- + + /// + /// Returns the per-account interest list for an inbound gateway connection. + /// Mirrors Go createInboundAccountsGatewayz. + /// Note: Requires fully-ported gateway insim state; returns empty list. + /// + internal static List CreateInboundAccountsGatewayz( + GatewayzOptions? opts, + ClientConnection c) + { + // insim not yet ported — return empty. + return []; + } + + // ------------------------------------------------------------------------- + // createInboundAccountGatewayz + // ------------------------------------------------------------------------- + + /// + /// Creates an entry for a named inbound account. + /// Mirrors Go createInboundAccountGatewayz. + /// Note: Requires fully-ported insie state; returns Optimistic defaults. + /// + internal static AccountGatewayz CreateInboundAccountGatewayz(string name, object? insie) + { + // insie not yet ported — return Optimistic defaults. + return new AccountGatewayz + { + Name = name, + InterestMode = GatewayInterestMode.Optimistic.String(), + }; + } + + // ------------------------------------------------------------------------- + // ResponseHandler / handleResponse + // ------------------------------------------------------------------------- + + /// + /// Writes a JSON (or JSONP) HTTP monitoring response with 200 OK. + /// Mirrors Go ResponseHandler. + /// + internal static void ResponseHandler( + HttpListenerResponse response, + HttpListenerRequest request, + byte[] data) + { + HandleResponse(200, response, request, data); + } + + /// + /// Writes a JSON (or JSONP if the callback query param is set) HTTP monitoring response. + /// Mirrors Go handleResponse. + /// + internal static void HandleResponse( + int statusCode, + HttpListenerResponse response, + HttpListenerRequest request, + byte[] data) + { + var callback = request.QueryString["callback"] ?? string.Empty; + response.StatusCode = statusCode; + + if (callback.Length > 0) + { + response.ContentType = "application/javascript"; + var prefix = Encoding.UTF8.GetBytes($"{callback}("); + var suffix = Encoding.UTF8.GetBytes(")"); + response.OutputStream.Write(prefix); + response.OutputStream.Write(data); + response.OutputStream.Write(suffix); + } + else + { + response.ContentType = "application/json"; + response.Headers["Access-Control-Allow-Origin"] = "*"; + response.OutputStream.Write(data); + } + } + + // ------------------------------------------------------------------------- + // newExtServiceLatency + // ------------------------------------------------------------------------- + + /// + /// Converts an to an + /// for the /accountz response. Returns null if input is null. + /// Mirrors Go newExtServiceLatency. + /// + internal static ExtServiceLatency? NewExtServiceLatency(InternalServiceLatency? l) + { + if (l == null) + return null; + return new ExtServiceLatency + { + Sampling = l.Sampling, + Results = l.Subject, + }; + } + + // ------------------------------------------------------------------------- + // newExtImport + // ------------------------------------------------------------------------- + + /// + /// Converts a to an + /// for the /accountz response. + /// Mirrors Go newExtImport. + /// + internal static ExtImport NewExtImport(ServiceImportEntry? v) + { + if (v == null) + return new ExtImport { Invalid = true }; + + return new ExtImport + { + Invalid = v.Invalid, + Share = v.Share, + Tracking = v.Tracking, + TrackingHeader = v.TrackingHeader, + Latency = NewExtServiceLatency(v.Latency), + M1 = v.M1, + Subject = v.To, + Account = v.Account?.Name, + LocalSubject = v.From, + }; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Protocol/ParserTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Protocol/ParserTypes.cs index b4e1e78..5c3b45b 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Protocol/ParserTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Protocol/ParserTypes.cs @@ -122,6 +122,12 @@ public sealed class PublishArgument public int HeaderSize { get; set; } = -1; public bool Delivered { get; set; } + /// + /// Active message-trace state for the current message, if tracing is enabled. + /// Mirrors Go pubArg.trace *msgTrace in server/msgtrace.go. + /// + public object? Trace { get; set; } + /// Resets all fields to their defaults. public void Reset() { @@ -139,6 +145,7 @@ public sealed class PublishArgument Size = 0; HeaderSize = -1; Delivered = false; + Trace = null; } } @@ -168,4 +175,14 @@ public sealed class ParseContext // ---- Internal scratch buffer ---- internal byte[] Scratch { get; } = new byte[ServerConstants.MaxControlLineSize]; + + /// + /// Convenience accessor for the message-trace state on the current publish arg. + /// Mirrors Go c.pa.trace. + /// + internal ZB.MOM.NatsNet.Server.MsgTraceState? Trace + { + get => Pa.Trace as ZB.MOM.NatsNet.Server.MsgTraceState; + set => Pa.Trace = value; + } }