diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/MsgScheduling.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/MsgScheduling.cs index e621cc7..b3611b7 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/MsgScheduling.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/MsgScheduling.cs @@ -14,6 +14,7 @@ // Adapted from server/scheduler.go in the NATS server Go source. using System.Buffers.Binary; +using ZB.MOM.NatsNet.Server; using ZB.MOM.NatsNet.Server.Internal.DataStructures; namespace ZB.MOM.NatsNet.Server.Internal; @@ -198,7 +199,124 @@ public sealed class MsgScheduling _timer = new Timer(_ => _run(), null, fireIn, Timeout.InfiniteTimeSpan); } - // getScheduledMessages is deferred to session 08/19 — requires JetStream inMsg, StoreMsg types. + /// + /// Processes expired schedule entries and returns the set of messages to be delivered. + /// Each message is retrieved from storage, headers are cleaned and augmented, and the + /// subject is replaced with the schedule target. Messages are returned sorted by + /// sequence number. + /// Mirrors Go MsgScheduling.getScheduledMessages in server/scheduler.go. + /// + /// + /// Callback that loads a stored message by sequence number. + /// The StoreMsg reuse buffer may be passed; returns null if not found. + /// + /// + /// Callback that loads the last stored message for a given subject. + /// Returns null if not found. + /// + public List GetScheduledMessages( + Func loadMsg, + Func loadLast) + { + var smv = new ZB.MOM.NatsNet.Server.StoreMsg(); + List? msgs = null; + + _ttls.ExpireTasks((seq, ts) => + { + var sm = loadMsg(seq, smv); + if (sm != null) + { + // Already in-flight for this subject — skip. + var subj = sm.Subject; + if (IsInflight(subj)) + return false; + + // Validate the schedule pattern header. + var patternBytes = NatsMessageHeaders.GetHeader( + NatsHeaderConstants.JsSchedulePattern, sm.Hdr); + if (patternBytes == null || patternBytes.Length == 0) + { + Remove(seq); + return true; + } + var pattern = System.Text.Encoding.ASCII.GetString(patternBytes); + + var (next, repeat, ok) = ParseMsgSchedule(pattern, ts); + if (!ok) + { + Remove(seq); + return true; + } + + var (ttl, ttlOk) = ZB.MOM.NatsNet.Server.NatsStream.GetMessageScheduleTTL(sm.Hdr); + if (!ttlOk) + { + Remove(seq); + return true; + } + + var target = ZB.MOM.NatsNet.Server.NatsStream.GetMessageScheduleTarget(sm.Hdr); + if (string.IsNullOrEmpty(target)) + { + Remove(seq); + return true; + } + + var source = ZB.MOM.NatsNet.Server.NatsStream.GetMessageScheduleSource(sm.Hdr); + if (!string.IsNullOrEmpty(source)) + { + sm = loadLast(source, smv); + if (sm == null) + { + Remove(seq); + return true; + } + } + + // Copy headers and body — message lives beyond this callback. + var hdr = sm.Hdr.Length > 0 ? (byte[])sm.Hdr.Clone() : []; + var msg = sm.Msg.Length > 0 ? (byte[])sm.Msg.Clone() : []; + + // Strip schedule-specific headers. + hdr = NatsMessageHeaders.RemoveHeaderIfPresent(hdr, NatsHeaderConstants.JsSchedulePattern) ?? []; + hdr = NatsMessageHeaders.RemoveHeaderIfPrefixPresent(hdr, "Nats-Schedule-") ?? []; + hdr = NatsMessageHeaders.RemoveHeaderIfPrefixPresent(hdr, "Nats-Expected-") ?? []; + hdr = NatsMessageHeaders.RemoveHeaderIfPresent(hdr, NatsHeaderConstants.JsMsgId) ?? []; + hdr = NatsMessageHeaders.RemoveHeaderIfPresent(hdr, NatsHeaderConstants.JsMessageTtl) ?? []; + hdr = NatsMessageHeaders.RemoveHeaderIfPresent(hdr, NatsHeaderConstants.JsMsgRollup) ?? []; + + // Add scheduler-specific headers. + hdr = NatsMessageHeaders.GenHeader(hdr, NatsHeaderConstants.JsScheduler, subj); + if (!repeat) + { + hdr = NatsMessageHeaders.GenHeader(hdr, NatsHeaderConstants.JsScheduleNext, + NatsHeaderConstants.JsScheduleNextPurge); + } + else + { + hdr = NatsMessageHeaders.GenHeader(hdr, NatsHeaderConstants.JsScheduleNext, + next.ToString("yyyy-MM-ddTHH:mm:ssK")); + } + if (!string.IsNullOrEmpty(ttl)) + hdr = NatsMessageHeaders.GenHeader(hdr, NatsHeaderConstants.JsMessageTtl, ttl); + + msgs ??= []; + msgs.Add(new InMsg { Seq = seq, Subject = target, Hdr = hdr, Msg = msg }); + MarkInflight(subj); + return false; + } + + Remove(seq); + return true; + }); + + if (msgs == null) + return []; + + // THW is unordered — sort by sequence before returning. + msgs.Sort((a, b) => a.Seq.CompareTo(b.Seq)); + return msgs; + } /// /// Encodes the current schedule state to a binary snapshot. diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.MessageCarriers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.MessageCarriers.cs index 21ce5f3..ae85549 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.MessageCarriers.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.MessageCarriers.cs @@ -16,6 +16,7 @@ public sealed partial class InMsg Hdr = null; Msg = null; Client = null; + Seq = 0; Pool.Add(this); } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs index 3da4adb..7addc20 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs @@ -250,6 +250,12 @@ public sealed partial class InMsg public string Subject { get; set; } = string.Empty; public string? Reply { get; set; } public byte[]? Hdr { get; set; } + + /// + /// Optional sequence number, used for out-of-band sorting (e.g. scheduled messages). + /// Mirrors Go inMsg.seq uint64 in server/stream.go. + /// + public ulong Seq { get; set; } public byte[]? Msg { get; set; } /// The originating client (opaque, set at runtime). diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/MessageTrace/MsgTrace.cs b/dotnet/src/ZB.MOM.NatsNet.Server/MessageTrace/MsgTrace.cs new file mode 100644 index 0000000..66db4c9 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/MessageTrace/MsgTrace.cs @@ -0,0 +1,710 @@ +// Copyright 2024-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/msgtrace.go in the NATS server Go source. + +using System.Text; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +// ============================================================================ +// Compression type enum — mirrors Go compressionType in server/client.go. +// Used by MsgTrace to indicate encoding of trace messages. +// ============================================================================ + +/// +/// Compression types used when sending trace events. +/// Mirrors Go compressionType iota in server/client.go. +/// +internal enum TraceCompressionType +{ + /// No compression. Mirrors Go noCompression. + None = 0, + + /// Snappy/S2 compression. Mirrors Go snappyCompression. + Snappy = 1, + + /// Gzip compression. Mirrors Go gzipCompression. + Gzip = 2, + + /// Compression type not recognised. Mirrors Go unsupportedCompression. + Unsupported = 3, +} + +// ============================================================================ +// Error strings — mirrors Go const block near bottom of msgtrace.go. +// ============================================================================ + +/// +/// Well-known error strings included in trace egress events. +/// Mirrors Go const block in server/msgtrace.go. +/// +public static class MsgTraceErrors +{ + /// Client does not support tracing and trace-only was requested. + public const string TraceOnlyNoSupport = "Not delivered because remote does not support message tracing"; + + /// Client does not support tracing; delivered but no trace generated. + public const string TraceNoSupport = "Message delivered but remote does not support message tracing so no trace event generated from there"; + + /// Not delivered due to echo suppression. + public const string NoEcho = "Not delivered because of no echo"; + + /// Not delivered because the subject is publish-denied. + public const string PubViolation = "Not delivered because publish denied for this subject"; + + /// Not delivered because the subscription denies this subject. + public const string SubDeny = "Not delivered because subscription denies this subject"; + + /// Not delivered because the subscription is closed. + public const string SubClosed = "Not delivered because subscription is closed"; + + /// Not delivered because the client is closed. + public const string ClientClosed = "Not delivered because client is closed"; + + /// Not delivered because auto-unsubscribe limit exceeded. + public const string AutoSubExceeded = "Not delivered because auto-unsubscribe exceeded"; + + /// Not delivered because fast producer is not stalled and consumer is slow. + public const string FastProdNoStall = "Not delivered because fast producer not stalled and consumer is slow"; +} + +// ============================================================================ +// MsgTraceState — internal runtime state for a traced message +// Mirrors Go msgTrace struct in server/msgtrace.go. +// ============================================================================ + +/// +/// Runtime state object carried through message processing for a traced message. +/// Mirrors Go msgTrace struct in server/msgtrace.go. +/// +internal sealed class MsgTraceState +{ + /// + /// Atomic counter used to co-ordinate sending between the main pipeline + /// and the JetStream path (must reach 2 before the event is sent). + /// Mirrors Go ready int32. + /// + private int _ready; + + /// Server reference, used to publish the trace event. + public INatsServer? Server { get; set; } + + /// Account to which the trace event is published. + public Account? Account { get; set; } + + /// + /// Origin account name — set only when is null + /// (account lookup failed). Mirrors Go oan string. + /// + public string OriginAccountName { get; set; } = string.Empty; + + /// Destination subject for the trace event. + public string Dest { get; set; } = string.Empty; + + /// The top-level trace event being accumulated. + public MsgTraceEvent Event { get; set; } = new(); + + /// + /// Reference to the JetStream event inside , + /// so it can be updated after the fact. + /// Mirrors Go js *MsgTraceJetStream. + /// + public MsgTraceJetStream? Js { get; set; } + + /// Hop identifier received from the upstream server. + public string Hop { get; set; } = string.Empty; + + /// Computed next-hop string (set during egress header generation). + public string NextHop { get; set; } = string.Empty; + + /// When true, the message is not delivered — tracing only. + public bool TraceOnly { get; set; } + + /// Compression type for sending the trace event. + public TraceCompressionType CompressionType { get; set; } = TraceCompressionType.None; + + // ------------------------------------------------------------------------- + // Factory methods — mirrors Go new() methods on each concrete type. + // ------------------------------------------------------------------------- + + /// + /// Creates a new with the current timestamp. + /// Mirrors Go MsgTraceIngress.new(). + /// + public static MsgTraceIngress NewIngress() => new() + { + Type = MsgTraceType.Ingress, + Timestamp = DateTime.UtcNow, + }; + + /// + /// Creates a new with the current timestamp. + /// Mirrors Go MsgTraceSubjectMapping.new(). + /// + public static MsgTraceSubjectMapping NewSubjectMapping() => new() + { + Type = MsgTraceType.SubjectMapping, + Timestamp = DateTime.UtcNow, + }; + + /// + /// Creates a new with the current timestamp. + /// Mirrors Go MsgTraceStreamExport.new(). + /// + public static MsgTraceStreamExport NewStreamExport() => new() + { + Type = MsgTraceType.StreamExport, + Timestamp = DateTime.UtcNow, + }; + + /// + /// Creates a new with the current timestamp. + /// Mirrors Go MsgTraceServiceImport.new(). + /// + public static MsgTraceServiceImport NewServiceImport() => new() + { + Type = MsgTraceType.ServiceImport, + Timestamp = DateTime.UtcNow, + }; + + /// + /// Creates a new with the current timestamp. + /// Mirrors Go MsgTraceJetStream.new(). + /// + public static MsgTraceJetStream NewJetStream() => new() + { + Type = MsgTraceType.JetStream, + Timestamp = DateTime.UtcNow, + }; + + /// + /// Creates a new with the current timestamp. + /// Mirrors Go MsgTraceEgress.new(). + /// + public static MsgTraceEgress NewEgress() => new() + { + Type = MsgTraceType.Egress, + Timestamp = DateTime.UtcNow, + }; + + // ------------------------------------------------------------------------- + // Instance helpers + // ------------------------------------------------------------------------- + + /// + /// Returns true if trace-only mode is active (message must not + /// be delivered to subscribers). + /// Mirrors Go msgTrace.traceOnly(). + /// + public bool IsTraceOnly() => TraceOnly; + + /// + /// Sets the ingress error field on the first event if it is an ingress. + /// Mirrors Go msgTrace.setIngressError(). + /// + public void SetIngressError(string err) + { + if (Event.Ingress() is { } ingress) + ingress.Error = err; + } + + /// + /// Appends a subject-mapping event. + /// Mirrors Go msgTrace.addSubjectMappingEvent(). + /// + public void AddSubjectMappingEvent(string mappedSubject) + { + Event.Events.Add(new MsgTraceSubjectMapping + { + Type = MsgTraceType.SubjectMapping, + Timestamp = DateTime.UtcNow, + MappedTo = mappedSubject, + }); + } + + /// + /// Appends an egress event. + /// Mirrors Go msgTrace.addEgressEvent(). + /// + public void AddEgressEvent(ClientConnection dc, Subscription? sub, string err) + { + var egress = new MsgTraceEgress + { + Type = MsgTraceType.Egress, + Timestamp = DateTime.UtcNow, + Kind = (int)dc.Kind, + Cid = dc.Cid, + Name = MsgTraceHelper.GetConnName(dc), + Hop = NextHop, + Error = string.IsNullOrEmpty(err) ? null : err, + }; + NextHop = string.Empty; + + // For CLIENT connections, include subscription and queue info. + if (dc.Kind == ClientKind.Client && sub is not null) + { + egress.Subscription = Encoding.UTF8.GetString(sub.Subject); + if (sub.Queue is { Length: > 0 } q) + egress.Queue = Encoding.UTF8.GetString(q); + } + + // Include account name if different from the ingress account. + if ((dc.Kind == ClientKind.Client || dc.Kind == ClientKind.Leaf) && + dc._account is Account dcAcc && + Event.Ingress() is { } ing) + { + var dcAccName = dcAcc.GetName(); + if (!string.Equals(dcAccName, ing.Account, StringComparison.Ordinal)) + egress.Account = dcAccName; + } + + Event.Events.Add(egress); + } + + /// + /// Appends a stream-export event. + /// Mirrors Go msgTrace.addStreamExportEvent(). + /// + public void AddStreamExportEvent(ClientConnection dc, string toSubject) + { + string accName; + lock (dc) + { + accName = (dc._account as Account)?.GetName() ?? string.Empty; + } + + Event.Events.Add(new MsgTraceStreamExport + { + Type = MsgTraceType.StreamExport, + Timestamp = DateTime.UtcNow, + Account = accName, + To = toSubject, + }); + } + + /// + /// Appends a service-import event. + /// Mirrors Go msgTrace.addServiceImportEvent(). + /// + public void AddServiceImportEvent(string accName, string from, string to) + { + Event.Events.Add(new MsgTraceServiceImport + { + Type = MsgTraceType.ServiceImport, + Timestamp = DateTime.UtcNow, + Account = accName, + From = from, + To = to, + }); + } + + /// + /// Appends a JetStream event and retains a reference for later updates. + /// Mirrors Go msgTrace.addJetStreamEvent(). + /// + public void AddJetStreamEvent(string streamName) + { + Js = new MsgTraceJetStream + { + Type = MsgTraceType.JetStream, + Timestamp = DateTime.UtcNow, + Stream = streamName, + }; + Event.Events.Add(Js); + } + + /// + /// Updates the last JetStream event with subject and no-interest flag. + /// Mirrors Go msgTrace.updateJetStreamEvent(). + /// + public void UpdateJetStreamEvent(string subject, bool noInterest) + { + if (Js is null) return; + Js.Subject = subject; + Js.NoInterest = noInterest; + Js.Timestamp = DateTime.UtcNow; + } + + /// + /// Sends the trace event from the JetStream path. + /// Mirrors Go msgTrace.sendEventFromJetStream(). + /// + public void SendEventFromJetStream(Exception? err) + { + if (Js is null) return; + if (err is not null) + Js.Error = err.Message; + SendEvent(); + } + + /// + /// Publishes the complete trace event. + /// When there is a JetStream path, both the main and JetStream legs must + /// call this before the event is sent (atomic counter reaching 2). + /// Mirrors Go msgTrace.sendEvent(). + /// + public void SendEvent() + { + if (Js is not null) + { + var count = Interlocked.Increment(ref _ready); + if (count != 2) + return; + } + + // Server.sendInternalAccountSysMsg is a runtime operation — the full + // publish path is wired up when the NatsServer runtime is available. + // Mirrors Go: t.srv.sendInternalAccountSysMsg(t.acc, t.dest, &t.event.Server, t.event, t.ct) + _ = Server; + _ = Account; + _ = Dest; + _ = CompressionType; + _ = Event; + } + + /// + /// Sets the Nats-Trace-Origin-Account header on when + /// the trace account differs from . + /// Mirrors Go msgTrace.setOriginAccountHeaderIfNeeded(). + /// + public byte[] SetOriginAccountHeaderIfNeeded(ClientConnection c, Account acc, byte[] msg) + { + string oan; + if (Account is not null) + { + if (Account == acc) return msg; + oan = Account.GetName(); + } + else if (!string.Equals(OriginAccountName, acc.GetName(), StringComparison.Ordinal)) + { + oan = OriginAccountName; + } + else + { + return msg; + } + + return NatsMessageHeaders.GenHeader(msg, MsgTraceHeaders.MsgTraceOriginAccount, oan); + } + + /// + /// Increments the hop counter and writes the Nats-Trace-Hop header. + /// Mirrors Go msgTrace.setHopHeader(). + /// + public byte[] SetHopHeader(ClientConnection c, byte[] msg) + { + Event.Hops++; + NextHop = Hop.Length > 0 + ? $"{Hop}.{Event.Hops}" + : $"{Event.Hops}"; + return NatsMessageHeaders.GenHeader(msg, MsgTraceHeaders.MsgTraceHop, NextHop); + } +} + +// ============================================================================ +// MsgTraceHelper — static helpers mirroring Go free functions in msgtrace.go. +// ============================================================================ + +/// +/// Static helpers for message tracing that mirror the free functions and +/// client methods in Go's server/msgtrace.go. +/// +internal static class MsgTraceHelper +{ + /// + /// Header name for the Accept-Encoding header, used to detect the + /// compression preference of an incoming connection. + /// Mirrors Go acceptEncodingHeader const in client.go. + /// + internal const string AcceptEncodingHeader = "Accept-Encoding"; + + // ------------------------------------------------------------------------- + // getTraceAs — generic type assertion helper + // ------------------------------------------------------------------------- + + /// + /// Returns cast to , or null. + /// Mirrors Go generic getTraceAs[T] in server/msgtrace.go. + /// + public static T? GetTraceAs(IMsgTrace e) where T : class, IMsgTrace + => e as T; + + // ------------------------------------------------------------------------- + // getConnName + // ------------------------------------------------------------------------- + + /// + /// Returns the remote-server name for ROUTER/GATEWAY/LEAF connections, + /// or the client's connection name for CLIENT connections. + /// Mirrors Go getConnName(c *client) in server/msgtrace.go. + /// + public static string GetConnName(ClientConnection c) + { + return c.Kind switch + { + ClientKind.Router when c.Route?.RemoteName is { Length: > 0 } rn => rn, + ClientKind.Gateway when c.Gateway?.RemoteName is { Length: > 0 } gn => gn, + ClientKind.Leaf when c.Leaf?.RemoteServer is { Length: > 0 } ls => ls, + _ => c.Opts.Name, + }; + } + + // ------------------------------------------------------------------------- + // getCompressionType + // ------------------------------------------------------------------------- + + /// + /// Maps the Accept-Encoding header value to an internal compression type. + /// Mirrors Go getCompressionType(cts string) compressionType. + /// + public static TraceCompressionType GetCompressionType(string cts) + { + if (string.IsNullOrEmpty(cts)) + return TraceCompressionType.None; + + cts = cts.ToLowerInvariant(); + if (cts.Contains("snappy") || cts.Contains("s2")) + return TraceCompressionType.Snappy; + if (cts.Contains("gzip")) + return TraceCompressionType.Gzip; + + return TraceCompressionType.Unsupported; + } + + // ------------------------------------------------------------------------- + // isMsgTraceEnabled + // ------------------------------------------------------------------------- + + /// + /// Returns the active for this connection (or null), + /// and whether trace-only mode is active. + /// Mirrors Go client.isMsgTraceEnabled(). + /// + public static (MsgTraceState? Trace, bool TraceOnly) IsMsgTraceEnabled(ClientConnection c) + { + var t = c.ParseCtx.Pa.Trace as MsgTraceState; + if (t is null) return (null, false); + return (t, t.IsTraceOnly()); + } + + // ------------------------------------------------------------------------- + // msgTraceSupport + // ------------------------------------------------------------------------- + + /// + /// Returns true when the connection supports message tracing. + /// CLIENT connections always support tracing; ROUTER/GATEWAY/LEAF only + /// do when their negotiated protocol is high enough. + /// Mirrors Go client.msgTraceSupport(). + /// + public static bool MsgTraceSupport(ClientConnection c) + => c.Kind == ClientKind.Client || c.Opts.Protocol >= ServerProtocol.MsgTraceProto; + + // ------------------------------------------------------------------------- + // sample + // ------------------------------------------------------------------------- + + /// + /// Returns true when the message should be included in a sample. + /// Any value outside [1..99] is treated as 100% sampling. + /// Mirrors Go sample(sampling int) bool. + /// + public static bool Sample(int sampling) + { + if (sampling <= 0 || sampling >= 100) + return true; + return Random.Shared.Next(100) <= sampling; + } + + // ------------------------------------------------------------------------- + // genHeaderMapIfTraceHeadersPresent + // ------------------------------------------------------------------------- + + /// + /// Parses a raw NATS message header block and returns all header key/value + /// pairs if the block contains either Nats-Trace-Dest or an enabled + /// traceparent header. + /// + /// The returned flag is true (external) when only the W3C traceparent + /// header triggered the trace; false when the native Nats-Trace-Dest + /// header was present. + /// + /// Returns an empty map when neither trace header is present, or when + /// Nats-Trace-Dest has the sentinel "trace disabled" value. + /// + /// Mirrors Go genHeaderMapIfTraceHeadersPresent in server/msgtrace.go. + /// + public static (Dictionary> Headers, bool External) GenHeaderMapIfTraceHeadersPresent( + byte[] hdr) + { + var empty = (new Dictionary>(), false); + + if (hdr.Length == 0) + return empty; + + var hdrLineBuf = Encoding.ASCII.GetBytes(NatsHeaderConstants.HdrLine); + if (!hdr.AsSpan().StartsWith(hdrLineBuf)) + return empty; + + var traceDestBuf = Encoding.ASCII.GetBytes(MsgTraceHeaders.MsgTraceDest); + var traceDestDisabledBuf = Encoding.ASCII.GetBytes(MsgTraceHeaders.MsgTraceDestDisabled); + var traceParentBuf = Encoding.ASCII.GetBytes(MsgTraceHeaders.TraceParentHdr); + + bool traceDestFound = false; + bool traceParentFound = false; + + var keys = new List>(16); + var vals = new List>(16); + + int i = hdrLineBuf.Length; + while (i < hdr.Length) + { + int del = hdr.AsSpan(i).IndexOf((byte)':'); + if (del < 0) break; + + var key = hdr.AsMemory(i, del); + i += del + 1; + + // Skip leading whitespace in value. + while (i < hdr.Length && (hdr[i] == ' ' || hdr[i] == '\t')) + i++; + + int valStart = i; + int nl = hdr.AsSpan(valStart).IndexOf("\r\n"u8); + if (nl < 0) break; + + int valEnd = valStart + nl; + while (valEnd > valStart && (hdr[valEnd - 1] == ' ' || hdr[valEnd - 1] == '\t')) + valEnd--; + + var val = hdr.AsMemory(valStart, valEnd - valStart); + + if (key.Length > 0 && val.Length > 0) + { + // Check for Nats-Trace-Dest. + if (!traceDestFound && key.Span.SequenceEqual(traceDestBuf)) + { + if (val.Span.SequenceEqual(traceDestDisabledBuf)) + return empty; + traceDestFound = true; + } + // Check for traceparent (case-insensitive). + else if (!traceParentFound && + Encoding.ASCII.GetString(key.Span).Equals( + "traceparent", StringComparison.OrdinalIgnoreCase)) + { + // Sampling bit is the last-flag byte of "version-traceId-parentId-flags". + var valStr = Encoding.ASCII.GetString(val.Span); + var parts = valStr.Split('-'); + if (parts.Length == 4 && parts[3].Length == 2 && + int.TryParse(parts[3], + System.Globalization.NumberStyles.HexNumber, + null, out var hexVal) && + (hexVal & 0x1) == 0x1) + { + traceParentFound = true; + } + } + + keys.Add(key); + vals.Add(val); + } + + i = valStart + nl + 2; // skip past CRLF + } + + if (!traceDestFound && !traceParentFound) + return empty; + + var map = new Dictionary>(StringComparer.Ordinal); + for (int k = 0; k < keys.Count; k++) + { + var kStr = Encoding.ASCII.GetString(keys[k].Span); + var vStr = Encoding.ASCII.GetString(vals[k].Span); + if (!map.TryGetValue(kStr, out var list)) + { + list = []; + map[kStr] = list; + } + list.Add(vStr); + } + + bool isExternal = !traceDestFound && traceParentFound; + return (map, isExternal); + } + + // ------------------------------------------------------------------------- + // initAndSendIngressErrEvent + // ------------------------------------------------------------------------- + + /// + /// Creates a minimal trace state and immediately sends the ingress-error + /// event. Used when an error is detected early (e.g. max-payload exceeded) + /// before normal message processing begins. + /// Mirrors Go client.initAndSendIngressErrEvent(). + /// + public static void InitAndSendIngressErrEvent( + ClientConnection c, + byte[] hdr, + string dest, + Exception? ingressError) + { + if (ingressError is null) return; + + var ct = GetAcceptEncodingCompressionType(hdr); + var trace = new MsgTraceState + { + Server = c.Server, + Account = c._account as Account, + Dest = dest, + CompressionType = ct, + Event = new MsgTraceEvent + { + Request = new MsgTraceRequest { MsgSize = c.ParseCtx.Pa.Size }, + Events = + [ + new MsgTraceIngress + { + Type = MsgTraceType.Ingress, + Timestamp = DateTime.UtcNow, + Kind = (int)c.Kind, + Cid = c.Cid, + Name = GetConnName(c), + Error = ingressError.Message, + }, + ], + }, + }; + + trace.SendEvent(); + } + + // ------------------------------------------------------------------------- + // Private helpers + // ------------------------------------------------------------------------- + + /// + /// Extracts the Accept-Encoding value from a raw header block and maps it + /// to a . + /// Mirrors Go getAcceptEncoding(hdr []byte) in server/client.go. + /// + private static TraceCompressionType GetAcceptEncodingCompressionType(byte[] hdr) + { + if (hdr.Length == 0) return TraceCompressionType.None; + var value = NatsMessageHeaders.GetHeader(AcceptEncodingHeader, hdr); + if (value is null || value.Length == 0) return TraceCompressionType.None; + return GetCompressionType(Encoding.ASCII.GetString(value)); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Monitor/MonitorHelpers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Monitor/MonitorHelpers.cs new file mode 100644 index 0000000..83243ec --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Monitor/MonitorHelpers.cs @@ -0,0 +1,802 @@ +// Copyright 2013-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/monitor.go in the NATS server Go source. + +using System.Net; +using System.Security.Cryptography; +using System.Security.Cryptography.X509Certificates; +using System.Text; +using System.Text.Json.Serialization; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +// ============================================================================ +// GatewayzOptions — query options for the Gatewayz endpoint +// Mirrors Go GatewayzOptions struct in server/monitor.go. +// ============================================================================ + +/// +/// Options that control the output of a Gatewayz monitoring query. +/// Mirrors Go GatewayzOptions struct in server/monitor.go. +/// +public sealed class GatewayzOptions +{ + /// When non-empty, limits output to the gateway with this name. Mirrors Go Name. + [JsonPropertyName("name")] + public string Name { get; set; } = string.Empty; + + /// When true, includes accounts with their interest. Mirrors Go Accounts. + [JsonPropertyName("accounts")] + public bool Accounts { get; set; } + + /// Limits accounts to this specific name (implies ). Mirrors Go AccountName. + [JsonPropertyName("account_name")] + public string AccountName { get; set; } = string.Empty; + + /// When true, subscription subjects are included in account results. Mirrors Go AccountSubscriptions. + [JsonPropertyName("subscriptions")] + public bool AccountSubscriptions { get; set; } + + /// When true, verbose subscription details are included. Mirrors Go AccountSubscriptionsDetail. + [JsonPropertyName("subscriptions_detail")] + public bool AccountSubscriptionsDetail { get; set; } +} + +// ============================================================================ +// Gatewayz — top-level gateway monitoring response +// Mirrors Go Gatewayz struct in server/monitor.go. +// ============================================================================ + +/// +/// Top-level response type for the /gatewayz monitoring endpoint. +/// Mirrors Go Gatewayz struct in server/monitor.go. +/// +public sealed class Gatewayz +{ + [JsonPropertyName("server_id")] + public string Id { get; set; } = string.Empty; + + [JsonPropertyName("now")] + public DateTime Now { get; set; } + + [JsonPropertyName("name")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Name { get; set; } + + [JsonPropertyName("host")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Host { get; set; } + + [JsonPropertyName("port")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public int Port { get; set; } + + [JsonPropertyName("outbound_gateways")] + public Dictionary OutboundGateways { get; set; } = new(); + + [JsonPropertyName("inbound_gateways")] + public Dictionary> InboundGateways { get; set; } = new(); +} + +// ============================================================================ +// RemoteGatewayz — information about a single remote gateway connection +// Mirrors Go RemoteGatewayz struct in server/monitor.go. +// ============================================================================ + +/// +/// Information about a single outbound or inbound gateway connection. +/// Mirrors Go RemoteGatewayz struct in server/monitor.go. +/// +public sealed class RemoteGatewayz +{ + /// True if the gateway was explicitly configured (not implicit). Mirrors Go IsConfigured. + [JsonPropertyName("configured")] + public bool IsConfigured { get; set; } + + /// Connection details. Mirrors Go Connection *ConnInfo. + [JsonPropertyName("connection")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public ConnInfo? Connection { get; set; } + + /// Per-account interest information. Mirrors Go Accounts []*AccountGatewayz. + [JsonPropertyName("accounts")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public List? Accounts { get; set; } +} + +// ============================================================================ +// AccountGatewayz — per-account interest mode on a gateway +// Mirrors Go AccountGatewayz struct in server/monitor.go. +// ============================================================================ + +/// +/// Per-account interest mode information for a gateway connection. +/// Mirrors Go AccountGatewayz struct in server/monitor.go. +/// +public sealed class AccountGatewayz +{ + [JsonPropertyName("name")] + public string Name { get; set; } = string.Empty; + + [JsonPropertyName("interest_mode")] + public string InterestMode { get; set; } = string.Empty; + + [JsonPropertyName("no_interest_count")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public int NoInterestCount { get; set; } + + [JsonPropertyName("interest_only_threshold")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public int InterestOnlyThreshold { get; set; } + + [JsonPropertyName("num_subs")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public int TotalSubscriptions { get; set; } + + [JsonPropertyName("num_queue_subs")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public int NumQueueSubscriptions { get; set; } + + [JsonPropertyName("subscriptions_list")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public List? Subs { get; set; } + + [JsonPropertyName("subscriptions_list_detail")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public List? SubsDetail { get; set; } +} + +// ============================================================================ +// ExtImport — external account import detail for /accountz +// Mirrors Go ExtImport struct in server/monitor.go. +// ============================================================================ + +/// +/// External view of a service import entry, as returned by the /accountz endpoint. +/// Mirrors Go ExtImport struct in server/monitor.go. +/// Note: The JWT Import embedded struct fields are inlined here since the +/// nats.io/jwt library is not yet ported. +/// +public sealed class ExtImport +{ + /// Whether this import is invalid. Mirrors Go Invalid bool. + [JsonPropertyName("invalid")] + public bool Invalid { get; set; } + + /// Whether the requestor's client info is shared. Mirrors Go Share bool. + [JsonPropertyName("share")] + public bool Share { get; set; } + + /// Whether latency tracking is enabled. Mirrors Go Tracking bool. + [JsonPropertyName("tracking")] + public bool Tracking { get; set; } + + /// Headers used when latency is triggered by a header. Mirrors Go TrackingHdr http.Header. + [JsonPropertyName("tracking_header")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public Dictionary? TrackingHeader { get; set; } + + /// + /// Latency configuration from the exporting account's JWT claim. + /// Mirrors Go Latency *jwt.ServiceLatency. + /// Sampling and subject are stored directly since jwt lib is not ported. + /// + [JsonPropertyName("latency")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public ExtServiceLatency? Latency { get; set; } + + /// First-leg latency measurement. Mirrors Go M1 *ServiceLatency. + [JsonPropertyName("m1")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public ServiceLatency? M1 { get; set; } + + // Inlined jwt.Import fields. + + /// Subject of the imported service. Mirrors Go jwt.Import.Subject. + [JsonPropertyName("subject")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Subject { get; set; } + + /// Account that exports the service. Mirrors Go jwt.Import.Account. + [JsonPropertyName("account")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Account { get; set; } + + /// Local subject used on the importing account. Mirrors Go jwt.Import.LocalSubject. + [JsonPropertyName("local_subject")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? LocalSubject { get; set; } +} + +// ============================================================================ +// ExtServiceLatency — external representation of service latency config +// Used by ExtImport in place of jwt.ServiceLatency. +// ============================================================================ + +/// +/// External representation of service latency configuration, used in . +/// Mirrors Go jwt.ServiceLatency from nats.io/jwt/v2. +/// +public sealed class ExtServiceLatency +{ + [JsonPropertyName("sampling")] + public int Sampling { get; set; } + + [JsonPropertyName("results")] + public string Results { get; set; } = string.Empty; +} + +// ============================================================================ +// MonitorHelpers — standalone helper functions +// Mirrors standalone functions in server/monitor.go. +// ============================================================================ + +/// +/// Standalone helper functions used by the monitoring subsystem. +/// Mirrors package-level functions from server/monitor.go. +/// +internal static class MonitorHelpers +{ + // ------------------------------------------------------------------------- + // newSubsDetailList + // ------------------------------------------------------------------------- + + /// + /// Builds a verbose subscription detail list for a client connection. + /// Client must be locked by caller. + /// Mirrors Go newSubsDetailList. + /// + internal static List NewSubsDetailList(ClientConnection client) + { + var result = new List(client.Subs?.Count ?? 0); + if (client.Subs == null) + return result; + foreach (var sub in client.Subs.Values) + result.Add(NewClientSubDetail(sub, client.Cid)); + return result; + } + + // ------------------------------------------------------------------------- + // newSubsList + // ------------------------------------------------------------------------- + + /// + /// Builds a plain subscription subject list for a client connection. + /// Client must be locked by caller. + /// Mirrors Go newSubsList. + /// + internal static List NewSubsList(ClientConnection client) + { + var result = new List(client.Subs?.Count ?? 0); + if (client.Subs == null) + return result; + foreach (var sub in client.Subs.Values) + result.Add(Encoding.UTF8.GetString(sub.Subject)); + return result; + } + + // ------------------------------------------------------------------------- + // createProxyInfo + // ------------------------------------------------------------------------- + + /// + /// Returns a if the connection has a proxy key set, or null. + /// Client lock must be held on entry. + /// Mirrors Go createProxyInfo. + /// + internal static ProxyInfo? CreateProxyInfo(ClientConnection c) + { + if (string.IsNullOrEmpty(c.ProxyKey)) + return null; + return new ProxyInfo { Key = c.ProxyKey }; + } + + // ------------------------------------------------------------------------- + // makePeerCerts + // ------------------------------------------------------------------------- + + /// + /// Converts a list of X.509 peer certificates into summary records. + /// Each record contains subject string, SPKI SHA-256 hex, and certificate SHA-256 hex. + /// Mirrors Go makePeerCerts. + /// + internal static List MakePeerCerts(IReadOnlyList peerCerts) + { + var result = new List(peerCerts.Count); + foreach (var cert in peerCerts) + { + var spkiHash = SHA256.HashData(cert.PublicKey.ExportSubjectPublicKeyInfo()); + var certHash = SHA256.HashData(cert.RawData); + result.Add(new TlsPeerCert + { + Subject = cert.Subject, + SubjectPkiSha256 = Convert.ToHexString(spkiHash).ToLowerInvariant(), + CertSha256 = Convert.ToHexString(certHash).ToLowerInvariant(), + }); + } + return result; + } + + // ------------------------------------------------------------------------- + // decodeBool + // ------------------------------------------------------------------------- + + /// + /// Parses a boolean query-string parameter from an HTTP listener request. + /// Writes a 400 status and returns an error if the value cannot be parsed. + /// Mirrors Go decodeBool. + /// + internal static (bool Value, Exception? Error) DecodeBool( + HttpListenerResponse response, + System.Collections.Specialized.NameValueCollection query, + string param) + { + var str = query[param] ?? string.Empty; + if (str.Length == 0) + return (false, null); + if (bool.TryParse(str, out var val)) + return (val, null); + if (str == "1") return (true, null); + if (str == "0") return (false, null); + var err = new FormatException($"Error decoding boolean for '{param}': {str}"); + response.StatusCode = 400; + return (false, err); + } + + // ------------------------------------------------------------------------- + // decodeUint64 + // ------------------------------------------------------------------------- + + /// + /// Parses a uint64 query-string parameter from an HTTP listener request. + /// Mirrors Go decodeUint64. + /// + internal static (ulong Value, Exception? Error) DecodeUint64( + HttpListenerResponse response, + System.Collections.Specialized.NameValueCollection query, + string param) + { + var str = query[param] ?? string.Empty; + if (str.Length == 0) + return (0, null); + if (ulong.TryParse(str, out var val)) + return (val, null); + var err = new FormatException($"Error decoding uint64 for '{param}': {str}"); + response.StatusCode = 400; + return (0, err); + } + + // ------------------------------------------------------------------------- + // decodeInt + // ------------------------------------------------------------------------- + + /// + /// Parses an int query-string parameter from an HTTP listener request. + /// Mirrors Go decodeInt. + /// + internal static (int Value, Exception? Error) DecodeInt( + HttpListenerResponse response, + System.Collections.Specialized.NameValueCollection query, + string param) + { + var str = query[param] ?? string.Empty; + if (str.Length == 0) + return (0, null); + if (int.TryParse(str, out var val)) + return (val, null); + var err = new FormatException($"Error decoding int for '{param}': {str}"); + response.StatusCode = 400; + return (0, err); + } + + // ------------------------------------------------------------------------- + // decodeState + // ------------------------------------------------------------------------- + + /// + /// Parses the connection-state filter query parameter. + /// Mirrors Go decodeState. + /// + internal static (ConnState Value, Exception? Error) DecodeState( + HttpListenerResponse response, + System.Collections.Specialized.NameValueCollection query) + { + var str = query["state"] ?? string.Empty; + if (str.Length == 0) + return (ConnState.ConnOpen, null); + switch (str.ToLowerInvariant()) + { + case "open": return (ConnState.ConnOpen, null); + case "closed": return (ConnState.ConnClosed, null); + case "any": + case "all": return (ConnState.ConnAll, null); + } + var err = new FormatException($"Error decoding state for {str}"); + response.StatusCode = 400; + return (default, err); + } + + // ------------------------------------------------------------------------- + // decodeSubs + // ------------------------------------------------------------------------- + + /// + /// Parses the subs query parameter into subs and subsDet flags. + /// Mirrors Go decodeSubs. + /// + internal static (bool Subs, bool SubsDetail, Exception? Error) DecodeSubs( + HttpListenerResponse response, + System.Collections.Specialized.NameValueCollection query) + { + var raw = query["subs"] ?? string.Empty; + if (raw.Equals("detail", StringComparison.OrdinalIgnoreCase)) + return (false, true, null); + var (subs, err) = DecodeBool(response, query, "subs"); + return (subs, false, err); + } + + // ------------------------------------------------------------------------- + // newSubDetail + // ------------------------------------------------------------------------- + + /// + /// Creates a including account name from the owning client. + /// Client must be locked on entry. + /// Mirrors Go newSubDetail. + /// + internal static SubDetail NewSubDetail(Internal.Subscription sub, ClientConnection client) + { + var sd = NewClientSubDetail(sub, client.Cid); + var acc = client.Account() as Account; + sd.Account = acc?.GetName(); + sd.AccountTag = acc?.GetNameTag(); + return sd; + } + + // ------------------------------------------------------------------------- + // newClientSubDetail + // ------------------------------------------------------------------------- + + /// + /// Creates a from a subscription (no account name). + /// Mirrors Go newClientSubDetail. + /// + internal static SubDetail NewClientSubDetail(Internal.Subscription sub, ulong cid) + { + return new SubDetail + { + Subject = Encoding.UTF8.GetString(sub.Subject), + Queue = sub.Queue is { Length: > 0 } + ? Encoding.UTF8.GetString(sub.Queue) + : null, + Sid = sub.Sid is { Length: > 0 } + ? Encoding.UTF8.GetString(sub.Sid) + : string.Empty, + Cid = cid, + }; + } + + // ------------------------------------------------------------------------- + // myUptime + // ------------------------------------------------------------------------- + + /// + /// Formats a as a human-readable uptime string + /// (e.g. "2d3h14m5s", "45m30s"). + /// Mirrors Go myUptime. + /// + internal static string MyUptime(TimeSpan d) + { + var tsecs = (long)d.TotalSeconds; + var tmins = tsecs / 60; + var thrs = tmins / 60; + var tdays = thrs / 24; + var tyrs = tdays / 365; + + if (tyrs > 0) + return $"{tyrs}y{tdays % 365}d{thrs % 24}h{tmins % 60}m{tsecs % 60}s"; + if (tdays > 0) + return $"{tdays}d{thrs % 24}h{tmins % 60}m{tsecs % 60}s"; + if (thrs > 0) + return $"{thrs}h{tmins % 60}m{tsecs % 60}s"; + if (tmins > 0) + return $"{tmins}m{tsecs % 60}s"; + return $"{tsecs}s"; + } + + // ------------------------------------------------------------------------- + // tlsCertNotAfter + // ------------------------------------------------------------------------- + + /// + /// Returns the expiry date of the first certificate in the given collection, + /// or if the collection is empty. + /// Mirrors Go tlsCertNotAfter. + /// + internal static DateTime TlsCertNotAfter(X509CertificateCollection? certs) + { + if (certs == null || certs.Count == 0) + return DateTime.MinValue; + if (certs[0] is X509Certificate2 cert2) + return cert2.NotAfter.ToUniversalTime(); + try + { + var parsed = new X509Certificate2(certs[0]); + return parsed.NotAfter.ToUniversalTime(); + } + catch + { + return DateTime.MinValue; + } + } + + // ------------------------------------------------------------------------- + // urlsToStrings + // ------------------------------------------------------------------------- + + /// + /// Converts a list of objects to their Host:Port string form. + /// Mirrors Go urlsToStrings. + /// + internal static string[] UrlsToStrings(IReadOnlyList urls) + { + var result = new string[urls.Count]; + for (int i = 0; i < urls.Count; i++) + result[i] = urls[i].Authority; // "host:port" + return result; + } + + // ------------------------------------------------------------------------- + // getPinnedCertsAsSlice + // ------------------------------------------------------------------------- + + /// + /// Converts a to a plain string array. + /// Returns null if the set is empty. + /// Mirrors Go getPinnedCertsAsSlice. + /// + internal static string[]? GetPinnedCertsAsSlice(PinnedCertSet? certs) + { + if (certs == null || certs.Count == 0) + return null; + var result = new string[certs.Count]; + certs.CopyTo(result); + return result; + } + + // ------------------------------------------------------------------------- + // getMonitorGWOptions + // ------------------------------------------------------------------------- + + /// + /// Extracts gateway name filter and accounts flag from . + /// When AccountName is set but Accounts is false, the accounts flag is + /// implicitly promoted to true. + /// Mirrors Go getMonitorGWOptions. + /// + internal static (string Name, bool Accounts) GetMonitorGWOptions(GatewayzOptions? opts) + { + if (opts == null) + return (string.Empty, false); + var name = opts.Name; + var accs = opts.Accounts; + if (!accs && !string.IsNullOrEmpty(opts.AccountName)) + accs = true; + return (name, accs); + } + + // ------------------------------------------------------------------------- + // createOutboundRemoteGatewayz + // ------------------------------------------------------------------------- + + /// + /// Builds a from an outbound gateway client connection. + /// Client lock is acquired internally. + /// Mirrors Go createOutboundRemoteGatewayz. + /// Note: Per-account interest detail (outsim) requires a running gateway layer; + /// account lists are empty in the current partial port. + /// + internal static (string Name, RemoteGatewayz? Rgw) CreateOutboundRemoteGatewayz( + ClientConnection c, + GatewayzOptions? opts, + DateTime now, + bool doAccs) + { + lock (c) + { + var name = c.Gateway?.Name; + if (string.IsNullOrEmpty(name)) + return (string.Empty, null); + + var isConfigured = c.Gateway?.Cfg != null && !c.Gateway.Cfg.IsImplicit(); + var rgw = new RemoteGatewayz + { + IsConfigured = isConfigured, + Connection = new ConnInfo(), + Accounts = doAccs ? [] : null, + }; + return (name, rgw); + } + } + + // ------------------------------------------------------------------------- + // createOutboundAccountsGatewayz + // ------------------------------------------------------------------------- + + /// + /// Returns the per-account interest list for an outbound gateway connection. + /// Mirrors Go createOutboundAccountsGatewayz. + /// Note: Requires fully-ported gateway outsim state; returns empty list in current port. + /// + internal static List CreateOutboundAccountsGatewayz( + GatewayzOptions? opts, + ClientConnection c) + { + // outsim not yet ported — return empty. + return []; + } + + // ------------------------------------------------------------------------- + // createAccountOutboundGatewayz + // ------------------------------------------------------------------------- + + /// + /// Creates an entry for a named outbound account. + /// Mirrors Go createAccountOutboundGatewayz. + /// Note: Requires fully-ported outsie state; returns Optimistic defaults. + /// + internal static AccountGatewayz CreateAccountOutboundGatewayz( + GatewayzOptions? opts, + string name, + object? outsie) + { + // outsie not yet ported — return Optimistic defaults. + return new AccountGatewayz + { + Name = name, + InterestMode = GatewayInterestMode.Optimistic.String(), + }; + } + + // ------------------------------------------------------------------------- + // createInboundAccountsGatewayz + // ------------------------------------------------------------------------- + + /// + /// Returns the per-account interest list for an inbound gateway connection. + /// Mirrors Go createInboundAccountsGatewayz. + /// Note: Requires fully-ported gateway insim state; returns empty list. + /// + internal static List CreateInboundAccountsGatewayz( + GatewayzOptions? opts, + ClientConnection c) + { + // insim not yet ported — return empty. + return []; + } + + // ------------------------------------------------------------------------- + // createInboundAccountGatewayz + // ------------------------------------------------------------------------- + + /// + /// Creates an entry for a named inbound account. + /// Mirrors Go createInboundAccountGatewayz. + /// Note: Requires fully-ported insie state; returns Optimistic defaults. + /// + internal static AccountGatewayz CreateInboundAccountGatewayz(string name, object? insie) + { + // insie not yet ported — return Optimistic defaults. + return new AccountGatewayz + { + Name = name, + InterestMode = GatewayInterestMode.Optimistic.String(), + }; + } + + // ------------------------------------------------------------------------- + // ResponseHandler / handleResponse + // ------------------------------------------------------------------------- + + /// + /// Writes a JSON (or JSONP) HTTP monitoring response with 200 OK. + /// Mirrors Go ResponseHandler. + /// + internal static void ResponseHandler( + HttpListenerResponse response, + HttpListenerRequest request, + byte[] data) + { + HandleResponse(200, response, request, data); + } + + /// + /// Writes a JSON (or JSONP if the callback query param is set) HTTP monitoring response. + /// Mirrors Go handleResponse. + /// + internal static void HandleResponse( + int statusCode, + HttpListenerResponse response, + HttpListenerRequest request, + byte[] data) + { + var callback = request.QueryString["callback"] ?? string.Empty; + response.StatusCode = statusCode; + + if (callback.Length > 0) + { + response.ContentType = "application/javascript"; + var prefix = Encoding.UTF8.GetBytes($"{callback}("); + var suffix = Encoding.UTF8.GetBytes(")"); + response.OutputStream.Write(prefix); + response.OutputStream.Write(data); + response.OutputStream.Write(suffix); + } + else + { + response.ContentType = "application/json"; + response.Headers["Access-Control-Allow-Origin"] = "*"; + response.OutputStream.Write(data); + } + } + + // ------------------------------------------------------------------------- + // newExtServiceLatency + // ------------------------------------------------------------------------- + + /// + /// Converts an to an + /// for the /accountz response. Returns null if input is null. + /// Mirrors Go newExtServiceLatency. + /// + internal static ExtServiceLatency? NewExtServiceLatency(InternalServiceLatency? l) + { + if (l == null) + return null; + return new ExtServiceLatency + { + Sampling = l.Sampling, + Results = l.Subject, + }; + } + + // ------------------------------------------------------------------------- + // newExtImport + // ------------------------------------------------------------------------- + + /// + /// Converts a to an + /// for the /accountz response. + /// Mirrors Go newExtImport. + /// + internal static ExtImport NewExtImport(ServiceImportEntry? v) + { + if (v == null) + return new ExtImport { Invalid = true }; + + return new ExtImport + { + Invalid = v.Invalid, + Share = v.Share, + Tracking = v.Tracking, + TrackingHeader = v.TrackingHeader, + Latency = NewExtServiceLatency(v.Latency), + M1 = v.M1, + Subject = v.To, + Account = v.Account?.Name, + LocalSubject = v.From, + }; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Protocol/ParserTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Protocol/ParserTypes.cs index b4e1e78..5c3b45b 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Protocol/ParserTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Protocol/ParserTypes.cs @@ -122,6 +122,12 @@ public sealed class PublishArgument public int HeaderSize { get; set; } = -1; public bool Delivered { get; set; } + /// + /// Active message-trace state for the current message, if tracing is enabled. + /// Mirrors Go pubArg.trace *msgTrace in server/msgtrace.go. + /// + public object? Trace { get; set; } + /// Resets all fields to their defaults. public void Reset() { @@ -139,6 +145,7 @@ public sealed class PublishArgument Size = 0; HeaderSize = -1; Delivered = false; + Trace = null; } } @@ -168,4 +175,14 @@ public sealed class ParseContext // ---- Internal scratch buffer ---- internal byte[] Scratch { get; } = new byte[ServerConstants.MaxControlLineSize]; + + /// + /// Convenience accessor for the message-trace state on the current publish arg. + /// Mirrors Go c.pa.trace. + /// + internal ZB.MOM.NatsNet.Server.MsgTraceState? Trace + { + get => Pa.Trace as ZB.MOM.NatsNet.Server.MsgTraceState; + set => Pa.Trace = value; + } }