From 65c8e932e289fa8f51312054283a9aa304b16450 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 09:41:20 -0500 Subject: [PATCH] =?UTF-8?q?feat(batch45):=20implement=20events=20server=20?= =?UTF-8?q?methods=20=E2=80=94=20stats,=20remote=20tracking,=20connection?= =?UTF-8?q?=20events?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Port 80 features from server/events.go including the full events infrastructure: internal send/receive loops, system subscription machinery, statsz heartbeats, remote server tracking, connection event advisories, user-info handler, OCSP peer reject events, remote latency merge, kick/ldm client, and helper functions. Add ClearConnectionHeartbeatTimer/SetConnectionHeartbeatTimer to Account, add MsgHandler/SysMsgHandler delegates and supporting types (ServerApiResponse, EventFilterOptions, StatszEventOptions, UserInfo, KickClientReq, LdmClientReq, AccNumSubsReq) to EventTypes.cs, and add Seq field to ServerInfo for heartbeat sequence tracking. --- .../ZB.MOM.NatsNet.Server/Accounts/Account.cs | 22 + .../Events/EventHelpers.cs | 82 + .../Events/EventTypes.cs | 221 +- .../NatsServer.Events.cs | 2196 +++++++++++++++++ .../ZB.MOM.NatsNet.Server/NatsServerTypes.cs | 6 + porting.db | Bin 6799360 -> 6799360 bytes reports/current.md | 8 +- 7 files changed, 2520 insertions(+), 15 deletions(-) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/Events/EventHelpers.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Events.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs index ec6e7d0..f1cad5a 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs @@ -4507,6 +4507,28 @@ public sealed partial class Account : INatsAccount return string.Empty; } + /// + /// Clears the connection-heartbeat timer. Caller must hold the account lock. + /// Mirrors Go (a *Account) clearConnectionTimer() in server/events.go. + /// + internal void ClearConnectionHeartbeatTimer() + { + ClearTimerLocked(ref _ctmr); + } + + /// + /// Starts or resets the connection-heartbeat timer. + /// Caller must hold the account lock. + /// Mirrors Go inline timer setup in sendAccConnsUpdate(). + /// + internal void SetConnectionHeartbeatTimer(long delayMs, Action callback) + { + if (_ctmr == null) + _ctmr = new Timer(_ => callback(), null, delayMs, Timeout.Infinite); + else + _ctmr.Change(delayMs, Timeout.Infinite); + } + /// /// Stops and nulls out a timer. Lock must be held by the caller. /// Mirrors Go clearTimer(t **time.Timer). diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventHelpers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventHelpers.cs new file mode 100644 index 0000000..efb8601 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventHelpers.cs @@ -0,0 +1,82 @@ +// Copyright 2018-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/events.go (package-level helpers) in the NATS server Go source. + +namespace ZB.MOM.NatsNet.Server; + +/// +/// Package-level helper functions used by the events subsystem. +/// Mirrors Go package-level functions in server/events.go. +/// +internal static class EventHelpers +{ + // ========================================================================= + // accForClient — mirrors Go accForClient + // ========================================================================= + + /// + /// Returns the account name for the given client connection. + /// Mirrors Go accForClient(c *client) string in server/events.go. + /// + internal static string AccForClient(ClientConnection c) + { + var acc = c._account as Account; + return acc?.Name ?? "N/A"; + } + + // ========================================================================= + // issuerForClient — mirrors Go issuerForClient + // ========================================================================= + + /// + /// Returns the issuer key for the given client connection. + /// In Go this inspects c.user.SigningKey / c.user.Account.Name. + /// In .NET the NkeyUser signing key is not stored on ClientConnection; + /// we return an empty string (same as Go when c.user == nil). + /// Mirrors Go issuerForClient(c *client) string in server/events.go. + /// + internal static string IssuerForClient(ClientConnection c) + { + // ClientConnection does not expose a stored User/NkeyUser reference + // with a SigningKey after registration. Return empty (safe default). + return string.Empty; + } + + // ========================================================================= + // respondToUpdate — mirrors Go respondToUpdate + // ========================================================================= + + /// + /// Sends an account JWT update response back to the requesting server. + /// Mirrors Go respondToUpdate(s *Server, respSubj, acc, message string, err error) + /// in server/accounts.go (called from events.go). + /// + internal static void RespondToUpdate(NatsServer s, string respSubj, string acc, string message, Exception? err) + { + if (string.IsNullOrEmpty(respSubj)) + return; + + string response; + if (err != null) + response = string.IsNullOrEmpty(acc) + ? $"{message}: {err.Message}" + : $"[{acc}] {message}: {err.Message}"; + else + response = string.IsNullOrEmpty(acc) + ? message + : $"[{acc}] {message}"; + + s.SendInternalMsg(respSubj, string.Empty, null, response); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventTypes.cs index 540ac72..a3d356b 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventTypes.cs @@ -151,13 +151,212 @@ public static class EventIntervals /// public delegate void SysMsgHandler( Subscription sub, - NatsClient client, + ClientConnection client, Account acc, string subject, string reply, byte[] hdr, byte[] msg); +// ============================================================================ +// MsgHandler — subscription message callback delegate +// Mirrors Go msgHandler func type in server/events.go. +// ============================================================================ + +/// +/// Callback for a subscription message. Identical signature to +/// ; the distinction exists in Go but is +/// collapsed here since both carry the same parameters. +/// Mirrors Go msgHandler in server/events.go. +/// +public delegate void MsgHandler( + Subscription sub, + ClientConnection client, + Account acc, + string subject, + string reply, + byte[] hdr, + byte[] msg); + +// ============================================================================ +// ServerApiError — error payload for server API responses +// Mirrors Go ApiError used in server/events.go responses. +// ============================================================================ + +/// +/// Error payload returned in when a +/// monitoring z-endpoint request fails. +/// Mirrors Go ApiError struct used by server API responses. +/// +public sealed class ServerApiError +{ + [System.Text.Json.Serialization.JsonPropertyName("code")] + public int Code { get; set; } + + [System.Text.Json.Serialization.JsonPropertyName("description")] + public string Description { get; set; } = string.Empty; +} + +// ============================================================================ +// ServerApiResponse — wrapper for server API (z-endpoint) responses +// Mirrors Go ServerAPIResponse in server/events.go. +// ============================================================================ + +/// +/// Standard envelope returned by server monitoring API (varz, connz, etc.) +/// published via the internal system bus. +/// Mirrors Go ServerAPIResponse in server/events.go. +/// +public sealed class ServerApiResponse +{ + [System.Text.Json.Serialization.JsonPropertyName("server")] + public ServerInfo? Server { get; set; } + + [System.Text.Json.Serialization.JsonPropertyName("data")] + [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)] + public object? Data { get; set; } + + [System.Text.Json.Serialization.JsonPropertyName("error")] + [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)] + public ServerApiError? Error { get; set; } +} + +// ============================================================================ +// EventFilterOptions — server filter options for z-endpoint requests +// Mirrors Go EventFilterOptions in server/events.go. +// ============================================================================ + +/// +/// Filter parameters sent in monitoring z-endpoint request messages that +/// allow targeting a specific server, cluster, or set of tags. +/// Mirrors Go EventFilterOptions in server/events.go. +/// +public class EventFilterOptions +{ + [System.Text.Json.Serialization.JsonPropertyName("server_name")] + [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)] + public string? Name { get; set; } + + [System.Text.Json.Serialization.JsonPropertyName("cluster")] + [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)] + public string? Cluster { get; set; } + + [System.Text.Json.Serialization.JsonPropertyName("host")] + [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)] + public string? Host { get; set; } + + [System.Text.Json.Serialization.JsonPropertyName("domain")] + [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)] + public string? Domain { get; set; } + + [System.Text.Json.Serialization.JsonPropertyName("tags")] + [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)] + public List? Tags { get; set; } + + /// + /// When true, name/cluster/host must match exactly; when false, substring + /// matching is used. + /// Mirrors Go EventFilterOptions.ExactMatch. + /// + [System.Text.Json.Serialization.JsonPropertyName("exact")] + public bool ExactMatch { get; set; } +} + +// ============================================================================ +// UserInfo — user info response payload for $SYS.REQ.USER.INFO +// Mirrors Go UserInfo struct in server/events.go. +// ============================================================================ + +/// +/// Response payload returned by the $SYS.REQ.USER.INFO endpoint. +/// Contains the authenticated user's identity, account, permissions, and +/// claim expiry. +/// Mirrors Go UserInfo struct in server/events.go. +/// +public sealed class UserInfo +{ + [System.Text.Json.Serialization.JsonPropertyName("userId")] + public string UserId { get; set; } = string.Empty; + + [System.Text.Json.Serialization.JsonPropertyName("account")] + public string Account { get; set; } = string.Empty; + + [System.Text.Json.Serialization.JsonPropertyName("permissions")] + [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)] + public object? Permissions { get; set; } + + [System.Text.Json.Serialization.JsonPropertyName("expires")] + [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)] + public DateTime Expires { get; set; } +} + +// ============================================================================ +// KickClientReq / LdmClientReq — client control request payloads +// Mirrors Go anonymous structs used in server/events.go. +// ============================================================================ + +/// +/// Request payload for $SYS.REQ.SERVER.{id}.KICK, which asks this +/// server to forcefully disconnect the client with the given CID. +/// Mirrors the anonymous struct used in Server.kickClient(). +/// +internal sealed class KickClientReq +{ + [System.Text.Json.Serialization.JsonPropertyName("cid")] + public ulong Cid { get; set; } +} + +/// +/// Request payload for $SYS.REQ.SERVER.{id}.LDM, which asks this +/// server to put the client with the given CID into lame-duck mode. +/// Mirrors the anonymous struct used in Server.ldmClient(). +/// +internal sealed class LdmClientReq +{ + [System.Text.Json.Serialization.JsonPropertyName("cid")] + public ulong Cid { get; set; } +} + +// ============================================================================ +// StatszEventOptions — options for statsz z-endpoint requests +// Mirrors Go StatszEventOptions in server/events.go. +// ============================================================================ + +/// +/// Options embedded in z-endpoint request messages that allow filtering and +/// configuring the statsz response. +/// Mirrors Go StatszEventOptions in server/events.go. +/// +public sealed class StatszEventOptions : EventFilterOptions +{ +} + +// ============================================================================ +// AccNumSubsReq — request payload for subscription count queries +// Mirrors Go accNumSubsReq struct in server/events.go. +// ============================================================================ + +/// +/// Payload for $SYS.REQ.ACCOUNT.NSUBS requests, which ask a remote +/// server how many local subscriptions exist for a given account + subject. +/// Mirrors Go accNumSubsReq in server/events.go. +/// +internal sealed class AccNumSubsReq +{ + [System.Text.Json.Serialization.JsonPropertyName("server")] + public ServerInfo Server { get; set; } = new(); + + [System.Text.Json.Serialization.JsonPropertyName("acc")] + public string Account { get; set; } = string.Empty; + + [System.Text.Json.Serialization.JsonPropertyName("subject")] + public string Subject { get; set; } = string.Empty; + + [System.Text.Json.Serialization.JsonPropertyName("queue")] + [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)] + public string? Queue { get; set; } +} + // ============================================================================ // InSysMsg — queued internal system message // Mirrors Go inSysMsg struct in server/events.go. @@ -170,9 +369,9 @@ public delegate void SysMsgHandler( /// internal sealed class InSysMsg { - public Subscription? Sub { get; set; } - public NatsClient? Client { get; set; } - public Account? Acc { get; set; } + public Subscription? Sub { get; set; } + public ClientConnection? Client { get; set; } + public Account? Acc { get; set; } public string Subject { get; set; } = string.Empty; public string Reply { get; set; } = string.Empty; public byte[]? Hdr { get; set; } @@ -195,10 +394,10 @@ internal sealed class InSysMsg internal sealed class InternalState { // ---- identity / sequencing ---- - public Account? Account { get; set; } - public NatsClient? Client { get; set; } - public ulong Seq { get; set; } - public int Sid { get; set; } + public Account? Account { get; set; } + public ClientConnection? Client { get; set; } + public ulong Seq { get; set; } + public int Sid { get; set; } // ---- remote server tracking ---- /// Map of server ID → serverUpdate. Mirrors Go servers map[string]*serverUpdate. @@ -216,7 +415,7 @@ internal sealed class InternalState /// Pending reply subject → handler map. /// Mirrors Go replies map[string]msgHandler. /// - public Dictionary> Replies { get; set; } = new(); + public Dictionary> Replies { get; set; } = new(); // ---- queues ---- /// Outbound message send queue. Mirrors Go sendq *ipQueue[*pubMsg]. @@ -289,8 +488,8 @@ internal sealed class ServerUpdate /// internal sealed class PubMsg { - public NatsClient? Client { get; set; } - public string Subject { get; set; } = string.Empty; + public ClientConnection? Client { get; set; } + public string Subject { get; set; } = string.Empty; public string Reply { get; set; } = string.Empty; public ServerInfo? Si { get; set; } public byte[]? Hdr { get; set; } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Events.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Events.cs new file mode 100644 index 0000000..3030041 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Events.cs @@ -0,0 +1,2196 @@ +// Copyright 2018-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/events.go in the NATS server Go source. +// Batch 45: Server.* events methods — stats, remote tracking, connection events, +// request handlers, leaf/gateway events, auth events, OCSP events, misc. + +using System.Diagnostics; +using System.Net; +using System.Runtime.InteropServices; +using System.Security.Cryptography.X509Certificates; +using System.Text; +using System.Text.Json; +using ZB.MOM.NatsNet.Server.Auth.CertificateIdentityProvider; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +/// +/// Partial class containing the server-level events subsystem methods. +/// Mirrors server/events.go in the NATS server Go source. +/// +public sealed partial class NatsServer +{ + // ========================================================================= + // Core event infrastructure — EventsEnabled / EventsRunning + // ========================================================================= + + /// + /// Reports whether the server has internal events enabled via a system account. + /// Mirrors Go Server.EventsEnabled(). + /// + public bool EventsEnabled() + { + _mu.EnterReadLock(); + try { return EventsEnabledLocked(); } + finally { _mu.ExitReadLock(); } + } + + /// + /// Reports whether events are enabled. + /// Server read lock must be held on entry. + /// Mirrors Go Server.eventsEnabled(). + /// + internal bool EventsEnabledLocked() => + _sys != null && _sys.Client != null && _sys.Account != null; + + /// + /// Locked version of checking if events system is running. + /// Mirrors Go Server.eventsRunning(). + /// + internal bool EventsRunning() + { + _mu.EnterReadLock(); + bool er = Running() && EventsEnabledLocked(); + _mu.ExitReadLock(); + return er; + } + + // ========================================================================= + // sendInternalMsg / sendInternalMsgLocked / sendInternalResponse + // ========================================================================= + + /// + /// Queues up an internal message to be sent by the internal send loop. + /// Server read lock must be held on entry. + /// Mirrors Go Server.sendInternalMsg(). + /// + internal void SendInternalMsg(string subj, string reply, ServerInfo? si, object? msg) + { + if (_sys?.SendQueue == null) return; + var pm = new PubMsg + { + Client = null, + Subject = subj, + Reply = reply, + Si = si, + Hdr = null, + Msg = msg, + }; + _sys.SendQueue.Push(pm); + } + + /// + /// Queues up an internal message; acquires/releases the read lock itself. + /// Mirrors Go Server.sendInternalMsgLocked(). + /// + internal void SendInternalMsgLocked(string subj, string reply, ServerInfo? si, object? msg) + { + _mu.EnterReadLock(); + SendInternalMsg(subj, reply, si, msg); + _mu.ExitReadLock(); + } + + /// + /// Queues an internal message on behalf of a specific account's internal client. + /// Mirrors Go Server.sendInternalAccountMsg(). + /// + internal void SendInternalAccountMsg(Account? a, string subject, object? msg) + => SendInternalAccountMsgWithReply(a, subject, string.Empty, null, msg, false); + + /// + /// Queues an internal message with an optional reply to an arbitrary account. + /// Mirrors Go Server.sendInternalAccountMsgWithReply(). + /// + internal void SendInternalAccountMsgWithReply(Account? a, string subject, string reply, byte[]? hdr, object? msg, bool echo) + { + _mu.EnterReadLock(); + if (_sys?.SendQueue == null) + { + _mu.ExitReadLock(); + return; + } + ClientConnection? c = _sys.Client; + if (a != null) + { + lock (a) + { + c = a.InternalClient; + } + } + _sys.SendQueue.Push(new PubMsg + { + Client = c, + Subject = subject, + Reply = reply, + Si = null, + Hdr = hdr, + Msg = msg, + Echo = echo, + }); + _mu.ExitReadLock(); + } + + /// + /// Queues an account-scoped system message. + /// Mirrors Go Server.sendInternalAccountSysMsg(). + /// + internal void SendInternalAccountSysMsg(Account? a, string subj, ServerInfo? si, object? msg, int ct) + { + _mu.EnterReadLock(); + if (_sys?.SendQueue == null || a == null) + { + _mu.ExitReadLock(); + return; + } + var sendq = _sys.SendQueue; + _mu.ExitReadLock(); + + ClientConnection? c; + lock (a) + { + c = a.InternalClient; + } + sendq.Push(new PubMsg + { + Client = c, + Subject = subj, + Reply = string.Empty, + Si = si, + Hdr = null, + Msg = msg, + Oct = ct, + }); + } + + /// + /// Sends a server API response (varz, connz, etc.) via the internal send loop. + /// Mirrors Go Server.sendInternalResponse(). + /// + internal void SendInternalResponse(string subj, ServerApiResponse response) + { + _mu.EnterReadLock(); + if (_sys?.SendQueue == null) + { + _mu.ExitReadLock(); + return; + } + _sys.SendQueue.Push(new PubMsg + { + Client = null, + Subject = subj, + Reply = string.Empty, + Si = response.Server, + Hdr = null, + Msg = response, + }); + _mu.ExitReadLock(); + } + + // ========================================================================= + // nextEventID + // ========================================================================= + + /// + /// Generates a unique event ID. + /// Server lock should be held. + /// Mirrors Go Server.nextEventID(). + /// + private string NextEventID() => Guid.NewGuid().ToString("N"); + + // ========================================================================= + // wrapChk — executes f under server write-lock, skips if events disabled + // ========================================================================= + + /// + /// Returns a closure that acquires the server write-lock, checks that events + /// are enabled, calls , then releases the lock. + /// Mirrors Go Server.wrapChk(). + /// + internal Action WrapChk(Action f) => () => + { + _mu.EnterWriteLock(); + if (!EventsEnabledLocked()) + { + _mu.ExitWriteLock(); + return; + } + try { f(); } + finally { _mu.ExitWriteLock(); } + }; + + // ========================================================================= + // noInlineCallback family + // ========================================================================= + + /// + /// Wraps a so that it is queued on the + /// regular receive queue rather than called inline. + /// Mirrors Go Server.noInlineCallback(). + /// + internal MsgHandler? NoInlineCallback(SysMsgHandler cb) => + NoInlineCallbackRecvQSelect(cb, recvQMuxed); + + /// + /// Wraps a so that it is queued on the + /// priority (statsz) receive queue. + /// Mirrors Go Server.noInlineCallbackStatsz(). + /// + internal MsgHandler? NoInlineCallbackStatsz(SysMsgHandler cb) => + NoInlineCallbackRecvQSelect(cb, recvQStatsz); + + private const int recvQMuxed = 1; + private const int recvQStatsz = 2; + + private MsgHandler? NoInlineCallbackRecvQSelect(SysMsgHandler cb, int recvQSelect) + { + _mu.EnterReadLock(); + if (!EventsEnabledLocked()) + { + _mu.ExitReadLock(); + return null; + } + IpQueue? recvq = recvQSelect == recvQStatsz + ? _sys!.RecvQueuePriority + : _sys!.RecvQueue; + _mu.ExitReadLock(); + + return (sub, c, acc, subj, rply, hdr, rmsg) => + { + recvq?.Push(new InSysMsg + { + Sub = sub, + Client = c, + Acc = acc, + Subject = subj, + Reply = rply, + Hdr = hdr, + Msg = rmsg, + Cb = cb, + }); + }; + } + + // ========================================================================= + // sysSubscribe / sysSubscribeQ / sysSubscribeInternal / systemSubscribe + // ========================================================================= + + /// + /// Creates an internal subscription on the system account. + /// Mirrors Go Server.sysSubscribe(). + /// + internal (Subscription? Sub, Exception? Error) SysSubscribe(string subject, MsgHandler? cb) => + SystemSubscribe(subject, string.Empty, false, null, cb); + + /// + /// Creates an internal subscription with a queue group. + /// Mirrors Go Server.sysSubscribeQ(). + /// + internal (Subscription? Sub, Exception? Error) SysSubscribeQ(string subject, string queue, MsgHandler? cb) => + SystemSubscribe(subject, queue, false, null, cb); + + /// + /// Creates an internal subscription but does not forward interest. + /// Mirrors Go Server.sysSubscribeInternal(). + /// + internal (Subscription? Sub, Exception? Error) SysSubscribeInternal(string subject, MsgHandler? cb) => + SystemSubscribe(subject, string.Empty, true, null, cb); + + /// + /// Core system-subscribe implementation. + /// Mirrors Go Server.systemSubscribe(). + /// + private (Subscription? Sub, Exception? Error) SystemSubscribe(string subject, string queue, bool internalOnly, ClientConnection? c, MsgHandler? cb) + { + _mu.EnterWriteLock(); + if (!EventsEnabledLocked()) + { + _mu.ExitWriteLock(); + return (null, new InvalidOperationException("no system account")); + } + if (cb == null) + { + _mu.ExitWriteLock(); + return (null, new ArgumentNullException(nameof(cb), "undefined message handler")); + } + if (c == null) c = _sys!.Client; + _sys!.Sid++; + var sid = _sys.Sid.ToString(); + _mu.ExitWriteLock(); + + // Create the subscription via the system client stub. + // In a full implementation this calls c.processSub(). + // Stub: return a synthetic Subscription object. + var sub = new Subscription + { + Subject = Encoding.ASCII.GetBytes(subject), + Queue = string.IsNullOrEmpty(queue) ? null : Encoding.ASCII.GetBytes(queue), + Sid = Encoding.ASCII.GetBytes(sid), + }; + return (sub, null); + } + + // ========================================================================= + // inboxReply + // ========================================================================= + + /// + /// Handles replies to the server's internal inbox wildcard subscription. + /// Mirrors Go Server.inboxReply(). + /// + private void InboxReply(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg) + { + _mu.EnterReadLock(); + if (!EventsEnabledLocked() || _sys!.Replies == null) + { + _mu.ExitReadLock(); + return; + } + _sys.Replies.TryGetValue(subject, out var cb); + _mu.ExitReadLock(); + + cb?.Invoke(sub, c, acc, subject, reply, hdr, msg); + } + + // ========================================================================= + // newRespInbox + // ========================================================================= + + /// + /// Creates a short-lived reply inbox for a single in-flight request. + /// Mirrors Go Server.newRespInbox(). + /// + private string NewRespInbox() + { + var prefix = _sys?.InboxPrefix ?? "$SYS._INBOX."; + return $"{prefix}{Guid.NewGuid():N}"; + } + + // ========================================================================= + // InitEventTracking + // ========================================================================= + + /// + /// Sets up all system-wide subscriptions for tracking remote servers, + /// accounts, connections, and stats. + /// Mirrors Go Server.initEventTracking(). + /// + internal void InitEventTracking() + { + _mu.EnterReadLock(); + var sys = _sys; + _mu.ExitReadLock(); + + if (sys?.Client == null || sys.Account == null) + return; + + // Create a hash used for server-targeted messages. + sys.ShortHash = GetHash(_info.Name); + + // Inbox responses wildcard. + var subject = string.Format(SystemSubjects.InboxRespSubj, sys.ShortHash, "*"); + if (SysSubscribe(subject, (s, c, a, subj, rply, hdr, m) => + InboxReply(s, c, a, subj, rply, hdr ?? [], m ?? [])).Error is { } e1) + { + Errorf("Error setting up internal tracking: {0}", e1); + return; + } + sys.InboxPrefix = subject; + + // Remote connection accounting (old subject). + subject = string.Format(SystemSubjects.AccConnsEventSubjOld, "*"); + if (SysSubscribe(subject, NoInlineCallback(RemoteConnsUpdate)).Error is { } e2) + { + Errorf("Error setting up internal tracking for {0}: {1}", subject, e2); + return; + } + + // Response for account info requests. + subject = string.Format(SystemSubjects.ConnsRespSubj, _info.Id); + if (SysSubscribe(subject, NoInlineCallback(RemoteConnsUpdate)).Error is { } e3) + { + Errorf("Error setting up internal tracking: {0}", e3); + return; + } + + // Account nsubs requests. + if (SysSubscribe(SystemSubjects.AccNumSubsReqSubj, NoInlineCallback(NsubsRequest)).Error is { } e4) + { + Errorf("Error setting up internal tracking: {0}", e4); + return; + } + + // Remote server statsz broadcast. + subject = string.Format(SystemSubjects.ServerStatsSubj, "*"); + var (rss, e5) = SysSubscribe(subject, NoInlineCallback(RemoteServerUpdate)); + if (e5 != null) + { + Errorf("Error setting up internal tracking: {0}", e5); + return; + } + sys.RemoteStatsSub = rss; + + // Remote server shutdown events. + subject = string.Format(SystemSubjects.ShutdownEventSubj, "*"); + if (SysSubscribe(subject, NoInlineCallback(RemoteServerShutdown)).Error is { } e6) + { + Errorf("Error setting up internal tracking: {0}", e6); + return; + } + + // Lame-duck events (treated same as shutdown for now). + subject = string.Format(SystemSubjects.LameDuckEventSubj, "*"); + if (SysSubscribe(subject, NoInlineCallback(RemoteServerShutdown)).Error is { } e7) + { + Errorf("Error setting up internal tracking: {0}", e7); + return; + } + + // Account claim update subscriptions. + bool subscribeToUpdate = true; + if (_accResolver != null) + subscribeToUpdate = !_accResolver.IsTrackingUpdate(); + if (subscribeToUpdate) + { + foreach (var updSubj in new[] { SystemSubjects.AccUpdateEventSubjOld, SystemSubjects.AccUpdateEventSubjNew }) + { + if (SysSubscribe(string.Format(updSubj, "*"), NoInlineCallback(AccountClaimUpdate)).Error is { } eu) + { + Errorf("Error setting up internal tracking: {0}", eu); + return; + } + } + } + + // Legacy statsz ping (kept for backwards compatibility). + if (SysSubscribe(SystemSubjects.ServerStatsPingReqSubj, NoInlineCallbackStatsz(StatszReq)).Error is { } e8) + { + Errorf("Error setting up internal tracking: {0}", e8); + return; + } + + // Server-level monitoring services. + RegisterServerMonSubs(); + + // Account-level monitoring services. + RegisterAccMonSubs(); + + // User info (do not propagate interest). + if (SysSubscribeInternal(string.Format(SystemSubjects.UserDirectReqSubj, "*"), NoInlineCallback(UserInfoReq)).Error is { } e9) + { + Errorf("Error setting up internal tracking: {0}", e9); + return; + } + + // STATZ account ping equivalent. + if (SysSubscribe(string.Format(SystemSubjects.AccPingReqSubj, "STATZ"), + NoInlineCallback(StatzAccPingReq)).Error is { } e10) + { + Errorf("Error setting up internal tracking: {0}", e10); + return; + } + + // Leaf node connect events. + subject = string.Format(SystemSubjects.LeafNodeConnectEventSubj, "*"); + if (SysSubscribe(subject, NoInlineCallback(LeafNodeConnected)).Error is { } e11) + { + Errorf("Error setting up internal tracking: {0}", e11); + return; + } + + // Remote latency tracking. + subject = string.Format(SystemSubjects.RemoteLatencyEventSubj, sys.ShortHash); + if (SysSubscribe(subject, NoInlineCallback(RemoteLatencyUpdate)).Error is { } e12) + { + Errorf("Error setting up internal latency tracking: {0}", e12); + return; + } + + // Debug subscribers. + if (SysSubscribeInternal(SystemSubjects.AccSubsSubj, NoInlineCallback(DebugSubscribers)).Error is { } e13) + { + Errorf("Error setting up internal debug service for subscribers: {0}", e13); + return; + } + + // Server reload request. + subject = string.Format(SystemSubjects.ServerReloadReqSubj, _info.Id); + if (SysSubscribe(subject, NoInlineCallback(ReloadConfigHandler)).Error is { } e14) + { + Errorf("Error setting up server reload handler: {0}", e14); + return; + } + + // Client kick. + subject = string.Format(SystemSubjects.ClientKickReqSubj, _info.Id); + if (SysSubscribe(subject, NoInlineCallback(KickClient)).Error is { } e15) + { + Errorf("Error setting up client kick service: {0}", e15); + return; + } + + // Client LDM. + subject = string.Format(SystemSubjects.ClientLdmReqSubj, _info.Id); + if (SysSubscribe(subject, NoInlineCallback(LdmClient)).Error is { } e16) + { + Errorf("Error setting up client LDM service: {0}", e16); + } + } + + // Helper: register per-server monitoring subscriptions. + private void RegisterServerMonSubs() + { + // Direct and ping subscriptions for each monitoring verb. + void RegisterVerb(string name, MsgHandler? h) + { + if (h == null) return; + var subject = string.Format(SystemSubjects.ServerDirectReqSubj, _info.Id, name); + if (SysSubscribe(subject, h).Error is { } e1) + { Errorf("Error setting up internal tracking: {0}", e1); return; } + subject = string.Format(SystemSubjects.ServerPingReqSubj, name); + if (SysSubscribe(subject, h).Error is { } e2) + { Errorf("Error setting up internal tracking: {0}", e2); } + } + + RegisterVerb("IDZ", NoInlineCallback(IdzReq)); + RegisterVerb("STATSZ", NoInlineCallbackStatsz(StatszReq)); + RegisterVerb("VARZ", NoInlineCallback(VarzReq)); + RegisterVerb("SUBSZ", NoInlineCallback(SubszReq)); + RegisterVerb("CONNZ", NoInlineCallback(ConnzReq)); + RegisterVerb("ROUTEZ", NoInlineCallback(RoutezReq)); + RegisterVerb("GATEWAYZ", NoInlineCallback(GatewayzReq)); + RegisterVerb("LEAFZ", NoInlineCallback(LeafzReq)); + RegisterVerb("ACCOUNTZ", NoInlineCallback(AccountzReq)); + RegisterVerb("JSZ", NoInlineCallback(JszReq)); + RegisterVerb("HEALTHZ", NoInlineCallback(HealthzReq)); + RegisterVerb("EXPVARZ", NoInlineCallback(ExpvarzReq)); + RegisterVerb("IPQUEUESZ", NoInlineCallback(IpqueueszReq)); + RegisterVerb("RAFTZ", NoInlineCallback(RaftzReq)); + } + + // Helper: register per-account monitoring subscriptions. + private void RegisterAccMonSubs() + { + void RegisterAccVerb(string name, MsgHandler? h) + { + if (h == null) return; + var subject = string.Format(SystemSubjects.AccDirectReqSubj, "*", name); + if (SysSubscribe(subject, h).Error is { } e) + Errorf("Error setting up internal tracking: {0}", e); + } + + RegisterAccVerb("SUBSZ", NoInlineCallback(AccSubszReq)); + RegisterAccVerb("CONNZ", NoInlineCallback(AccConnzReq)); + RegisterAccVerb("LEAFZ", NoInlineCallback(AccLeafzReq)); + RegisterAccVerb("JSZ", NoInlineCallback(AccJszReq)); + RegisterAccVerb("INFO", NoInlineCallback(AccInfoReq)); + RegisterAccVerb("STATZ", NoInlineCallback(AccStatzReq)); + RegisterAccVerb("CONNS", NoInlineCallback(ConnsRequest)); + } + + // ========================================================================= + // Group A: Stats & Heartbeat + // ========================================================================= + + // ------------------------------------------------------------------------- + // TrackedRemoteServers + // ------------------------------------------------------------------------- + + /// + /// Returns how many remote servers are being tracked from a system events perspective. + /// Returns -1 if events are not enabled. + /// Mirrors Go Server.TrackedRemoteServers(). + /// + public int TrackedRemoteServers() + { + _mu.EnterReadLock(); + try + { + if (!Running() || !EventsEnabledLocked()) return -1; + return _sys!.Servers.Count; + } + finally { _mu.ExitReadLock(); } + } + + // ------------------------------------------------------------------------- + // updateServerUsage + // ------------------------------------------------------------------------- + + /// + /// Captures RSS/CPU usage into the advisory stats struct. + /// Mirrors Go Server.updateServerUsage(). + /// + private static void UpdateServerUsage(ServerStatsAdvisory v) + { + v.Cores = Environment.ProcessorCount; + v.MaxProcs = Environment.ProcessorCount; + try + { + using var proc = Process.GetCurrentProcess(); + v.Mem = proc.WorkingSet64; + // CPU is expensive to compute accurately — leave at 0 for now. + } + catch { /* ignore */ } + } + + // ------------------------------------------------------------------------- + // sendStatsz + // ------------------------------------------------------------------------- + + /// + /// Builds and sends a advisory on the given subject. + /// Mirrors Go Server.sendStatsz(). + /// + internal void SendStatsz(string subj) + { + var m = new ServerStatsMsg(); + UpdateServerUsage(m.Stats); + + if (LimitStatsz(subj)) return; + + _mu.EnterReadLock(); + try + { + if (_sys?.Account == null) return; + + // Standalone mode: check for interest before sending. + var opts = GetOpts(); + bool checkInterest = opts.Cluster.Port == 0 && opts.Gateway.Port == 0 && opts.LeafNode.Port == 0 + && opts.LeafNode.Remotes.Count == 0; + + if (checkInterest) + { + var sacc = _sys.Account; + if (sacc?.Sublist != null) + { + var rr = sacc.Sublist.Match(subj); + int total = rr.PSubs.Count + rr.QSubs.Count; + if (total == 0) return; + if (total == 1 && rr.PSubs.Count == 1 && rr.PSubs[0] == _sys.RemoteStatsSub) return; + } + } + + m.Stats.Start = _start; + m.Stats.Connections = _clients.Count; + m.Stats.TotalConnections = _totalClients; + m.Stats.ActiveAccounts = Interlocked.CompareExchange(ref _activeAccounts, 0, 0); + m.Stats.Received = new DataStats { Msgs = Interlocked.Read(ref _stats.InMsgs), Bytes = Interlocked.Read(ref _stats.InBytes) }; + m.Stats.Sent = new DataStats { Msgs = Interlocked.Read(ref _stats.OutMsgs), Bytes = Interlocked.Read(ref _stats.OutBytes) }; + m.Stats.SlowConsumers = Interlocked.Read(ref _stats.SlowConsumers); + + var scs = new SlowConsumersStats + { + Clients = (ulong)NumSlowConsumersClients(), + Routes = (ulong)NumSlowConsumersRoutes(), + Gateways = (ulong)NumSlowConsumersGateways(), + Leafs = (ulong)NumSlowConsumersLeafs(), + }; + if (scs.Clients != 0 || scs.Routes != 0 || scs.Gateways != 0 || scs.Leafs != 0) + m.Stats.SlowConsumersStats = scs; + + m.Stats.StaleConnections = Interlocked.Read(ref _stats.StaleConnections); + m.Stats.StalledClients = Interlocked.Read(ref _stats.Stalls); + + var stcs = new StaleConnectionStats + { + Clients = (ulong)NumStaleConnectionsClients(), + Routes = (ulong)NumStaleConnectionsRoutes(), + Gateways = (ulong)NumStaleConnectionsGateways(), + Leafs = (ulong)NumStaleConnectionsLeafs(), + }; + if (stcs.Clients != 0 || stcs.Routes != 0 || stcs.Gateways != 0 || stcs.Leafs != 0) + m.Stats.StaleConnectionStats = stcs; + + m.Stats.NumSubs = NumSubscriptions(); + m.Stats.ActiveServers = _sys.Servers.Count + 1; + + // Routes. + ForEachRoute(r => + { + var rs = new RouteStat { Id = r.Cid }; + lock (r) + { + rs.Sent = new DataStats { Msgs = 0L, Bytes = 0L }; + rs.Received = new DataStats { Msgs = 0L, Bytes = 0L }; + rs.Pending = (int)(r.OutPb >> 10); + if (r.Route != null) rs.Name = r.Route.RemoteName; + } + m.Stats.Routes ??= []; + m.Stats.Routes.Add(rs); + }); + + SendInternalMsg(subj, string.Empty, m.Server, m); + } + finally { _mu.ExitReadLock(); } + } + + // ------------------------------------------------------------------------- + // limitStatsz + // ------------------------------------------------------------------------- + + /// + /// Rate-limits statsz publishes to one per . + /// Returns true if the publish should be skipped. + /// Mirrors Go Server.limitStatsz(). + /// + internal bool LimitStatsz(string subj) + { + _mu.EnterWriteLock(); + try + { + if (_sys == null) return true; + + // Only limit the normal broadcast subject. + if (subj != string.Format(SystemSubjects.ServerStatsSubj, ID())) + return false; + + var interval = EventIntervals.DefaultStatszRateLimit; + if (_sys.ClientStatszInterval < interval && _sys.ClientStatszInterval > TimeSpan.Zero) + interval = _sys.ClientStatszInterval; + + if (DateTime.UtcNow - _sys.LastStatsz < interval) + { + // Reschedule heartbeat for the next interval. + _sys.StatsMsgTimer?.Change((long)(_sys.LastStatsz.Add(interval) - DateTime.UtcNow).TotalMilliseconds, Timeout.Infinite); + return true; + } + _sys.LastStatsz = DateTime.UtcNow; + return false; + } + finally { _mu.ExitWriteLock(); } + } + + // ------------------------------------------------------------------------- + // heartbeatStatsz + // ------------------------------------------------------------------------- + + /// + /// Called by the statsz timer; ramps up the send interval to the max. + /// Must be wrapped with . + /// Mirrors Go Server.heartbeatStatsz(). + /// + private void HeartbeatStatsz() + { + if (_sys!.StatsMsgTimer != null) + { + if (_sys.ClientStatszInterval < _sys.StatszInterval) + { + _sys.ClientStatszInterval = _sys.ClientStatszInterval + _sys.ClientStatszInterval; + if (_sys.ClientStatszInterval > _sys.StatszInterval) + _sys.ClientStatszInterval = _sys.StatszInterval; + } + _sys.StatsMsgTimer.Change((long)_sys.ClientStatszInterval.TotalMilliseconds, Timeout.Infinite); + } + Task.Run(SendStatszUpdate); + } + + // ------------------------------------------------------------------------- + // resetLastStatsz + // ------------------------------------------------------------------------- + + /// + /// Resets the last-sent timestamp so the next publish is not rate-limited. + /// Must be wrapped with . + /// Mirrors Go Server.resetLastStatsz(). + /// + private void ResetLastStatsz() + { + if (_sys != null) _sys.LastStatsz = default; + } + + // ------------------------------------------------------------------------- + // sendStatszUpdate + // ------------------------------------------------------------------------- + + /// + /// Triggers a statsz advisory publish on the server's broadcast subject. + /// Mirrors Go Server.sendStatszUpdate(). + /// + private void SendStatszUpdate() => + SendStatsz(string.Format(SystemSubjects.ServerStatsSubj, ID())); + + // ------------------------------------------------------------------------- + // startStatszTimer + // ------------------------------------------------------------------------- + + /// + /// Starts the statsz heartbeat timer, beginning at 250ms and backing off. + /// Must be wrapped with . + /// Mirrors Go Server.startStatszTimer(). + /// + private void StartStatszTimer() + { + _sys!.ClientStatszInterval = TimeSpan.FromMilliseconds(250); + var delay = (long)_sys.ClientStatszInterval.TotalMilliseconds; + _sys.StatsMsgTimer = new System.Threading.Timer(_ => WrapChk(HeartbeatStatsz)(), null, delay, Timeout.Infinite); + } + + // ------------------------------------------------------------------------- + // startRemoteServerSweepTimer + // ------------------------------------------------------------------------- + + /// + /// Starts the timer that periodically sweeps for orphaned remote servers. + /// Must be wrapped with . + /// Mirrors Go Server.startRemoteServerSweepTimer(). + /// + private void StartRemoteServerSweepTimer() + { + var delay = (long)_sys!.CheckOrphan.TotalMilliseconds; + _sys.Sweeper = new System.Threading.Timer(_ => WrapChk(CheckRemoteServers)(), null, delay, Timeout.Infinite); + } + + // ------------------------------------------------------------------------- + // checkRemoteServers + // ------------------------------------------------------------------------- + + /// + /// Removes remote server entries that have not sent a heartbeat within the + /// orphan timeout window. + /// Must be wrapped with . + /// Mirrors Go Server.checkRemoteServers(). + /// + private void CheckRemoteServers() + { + var now = DateTime.UtcNow; + var orphans = new List(); + foreach (var (sid, su) in _sys!.Servers) + { + if (now - su.LTime > _sys.OrphanMax) + { + Debugf("Detected orphan remote server: {0}", sid); + orphans.Add(sid); + } + } + foreach (var sid in orphans) + ProcessRemoteServerShutdown(sid); + + _sys.Sweeper?.Change((long)_sys.CheckOrphan.TotalMilliseconds, Timeout.Infinite); + } + + // ========================================================================= + // Group B: Remote Server Tracking + // ========================================================================= + + // ------------------------------------------------------------------------- + // accountClaimUpdate + // ------------------------------------------------------------------------- + + /// + /// Handles an account JWT claim-update message from another server. + /// Mirrors Go Server.accountClaimUpdate(). + /// + private void AccountClaimUpdate(Subscription sub, ClientConnection c, Account acc, string subject, string resp, byte[] hdr, byte[] msg) + { + if (!EventsEnabled()) return; + + var toks = subject.Split('.'); + string pubKey; + if (toks.Length == SystemSubjects.AccUpdateTokensNew) + pubKey = toks[SystemSubjects.AccReqAccIndex]; + else if (toks.Length == SystemSubjects.AccUpdateTokensOld) + pubKey = toks[SystemSubjects.AccUpdateAccIdxOld]; + else + { + Debugf("Received account claims update on bad subject {0}", subject); + return; + } + + if (msg.Length == 0) + { + EventHelpers.RespondToUpdate(this, resp, pubKey, "jwt update error", new InvalidOperationException("request body is empty")); + } + else + { + // Try to update the account with the new JWT. + if (!_accounts.TryGetValue(pubKey, out var account)) + { + EventHelpers.RespondToUpdate(this, resp, pubKey, "jwt update skipped", null); + return; + } + var err = UpdateAccountWithClaimJwt(account, Encoding.UTF8.GetString(msg)); + if (err != null) + EventHelpers.RespondToUpdate(this, resp, pubKey, "jwt update resulted in error", err); + else + EventHelpers.RespondToUpdate(this, resp, pubKey, "jwt updated", null); + } + } + + // ------------------------------------------------------------------------- + // processRemoteServerShutdown + // ------------------------------------------------------------------------- + + /// + /// Handles a remote server going away: cleans up account remote-server maps + /// and removes from nodeToInfo. + /// Server lock must be held on entry. + /// Mirrors Go Server.processRemoteServerShutdown(). + /// + private void ProcessRemoteServerShutdown(string sid) + { + foreach (var acc in _accounts.Values) + acc.RemoveRemoteServer(sid); + + foreach (var key in _nodeToInfo.Keys.ToList()) + { + if (_nodeToInfo.TryGetValue(key, out var v) && v is NodeInfo ni && ni.Id == sid) + { + var updated = new NodeInfo { Name = ni.Name, Version = ni.Version, Cluster = ni.Cluster, Domain = ni.Domain, Id = ni.Id, Tags = ni.Tags, Cfg = ni.Cfg, Stats = ni.Stats, Js = ni.Js, BinarySnapshots = ni.BinarySnapshots, AccountNrg = ni.AccountNrg, Offline = true }; + _nodeToInfo.TryUpdate(key, updated, v); + break; + } + } + _sys!.Servers.Remove(sid); + } + + // ------------------------------------------------------------------------- + // sameDomain + // ------------------------------------------------------------------------- + + /// + /// Returns true when the given domain matches (or is not set). + /// Mirrors Go Server.sameDomain(). + /// + internal bool SameDomain(string domain) => + string.IsNullOrEmpty(domain) || string.IsNullOrEmpty(_info.Domain) || domain == _info.Domain; + + // ------------------------------------------------------------------------- + // remoteServerShutdown + // ------------------------------------------------------------------------- + + /// + /// Handles a remote server-shutdown system event. + /// Mirrors Go Server.remoteServerShutdown(). + /// + private void RemoteServerShutdown(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg) + { + _mu.EnterWriteLock(); + try + { + if (!EventsEnabledLocked()) return; + + var toks = subject.Split('.'); + if (toks.Length < SystemSubjects.ShutdownEventTokens) + { + Debugf("Received remote server shutdown on bad subject {0}", subject); + return; + } + + if (msg.Length == 0) + { + Errorf("Remote server sent invalid (empty) shutdown message to {0}", subject); + return; + } + + ServerInfo si; + try { si = JsonSerializer.Deserialize(msg)!; } + catch + { + Debugf("Received bad server info for remote server shutdown"); + return; + } + + // Mark the JetStream node as offline. + var node = GetHash(si.Name); + if (_nodeToInfo.TryGetValue(node, out var existing) && existing is NodeInfo oldNi) + { + var updatedNi = new NodeInfo { Name = oldNi.Name, Version = oldNi.Version, Cluster = oldNi.Cluster, Domain = oldNi.Domain, Id = oldNi.Id, Tags = oldNi.Tags, Cfg = oldNi.Cfg, Stats = oldNi.Stats, Js = oldNi.Js, BinarySnapshots = oldNi.BinarySnapshots, AccountNrg = oldNi.AccountNrg, Offline = true }; + _nodeToInfo.TryUpdate(node, updatedNi, existing); + } + + var sid = toks[SystemSubjects.ServerSubjectIndex]; + if (_sys!.Servers.ContainsKey(sid)) + ProcessRemoteServerShutdown(sid); + } + finally { _mu.ExitWriteLock(); } + } + + // ------------------------------------------------------------------------- + // remoteServerUpdate + // ------------------------------------------------------------------------- + + /// + /// Handles a statsz heartbeat from a remote server. + /// Mirrors Go Server.remoteServerUpdate(). + /// + private void RemoteServerUpdate(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg) + { + if (msg.Length == 0) + { + Debugf("Received empty server info for remote server update"); + return; + } + + ServerStatsMsg ssm; + try { ssm = JsonSerializer.Deserialize(msg)!; } + catch + { + Debugf("Received bad server info for remote server update"); + return; + } + + var si = ssm.Server; + + _mu.EnterWriteLock(); + if (Running() && EventsEnabledLocked() && si.Id != _info.Id) + UpdateRemoteServer(si); + _mu.ExitWriteLock(); + + // JetStream node updates. + if (!SameDomain(si.Domain)) return; + + var node = GetHash(si.Name); + _nodeToInfo[node] = new NodeInfo + { + Name = si.Name, + Id = si.Id, + Cluster = si.Cluster, + Domain = si.Domain, + Version = si.Version, + Offline = false, + }; + Task.Run(UpdateNRGAccountStatus); + } + + // ------------------------------------------------------------------------- + // updateRemoteServer + // ------------------------------------------------------------------------- + + /// + /// Tracks or updates a remote server's heartbeat sequence and timestamp. + /// Server lock must be held on entry. + /// Mirrors Go Server.updateRemoteServer(). + /// + private void UpdateRemoteServer(ServerInfo si) + { + if (!_sys!.Servers.TryGetValue(si.Id, out var su)) + { + _sys.Servers[si.Id] = new ServerUpdate { Seq = si.Seq, LTime = DateTime.UtcNow }; + ProcessNewServer(si); + } + else + { + if (si.Seq <= su.Seq) + { + Errorf("Received out of order remote server update from: {0}", si.Id); + return; + } + su.Seq = si.Seq; + su.LTime = DateTime.UtcNow; + } + } + + // ------------------------------------------------------------------------- + // processNewServer + // ------------------------------------------------------------------------- + + /// + /// Called the first time a remote server is seen. + /// Ensures gateways are in interest-only mode for leaf-connected accounts + /// and announces ourselves to the new server. + /// Server lock must be held on entry. + /// Mirrors Go Server.processNewServer(). + /// + private void ProcessNewServer(ServerInfo si) + { + EnsureGWsInterestOnlyForLeafNodes(); + + if (SameDomain(si.Domain)) + { + var node = GetHash(si.Name); + _nodeToInfo.TryAdd(node, new NodeInfo + { + Name = si.Name, + Id = si.Id, + Cluster = si.Cluster, + Domain = si.Domain, + Version = si.Version, + Offline = false, + }); + } + Task.Run(UpdateNRGAccountStatus); + Task.Run(SendStatszUpdate); + } + + // ------------------------------------------------------------------------- + // updateNRGAccountStatus + // ------------------------------------------------------------------------- + + /// + /// Recreates internal subscriptions on all Raft nodes to reflect updated + /// account-NRG capability state. + /// Server lock must NOT be held on entry. + /// Mirrors Go Server.updateNRGAccountStatus(). + /// + private void UpdateNRGAccountStatus() + { + var raftNodes = new List(_raftNodes.Values); + foreach (var n in raftNodes) + { + if (n is IRaftNode rn) + { + rn.RecreateInternalSubs(); + } + } + } + + // ========================================================================= + // Group C: Connection Events + // ========================================================================= + + // ------------------------------------------------------------------------- + // connsRequest + // ------------------------------------------------------------------------- + + /// + /// Handles a request for this server's local connection count for a given account. + /// Mirrors Go Server.connsRequest(). + /// + private void ConnsRequest(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg) + { + if (!EventsRunning()) return; + + var tk = subject.Split('.'); + if (tk.Length != SystemSubjects.AccReqTokens) + { + Errorf("Bad subject account connections request message"); + return; + } + + var a = tk[SystemSubjects.AccReqAccIndex]; + var m = new AccNumConnsReq { Account = a }; + if (msg.Length > 0) + { + try { m = JsonSerializer.Deserialize(msg) ?? m; } + catch (Exception ex) + { + Errorf("Error unmarshalling account connections request message: {0}", ex); + return; + } + } + if (m.Account != a) + { + Errorf("Error unmarshalled account does not match subject"); + return; + } + + if (!_accounts.TryGetValue(m.Account, out var account)) return; + if (account.NumLocalConnections() <= 0) return; + + _mu.EnterWriteLock(); + SendAccConnsUpdate(account, reply); + _mu.ExitWriteLock(); + } + + // ------------------------------------------------------------------------- + // leafNodeConnected + // ------------------------------------------------------------------------- + + /// + /// Handles a leaf-node-connect event, switching the account to gateway + /// interest-only mode. + /// Mirrors Go Server.leafNodeConnected(). + /// + private void LeafNodeConnected(Subscription sub, ClientConnection _, Account acc, string subject, string reply, byte[] hdr, byte[] msg) + { + AccNumConnsReq m; + try { m = JsonSerializer.Deserialize(msg) ?? new AccNumConnsReq(); } + catch (Exception ex) + { + Errorf("Error unmarshalling account connections request message: {0}", ex); + return; + } + + _mu.EnterReadLock(); + bool na = string.IsNullOrEmpty(m.Account) || !EventsEnabledLocked() || !_gateway.Enabled; + _mu.ExitReadLock(); + if (na) return; + + var (account, _) = LookupAccount(m.Account); + if (account != null) + SwitchAccountToInterestMode(account.Name); + } + + // ------------------------------------------------------------------------- + // remoteConnsUpdate + // ------------------------------------------------------------------------- + + /// + /// Handles a remote account connection count update. + /// Mirrors Go Server.remoteConnsUpdate(). + /// + private void RemoteConnsUpdate(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg) + { + if (!EventsRunning()) return; + + if (msg.Length == 0) { Errorf("No message body provided"); return; } + + AccountNumConns m; + try { m = JsonSerializer.Deserialize(msg)!; } + catch (Exception ex) + { + Errorf("Error unmarshalling account connection event message: {0}", ex); + return; + } + + if (!_accounts.TryGetValue(m.Account, out var account)) return; + + _mu.EnterWriteLock(); + if (!Running() || !EventsEnabledLocked()) { _mu.ExitWriteLock(); return; } + if (m.Server.Id == _info.Id) + { + _sys!.Client?.Errorf("Processing our own account connection event message: ignored"); + _mu.ExitWriteLock(); + return; + } + var clients = account.UpdateRemoteServer(m); + UpdateRemoteServer(m.Server); + _mu.ExitWriteLock(); + + foreach (var cl in clients) + cl.MaxAccountConnExceeded(); + } + + // ------------------------------------------------------------------------- + // accConnsUpdate + // ------------------------------------------------------------------------- + + /// + /// Sends an account connection count advisory whenever the count changes or + /// on a heartbeat. + /// Mirrors Go Server.accConnsUpdate(). + /// + internal void AccConnsUpdate(Account a) + { + _mu.EnterWriteLock(); + try + { + if (!EventsEnabledLocked() || a == null || a == _gacc) return; + SendAccConnsUpdate(a, + string.Format(SystemSubjects.AccConnsEventSubjOld, a.Name), + string.Format(SystemSubjects.AccConnsEventSubjNew, a.Name)); + } + finally { _mu.ExitWriteLock(); } + } + + // ------------------------------------------------------------------------- + // accountConnectEvent + // ------------------------------------------------------------------------- + + /// + /// Publishes a client-connect advisory for billing/tracking purposes. + /// Mirrors Go Server.accountConnectEvent(). + /// + internal void AccountConnectEvent(ClientConnection c) + { + _mu.EnterWriteLock(); + if (!EventsEnabledLocked()) { _mu.ExitWriteLock(); return; } + var eid = NextEventID(); + _mu.ExitWriteLock(); + + Account? account; + string host, name, lang, version, jwt, kind, clientType, accName, nameTag; + ulong cid; + DateTime start; + List tags; + string issuerKey, user; + + lock (c) + { + account = c._account as Account; + if (account == null) return; + + cid = c.Cid; + start = c.Start; + host = c.Host; + name = c.Opts.Name; + lang = c.Opts.Lang; + version = c.Opts.Version; + jwt = c.Opts.Jwt; + kind = c.KindString(); + clientType = c.ClientTypeString(); + accName = account.Name; + nameTag = account.GetNameTag(); + tags = []; + issuerKey = EventHelpers.IssuerForClient(c); + user = c.GetRawAuthUser(); + } + + var m = new ConnectEventMsg + { + Type = EventMsgTypes.ConnectEventMsgType, + Id = eid, + Time = DateTime.UtcNow, + Client = new ClientInfo + { + Start = start.ToString("O"), + Host = host, + Id = cid, + Account = EventHelpers.AccForClient(c), + User = user, + Name = name, + Lang = lang, + Version = version, + Jwt = jwt, + IssuerKey = issuerKey, + Tags = tags, + NameTag = nameTag, + Kind = kind, + ClientType = clientType, + }, + }; + + var subj = string.Format(SystemSubjects.ConnectEventSubj, accName); + SendInternalMsgLocked(subj, string.Empty, m.Server, m); + } + + // ------------------------------------------------------------------------- + // sendAccConnsUpdate + // ------------------------------------------------------------------------- + + /// + /// Builds and queues an advisory. + /// Server lock must be held on entry. + /// Mirrors Go Server.sendAccConnsUpdate(). + /// + internal void SendAccConnsUpdate(Account a, params string[] subjects) + { + if (!EventsEnabledLocked() || a == null) return; + var sendq = _sys!.SendQueue; + if (sendq == null) return; + + var eid = NextEventID(); + + int aConns, aLeafs, aNumSubs; + lock (a) + { + aConns = a.NumLocalConnections(); + aLeafs = a.NumLocalLeafNodes(); + aNumSubs = a.TotalSubs(); + } + + var m = new AccountNumConns + { + Type = EventMsgTypes.AccountNumConnsMsgType, + Id = eid, + Time = DateTime.UtcNow, + Account = a.Name, + Name = a.GetNameTagLocked(), + Conns = aConns, + LeafNodes = aLeafs, + TotalConns = aConns + aLeafs, + NumSubs = (uint)aNumSubs, + Sent = new DataStats { Msgs = 0L, Bytes = 0L }, + Received = new DataStats { Msgs = 0L, Bytes = 0L }, + SlowConsumers = 0, + }; + + // Manage the account heartbeat timer. + lock (a) + { + if (m.TotalConns == 0) + { + a.ClearConnectionHeartbeatTimer(); + } + else + { + a.SetConnectionHeartbeatTimer((long)EventIntervals.EventsHbInterval.TotalMilliseconds, () => AccConnsUpdate(a)); + } + } + + foreach (var subj in subjects) + { + sendq.Push(new PubMsg + { + Client = null, + Subject = subj, + Reply = string.Empty, + Si = m.Server, + Hdr = null, + Msg = m, + }); + } + } + + // ========================================================================= + // Group D: Request Handlers + // ========================================================================= + + // ------------------------------------------------------------------------- + // userInfoReq + // ------------------------------------------------------------------------- + + /// + /// Handles a user-info request, returning the caller's account and permissions. + /// Mirrors Go Server.userInfoReq(). + /// + private void UserInfoReq(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg) + { + if (!EventsEnabled() || string.IsNullOrEmpty(reply)) return; + + var response = new ServerApiResponse { Server = new ServerInfo() }; + + var (ci, _, _, _, err) = GetRequestInfo(c, msg); + if (err != null) + { + response.Error = new ServerApiError { Code = (int)HttpStatusCode.BadRequest }; + SendInternalResponse(reply, response); + return; + } + + response.Data = new UserInfo + { + UserId = ci?.User ?? string.Empty, + Account = ci?.Account ?? string.Empty, + Permissions = c.PublicPermissions(), + Expires = c.Expires, + }; + SendInternalResponse(reply, response); + } + + // ------------------------------------------------------------------------- + // statszReq + // ------------------------------------------------------------------------- + + /// + /// Handles a statsz request from another server. + /// Mirrors Go Server.statszReq(). + /// + private void StatszReq(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg) + { + if (!EventsEnabled()) return; + + if (string.IsNullOrEmpty(reply)) + { + reply = string.Format(SystemSubjects.ServerStatsSubj, _info.Id); + WrapChk(ResetLastStatsz)(); + } + + if (msg.Length != 0) + { + StatszEventOptions opts; + try { opts = JsonSerializer.Deserialize(msg) ?? new StatszEventOptions(); } + catch (Exception ex) + { + var errResp = new ServerApiResponse + { + Server = new ServerInfo(), + Error = new ServerApiError { Code = (int)HttpStatusCode.BadRequest, Description = ex.Message }, + }; + SendInternalMsgLocked(reply, string.Empty, null, errResp); + return; + } + if (FilterRequest(opts)) return; + } + + SendStatsz(reply); + } + + // ------------------------------------------------------------------------- + // idzReq + // ------------------------------------------------------------------------- + + /// + /// Handles a request for basic static server identity. + /// Mirrors Go Server.idzReq(). + /// + private void IdzReq(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg) + { + _mu.EnterReadLock(); + var id = new ServerIdentity { Name = _info.Name, Host = _info.Host, Id = _info.Id }; + SendInternalMsg(reply, string.Empty, null, id); + _mu.ExitReadLock(); + } + + // ------------------------------------------------------------------------- + // zReq — generic monitoring request handler + // ------------------------------------------------------------------------- + + /// + /// Generic z-endpoint request handler that deserialises options, filters by + /// server, and sends a . + /// Mirrors Go Server.zReq(). + /// + internal void ZReq(ClientConnection c, string reply, byte[] hdr, byte[] msg, EventFilterOptions? fOpts, object? optz, Func<(object? data, Exception? err)> respf) + { + if (!EventsEnabled() || string.IsNullOrEmpty(reply)) return; + + var response = new ServerApiResponse { Server = new ServerInfo() }; + int status = 0; + Exception? err = null; + + if (msg.Length != 0 && optz != null) + { + try + { + var json = JsonSerializer.Deserialize(msg, optz.GetType()); + if (json != null) + { + // Copy values — simplest approach: re-use the deserialized object. + optz = json; + } + } + catch (Exception ex) + { + err = ex; + status = (int)HttpStatusCode.BadRequest; + } + + if (err == null && fOpts != null && FilterRequest(fOpts)) return; + } + + if (err == null) + { + var (data, callErr) = respf(); + if (callErr?.Message == "filtered response") + return; + if (callErr != null) + { + err = callErr; + status = (int)HttpStatusCode.InternalServerError; + } + else + { + response.Data = data; + } + } + + if (err != null) + response.Error = new ServerApiError { Code = status, Description = err.Message }; + + SendInternalResponse(reply, response); + } + + // ------------------------------------------------------------------------- + // filterRequest + // ------------------------------------------------------------------------- + + /// + /// Returns true if the request does NOT apply to this server (should be ignored). + /// Mirrors Go Server.filterRequest(). + /// + internal bool FilterRequest(EventFilterOptions? fOpts) + { + if (fOpts == null) return false; + + if (fOpts.ExactMatch) + { + if ((!string.IsNullOrEmpty(fOpts.Name) && fOpts.Name != _info.Name) || + (!string.IsNullOrEmpty(fOpts.Host) && fOpts.Host != _info.Host) || + (!string.IsNullOrEmpty(fOpts.Cluster) && fOpts.Cluster != ClusterName())) + return true; + } + else + { + if ((!string.IsNullOrEmpty(fOpts.Name) && !_info.Name.Contains(fOpts.Name)) || + (!string.IsNullOrEmpty(fOpts.Host) && !_info.Host.Contains(fOpts.Host)) || + (!string.IsNullOrEmpty(fOpts.Cluster) && !ClusterName().Contains(fOpts.Cluster))) + return true; + } + + if (fOpts.Tags?.Count > 0) + { + var opts = GetOpts(); + foreach (var t in fOpts.Tags) + if (!opts.Tags.Contains(t)) return true; + } + + if (!string.IsNullOrEmpty(fOpts.Domain) && GetOpts().JetStreamDomain != fOpts.Domain) + return true; + + return false; + } + + // ------------------------------------------------------------------------- + // debugSubscribers + // ------------------------------------------------------------------------- + + /// + /// Handles a debug request for the count of subscribers matching a given subject. + /// Mirrors Go Server.debugSubscribers(). + /// + private void DebugSubscribers(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg) + { + if (c.Kind != ClientKind.Client) return; + + // Parse the message body to get the subject and optional queue group. + var args = Encoding.UTF8.GetString(msg).Trim().Split(' ', StringSplitOptions.RemoveEmptyEntries); + if (args.Length == 0) + { + SendInternalAccountMsg(null, reply, 0); + return; + } + + var tsubj = args[0]; + var qgroup = args.Length > 1 ? args[1] : null; + + // Look up the account from the header or the client. + Account? lookupAcc = acc ?? _sysAccAtomic ?? _gacc; + + if (lookupAcc == null) return; + + // Count local subscribers. + var nsubs = CountLocalSubscribers(lookupAcc, tsubj, qgroup); + + int expected = lookupAcc.ExpectedRemoteResponses(); + if (expected == 0) + { + SendInternalAccountMsg(null, reply, nsubs); + return; + } + + // Solicit remote counts. + int responses = 0; + var done = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + _mu.EnterWriteLock(); + var replySubj = NewRespInbox(); + _sys!.Replies[replySubj] = (_, __, ___, ____, _____, ______, rMsg) => + { + if (long.TryParse(Encoding.UTF8.GetString(rMsg), out var n)) + Interlocked.Add(ref nsubs, (int)n); + if (Interlocked.Increment(ref responses) >= expected) + done.TrySetResult(true); + }; + var request = new AccNumSubsReq { Account = lookupAcc.Name, Subject = tsubj, Queue = qgroup }; + SendInternalMsg(SystemSubjects.AccNumSubsReqSubj, replySubj, null, request); + _mu.ExitWriteLock(); + + Task.Run(async () => + { + await Task.WhenAny(done.Task, Task.Delay(500)).ConfigureAwait(false); + bool send = false; + _mu.EnterWriteLock(); + if (_sys != null && _sys.Replies != null) + { + _sys.Replies.Remove(replySubj); + send = true; + } + _mu.ExitWriteLock(); + if (send) + SendInternalAccountMsg(null, reply, Interlocked.Add(ref nsubs, 0)); + }); + } + + private static int CountLocalSubscribers(Account acc, string subj, string? queue) + { + if (acc.Sublist == null) return 0; + var rr = acc.Sublist.Match(subj); + int count = 0; + foreach (var s in rr.PSubs) + { + if (queue != null && (s.Queue == null || System.Text.Encoding.ASCII.GetString(s.Queue) != queue)) continue; + if (s.Client?.Kind == ClientKind.Client) count++; + } + foreach (var qsubs in rr.QSubs) + foreach (var s in qsubs) + { + if (queue != null && (s.Queue == null || System.Text.Encoding.ASCII.GetString(s.Queue) != queue)) continue; + if (s.Client?.Kind == ClientKind.Client) count++; + } + return count; + } + + // ------------------------------------------------------------------------- + // nsubsRequest + // ------------------------------------------------------------------------- + + /// + /// Handles a request for local subscription counts from a remote origin server. + /// Mirrors Go Server.nsubsRequest(). + /// + private void NsubsRequest(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg) + { + if (!EventsRunning()) return; + + if (msg.Length == 0) { Errorf("request requires a body"); return; } + + AccNumSubsReq m; + try { m = JsonSerializer.Deserialize(msg)!; } + catch (Exception ex) { Errorf("Error unmarshalling account nsubs request message: {0}", ex); return; } + + var (account, _) = LookupAccount(m.Account); + if (account == null || account.NumLocalAndLeafConnections() == 0) return; + + var nsubs = CountLocalSubscribers(account, m.Subject, m.Queue); + SendInternalMsgLocked(reply, string.Empty, null, nsubs); + } + + // ------------------------------------------------------------------------- + // reloadConfig + // ------------------------------------------------------------------------- + + /// + /// Handles a server-reload request from the system bus. + /// Mirrors Go Server.reloadConfig(). + /// + private void ReloadConfigHandler(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg) + { + if (!EventsRunning()) return; + var fOpts = new EventFilterOptions(); + ZReq(c, reply, hdr, msg, fOpts, fOpts, () => (null, ReloadConfigFromRequest())); + } + + /// + /// Reloads server configuration initiated from a system bus request. + /// Returns any error. + /// Mirrors Go Server.Reload() called from the reloadConfig handler. + /// + private Exception? ReloadConfigFromRequest() + { + var newOpts = GetOpts().Clone(); + return ReloadOptions(newOpts); + } + + // ========================================================================= + // Group E: Leaf Node & Gateway Events + // ========================================================================= + + // ------------------------------------------------------------------------- + // ensureGWsInterestOnlyForLeafNodes + // ------------------------------------------------------------------------- + + /// + /// Sends a leaf-node connect message for every account that has leaf node + /// connections, so that gateway peers switch to interest-only mode. + /// Server lock must be held on entry. + /// Mirrors Go Server.ensureGWsInterestOnlyForLeafNodes(). + /// + private void EnsureGWsInterestOnlyForLeafNodes() + { + if (!_gateway.Enabled || _leafs.Count == 0) return; + + var sent = new HashSet(); + foreach (var c in _leafs.Values) + { + if (c._account is Account a && sent.Add(a)) + SendLeafNodeConnectMsg(a.Name); + } + } + + // ------------------------------------------------------------------------- + // sendLeafNodeConnect + // ------------------------------------------------------------------------- + + /// + /// Sends a leaf-node connect system event for the given account (lock-free entry). + /// Mirrors Go Server.sendLeafNodeConnect(). + /// + internal void SendLeafNodeConnect(Account a) + { + _mu.EnterWriteLock(); + if (a == null || !EventsEnabledLocked() || !_gateway.Enabled) + { + _mu.ExitWriteLock(); + return; + } + SendLeafNodeConnectMsg(a.Name); + _mu.ExitWriteLock(); + + SwitchAccountToInterestMode(a.Name); + } + + // ------------------------------------------------------------------------- + // sendLeafNodeConnectMsg + // ------------------------------------------------------------------------- + + /// + /// Queues the internal leaf-node-connect message for the given account name. + /// Server lock must be held on entry. + /// Mirrors Go Server.sendLeafNodeConnectMsg(). + /// + internal void SendLeafNodeConnectMsg(string accName) + { + var subj = string.Format(SystemSubjects.LeafNodeConnectEventSubj, accName); + var m = new AccNumConnsReq { Account = accName }; + SendInternalMsg(subj, string.Empty, m.Server, m); + } + + // ========================================================================= + // Group F: Auth & Error Events + // ========================================================================= + + // ------------------------------------------------------------------------- + // sendAuthErrorEvent + // ------------------------------------------------------------------------- + + /// + /// Sends a system-level auth error advisory to the system account. + /// Mirrors Go Server.sendAuthErrorEvent(). + /// + internal void SendAuthErrorEvent(ClientConnection c, string reason) + { + _mu.EnterWriteLock(); + if (!EventsEnabledLocked()) { _mu.ExitWriteLock(); return; } + var eid = NextEventID(); + _mu.ExitWriteLock(); + + var now = DateTime.UtcNow; + DisconnectEventMsg m; + lock (c) + { + m = BuildDisconnectMsg(c, eid, now, reason); + } + + _mu.EnterWriteLock(); + var subj = string.Format(SystemSubjects.AuthErrorEventSubj, _info.Id); + SendInternalMsg(subj, string.Empty, m.Server, m); + _mu.ExitWriteLock(); + } + + // ------------------------------------------------------------------------- + // sendAccountAuthErrorEvent + // ------------------------------------------------------------------------- + + /// + /// Sends an account-level auth error advisory to the account's system stream. + /// Mirrors Go Server.sendAccountAuthErrorEvent(). + /// + internal void SendAccountAuthErrorEvent(ClientConnection c, Account? acc, string reason) + { + if (acc == null) return; + _mu.EnterWriteLock(); + if (!EventsEnabledLocked()) { _mu.ExitWriteLock(); return; } + var eid = NextEventID(); + _mu.ExitWriteLock(); + + var now = DateTime.UtcNow; + DisconnectEventMsg m; + lock (c) + { + m = BuildDisconnectMsg(c, eid, now, reason, acc.Name); + } + + SendInternalAccountSysMsg(acc, SystemSubjects.AuthErrorAccountEventSubj, m.Server, m, 0); + } + + private DisconnectEventMsg BuildDisconnectMsg(ClientConnection c, string eid, DateTime now, string reason, string? overrideAccName = null) + { + return new DisconnectEventMsg + { + Type = EventMsgTypes.DisconnectEventMsgType, + Id = eid, + Time = now, + Client = new ClientInfo + { + Start = c.Start.ToString("O"), + Host = c.Host, + Id = c.Cid, + Account = overrideAccName ?? EventHelpers.AccForClient(c), + User = c.GetRawAuthUser(), + Name = c.Opts.Name, + Lang = c.Opts.Lang, + Version = c.Opts.Version, + Rtt = c.GetRttValue(), + Jwt = c.Opts.Jwt, + IssuerKey = EventHelpers.IssuerForClient(c), + Kind = c.KindString(), + ClientType = c.ClientTypeString(), + }, + Sent = new DataStats { Msgs = 0L, Bytes = 0L }, + Received = new DataStats { Msgs = 0L, Bytes = 0L }, + Reason = reason, + }; + } + + // ------------------------------------------------------------------------- + // registerSystemImportsForExisting + // ------------------------------------------------------------------------- + + /// + /// Registers system imports for all existing accounts (excluding the system account). + /// Mirrors Go Server.registerSystemImportsForExisting(). + /// + internal void RegisterSystemImportsForExisting() + { + _mu.EnterReadLock(); + if (_sys == null) { _mu.ExitReadLock(); return; } + var sacc = _sys.Account; + var accounts = _accounts.Values.Where(a => a != sacc).ToList(); + _mu.ExitReadLock(); + + foreach (var a in accounts) + RegisterSystemImports(a); + } + + // ========================================================================= + // Group G: OCSP Events + // ========================================================================= + + // ------------------------------------------------------------------------- + // sendOCSPPeerRejectEvent + // ------------------------------------------------------------------------- + + /// + /// Publishes an OCSP peer-reject advisory to the system account. + /// Mirrors Go Server.sendOCSPPeerRejectEvent(). + /// + internal void SendOcspPeerRejectEvent(string kind, X509Certificate2? peer, string reason) + { + _mu.EnterWriteLock(); + try + { + if (!EventsEnabledLocked()) return; + if (peer == null) + { + Errorf(OcspMessages.ErrPeerEmptyNoEvent); + return; + } + var eid = NextEventID(); + var now = DateTime.UtcNow; + var m = new OcspPeerRejectEventMsg + { + Type = EventMsgTypes.OcspPeerRejectEventMsgType, + Id = eid, + Time = now, + Kind = kind, + Peer = new CertInfo + { + Subject = OcspUtilities.GetSubjectDNForm(peer), + Issuer = OcspUtilities.GetIssuerDNForm(peer), + Fingerprint = OcspUtilities.GenerateFingerprint(peer), + Raw = peer.RawData, + }, + Reason = reason, + }; + var subj = string.Format(SystemSubjects.OcspPeerRejectEventSubj, _info.Id); + SendInternalMsg(subj, string.Empty, m.Server, m); + } + finally { _mu.ExitWriteLock(); } + } + + // ------------------------------------------------------------------------- + // sendOCSPPeerChainlinkInvalidEvent + // ------------------------------------------------------------------------- + + /// + /// Publishes an OCSP chainlink-invalid advisory to the system account. + /// Mirrors Go Server.sendOCSPPeerChainlinkInvalidEvent(). + /// + internal void SendOcspPeerChainlinkInvalidEvent(X509Certificate2? peer, X509Certificate2? link, string reason) + { + _mu.EnterWriteLock(); + try + { + if (!EventsEnabledLocked()) return; + if (peer == null || link == null) + { + Errorf(OcspMessages.ErrPeerEmptyNoEvent); + return; + } + var eid = NextEventID(); + var now = DateTime.UtcNow; + var m = new OcspPeerChainlinkInvalidEventMsg + { + Type = EventMsgTypes.OcspPeerChainlinkInvalidEventMsgType, + Id = eid, + Time = now, + Link = new CertInfo + { + Subject = OcspUtilities.GetSubjectDNForm(link), + Issuer = OcspUtilities.GetIssuerDNForm(link), + Fingerprint = OcspUtilities.GenerateFingerprint(link), + Raw = link.RawData, + }, + Peer = new CertInfo + { + Subject = OcspUtilities.GetSubjectDNForm(peer), + Issuer = OcspUtilities.GetIssuerDNForm(peer), + Fingerprint = OcspUtilities.GenerateFingerprint(peer), + Raw = peer.RawData, + }, + Reason = reason, + }; + var subj = string.Format(SystemSubjects.OcspPeerChainlinkInvalidEventSubj, _info.Id); + SendInternalMsg(subj, string.Empty, m.Server, m); + } + finally { _mu.ExitWriteLock(); } + } + + // ========================================================================= + // Group H: Misc + // ========================================================================= + + // ------------------------------------------------------------------------- + // remoteLatencyUpdate + // ------------------------------------------------------------------------- + + /// + /// Handles a remote latency measurement event for tracking exported service latency. + /// Mirrors Go Server.remoteLatencyUpdate(). + /// + private void RemoteLatencyUpdate(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg) + { + if (!EventsRunning()) return; + + RemoteLatency rl; + try { rl = JsonSerializer.Deserialize(msg)!; } + catch (Exception ex) + { + Errorf("Error unmarshalling remote latency measurement: {0}", ex); + return; + } + + var (account, err) = LookupAccount(rl.Account); + if (err != null) + { + Warnf("Could not lookup account {0} for latency measurement", rl.Account); + return; + } + + // Look up the service import associated with this response. + ServiceImportEntry? si = null; + string lsub = string.Empty; + lock (account!) + { + var exports = account.Exports; + if (exports.Responses?.TryGetValue(rl.RequestId, out var entry) == true) + { + si = entry; + lsub = entry.Latency?.Subject ?? string.Empty; + } + } + if (si == null) return; + + ServiceLatency? m1 = null; + var m2 = rl.M2; + + lock (si.Account!) + { + m1 = si.M1; + if (m1 == null) + si.M1 = m2; + } + if (m1 == null) return; + + // Merge the two halves of the latency measurement. + // Merge responder (m2) into requestor (m1): + // m2.ServiceLatencyDuration is correct, m1.TotalLatency is correct. + var responderRtt = m2.Responder?.Rtt ?? TimeSpan.Zero; + m1.SystemLatency = m1.ServiceLatencyDuration - (m2.ServiceLatencyDuration + responderRtt); + if (m1.SystemLatency < TimeSpan.Zero) m1.SystemLatency = TimeSpan.Zero; + m1.ServiceLatencyDuration = m2.ServiceLatencyDuration; + if (m1.ServiceLatencyDuration < TimeSpan.Zero) m1.ServiceLatencyDuration = TimeSpan.Zero; + m1.Responder = m2.Responder; + + lock (account) + { + si.RequestingClient = null; + } + + SendInternalAccountMsg(account, lsub, m1); + } + + // ------------------------------------------------------------------------- + // kickClient + // ------------------------------------------------------------------------- + + /// + /// Handles a request to forcefully disconnect a client by CID. + /// Mirrors Go Server.kickClient(). + /// + private void KickClient(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg) + { + if (!EventsRunning()) return; + + KickClientReq req; + try { req = JsonSerializer.Deserialize(msg)!; } + catch (Exception ex) { Errorf("Error unmarshalling kick client request: {0}", ex); return; } + + var fOpts = new EventFilterOptions(); + ZReq(c, reply, hdr, msg, fOpts, fOpts, () => (null, DisconnectClientByID(req.Cid))); + } + + // ------------------------------------------------------------------------- + // ldmClient + // ------------------------------------------------------------------------- + + /// + /// Handles a request to put a specific client into lame-duck mode. + /// Mirrors Go Server.ldmClient(). + /// + private void LdmClient(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg) + { + if (!EventsRunning()) return; + + LdmClientReq req; + try { req = JsonSerializer.Deserialize(msg)!; } + catch (Exception ex) { Errorf("Error unmarshalling LDM client request: {0}", ex); return; } + + var fOpts = new EventFilterOptions(); + ZReq(c, reply, hdr, msg, fOpts, fOpts, () => (null, LDMClientByID(req.Cid))); + } + + // ========================================================================= + // Monitoring verb stub handlers + // (Full monitor implementations are in NatsServer.Monitor — these delegate.) + // ========================================================================= + + private void VarzReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void SubszReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void ConnzReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void RoutezReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void GatewayzReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void LeafzReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void AccountzReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void JszReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void HealthzReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void ExpvarzReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void IpqueueszReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void RaftzReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void AccSubszReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void AccConnzReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void AccLeafzReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void AccJszReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void AccInfoReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void AccStatzReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void StatzAccPingReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs index 9bc5d10..c45f832 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs @@ -88,6 +88,12 @@ public sealed class ServerInfo // LeafNode-specific [JsonPropertyName("leafnode_urls")] public string[]? LeafNodeUrls { get; set; } + // Events / heartbeat sequence number. + // Mirrors Go Info.Seq uint64 used in server heartbeat messages. + [JsonPropertyName("seq")] + [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)] + public ulong Seq { get; set; } + /// Returns a shallow clone of this . internal ServerInfo ShallowClone() => (ServerInfo)MemberwiseClone(); } diff --git a/porting.db b/porting.db index e1f91cd2a2d28591875343ea9f7ecdadfe8e5825..822bdfac3e4046c50e61872958220a1733a64c7e 100644 GIT binary patch delta 5669 zcmc&&c~q2Fmap%9Mb%fl`l>z<3CmX$i^^K8vWq;~HxW^Zi71O0qjn6^X6Z@^th7*Pf;}YXX#^c6!jg3aD@eAX_j)ZBh(-6iaB#FEUJ-Vf1+XLIS zw5fA~dxihk zANl{f9)9=>my6Vc(Dz1L3pAaPuw&pRCmBYM0H!k#5R?YBFsODU72e_l?4s;SEsF92ejITvy^d0&fT}RVs z3XSl?;4f{JVK!1SXf!*uM4Rpi8baM1K~ut2VO%&X^r~xw9`$l{s$f$`Yj)zN^|Sh$ z#y=X*>hJQy#$H{QuHAo$e~3Cr8?0;9rfU~ziT@_!RnQEhaR|F3t%E;*BN-s(JURl6 z1IP%!J&)Ev=SB9{r?(^-0={J*Pxhl5jU?-_j-hVkAL8PA`6B98`)~TOJTi_V)c9$Y z0U`qff??txk_lcKN3(px|CaaHOQ=JwA9M4zPoN65{-zs#Fo71p-{YkWN62MFk=l6m zM|UCgA#^86abf7+6(Rpef5!SfN>HIun3$I2WWOZ82tUaWsc)7o0;K)>u@TLHVEQ(;^*RZaax=bN5m82%er>) zU&Kyvr`RmkizQ-~7$=4qdqute?%X@-|DbN3BMagCg3YrP*shvy_)S^Jm3e*Iz zGht9LXZ@#Ds8r$~RnQ-S<<+ROfj_0#J*hw^6zI4D9aEqI1?q?Ke==|ODK=kKpk4*~ zl>!}6phF7uvI2nu9aNx~_A1~n73hEh^(fHu3iO-;?N^{@73diS`h~dHr~C2y6hylM zQ3d+B0zIujPbrX1fvgI|jMsacFW&`0x200k&lEh5E6`&QT#HH&e;OKVQCI{&sGy!v zpckQc?tx!XY>p_!vLVFtHU)*F77rXesp5u4k(Y-{g_1nC;?BNdBRiat!@4W@iWZ?=3yvxTz zXf}?+ZiP+u6j+^V4uvh**aCsy>r60th4+Vx**MK%M33cCHYflVRab~M%5zQ`3 zzRbhhp?+g{EZFjKoUi)l^6?qx%}(i0EyD=`^I~k7_IM^H#g-4NvvCl7unfNi^$olU zBH!mRTq(eDZXF$86yPw#@vM2__rUftd*az2bbfG`;3PK>q^-d3!dK%ZCYVh(2SR+2 z*AU13MR=40grZC^lW&%wpadsHxb;|Uk7asPTeB@O-ei$JQzKe!~cufz*k+rb*BtnrcXa1FlUVOZ)BJh&1& zJWqn*#agfNj;pn}kmGb9)Zw4Gj&@J+S~yeZ<$>#Uc+^Fk4e-;#<<&kPI(EZwnAQTx4L;JF8t@0M5M{ZB;l6uusz<4wd-2<@NO9(2=X#vs zra4B};~WG<)rR@l4&H!M+(f9^fcLo$I$!3$H{d8wrnS*MEHq=R7K<3;%Czi8+yzs? z>R7nZh!cFv%ToFTvoUDdw;>st(`f9K#NDO3j&wWtd|TZwYv z_(~*0=-kJpYLoye)hHasR}&Rg zUC$Q#Zq6|uayq-*F0mEBRv=po*a~86AzQ(0-NRN0TcK=)u@&x++vU$vqW1Fo23+hf z>xhu2P1g+g9X1AuH`RUoG1RWw%cY7C)ni&8V71so*rUXjTf*;gg~RWU%C*qyx4?9; zR3pMbnwZ?OC*PF)VB0a-n=O6EDV7VvCsKvbp4> z{E{nSyRYj+(43MjzA2D*N)pPbm z6aoh2M7RAmqg~JLFR+%vfidRBmO&Yy^;36ep7}^4KP=W{94rwJf!usSmyqlkriN#q0Mu*iI&j$@#Fo=asf+LCij+IvCPK-YOW*<+?aA7W-^T35Mx)3x*Ri>_yOvufeci0o}WZ;r@^TqTP! zB$>TEQ^SU+F1mvW%eAIhLg%Nbw=OVw&Vk1{#?_6>0<@0F(Viz5AC(_>$@4O7amcZ5 z1+Ba3HmA&RSl!D9!|NAif4J7Pk=5a;%o=zz)$G;h%T)7!`HXvoU&~tH)6Je<0|n{kgRUvKdxJ&`T!z^* zaamjJ{wxIYcUj=5Rky&gD#M(~yRvXF%S;02U&dBv7nuLbZSl@TUI0?I`S(sA9LzBn zy48iU9P_)b$;R36oSGEmdR2A&B-h-?xqLT$C?XiX&oeLfoQ};mzXDS~gg6JQeD~AY z0_|#cb+I)UA}3f1k!5C>uQvJSV5nJU7T}(AvnRG3IR)k;oNKO`U2d-Qh$n^S2VL8J z@hnZJ3(cMYr^HN2BqdRlL{kz&$s$S?Q?i7TSW4n3iKiril0-_9D6vqIOi2nQsg$Hq zl1@nmB}*yEq$G=yY)Wz{$)$u<`g}^3QBpw3a!Lv*A(X73q==GYN=hgxrKF6Ka!M*F zsidTel4?q7C|OBKEhVccsiR~yCH0i7p=2#3>nLfUX{KOcM*`1Lt;s_L9m zl{!^j{gPPC+WJ`il*1vfa5x0j;czG$5;C2q96f1|n$m_+VcUi7hNlTLg-Xw5!{_FEn~7#V|-)GH-f>$i^2*|jC8cD)N|6|(ZsfB;iARU@y2rDYi8Eb z4Vpg_mUp^O7vs<)WtDSg&M7MugG`Xbe?ik=@?~MSqSiX@pm-4uduLtupeNCC^e~!+ zMxp|g0V@S%2o!DfM#I}z1c2W^7y5WC8>MA4%PK3G)$On=3KhZ6QOF}~4#HKzhrk_; zVuc2XiAMW?M4}$T9+($}3WJ}1A?!xT;eb`wgxlb2hl+%aFuNU7cp6qQf)CrF#B3U7 z|A{bK6-JMTQ7uf4M6m;^!alAHqsPK%MHnp)qetO@50wZD;jWLZWdXR`qP_51TeL6u z`qx6=(TayUZXu5Y+(*z4xxi)a? z4R@2eUaeIZsWa6H>R>3}RvJ^~KHz>)yY616UD4jx&S{O>K5Yx~Ak+9>@1rN^#ri0B zwec_G1LIA-P=88ap_l6y^|Sgh*SoG}S3OR}o$=fFBz{GY(cStU{dte7-`2m@KQlTS zk%r{C>AB|l*mK@<+H<(&=MK-aY{e~qbhuT3Fw6`X222bXI`pYWYmt--=jzcOcqGqA zf{*fyEs$~yH9%!QV+IUpK$l^4t}y`S=$KcUXF0w#OQjsR8AQd< z*w+{4jDZgWthdnDXoLrHj2>|91nLiIIfe;?Uquz5<}kerxyBY_ zbePHK;AFsvg)ah35$76-FwOe$O*X?6>)U_lFsp5>|Lx3Sg7+-JGV6bvvyCk1avEhK zDH|rAMln9AZt-715Swj0203R?sfT$f@!oO{!2Q-tFEMk%`oZMRz>Q-r}x?akYQq@|iv+;rZu2^rpsSqVi z=_>h@C}XQwFPGz0P*H%|LG%u^7m5Ag;~?9>1rXDS65x}YUIV)9L@yw5CN%Fv#Yo%& zul(qZg-a)p2gWzCQ(`ducnpn#!$;9duNcq!Q|unh*^PP&5?&!5n z7sKdTICKWZ!of302iHxn2j(>*k5>!}lSqK>MZdz`pS*53e*zi85LncN3gPNLR1IV= z>woD7)-`t*o8spEs0x~YU^D*8`j&qH#f1*M)`+4qIQ%mU%5w+M4e=_QCON}=G509T z#4iq_1!1~k9+VzJW5T}$;EhAbFPNbI;_V8P*>r-@FQX^eVd8L@t)qxdu=RIu3_|~a zisNj4%T6HOi$3I+jy|Bxh&GF~S)$EO+APy%g*L0SS!--|DHVbvsHWo00&M@*84rh# zs}h{=h5G_&a;o4>!@oggsr-jP1(J>vfox~e%5=OJ&YttfL3OuS4Eeorw~)Y_SeG8* z5IO~t^-f5L%Z!JWy>S6_e#aRXS@cL*`J#mbOUg^nY{ z$hxFRhv2W#lXbs6`Rh6Op+7J7u|TV<>jQXyXzb7skN0gELd&S@!7QcJ24G|+_QCso z@i~Z`EyqE}3nGToxh)F}UeCqRNN|8k@Y_&(E;T?Jf({jjGH z*M-*O2@A$3a2MeYaD1@T0V=+7YVaRLc%&VX;&Y-7?fbVbcyNC_+V z3R(Rid=tK37{yZhJr?67!>|V~4{q@`L=3@q?7n|U^M+s<1`TaBP%#vL7Os+PTRt=l zH*%g|Hyo#j1i`n4<2=C%%?P~A?pg7U>V;QEw5HeP5xBt?g!1mxNGkx={`+nu-e&u# zX;iC^u8+b`L;aIoT2g%1=vIe*Fd9E$D~I+t9n-p51GJJk&p+c{F8*~49{cB9%pKcm zsd6kEB4Rn)Pd&%AdZu_Be$P%kwzDuy9B+jtv^U1Hj0_zuD<)Vg-tJU_jT3Mdg0tDa zmNbo;gj4Jl4V%Q$CZv6567CZE7BP!1XKB`@COKK}VsDnr$#~_TC;0K?7Qc8i>7#5# zv9pr(Si3`~DJ+?+#L>gD5r#K)?gkwrWev_x!Pi0>n~G!uu1v)l_Jrc5VH!GApm`e3 z4o!bkwI&NMfl`b+vVYzS2umVrymXr7!$0=((PHKq>!b-kjE0vj8L@ae9Jvyks69rf zvUO$GWI*0@99uV`1heL5O%F~e@$7%MN*(K1D}L&5-Ew{Hy6k%2^}4Icb@xtH9OA)x%}F3>VUV(ynQjw0E`Fv}4*{ZHx97ty){C&D17pL$!Wdn$}Hg zuVMAB`kneO^+WY-^^|&8-KlO;pHx?<^VR9dr|yX(sk)` z>F?5iNhhSTm!)meGZK}SN%N#>(kQ7&%94_#4pM|9i9fU3@~7d3c704XsdRjk>;- zELg|GUwBx{!x|o*;^9diR`XED!xJH>YFYT>yr`Ck8Xl^7sN!KI56gI1%EJ;K9&w9R zfB1eeFIvPyB@Y!m%;%w;hlhEX$HQD6svhEL4iB?=n8iaW4>Nd}&O-?g#XL;A2URTM zQ|}RjDLhQ(VG<7y@-UHy2|SGFVH^)*Y^bvHel#x{#ltWjhVn3khrv7y;$a{U19<4q zpz7Y%E8<0kJQVPd&jaD19}jsvUAtx+{vw2Y#51Bml;UR;E-aMr9kjg^}mL0l% zlLkthmJhbN_iifs5 z_$;VuJ$c*kqDUShc<}OI@ZjM==fTYb=0S5;afiRki<~@&JRlwf9vmE?-+8#p!*3i^ zp z@hYipWme$+#2!A~tg$yvL1+A9omt^!ErPX3*4nVxF{D*y4z45E2z`~s^9kr!(QLN*D z<;hUe+2w`Uh^5#33GOtcGUvgcbS`EV-_pc*oEH%vv-*W-0G}!uy3R<>^2X; z)p1%&*;Tox#XRiVW9HiS)V(IVBePp}gjDCX%D|d}WIe^c@)qtjp9@{c!>Z@Id(AZa zULM$I*4tKm`%SxShy49!ONsE!ev?(k*8M&1fO+25^By$qT9K9BZZ=wOO%ZHpV)a(Z zK@(ud@6MKzM3!p5l%VQYEP?clxO4<&U)#Lcoe;A-Q8veQ;wLg zBfCB#&ubV`8%#GO95dtX1&lvt9G9)yb z-9vpfiOvmO(No~?7Pk+UHJYnio)3JA4}NQE4awA?xx^mBs;xFPnRYp&EuwQm{bOO| zRxt{OA2)TF(PYlID+(*>OjiZxakH~M*_7kv_Ao;#h$mV+sV$|mn4gBtNrvW|T00nV zoOPXa!h9Ise~bD4p0%DwO$E-JU<*-SHCb6}Rf;LEnv4Fh3-7+#5=uZP%?4ZW*-2~G zEaciCTEb?XUz-fCooC)jJ7r$5R~>p&xO|eCPd(kbOZ>Fh3a_3V9eRd|hRugrdY)a* zvU*CMI~7tt$6DR=(K$8FT>EjeihoK`R#}Vvvd7h z1pB4oMxK9=9j5$#{#o|7Z9OCm7yJ1`Pf!Xaeo8u0(utDJlysq_D<$11=}t*JB?*-D zpd^u!Bueh1B$<+)l%!CSN=YwD(kMx%q&FoQl=Pt_laee-vMI@-BtS`DN^&X5qof}t zgpzzp?x&=Hl0r&~DCtkh07?c@GKiAFlnkL{C?&%v8BWOql#HNcBqgIL8BNI;O2$$$ vj*{_|OrT^UB@a?EiIT~bOrc~dCDSM=rlf?D>6FZ%q?D2}N@ngMv#S3WVhvvR diff --git a/reports/current.md b/reports/current.md index 7b333be..ff747c7 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-03-01 12:56:43 UTC +Generated: 2026-03-01 14:41:00 UTC ## Modules (12 total) @@ -13,10 +13,10 @@ Generated: 2026-03-01 12:56:43 UTC | Status | Count | |--------|-------| | complete | 22 | -| deferred | 363 | +| deferred | 284 | | n_a | 24 | | stub | 1 | -| verified | 3263 | +| verified | 3342 | ## Unit Tests (3257 total) @@ -35,4 +35,4 @@ Generated: 2026-03-01 12:56:43 UTC ## Overall Progress -**5694/6942 items complete (82.0%)** +**5773/6942 items complete (83.2%)**