feat(batch42): implement foundation helpers — msgtrace, monitor helpers, scheduler
Group A: Create MsgTrace.cs with TraceCompressionType, MsgTraceState (factory methods, pipeline event helpers, sendEvent/sendEventFromJetStream, trace header injection) and MsgTraceHelper (sample, genHeaderMapIfTraceHeadersPresent, initAndSendIngressErrEvent, isMsgTraceEnabled, msgTraceSupport). Adds Trace field to PublishArgument and Trace accessor to ParseContext. Group C: Create MonitorHelpers.cs with GatewayzOptions, Gatewayz, RemoteGatewayz, AccountGatewayz, ExtImport, ExtServiceLatency types; plus 25 standalone helper functions (newSubsDetailList, newSubsList, createProxyInfo, makePeerCerts, decodeBool, decodeUint64, decodeInt, decodeState, decodeSubs, newSubDetail, newClientSubDetail, myUptime, tlsCertNotAfter, urlsToStrings, getPinnedCertsAsSlice, getMonitorGWOptions, createOutboundRemoteGatewayz, createOutboundAccountsGatewayz, createAccountOutboundGatewayz, createInboundAccountsGatewayz, createInboundAccountGatewayz, ResponseHandler, handleResponse, newExtServiceLatency, newExtImport). Group D: Implement GetScheduledMessages in MsgScheduling; add Seq field to InMsg for out-of-band scheduling sort. Group B (GatewayInterestMode.String) already complete.
This commit is contained in:
@@ -14,6 +14,7 @@
|
||||
// Adapted from server/scheduler.go in the NATS server Go source.
|
||||
|
||||
using System.Buffers.Binary;
|
||||
using ZB.MOM.NatsNet.Server;
|
||||
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
|
||||
|
||||
namespace ZB.MOM.NatsNet.Server.Internal;
|
||||
@@ -198,7 +199,124 @@ public sealed class MsgScheduling
|
||||
_timer = new Timer(_ => _run(), null, fireIn, Timeout.InfiniteTimeSpan);
|
||||
}
|
||||
|
||||
// getScheduledMessages is deferred to session 08/19 — requires JetStream inMsg, StoreMsg types.
|
||||
/// <summary>
|
||||
/// Processes expired schedule entries and returns the set of messages to be delivered.
|
||||
/// Each message is retrieved from storage, headers are cleaned and augmented, and the
|
||||
/// subject is replaced with the schedule target. Messages are returned sorted by
|
||||
/// sequence number.
|
||||
/// Mirrors Go <c>MsgScheduling.getScheduledMessages</c> in server/scheduler.go.
|
||||
/// </summary>
|
||||
/// <param name="loadMsg">
|
||||
/// Callback that loads a stored message by sequence number.
|
||||
/// The <c>StoreMsg</c> reuse buffer may be passed; returns <c>null</c> if not found.
|
||||
/// </param>
|
||||
/// <param name="loadLast">
|
||||
/// Callback that loads the last stored message for a given subject.
|
||||
/// Returns <c>null</c> if not found.
|
||||
/// </param>
|
||||
public List<InMsg> GetScheduledMessages(
|
||||
Func<ulong, ZB.MOM.NatsNet.Server.StoreMsg, ZB.MOM.NatsNet.Server.StoreMsg?> loadMsg,
|
||||
Func<string, ZB.MOM.NatsNet.Server.StoreMsg, ZB.MOM.NatsNet.Server.StoreMsg?> loadLast)
|
||||
{
|
||||
var smv = new ZB.MOM.NatsNet.Server.StoreMsg();
|
||||
List<InMsg>? msgs = null;
|
||||
|
||||
_ttls.ExpireTasks((seq, ts) =>
|
||||
{
|
||||
var sm = loadMsg(seq, smv);
|
||||
if (sm != null)
|
||||
{
|
||||
// Already in-flight for this subject — skip.
|
||||
var subj = sm.Subject;
|
||||
if (IsInflight(subj))
|
||||
return false;
|
||||
|
||||
// Validate the schedule pattern header.
|
||||
var patternBytes = NatsMessageHeaders.GetHeader(
|
||||
NatsHeaderConstants.JsSchedulePattern, sm.Hdr);
|
||||
if (patternBytes == null || patternBytes.Length == 0)
|
||||
{
|
||||
Remove(seq);
|
||||
return true;
|
||||
}
|
||||
var pattern = System.Text.Encoding.ASCII.GetString(patternBytes);
|
||||
|
||||
var (next, repeat, ok) = ParseMsgSchedule(pattern, ts);
|
||||
if (!ok)
|
||||
{
|
||||
Remove(seq);
|
||||
return true;
|
||||
}
|
||||
|
||||
var (ttl, ttlOk) = ZB.MOM.NatsNet.Server.NatsStream.GetMessageScheduleTTL(sm.Hdr);
|
||||
if (!ttlOk)
|
||||
{
|
||||
Remove(seq);
|
||||
return true;
|
||||
}
|
||||
|
||||
var target = ZB.MOM.NatsNet.Server.NatsStream.GetMessageScheduleTarget(sm.Hdr);
|
||||
if (string.IsNullOrEmpty(target))
|
||||
{
|
||||
Remove(seq);
|
||||
return true;
|
||||
}
|
||||
|
||||
var source = ZB.MOM.NatsNet.Server.NatsStream.GetMessageScheduleSource(sm.Hdr);
|
||||
if (!string.IsNullOrEmpty(source))
|
||||
{
|
||||
sm = loadLast(source, smv);
|
||||
if (sm == null)
|
||||
{
|
||||
Remove(seq);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// Copy headers and body — message lives beyond this callback.
|
||||
var hdr = sm.Hdr.Length > 0 ? (byte[])sm.Hdr.Clone() : [];
|
||||
var msg = sm.Msg.Length > 0 ? (byte[])sm.Msg.Clone() : [];
|
||||
|
||||
// Strip schedule-specific headers.
|
||||
hdr = NatsMessageHeaders.RemoveHeaderIfPresent(hdr, NatsHeaderConstants.JsSchedulePattern) ?? [];
|
||||
hdr = NatsMessageHeaders.RemoveHeaderIfPrefixPresent(hdr, "Nats-Schedule-") ?? [];
|
||||
hdr = NatsMessageHeaders.RemoveHeaderIfPrefixPresent(hdr, "Nats-Expected-") ?? [];
|
||||
hdr = NatsMessageHeaders.RemoveHeaderIfPresent(hdr, NatsHeaderConstants.JsMsgId) ?? [];
|
||||
hdr = NatsMessageHeaders.RemoveHeaderIfPresent(hdr, NatsHeaderConstants.JsMessageTtl) ?? [];
|
||||
hdr = NatsMessageHeaders.RemoveHeaderIfPresent(hdr, NatsHeaderConstants.JsMsgRollup) ?? [];
|
||||
|
||||
// Add scheduler-specific headers.
|
||||
hdr = NatsMessageHeaders.GenHeader(hdr, NatsHeaderConstants.JsScheduler, subj);
|
||||
if (!repeat)
|
||||
{
|
||||
hdr = NatsMessageHeaders.GenHeader(hdr, NatsHeaderConstants.JsScheduleNext,
|
||||
NatsHeaderConstants.JsScheduleNextPurge);
|
||||
}
|
||||
else
|
||||
{
|
||||
hdr = NatsMessageHeaders.GenHeader(hdr, NatsHeaderConstants.JsScheduleNext,
|
||||
next.ToString("yyyy-MM-ddTHH:mm:ssK"));
|
||||
}
|
||||
if (!string.IsNullOrEmpty(ttl))
|
||||
hdr = NatsMessageHeaders.GenHeader(hdr, NatsHeaderConstants.JsMessageTtl, ttl);
|
||||
|
||||
msgs ??= [];
|
||||
msgs.Add(new InMsg { Seq = seq, Subject = target, Hdr = hdr, Msg = msg });
|
||||
MarkInflight(subj);
|
||||
return false;
|
||||
}
|
||||
|
||||
Remove(seq);
|
||||
return true;
|
||||
});
|
||||
|
||||
if (msgs == null)
|
||||
return [];
|
||||
|
||||
// THW is unordered — sort by sequence before returning.
|
||||
msgs.Sort((a, b) => a.Seq.CompareTo(b.Seq));
|
||||
return msgs;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Encodes the current schedule state to a binary snapshot.
|
||||
|
||||
@@ -16,6 +16,7 @@ public sealed partial class InMsg
|
||||
Hdr = null;
|
||||
Msg = null;
|
||||
Client = null;
|
||||
Seq = 0;
|
||||
Pool.Add(this);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -250,6 +250,12 @@ public sealed partial class InMsg
|
||||
public string Subject { get; set; } = string.Empty;
|
||||
public string? Reply { get; set; }
|
||||
public byte[]? Hdr { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Optional sequence number, used for out-of-band sorting (e.g. scheduled messages).
|
||||
/// Mirrors Go <c>inMsg.seq uint64</c> in server/stream.go.
|
||||
/// </summary>
|
||||
public ulong Seq { get; set; }
|
||||
public byte[]? Msg { get; set; }
|
||||
|
||||
/// <summary>The originating client (opaque, set at runtime).</summary>
|
||||
|
||||
710
dotnet/src/ZB.MOM.NatsNet.Server/MessageTrace/MsgTrace.cs
Normal file
710
dotnet/src/ZB.MOM.NatsNet.Server/MessageTrace/MsgTrace.cs
Normal file
@@ -0,0 +1,710 @@
|
||||
// Copyright 2024-2026 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Adapted from server/msgtrace.go in the NATS server Go source.
|
||||
|
||||
using System.Text;
|
||||
using ZB.MOM.NatsNet.Server.Internal;
|
||||
|
||||
namespace ZB.MOM.NatsNet.Server;
|
||||
|
||||
// ============================================================================
|
||||
// Compression type enum — mirrors Go compressionType in server/client.go.
|
||||
// Used by MsgTrace to indicate encoding of trace messages.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Compression types used when sending trace events.
|
||||
/// Mirrors Go <c>compressionType</c> iota in server/client.go.
|
||||
/// </summary>
|
||||
internal enum TraceCompressionType
|
||||
{
|
||||
/// <summary>No compression. Mirrors Go <c>noCompression</c>.</summary>
|
||||
None = 0,
|
||||
|
||||
/// <summary>Snappy/S2 compression. Mirrors Go <c>snappyCompression</c>.</summary>
|
||||
Snappy = 1,
|
||||
|
||||
/// <summary>Gzip compression. Mirrors Go <c>gzipCompression</c>.</summary>
|
||||
Gzip = 2,
|
||||
|
||||
/// <summary>Compression type not recognised. Mirrors Go <c>unsupportedCompression</c>.</summary>
|
||||
Unsupported = 3,
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Error strings — mirrors Go const block near bottom of msgtrace.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Well-known error strings included in trace egress events.
|
||||
/// Mirrors Go const block in server/msgtrace.go.
|
||||
/// </summary>
|
||||
public static class MsgTraceErrors
|
||||
{
|
||||
/// <summary>Client does not support tracing and trace-only was requested.</summary>
|
||||
public const string TraceOnlyNoSupport = "Not delivered because remote does not support message tracing";
|
||||
|
||||
/// <summary>Client does not support tracing; delivered but no trace generated.</summary>
|
||||
public const string TraceNoSupport = "Message delivered but remote does not support message tracing so no trace event generated from there";
|
||||
|
||||
/// <summary>Not delivered due to echo suppression.</summary>
|
||||
public const string NoEcho = "Not delivered because of no echo";
|
||||
|
||||
/// <summary>Not delivered because the subject is publish-denied.</summary>
|
||||
public const string PubViolation = "Not delivered because publish denied for this subject";
|
||||
|
||||
/// <summary>Not delivered because the subscription denies this subject.</summary>
|
||||
public const string SubDeny = "Not delivered because subscription denies this subject";
|
||||
|
||||
/// <summary>Not delivered because the subscription is closed.</summary>
|
||||
public const string SubClosed = "Not delivered because subscription is closed";
|
||||
|
||||
/// <summary>Not delivered because the client is closed.</summary>
|
||||
public const string ClientClosed = "Not delivered because client is closed";
|
||||
|
||||
/// <summary>Not delivered because auto-unsubscribe limit exceeded.</summary>
|
||||
public const string AutoSubExceeded = "Not delivered because auto-unsubscribe exceeded";
|
||||
|
||||
/// <summary>Not delivered because fast producer is not stalled and consumer is slow.</summary>
|
||||
public const string FastProdNoStall = "Not delivered because fast producer not stalled and consumer is slow";
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// MsgTraceState — internal runtime state for a traced message
|
||||
// Mirrors Go <c>msgTrace</c> struct in server/msgtrace.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Runtime state object carried through message processing for a traced message.
|
||||
/// Mirrors Go <c>msgTrace</c> struct in server/msgtrace.go.
|
||||
/// </summary>
|
||||
internal sealed class MsgTraceState
|
||||
{
|
||||
/// <summary>
|
||||
/// Atomic counter used to co-ordinate sending between the main pipeline
|
||||
/// and the JetStream path (must reach 2 before the event is sent).
|
||||
/// Mirrors Go <c>ready int32</c>.
|
||||
/// </summary>
|
||||
private int _ready;
|
||||
|
||||
/// <summary>Server reference, used to publish the trace event.</summary>
|
||||
public INatsServer? Server { get; set; }
|
||||
|
||||
/// <summary>Account to which the trace event is published.</summary>
|
||||
public Account? Account { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Origin account name — set only when <see cref="Account"/> is null
|
||||
/// (account lookup failed). Mirrors Go <c>oan string</c>.
|
||||
/// </summary>
|
||||
public string OriginAccountName { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>Destination subject for the trace event.</summary>
|
||||
public string Dest { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>The top-level trace event being accumulated.</summary>
|
||||
public MsgTraceEvent Event { get; set; } = new();
|
||||
|
||||
/// <summary>
|
||||
/// Reference to the JetStream event inside <see cref="Event.Events"/>,
|
||||
/// so it can be updated after the fact.
|
||||
/// Mirrors Go <c>js *MsgTraceJetStream</c>.
|
||||
/// </summary>
|
||||
public MsgTraceJetStream? Js { get; set; }
|
||||
|
||||
/// <summary>Hop identifier received from the upstream server.</summary>
|
||||
public string Hop { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>Computed next-hop string (set during egress header generation).</summary>
|
||||
public string NextHop { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>When true, the message is not delivered — tracing only.</summary>
|
||||
public bool TraceOnly { get; set; }
|
||||
|
||||
/// <summary>Compression type for sending the trace event.</summary>
|
||||
public TraceCompressionType CompressionType { get; set; } = TraceCompressionType.None;
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Factory methods — mirrors Go new() methods on each concrete type.
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Creates a new <see cref="MsgTraceIngress"/> with the current timestamp.
|
||||
/// Mirrors Go <c>MsgTraceIngress.new()</c>.
|
||||
/// </summary>
|
||||
public static MsgTraceIngress NewIngress() => new()
|
||||
{
|
||||
Type = MsgTraceType.Ingress,
|
||||
Timestamp = DateTime.UtcNow,
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// Creates a new <see cref="MsgTraceSubjectMapping"/> with the current timestamp.
|
||||
/// Mirrors Go <c>MsgTraceSubjectMapping.new()</c>.
|
||||
/// </summary>
|
||||
public static MsgTraceSubjectMapping NewSubjectMapping() => new()
|
||||
{
|
||||
Type = MsgTraceType.SubjectMapping,
|
||||
Timestamp = DateTime.UtcNow,
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// Creates a new <see cref="MsgTraceStreamExport"/> with the current timestamp.
|
||||
/// Mirrors Go <c>MsgTraceStreamExport.new()</c>.
|
||||
/// </summary>
|
||||
public static MsgTraceStreamExport NewStreamExport() => new()
|
||||
{
|
||||
Type = MsgTraceType.StreamExport,
|
||||
Timestamp = DateTime.UtcNow,
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// Creates a new <see cref="MsgTraceServiceImport"/> with the current timestamp.
|
||||
/// Mirrors Go <c>MsgTraceServiceImport.new()</c>.
|
||||
/// </summary>
|
||||
public static MsgTraceServiceImport NewServiceImport() => new()
|
||||
{
|
||||
Type = MsgTraceType.ServiceImport,
|
||||
Timestamp = DateTime.UtcNow,
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// Creates a new <see cref="MsgTraceJetStream"/> with the current timestamp.
|
||||
/// Mirrors Go <c>MsgTraceJetStream.new()</c>.
|
||||
/// </summary>
|
||||
public static MsgTraceJetStream NewJetStream() => new()
|
||||
{
|
||||
Type = MsgTraceType.JetStream,
|
||||
Timestamp = DateTime.UtcNow,
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// Creates a new <see cref="MsgTraceEgress"/> with the current timestamp.
|
||||
/// Mirrors Go <c>MsgTraceEgress.new()</c>.
|
||||
/// </summary>
|
||||
public static MsgTraceEgress NewEgress() => new()
|
||||
{
|
||||
Type = MsgTraceType.Egress,
|
||||
Timestamp = DateTime.UtcNow,
|
||||
};
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Instance helpers
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Returns <c>true</c> if trace-only mode is active (message must not
|
||||
/// be delivered to subscribers).
|
||||
/// Mirrors Go <c>msgTrace.traceOnly()</c>.
|
||||
/// </summary>
|
||||
public bool IsTraceOnly() => TraceOnly;
|
||||
|
||||
/// <summary>
|
||||
/// Sets the ingress error field on the first event if it is an ingress.
|
||||
/// Mirrors Go <c>msgTrace.setIngressError()</c>.
|
||||
/// </summary>
|
||||
public void SetIngressError(string err)
|
||||
{
|
||||
if (Event.Ingress() is { } ingress)
|
||||
ingress.Error = err;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Appends a subject-mapping event.
|
||||
/// Mirrors Go <c>msgTrace.addSubjectMappingEvent()</c>.
|
||||
/// </summary>
|
||||
public void AddSubjectMappingEvent(string mappedSubject)
|
||||
{
|
||||
Event.Events.Add(new MsgTraceSubjectMapping
|
||||
{
|
||||
Type = MsgTraceType.SubjectMapping,
|
||||
Timestamp = DateTime.UtcNow,
|
||||
MappedTo = mappedSubject,
|
||||
});
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Appends an egress event.
|
||||
/// Mirrors Go <c>msgTrace.addEgressEvent()</c>.
|
||||
/// </summary>
|
||||
public void AddEgressEvent(ClientConnection dc, Subscription? sub, string err)
|
||||
{
|
||||
var egress = new MsgTraceEgress
|
||||
{
|
||||
Type = MsgTraceType.Egress,
|
||||
Timestamp = DateTime.UtcNow,
|
||||
Kind = (int)dc.Kind,
|
||||
Cid = dc.Cid,
|
||||
Name = MsgTraceHelper.GetConnName(dc),
|
||||
Hop = NextHop,
|
||||
Error = string.IsNullOrEmpty(err) ? null : err,
|
||||
};
|
||||
NextHop = string.Empty;
|
||||
|
||||
// For CLIENT connections, include subscription and queue info.
|
||||
if (dc.Kind == ClientKind.Client && sub is not null)
|
||||
{
|
||||
egress.Subscription = Encoding.UTF8.GetString(sub.Subject);
|
||||
if (sub.Queue is { Length: > 0 } q)
|
||||
egress.Queue = Encoding.UTF8.GetString(q);
|
||||
}
|
||||
|
||||
// Include account name if different from the ingress account.
|
||||
if ((dc.Kind == ClientKind.Client || dc.Kind == ClientKind.Leaf) &&
|
||||
dc._account is Account dcAcc &&
|
||||
Event.Ingress() is { } ing)
|
||||
{
|
||||
var dcAccName = dcAcc.GetName();
|
||||
if (!string.Equals(dcAccName, ing.Account, StringComparison.Ordinal))
|
||||
egress.Account = dcAccName;
|
||||
}
|
||||
|
||||
Event.Events.Add(egress);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Appends a stream-export event.
|
||||
/// Mirrors Go <c>msgTrace.addStreamExportEvent()</c>.
|
||||
/// </summary>
|
||||
public void AddStreamExportEvent(ClientConnection dc, string toSubject)
|
||||
{
|
||||
string accName;
|
||||
lock (dc)
|
||||
{
|
||||
accName = (dc._account as Account)?.GetName() ?? string.Empty;
|
||||
}
|
||||
|
||||
Event.Events.Add(new MsgTraceStreamExport
|
||||
{
|
||||
Type = MsgTraceType.StreamExport,
|
||||
Timestamp = DateTime.UtcNow,
|
||||
Account = accName,
|
||||
To = toSubject,
|
||||
});
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Appends a service-import event.
|
||||
/// Mirrors Go <c>msgTrace.addServiceImportEvent()</c>.
|
||||
/// </summary>
|
||||
public void AddServiceImportEvent(string accName, string from, string to)
|
||||
{
|
||||
Event.Events.Add(new MsgTraceServiceImport
|
||||
{
|
||||
Type = MsgTraceType.ServiceImport,
|
||||
Timestamp = DateTime.UtcNow,
|
||||
Account = accName,
|
||||
From = from,
|
||||
To = to,
|
||||
});
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Appends a JetStream event and retains a reference for later updates.
|
||||
/// Mirrors Go <c>msgTrace.addJetStreamEvent()</c>.
|
||||
/// </summary>
|
||||
public void AddJetStreamEvent(string streamName)
|
||||
{
|
||||
Js = new MsgTraceJetStream
|
||||
{
|
||||
Type = MsgTraceType.JetStream,
|
||||
Timestamp = DateTime.UtcNow,
|
||||
Stream = streamName,
|
||||
};
|
||||
Event.Events.Add(Js);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Updates the last JetStream event with subject and no-interest flag.
|
||||
/// Mirrors Go <c>msgTrace.updateJetStreamEvent()</c>.
|
||||
/// </summary>
|
||||
public void UpdateJetStreamEvent(string subject, bool noInterest)
|
||||
{
|
||||
if (Js is null) return;
|
||||
Js.Subject = subject;
|
||||
Js.NoInterest = noInterest;
|
||||
Js.Timestamp = DateTime.UtcNow;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Sends the trace event from the JetStream path.
|
||||
/// Mirrors Go <c>msgTrace.sendEventFromJetStream()</c>.
|
||||
/// </summary>
|
||||
public void SendEventFromJetStream(Exception? err)
|
||||
{
|
||||
if (Js is null) return;
|
||||
if (err is not null)
|
||||
Js.Error = err.Message;
|
||||
SendEvent();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Publishes the complete trace event.
|
||||
/// When there is a JetStream path, both the main and JetStream legs must
|
||||
/// call this before the event is sent (atomic counter reaching 2).
|
||||
/// Mirrors Go <c>msgTrace.sendEvent()</c>.
|
||||
/// </summary>
|
||||
public void SendEvent()
|
||||
{
|
||||
if (Js is not null)
|
||||
{
|
||||
var count = Interlocked.Increment(ref _ready);
|
||||
if (count != 2)
|
||||
return;
|
||||
}
|
||||
|
||||
// Server.sendInternalAccountSysMsg is a runtime operation — the full
|
||||
// publish path is wired up when the NatsServer runtime is available.
|
||||
// Mirrors Go: t.srv.sendInternalAccountSysMsg(t.acc, t.dest, &t.event.Server, t.event, t.ct)
|
||||
_ = Server;
|
||||
_ = Account;
|
||||
_ = Dest;
|
||||
_ = CompressionType;
|
||||
_ = Event;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Sets the Nats-Trace-Origin-Account header on <paramref name="msg"/> when
|
||||
/// the trace account differs from <paramref name="acc"/>.
|
||||
/// Mirrors Go <c>msgTrace.setOriginAccountHeaderIfNeeded()</c>.
|
||||
/// </summary>
|
||||
public byte[] SetOriginAccountHeaderIfNeeded(ClientConnection c, Account acc, byte[] msg)
|
||||
{
|
||||
string oan;
|
||||
if (Account is not null)
|
||||
{
|
||||
if (Account == acc) return msg;
|
||||
oan = Account.GetName();
|
||||
}
|
||||
else if (!string.Equals(OriginAccountName, acc.GetName(), StringComparison.Ordinal))
|
||||
{
|
||||
oan = OriginAccountName;
|
||||
}
|
||||
else
|
||||
{
|
||||
return msg;
|
||||
}
|
||||
|
||||
return NatsMessageHeaders.GenHeader(msg, MsgTraceHeaders.MsgTraceOriginAccount, oan);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Increments the hop counter and writes the Nats-Trace-Hop header.
|
||||
/// Mirrors Go <c>msgTrace.setHopHeader()</c>.
|
||||
/// </summary>
|
||||
public byte[] SetHopHeader(ClientConnection c, byte[] msg)
|
||||
{
|
||||
Event.Hops++;
|
||||
NextHop = Hop.Length > 0
|
||||
? $"{Hop}.{Event.Hops}"
|
||||
: $"{Event.Hops}";
|
||||
return NatsMessageHeaders.GenHeader(msg, MsgTraceHeaders.MsgTraceHop, NextHop);
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// MsgTraceHelper — static helpers mirroring Go free functions in msgtrace.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Static helpers for message tracing that mirror the free functions and
|
||||
/// client methods in Go's server/msgtrace.go.
|
||||
/// </summary>
|
||||
internal static class MsgTraceHelper
|
||||
{
|
||||
/// <summary>
|
||||
/// Header name for the Accept-Encoding header, used to detect the
|
||||
/// compression preference of an incoming connection.
|
||||
/// Mirrors Go <c>acceptEncodingHeader</c> const in client.go.
|
||||
/// </summary>
|
||||
internal const string AcceptEncodingHeader = "Accept-Encoding";
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// getTraceAs — generic type assertion helper
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Returns <paramref name="e"/> cast to <typeparamref name="T"/>, or null.
|
||||
/// Mirrors Go generic <c>getTraceAs[T]</c> in server/msgtrace.go.
|
||||
/// </summary>
|
||||
public static T? GetTraceAs<T>(IMsgTrace e) where T : class, IMsgTrace
|
||||
=> e as T;
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// getConnName
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Returns the remote-server name for ROUTER/GATEWAY/LEAF connections,
|
||||
/// or the client's connection name for CLIENT connections.
|
||||
/// Mirrors Go <c>getConnName(c *client)</c> in server/msgtrace.go.
|
||||
/// </summary>
|
||||
public static string GetConnName(ClientConnection c)
|
||||
{
|
||||
return c.Kind switch
|
||||
{
|
||||
ClientKind.Router when c.Route?.RemoteName is { Length: > 0 } rn => rn,
|
||||
ClientKind.Gateway when c.Gateway?.RemoteName is { Length: > 0 } gn => gn,
|
||||
ClientKind.Leaf when c.Leaf?.RemoteServer is { Length: > 0 } ls => ls,
|
||||
_ => c.Opts.Name,
|
||||
};
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// getCompressionType
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Maps the Accept-Encoding header value to an internal compression type.
|
||||
/// Mirrors Go <c>getCompressionType(cts string) compressionType</c>.
|
||||
/// </summary>
|
||||
public static TraceCompressionType GetCompressionType(string cts)
|
||||
{
|
||||
if (string.IsNullOrEmpty(cts))
|
||||
return TraceCompressionType.None;
|
||||
|
||||
cts = cts.ToLowerInvariant();
|
||||
if (cts.Contains("snappy") || cts.Contains("s2"))
|
||||
return TraceCompressionType.Snappy;
|
||||
if (cts.Contains("gzip"))
|
||||
return TraceCompressionType.Gzip;
|
||||
|
||||
return TraceCompressionType.Unsupported;
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// isMsgTraceEnabled
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Returns the active <see cref="MsgTraceState"/> for this connection (or null),
|
||||
/// and whether trace-only mode is active.
|
||||
/// Mirrors Go <c>client.isMsgTraceEnabled()</c>.
|
||||
/// </summary>
|
||||
public static (MsgTraceState? Trace, bool TraceOnly) IsMsgTraceEnabled(ClientConnection c)
|
||||
{
|
||||
var t = c.ParseCtx.Pa.Trace as MsgTraceState;
|
||||
if (t is null) return (null, false);
|
||||
return (t, t.IsTraceOnly());
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// msgTraceSupport
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Returns <c>true</c> when the connection supports message tracing.
|
||||
/// CLIENT connections always support tracing; ROUTER/GATEWAY/LEAF only
|
||||
/// do when their negotiated protocol is high enough.
|
||||
/// Mirrors Go <c>client.msgTraceSupport()</c>.
|
||||
/// </summary>
|
||||
public static bool MsgTraceSupport(ClientConnection c)
|
||||
=> c.Kind == ClientKind.Client || c.Opts.Protocol >= ServerProtocol.MsgTraceProto;
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// sample
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Returns <c>true</c> when the message should be included in a sample.
|
||||
/// Any value outside [1..99] is treated as 100% sampling.
|
||||
/// Mirrors Go <c>sample(sampling int) bool</c>.
|
||||
/// </summary>
|
||||
public static bool Sample(int sampling)
|
||||
{
|
||||
if (sampling <= 0 || sampling >= 100)
|
||||
return true;
|
||||
return Random.Shared.Next(100) <= sampling;
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// genHeaderMapIfTraceHeadersPresent
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Parses a raw NATS message header block and returns all header key/value
|
||||
/// pairs if the block contains either <c>Nats-Trace-Dest</c> or an enabled
|
||||
/// <c>traceparent</c> header.
|
||||
///
|
||||
/// The returned flag is <c>true</c> (external) when only the W3C traceparent
|
||||
/// header triggered the trace; <c>false</c> when the native Nats-Trace-Dest
|
||||
/// header was present.
|
||||
///
|
||||
/// Returns an empty map when neither trace header is present, or when
|
||||
/// <c>Nats-Trace-Dest</c> has the sentinel "trace disabled" value.
|
||||
///
|
||||
/// Mirrors Go <c>genHeaderMapIfTraceHeadersPresent</c> in server/msgtrace.go.
|
||||
/// </summary>
|
||||
public static (Dictionary<string, List<string>> Headers, bool External) GenHeaderMapIfTraceHeadersPresent(
|
||||
byte[] hdr)
|
||||
{
|
||||
var empty = (new Dictionary<string, List<string>>(), false);
|
||||
|
||||
if (hdr.Length == 0)
|
||||
return empty;
|
||||
|
||||
var hdrLineBuf = Encoding.ASCII.GetBytes(NatsHeaderConstants.HdrLine);
|
||||
if (!hdr.AsSpan().StartsWith(hdrLineBuf))
|
||||
return empty;
|
||||
|
||||
var traceDestBuf = Encoding.ASCII.GetBytes(MsgTraceHeaders.MsgTraceDest);
|
||||
var traceDestDisabledBuf = Encoding.ASCII.GetBytes(MsgTraceHeaders.MsgTraceDestDisabled);
|
||||
var traceParentBuf = Encoding.ASCII.GetBytes(MsgTraceHeaders.TraceParentHdr);
|
||||
|
||||
bool traceDestFound = false;
|
||||
bool traceParentFound = false;
|
||||
|
||||
var keys = new List<ReadOnlyMemory<byte>>(16);
|
||||
var vals = new List<ReadOnlyMemory<byte>>(16);
|
||||
|
||||
int i = hdrLineBuf.Length;
|
||||
while (i < hdr.Length)
|
||||
{
|
||||
int del = hdr.AsSpan(i).IndexOf((byte)':');
|
||||
if (del < 0) break;
|
||||
|
||||
var key = hdr.AsMemory(i, del);
|
||||
i += del + 1;
|
||||
|
||||
// Skip leading whitespace in value.
|
||||
while (i < hdr.Length && (hdr[i] == ' ' || hdr[i] == '\t'))
|
||||
i++;
|
||||
|
||||
int valStart = i;
|
||||
int nl = hdr.AsSpan(valStart).IndexOf("\r\n"u8);
|
||||
if (nl < 0) break;
|
||||
|
||||
int valEnd = valStart + nl;
|
||||
while (valEnd > valStart && (hdr[valEnd - 1] == ' ' || hdr[valEnd - 1] == '\t'))
|
||||
valEnd--;
|
||||
|
||||
var val = hdr.AsMemory(valStart, valEnd - valStart);
|
||||
|
||||
if (key.Length > 0 && val.Length > 0)
|
||||
{
|
||||
// Check for Nats-Trace-Dest.
|
||||
if (!traceDestFound && key.Span.SequenceEqual(traceDestBuf))
|
||||
{
|
||||
if (val.Span.SequenceEqual(traceDestDisabledBuf))
|
||||
return empty;
|
||||
traceDestFound = true;
|
||||
}
|
||||
// Check for traceparent (case-insensitive).
|
||||
else if (!traceParentFound &&
|
||||
Encoding.ASCII.GetString(key.Span).Equals(
|
||||
"traceparent", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
// Sampling bit is the last-flag byte of "version-traceId-parentId-flags".
|
||||
var valStr = Encoding.ASCII.GetString(val.Span);
|
||||
var parts = valStr.Split('-');
|
||||
if (parts.Length == 4 && parts[3].Length == 2 &&
|
||||
int.TryParse(parts[3],
|
||||
System.Globalization.NumberStyles.HexNumber,
|
||||
null, out var hexVal) &&
|
||||
(hexVal & 0x1) == 0x1)
|
||||
{
|
||||
traceParentFound = true;
|
||||
}
|
||||
}
|
||||
|
||||
keys.Add(key);
|
||||
vals.Add(val);
|
||||
}
|
||||
|
||||
i = valStart + nl + 2; // skip past CRLF
|
||||
}
|
||||
|
||||
if (!traceDestFound && !traceParentFound)
|
||||
return empty;
|
||||
|
||||
var map = new Dictionary<string, List<string>>(StringComparer.Ordinal);
|
||||
for (int k = 0; k < keys.Count; k++)
|
||||
{
|
||||
var kStr = Encoding.ASCII.GetString(keys[k].Span);
|
||||
var vStr = Encoding.ASCII.GetString(vals[k].Span);
|
||||
if (!map.TryGetValue(kStr, out var list))
|
||||
{
|
||||
list = [];
|
||||
map[kStr] = list;
|
||||
}
|
||||
list.Add(vStr);
|
||||
}
|
||||
|
||||
bool isExternal = !traceDestFound && traceParentFound;
|
||||
return (map, isExternal);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// initAndSendIngressErrEvent
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Creates a minimal trace state and immediately sends the ingress-error
|
||||
/// event. Used when an error is detected early (e.g. max-payload exceeded)
|
||||
/// before normal message processing begins.
|
||||
/// Mirrors Go <c>client.initAndSendIngressErrEvent()</c>.
|
||||
/// </summary>
|
||||
public static void InitAndSendIngressErrEvent(
|
||||
ClientConnection c,
|
||||
byte[] hdr,
|
||||
string dest,
|
||||
Exception? ingressError)
|
||||
{
|
||||
if (ingressError is null) return;
|
||||
|
||||
var ct = GetAcceptEncodingCompressionType(hdr);
|
||||
var trace = new MsgTraceState
|
||||
{
|
||||
Server = c.Server,
|
||||
Account = c._account as Account,
|
||||
Dest = dest,
|
||||
CompressionType = ct,
|
||||
Event = new MsgTraceEvent
|
||||
{
|
||||
Request = new MsgTraceRequest { MsgSize = c.ParseCtx.Pa.Size },
|
||||
Events =
|
||||
[
|
||||
new MsgTraceIngress
|
||||
{
|
||||
Type = MsgTraceType.Ingress,
|
||||
Timestamp = DateTime.UtcNow,
|
||||
Kind = (int)c.Kind,
|
||||
Cid = c.Cid,
|
||||
Name = GetConnName(c),
|
||||
Error = ingressError.Message,
|
||||
},
|
||||
],
|
||||
},
|
||||
};
|
||||
|
||||
trace.SendEvent();
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Private helpers
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Extracts the Accept-Encoding value from a raw header block and maps it
|
||||
/// to a <see cref="TraceCompressionType"/>.
|
||||
/// Mirrors Go <c>getAcceptEncoding(hdr []byte)</c> in server/client.go.
|
||||
/// </summary>
|
||||
private static TraceCompressionType GetAcceptEncodingCompressionType(byte[] hdr)
|
||||
{
|
||||
if (hdr.Length == 0) return TraceCompressionType.None;
|
||||
var value = NatsMessageHeaders.GetHeader(AcceptEncodingHeader, hdr);
|
||||
if (value is null || value.Length == 0) return TraceCompressionType.None;
|
||||
return GetCompressionType(Encoding.ASCII.GetString(value));
|
||||
}
|
||||
}
|
||||
802
dotnet/src/ZB.MOM.NatsNet.Server/Monitor/MonitorHelpers.cs
Normal file
802
dotnet/src/ZB.MOM.NatsNet.Server/Monitor/MonitorHelpers.cs
Normal file
@@ -0,0 +1,802 @@
|
||||
// Copyright 2013-2026 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Adapted from server/monitor.go in the NATS server Go source.
|
||||
|
||||
using System.Net;
|
||||
using System.Security.Cryptography;
|
||||
using System.Security.Cryptography.X509Certificates;
|
||||
using System.Text;
|
||||
using System.Text.Json.Serialization;
|
||||
using ZB.MOM.NatsNet.Server.Internal;
|
||||
|
||||
namespace ZB.MOM.NatsNet.Server;
|
||||
|
||||
// ============================================================================
|
||||
// GatewayzOptions — query options for the Gatewayz endpoint
|
||||
// Mirrors Go <c>GatewayzOptions</c> struct in server/monitor.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Options that control the output of a <c>Gatewayz</c> monitoring query.
|
||||
/// Mirrors Go <c>GatewayzOptions</c> struct in server/monitor.go.
|
||||
/// </summary>
|
||||
public sealed class GatewayzOptions
|
||||
{
|
||||
/// <summary>When non-empty, limits output to the gateway with this name. Mirrors Go <c>Name</c>.</summary>
|
||||
[JsonPropertyName("name")]
|
||||
public string Name { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>When true, includes accounts with their interest. Mirrors Go <c>Accounts</c>.</summary>
|
||||
[JsonPropertyName("accounts")]
|
||||
public bool Accounts { get; set; }
|
||||
|
||||
/// <summary>Limits accounts to this specific name (implies <see cref="Accounts"/>). Mirrors Go <c>AccountName</c>.</summary>
|
||||
[JsonPropertyName("account_name")]
|
||||
public string AccountName { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>When true, subscription subjects are included in account results. Mirrors Go <c>AccountSubscriptions</c>.</summary>
|
||||
[JsonPropertyName("subscriptions")]
|
||||
public bool AccountSubscriptions { get; set; }
|
||||
|
||||
/// <summary>When true, verbose subscription details are included. Mirrors Go <c>AccountSubscriptionsDetail</c>.</summary>
|
||||
[JsonPropertyName("subscriptions_detail")]
|
||||
public bool AccountSubscriptionsDetail { get; set; }
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Gatewayz — top-level gateway monitoring response
|
||||
// Mirrors Go <c>Gatewayz</c> struct in server/monitor.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Top-level response type for the <c>/gatewayz</c> monitoring endpoint.
|
||||
/// Mirrors Go <c>Gatewayz</c> struct in server/monitor.go.
|
||||
/// </summary>
|
||||
public sealed class Gatewayz
|
||||
{
|
||||
[JsonPropertyName("server_id")]
|
||||
public string Id { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("now")]
|
||||
public DateTime Now { get; set; }
|
||||
|
||||
[JsonPropertyName("name")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? Name { get; set; }
|
||||
|
||||
[JsonPropertyName("host")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? Host { get; set; }
|
||||
|
||||
[JsonPropertyName("port")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
|
||||
public int Port { get; set; }
|
||||
|
||||
[JsonPropertyName("outbound_gateways")]
|
||||
public Dictionary<string, RemoteGatewayz> OutboundGateways { get; set; } = new();
|
||||
|
||||
[JsonPropertyName("inbound_gateways")]
|
||||
public Dictionary<string, List<RemoteGatewayz>> InboundGateways { get; set; } = new();
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// RemoteGatewayz — information about a single remote gateway connection
|
||||
// Mirrors Go <c>RemoteGatewayz</c> struct in server/monitor.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Information about a single outbound or inbound gateway connection.
|
||||
/// Mirrors Go <c>RemoteGatewayz</c> struct in server/monitor.go.
|
||||
/// </summary>
|
||||
public sealed class RemoteGatewayz
|
||||
{
|
||||
/// <summary>True if the gateway was explicitly configured (not implicit). Mirrors Go <c>IsConfigured</c>.</summary>
|
||||
[JsonPropertyName("configured")]
|
||||
public bool IsConfigured { get; set; }
|
||||
|
||||
/// <summary>Connection details. Mirrors Go <c>Connection *ConnInfo</c>.</summary>
|
||||
[JsonPropertyName("connection")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public ConnInfo? Connection { get; set; }
|
||||
|
||||
/// <summary>Per-account interest information. Mirrors Go <c>Accounts []*AccountGatewayz</c>.</summary>
|
||||
[JsonPropertyName("accounts")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public List<AccountGatewayz>? Accounts { get; set; }
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// AccountGatewayz — per-account interest mode on a gateway
|
||||
// Mirrors Go <c>AccountGatewayz</c> struct in server/monitor.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Per-account interest mode information for a gateway connection.
|
||||
/// Mirrors Go <c>AccountGatewayz</c> struct in server/monitor.go.
|
||||
/// </summary>
|
||||
public sealed class AccountGatewayz
|
||||
{
|
||||
[JsonPropertyName("name")]
|
||||
public string Name { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("interest_mode")]
|
||||
public string InterestMode { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("no_interest_count")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
|
||||
public int NoInterestCount { get; set; }
|
||||
|
||||
[JsonPropertyName("interest_only_threshold")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
|
||||
public int InterestOnlyThreshold { get; set; }
|
||||
|
||||
[JsonPropertyName("num_subs")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
|
||||
public int TotalSubscriptions { get; set; }
|
||||
|
||||
[JsonPropertyName("num_queue_subs")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
|
||||
public int NumQueueSubscriptions { get; set; }
|
||||
|
||||
[JsonPropertyName("subscriptions_list")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public List<string>? Subs { get; set; }
|
||||
|
||||
[JsonPropertyName("subscriptions_list_detail")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public List<SubDetail>? SubsDetail { get; set; }
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// ExtImport — external account import detail for /accountz
|
||||
// Mirrors Go <c>ExtImport</c> struct in server/monitor.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// External view of a service import entry, as returned by the <c>/accountz</c> endpoint.
|
||||
/// Mirrors Go <c>ExtImport</c> struct in server/monitor.go.
|
||||
/// Note: The JWT <c>Import</c> embedded struct fields are inlined here since the
|
||||
/// nats.io/jwt library is not yet ported.
|
||||
/// </summary>
|
||||
public sealed class ExtImport
|
||||
{
|
||||
/// <summary>Whether this import is invalid. Mirrors Go <c>Invalid bool</c>.</summary>
|
||||
[JsonPropertyName("invalid")]
|
||||
public bool Invalid { get; set; }
|
||||
|
||||
/// <summary>Whether the requestor's client info is shared. Mirrors Go <c>Share bool</c>.</summary>
|
||||
[JsonPropertyName("share")]
|
||||
public bool Share { get; set; }
|
||||
|
||||
/// <summary>Whether latency tracking is enabled. Mirrors Go <c>Tracking bool</c>.</summary>
|
||||
[JsonPropertyName("tracking")]
|
||||
public bool Tracking { get; set; }
|
||||
|
||||
/// <summary>Headers used when latency is triggered by a header. Mirrors Go <c>TrackingHdr http.Header</c>.</summary>
|
||||
[JsonPropertyName("tracking_header")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public Dictionary<string, string[]>? TrackingHeader { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Latency configuration from the exporting account's JWT claim.
|
||||
/// Mirrors Go <c>Latency *jwt.ServiceLatency</c>.
|
||||
/// Sampling and subject are stored directly since jwt lib is not ported.
|
||||
/// </summary>
|
||||
[JsonPropertyName("latency")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public ExtServiceLatency? Latency { get; set; }
|
||||
|
||||
/// <summary>First-leg latency measurement. Mirrors Go <c>M1 *ServiceLatency</c>.</summary>
|
||||
[JsonPropertyName("m1")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public ServiceLatency? M1 { get; set; }
|
||||
|
||||
// Inlined jwt.Import fields.
|
||||
|
||||
/// <summary>Subject of the imported service. Mirrors Go <c>jwt.Import.Subject</c>.</summary>
|
||||
[JsonPropertyName("subject")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? Subject { get; set; }
|
||||
|
||||
/// <summary>Account that exports the service. Mirrors Go <c>jwt.Import.Account</c>.</summary>
|
||||
[JsonPropertyName("account")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? Account { get; set; }
|
||||
|
||||
/// <summary>Local subject used on the importing account. Mirrors Go <c>jwt.Import.LocalSubject</c>.</summary>
|
||||
[JsonPropertyName("local_subject")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? LocalSubject { get; set; }
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// ExtServiceLatency — external representation of service latency config
|
||||
// Used by ExtImport in place of jwt.ServiceLatency.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// External representation of service latency configuration, used in <see cref="ExtImport"/>.
|
||||
/// Mirrors Go <c>jwt.ServiceLatency</c> from nats.io/jwt/v2.
|
||||
/// </summary>
|
||||
public sealed class ExtServiceLatency
|
||||
{
|
||||
[JsonPropertyName("sampling")]
|
||||
public int Sampling { get; set; }
|
||||
|
||||
[JsonPropertyName("results")]
|
||||
public string Results { get; set; } = string.Empty;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// MonitorHelpers — standalone helper functions
|
||||
// Mirrors standalone functions in server/monitor.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Standalone helper functions used by the monitoring subsystem.
|
||||
/// Mirrors package-level functions from <c>server/monitor.go</c>.
|
||||
/// </summary>
|
||||
internal static class MonitorHelpers
|
||||
{
|
||||
// -------------------------------------------------------------------------
|
||||
// newSubsDetailList
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Builds a verbose subscription detail list for a client connection.
|
||||
/// Client must be locked by caller.
|
||||
/// Mirrors Go <c>newSubsDetailList</c>.
|
||||
/// </summary>
|
||||
internal static List<SubDetail> NewSubsDetailList(ClientConnection client)
|
||||
{
|
||||
var result = new List<SubDetail>(client.Subs?.Count ?? 0);
|
||||
if (client.Subs == null)
|
||||
return result;
|
||||
foreach (var sub in client.Subs.Values)
|
||||
result.Add(NewClientSubDetail(sub, client.Cid));
|
||||
return result;
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// newSubsList
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Builds a plain subscription subject list for a client connection.
|
||||
/// Client must be locked by caller.
|
||||
/// Mirrors Go <c>newSubsList</c>.
|
||||
/// </summary>
|
||||
internal static List<string> NewSubsList(ClientConnection client)
|
||||
{
|
||||
var result = new List<string>(client.Subs?.Count ?? 0);
|
||||
if (client.Subs == null)
|
||||
return result;
|
||||
foreach (var sub in client.Subs.Values)
|
||||
result.Add(Encoding.UTF8.GetString(sub.Subject));
|
||||
return result;
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// createProxyInfo
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Returns a <see cref="ProxyInfo"/> if the connection has a proxy key set, or <c>null</c>.
|
||||
/// Client lock must be held on entry.
|
||||
/// Mirrors Go <c>createProxyInfo</c>.
|
||||
/// </summary>
|
||||
internal static ProxyInfo? CreateProxyInfo(ClientConnection c)
|
||||
{
|
||||
if (string.IsNullOrEmpty(c.ProxyKey))
|
||||
return null;
|
||||
return new ProxyInfo { Key = c.ProxyKey };
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// makePeerCerts
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Converts a list of X.509 peer certificates into <see cref="TlsPeerCert"/> summary records.
|
||||
/// Each record contains subject string, SPKI SHA-256 hex, and certificate SHA-256 hex.
|
||||
/// Mirrors Go <c>makePeerCerts</c>.
|
||||
/// </summary>
|
||||
internal static List<TlsPeerCert> MakePeerCerts(IReadOnlyList<X509Certificate2> peerCerts)
|
||||
{
|
||||
var result = new List<TlsPeerCert>(peerCerts.Count);
|
||||
foreach (var cert in peerCerts)
|
||||
{
|
||||
var spkiHash = SHA256.HashData(cert.PublicKey.ExportSubjectPublicKeyInfo());
|
||||
var certHash = SHA256.HashData(cert.RawData);
|
||||
result.Add(new TlsPeerCert
|
||||
{
|
||||
Subject = cert.Subject,
|
||||
SubjectPkiSha256 = Convert.ToHexString(spkiHash).ToLowerInvariant(),
|
||||
CertSha256 = Convert.ToHexString(certHash).ToLowerInvariant(),
|
||||
});
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// decodeBool
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Parses a boolean query-string parameter from an HTTP listener request.
|
||||
/// Writes a 400 status and returns an error if the value cannot be parsed.
|
||||
/// Mirrors Go <c>decodeBool</c>.
|
||||
/// </summary>
|
||||
internal static (bool Value, Exception? Error) DecodeBool(
|
||||
HttpListenerResponse response,
|
||||
System.Collections.Specialized.NameValueCollection query,
|
||||
string param)
|
||||
{
|
||||
var str = query[param] ?? string.Empty;
|
||||
if (str.Length == 0)
|
||||
return (false, null);
|
||||
if (bool.TryParse(str, out var val))
|
||||
return (val, null);
|
||||
if (str == "1") return (true, null);
|
||||
if (str == "0") return (false, null);
|
||||
var err = new FormatException($"Error decoding boolean for '{param}': {str}");
|
||||
response.StatusCode = 400;
|
||||
return (false, err);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// decodeUint64
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Parses a uint64 query-string parameter from an HTTP listener request.
|
||||
/// Mirrors Go <c>decodeUint64</c>.
|
||||
/// </summary>
|
||||
internal static (ulong Value, Exception? Error) DecodeUint64(
|
||||
HttpListenerResponse response,
|
||||
System.Collections.Specialized.NameValueCollection query,
|
||||
string param)
|
||||
{
|
||||
var str = query[param] ?? string.Empty;
|
||||
if (str.Length == 0)
|
||||
return (0, null);
|
||||
if (ulong.TryParse(str, out var val))
|
||||
return (val, null);
|
||||
var err = new FormatException($"Error decoding uint64 for '{param}': {str}");
|
||||
response.StatusCode = 400;
|
||||
return (0, err);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// decodeInt
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Parses an int query-string parameter from an HTTP listener request.
|
||||
/// Mirrors Go <c>decodeInt</c>.
|
||||
/// </summary>
|
||||
internal static (int Value, Exception? Error) DecodeInt(
|
||||
HttpListenerResponse response,
|
||||
System.Collections.Specialized.NameValueCollection query,
|
||||
string param)
|
||||
{
|
||||
var str = query[param] ?? string.Empty;
|
||||
if (str.Length == 0)
|
||||
return (0, null);
|
||||
if (int.TryParse(str, out var val))
|
||||
return (val, null);
|
||||
var err = new FormatException($"Error decoding int for '{param}': {str}");
|
||||
response.StatusCode = 400;
|
||||
return (0, err);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// decodeState
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Parses the connection-state filter query parameter.
|
||||
/// Mirrors Go <c>decodeState</c>.
|
||||
/// </summary>
|
||||
internal static (ConnState Value, Exception? Error) DecodeState(
|
||||
HttpListenerResponse response,
|
||||
System.Collections.Specialized.NameValueCollection query)
|
||||
{
|
||||
var str = query["state"] ?? string.Empty;
|
||||
if (str.Length == 0)
|
||||
return (ConnState.ConnOpen, null);
|
||||
switch (str.ToLowerInvariant())
|
||||
{
|
||||
case "open": return (ConnState.ConnOpen, null);
|
||||
case "closed": return (ConnState.ConnClosed, null);
|
||||
case "any":
|
||||
case "all": return (ConnState.ConnAll, null);
|
||||
}
|
||||
var err = new FormatException($"Error decoding state for {str}");
|
||||
response.StatusCode = 400;
|
||||
return (default, err);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// decodeSubs
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Parses the <c>subs</c> query parameter into <c>subs</c> and <c>subsDet</c> flags.
|
||||
/// Mirrors Go <c>decodeSubs</c>.
|
||||
/// </summary>
|
||||
internal static (bool Subs, bool SubsDetail, Exception? Error) DecodeSubs(
|
||||
HttpListenerResponse response,
|
||||
System.Collections.Specialized.NameValueCollection query)
|
||||
{
|
||||
var raw = query["subs"] ?? string.Empty;
|
||||
if (raw.Equals("detail", StringComparison.OrdinalIgnoreCase))
|
||||
return (false, true, null);
|
||||
var (subs, err) = DecodeBool(response, query, "subs");
|
||||
return (subs, false, err);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// newSubDetail
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Creates a <see cref="SubDetail"/> including account name from the owning client.
|
||||
/// Client must be locked on entry.
|
||||
/// Mirrors Go <c>newSubDetail</c>.
|
||||
/// </summary>
|
||||
internal static SubDetail NewSubDetail(Internal.Subscription sub, ClientConnection client)
|
||||
{
|
||||
var sd = NewClientSubDetail(sub, client.Cid);
|
||||
var acc = client.Account() as Account;
|
||||
sd.Account = acc?.GetName();
|
||||
sd.AccountTag = acc?.GetNameTag();
|
||||
return sd;
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// newClientSubDetail
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Creates a <see cref="SubDetail"/> from a subscription (no account name).
|
||||
/// Mirrors Go <c>newClientSubDetail</c>.
|
||||
/// </summary>
|
||||
internal static SubDetail NewClientSubDetail(Internal.Subscription sub, ulong cid)
|
||||
{
|
||||
return new SubDetail
|
||||
{
|
||||
Subject = Encoding.UTF8.GetString(sub.Subject),
|
||||
Queue = sub.Queue is { Length: > 0 }
|
||||
? Encoding.UTF8.GetString(sub.Queue)
|
||||
: null,
|
||||
Sid = sub.Sid is { Length: > 0 }
|
||||
? Encoding.UTF8.GetString(sub.Sid)
|
||||
: string.Empty,
|
||||
Cid = cid,
|
||||
};
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// myUptime
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Formats a <see cref="TimeSpan"/> as a human-readable uptime string
|
||||
/// (e.g. <c>"2d3h14m5s"</c>, <c>"45m30s"</c>).
|
||||
/// Mirrors Go <c>myUptime</c>.
|
||||
/// </summary>
|
||||
internal static string MyUptime(TimeSpan d)
|
||||
{
|
||||
var tsecs = (long)d.TotalSeconds;
|
||||
var tmins = tsecs / 60;
|
||||
var thrs = tmins / 60;
|
||||
var tdays = thrs / 24;
|
||||
var tyrs = tdays / 365;
|
||||
|
||||
if (tyrs > 0)
|
||||
return $"{tyrs}y{tdays % 365}d{thrs % 24}h{tmins % 60}m{tsecs % 60}s";
|
||||
if (tdays > 0)
|
||||
return $"{tdays}d{thrs % 24}h{tmins % 60}m{tsecs % 60}s";
|
||||
if (thrs > 0)
|
||||
return $"{thrs}h{tmins % 60}m{tsecs % 60}s";
|
||||
if (tmins > 0)
|
||||
return $"{tmins}m{tsecs % 60}s";
|
||||
return $"{tsecs}s";
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// tlsCertNotAfter
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Returns the expiry date of the first certificate in the given collection,
|
||||
/// or <see cref="DateTime.MinValue"/> if the collection is empty.
|
||||
/// Mirrors Go <c>tlsCertNotAfter</c>.
|
||||
/// </summary>
|
||||
internal static DateTime TlsCertNotAfter(X509CertificateCollection? certs)
|
||||
{
|
||||
if (certs == null || certs.Count == 0)
|
||||
return DateTime.MinValue;
|
||||
if (certs[0] is X509Certificate2 cert2)
|
||||
return cert2.NotAfter.ToUniversalTime();
|
||||
try
|
||||
{
|
||||
var parsed = new X509Certificate2(certs[0]);
|
||||
return parsed.NotAfter.ToUniversalTime();
|
||||
}
|
||||
catch
|
||||
{
|
||||
return DateTime.MinValue;
|
||||
}
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// urlsToStrings
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Converts a list of <see cref="Uri"/> objects to their <c>Host:Port</c> string form.
|
||||
/// Mirrors Go <c>urlsToStrings</c>.
|
||||
/// </summary>
|
||||
internal static string[] UrlsToStrings(IReadOnlyList<Uri> urls)
|
||||
{
|
||||
var result = new string[urls.Count];
|
||||
for (int i = 0; i < urls.Count; i++)
|
||||
result[i] = urls[i].Authority; // "host:port"
|
||||
return result;
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// getPinnedCertsAsSlice
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Converts a <see cref="PinnedCertSet"/> to a plain string array.
|
||||
/// Returns <c>null</c> if the set is empty.
|
||||
/// Mirrors Go <c>getPinnedCertsAsSlice</c>.
|
||||
/// </summary>
|
||||
internal static string[]? GetPinnedCertsAsSlice(PinnedCertSet? certs)
|
||||
{
|
||||
if (certs == null || certs.Count == 0)
|
||||
return null;
|
||||
var result = new string[certs.Count];
|
||||
certs.CopyTo(result);
|
||||
return result;
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// getMonitorGWOptions
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Extracts gateway name filter and accounts flag from <see cref="GatewayzOptions"/>.
|
||||
/// When <c>AccountName</c> is set but <c>Accounts</c> is false, the accounts flag is
|
||||
/// implicitly promoted to true.
|
||||
/// Mirrors Go <c>getMonitorGWOptions</c>.
|
||||
/// </summary>
|
||||
internal static (string Name, bool Accounts) GetMonitorGWOptions(GatewayzOptions? opts)
|
||||
{
|
||||
if (opts == null)
|
||||
return (string.Empty, false);
|
||||
var name = opts.Name;
|
||||
var accs = opts.Accounts;
|
||||
if (!accs && !string.IsNullOrEmpty(opts.AccountName))
|
||||
accs = true;
|
||||
return (name, accs);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// createOutboundRemoteGatewayz
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Builds a <see cref="RemoteGatewayz"/> from an outbound gateway client connection.
|
||||
/// Client lock is acquired internally.
|
||||
/// Mirrors Go <c>createOutboundRemoteGatewayz</c>.
|
||||
/// Note: Per-account interest detail (outsim) requires a running gateway layer;
|
||||
/// account lists are empty in the current partial port.
|
||||
/// </summary>
|
||||
internal static (string Name, RemoteGatewayz? Rgw) CreateOutboundRemoteGatewayz(
|
||||
ClientConnection c,
|
||||
GatewayzOptions? opts,
|
||||
DateTime now,
|
||||
bool doAccs)
|
||||
{
|
||||
lock (c)
|
||||
{
|
||||
var name = c.Gateway?.Name;
|
||||
if (string.IsNullOrEmpty(name))
|
||||
return (string.Empty, null);
|
||||
|
||||
var isConfigured = c.Gateway?.Cfg != null && !c.Gateway.Cfg.IsImplicit();
|
||||
var rgw = new RemoteGatewayz
|
||||
{
|
||||
IsConfigured = isConfigured,
|
||||
Connection = new ConnInfo(),
|
||||
Accounts = doAccs ? [] : null,
|
||||
};
|
||||
return (name, rgw);
|
||||
}
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// createOutboundAccountsGatewayz
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Returns the per-account interest list for an outbound gateway connection.
|
||||
/// Mirrors Go <c>createOutboundAccountsGatewayz</c>.
|
||||
/// Note: Requires fully-ported gateway outsim state; returns empty list in current port.
|
||||
/// </summary>
|
||||
internal static List<AccountGatewayz> CreateOutboundAccountsGatewayz(
|
||||
GatewayzOptions? opts,
|
||||
ClientConnection c)
|
||||
{
|
||||
// outsim not yet ported — return empty.
|
||||
return [];
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// createAccountOutboundGatewayz
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Creates an <see cref="AccountGatewayz"/> entry for a named outbound account.
|
||||
/// Mirrors Go <c>createAccountOutboundGatewayz</c>.
|
||||
/// Note: Requires fully-ported outsie state; returns Optimistic defaults.
|
||||
/// </summary>
|
||||
internal static AccountGatewayz CreateAccountOutboundGatewayz(
|
||||
GatewayzOptions? opts,
|
||||
string name,
|
||||
object? outsie)
|
||||
{
|
||||
// outsie not yet ported — return Optimistic defaults.
|
||||
return new AccountGatewayz
|
||||
{
|
||||
Name = name,
|
||||
InterestMode = GatewayInterestMode.Optimistic.String(),
|
||||
};
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// createInboundAccountsGatewayz
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Returns the per-account interest list for an inbound gateway connection.
|
||||
/// Mirrors Go <c>createInboundAccountsGatewayz</c>.
|
||||
/// Note: Requires fully-ported gateway insim state; returns empty list.
|
||||
/// </summary>
|
||||
internal static List<AccountGatewayz> CreateInboundAccountsGatewayz(
|
||||
GatewayzOptions? opts,
|
||||
ClientConnection c)
|
||||
{
|
||||
// insim not yet ported — return empty.
|
||||
return [];
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// createInboundAccountGatewayz
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Creates an <see cref="AccountGatewayz"/> entry for a named inbound account.
|
||||
/// Mirrors Go <c>createInboundAccountGatewayz</c>.
|
||||
/// Note: Requires fully-ported insie state; returns Optimistic defaults.
|
||||
/// </summary>
|
||||
internal static AccountGatewayz CreateInboundAccountGatewayz(string name, object? insie)
|
||||
{
|
||||
// insie not yet ported — return Optimistic defaults.
|
||||
return new AccountGatewayz
|
||||
{
|
||||
Name = name,
|
||||
InterestMode = GatewayInterestMode.Optimistic.String(),
|
||||
};
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// ResponseHandler / handleResponse
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Writes a JSON (or JSONP) HTTP monitoring response with <c>200 OK</c>.
|
||||
/// Mirrors Go <c>ResponseHandler</c>.
|
||||
/// </summary>
|
||||
internal static void ResponseHandler(
|
||||
HttpListenerResponse response,
|
||||
HttpListenerRequest request,
|
||||
byte[] data)
|
||||
{
|
||||
HandleResponse(200, response, request, data);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Writes a JSON (or JSONP if the <c>callback</c> query param is set) HTTP monitoring response.
|
||||
/// Mirrors Go <c>handleResponse</c>.
|
||||
/// </summary>
|
||||
internal static void HandleResponse(
|
||||
int statusCode,
|
||||
HttpListenerResponse response,
|
||||
HttpListenerRequest request,
|
||||
byte[] data)
|
||||
{
|
||||
var callback = request.QueryString["callback"] ?? string.Empty;
|
||||
response.StatusCode = statusCode;
|
||||
|
||||
if (callback.Length > 0)
|
||||
{
|
||||
response.ContentType = "application/javascript";
|
||||
var prefix = Encoding.UTF8.GetBytes($"{callback}(");
|
||||
var suffix = Encoding.UTF8.GetBytes(")");
|
||||
response.OutputStream.Write(prefix);
|
||||
response.OutputStream.Write(data);
|
||||
response.OutputStream.Write(suffix);
|
||||
}
|
||||
else
|
||||
{
|
||||
response.ContentType = "application/json";
|
||||
response.Headers["Access-Control-Allow-Origin"] = "*";
|
||||
response.OutputStream.Write(data);
|
||||
}
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// newExtServiceLatency
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Converts an <see cref="InternalServiceLatency"/> to an <see cref="ExtServiceLatency"/>
|
||||
/// for the <c>/accountz</c> response. Returns <c>null</c> if input is <c>null</c>.
|
||||
/// Mirrors Go <c>newExtServiceLatency</c>.
|
||||
/// </summary>
|
||||
internal static ExtServiceLatency? NewExtServiceLatency(InternalServiceLatency? l)
|
||||
{
|
||||
if (l == null)
|
||||
return null;
|
||||
return new ExtServiceLatency
|
||||
{
|
||||
Sampling = l.Sampling,
|
||||
Results = l.Subject,
|
||||
};
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// newExtImport
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Converts a <see cref="ServiceImportEntry"/> to an <see cref="ExtImport"/>
|
||||
/// for the <c>/accountz</c> response.
|
||||
/// Mirrors Go <c>newExtImport</c>.
|
||||
/// </summary>
|
||||
internal static ExtImport NewExtImport(ServiceImportEntry? v)
|
||||
{
|
||||
if (v == null)
|
||||
return new ExtImport { Invalid = true };
|
||||
|
||||
return new ExtImport
|
||||
{
|
||||
Invalid = v.Invalid,
|
||||
Share = v.Share,
|
||||
Tracking = v.Tracking,
|
||||
TrackingHeader = v.TrackingHeader,
|
||||
Latency = NewExtServiceLatency(v.Latency),
|
||||
M1 = v.M1,
|
||||
Subject = v.To,
|
||||
Account = v.Account?.Name,
|
||||
LocalSubject = v.From,
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -122,6 +122,12 @@ public sealed class PublishArgument
|
||||
public int HeaderSize { get; set; } = -1;
|
||||
public bool Delivered { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Active message-trace state for the current message, if tracing is enabled.
|
||||
/// Mirrors Go <c>pubArg.trace *msgTrace</c> in server/msgtrace.go.
|
||||
/// </summary>
|
||||
public object? Trace { get; set; }
|
||||
|
||||
/// <summary>Resets all fields to their defaults.</summary>
|
||||
public void Reset()
|
||||
{
|
||||
@@ -139,6 +145,7 @@ public sealed class PublishArgument
|
||||
Size = 0;
|
||||
HeaderSize = -1;
|
||||
Delivered = false;
|
||||
Trace = null;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -168,4 +175,14 @@ public sealed class ParseContext
|
||||
// ---- Internal scratch buffer ----
|
||||
|
||||
internal byte[] Scratch { get; } = new byte[ServerConstants.MaxControlLineSize];
|
||||
|
||||
/// <summary>
|
||||
/// Convenience accessor for the message-trace state on the current publish arg.
|
||||
/// Mirrors Go <c>c.pa.trace</c>.
|
||||
/// </summary>
|
||||
internal ZB.MOM.NatsNet.Server.MsgTraceState? Trace
|
||||
{
|
||||
get => Pa.Trace as ZB.MOM.NatsNet.Server.MsgTraceState;
|
||||
set => Pa.Trace = value;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user