diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs
index ec6e7d0..f1cad5a 100644
--- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs
@@ -4507,6 +4507,28 @@ public sealed partial class Account : INatsAccount
return string.Empty;
}
+ ///
+ /// Clears the connection-heartbeat timer. Caller must hold the account lock.
+ /// Mirrors Go (a *Account) clearConnectionTimer() in server/events.go.
+ ///
+ internal void ClearConnectionHeartbeatTimer()
+ {
+ ClearTimerLocked(ref _ctmr);
+ }
+
+ ///
+ /// Starts or resets the connection-heartbeat timer.
+ /// Caller must hold the account lock.
+ /// Mirrors Go inline timer setup in sendAccConnsUpdate().
+ ///
+ internal void SetConnectionHeartbeatTimer(long delayMs, Action callback)
+ {
+ if (_ctmr == null)
+ _ctmr = new Timer(_ => callback(), null, delayMs, Timeout.Infinite);
+ else
+ _ctmr.Change(delayMs, Timeout.Infinite);
+ }
+
///
/// Stops and nulls out a timer. Lock must be held by the caller.
/// Mirrors Go clearTimer(t **time.Timer).
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventHelpers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventHelpers.cs
new file mode 100644
index 0000000..efb8601
--- /dev/null
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventHelpers.cs
@@ -0,0 +1,82 @@
+// Copyright 2018-2026 The NATS Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Adapted from server/events.go (package-level helpers) in the NATS server Go source.
+
+namespace ZB.MOM.NatsNet.Server;
+
+///
+/// Package-level helper functions used by the events subsystem.
+/// Mirrors Go package-level functions in server/events.go.
+///
+internal static class EventHelpers
+{
+ // =========================================================================
+ // accForClient — mirrors Go accForClient
+ // =========================================================================
+
+ ///
+ /// Returns the account name for the given client connection.
+ /// Mirrors Go accForClient(c *client) string in server/events.go.
+ ///
+ internal static string AccForClient(ClientConnection c)
+ {
+ var acc = c._account as Account;
+ return acc?.Name ?? "N/A";
+ }
+
+ // =========================================================================
+ // issuerForClient — mirrors Go issuerForClient
+ // =========================================================================
+
+ ///
+ /// Returns the issuer key for the given client connection.
+ /// In Go this inspects c.user.SigningKey / c.user.Account.Name.
+ /// In .NET the NkeyUser signing key is not stored on ClientConnection;
+ /// we return an empty string (same as Go when c.user == nil).
+ /// Mirrors Go issuerForClient(c *client) string in server/events.go.
+ ///
+ internal static string IssuerForClient(ClientConnection c)
+ {
+ // ClientConnection does not expose a stored User/NkeyUser reference
+ // with a SigningKey after registration. Return empty (safe default).
+ return string.Empty;
+ }
+
+ // =========================================================================
+ // respondToUpdate — mirrors Go respondToUpdate
+ // =========================================================================
+
+ ///
+ /// Sends an account JWT update response back to the requesting server.
+ /// Mirrors Go respondToUpdate(s *Server, respSubj, acc, message string, err error)
+ /// in server/accounts.go (called from events.go).
+ ///
+ internal static void RespondToUpdate(NatsServer s, string respSubj, string acc, string message, Exception? err)
+ {
+ if (string.IsNullOrEmpty(respSubj))
+ return;
+
+ string response;
+ if (err != null)
+ response = string.IsNullOrEmpty(acc)
+ ? $"{message}: {err.Message}"
+ : $"[{acc}] {message}: {err.Message}";
+ else
+ response = string.IsNullOrEmpty(acc)
+ ? message
+ : $"[{acc}] {message}";
+
+ s.SendInternalMsg(respSubj, string.Empty, null, response);
+ }
+}
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventTypes.cs
index 540ac72..a3d356b 100644
--- a/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventTypes.cs
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventTypes.cs
@@ -151,13 +151,212 @@ public static class EventIntervals
///
public delegate void SysMsgHandler(
Subscription sub,
- NatsClient client,
+ ClientConnection client,
Account acc,
string subject,
string reply,
byte[] hdr,
byte[] msg);
+// ============================================================================
+// MsgHandler — subscription message callback delegate
+// Mirrors Go msgHandler func type in server/events.go.
+// ============================================================================
+
+///
+/// Callback for a subscription message. Identical signature to
+/// ; the distinction exists in Go but is
+/// collapsed here since both carry the same parameters.
+/// Mirrors Go msgHandler in server/events.go.
+///
+public delegate void MsgHandler(
+ Subscription sub,
+ ClientConnection client,
+ Account acc,
+ string subject,
+ string reply,
+ byte[] hdr,
+ byte[] msg);
+
+// ============================================================================
+// ServerApiError — error payload for server API responses
+// Mirrors Go ApiError used in server/events.go responses.
+// ============================================================================
+
+///
+/// Error payload returned in when a
+/// monitoring z-endpoint request fails.
+/// Mirrors Go ApiError struct used by server API responses.
+///
+public sealed class ServerApiError
+{
+ [System.Text.Json.Serialization.JsonPropertyName("code")]
+ public int Code { get; set; }
+
+ [System.Text.Json.Serialization.JsonPropertyName("description")]
+ public string Description { get; set; } = string.Empty;
+}
+
+// ============================================================================
+// ServerApiResponse — wrapper for server API (z-endpoint) responses
+// Mirrors Go ServerAPIResponse in server/events.go.
+// ============================================================================
+
+///
+/// Standard envelope returned by server monitoring API (varz, connz, etc.)
+/// published via the internal system bus.
+/// Mirrors Go ServerAPIResponse in server/events.go.
+///
+public sealed class ServerApiResponse
+{
+ [System.Text.Json.Serialization.JsonPropertyName("server")]
+ public ServerInfo? Server { get; set; }
+
+ [System.Text.Json.Serialization.JsonPropertyName("data")]
+ [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
+ public object? Data { get; set; }
+
+ [System.Text.Json.Serialization.JsonPropertyName("error")]
+ [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
+ public ServerApiError? Error { get; set; }
+}
+
+// ============================================================================
+// EventFilterOptions — server filter options for z-endpoint requests
+// Mirrors Go EventFilterOptions in server/events.go.
+// ============================================================================
+
+///
+/// Filter parameters sent in monitoring z-endpoint request messages that
+/// allow targeting a specific server, cluster, or set of tags.
+/// Mirrors Go EventFilterOptions in server/events.go.
+///
+public class EventFilterOptions
+{
+ [System.Text.Json.Serialization.JsonPropertyName("server_name")]
+ [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
+ public string? Name { get; set; }
+
+ [System.Text.Json.Serialization.JsonPropertyName("cluster")]
+ [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
+ public string? Cluster { get; set; }
+
+ [System.Text.Json.Serialization.JsonPropertyName("host")]
+ [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
+ public string? Host { get; set; }
+
+ [System.Text.Json.Serialization.JsonPropertyName("domain")]
+ [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
+ public string? Domain { get; set; }
+
+ [System.Text.Json.Serialization.JsonPropertyName("tags")]
+ [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
+ public List? Tags { get; set; }
+
+ ///
+ /// When true, name/cluster/host must match exactly; when false, substring
+ /// matching is used.
+ /// Mirrors Go EventFilterOptions.ExactMatch.
+ ///
+ [System.Text.Json.Serialization.JsonPropertyName("exact")]
+ public bool ExactMatch { get; set; }
+}
+
+// ============================================================================
+// UserInfo — user info response payload for $SYS.REQ.USER.INFO
+// Mirrors Go UserInfo struct in server/events.go.
+// ============================================================================
+
+///
+/// Response payload returned by the $SYS.REQ.USER.INFO endpoint.
+/// Contains the authenticated user's identity, account, permissions, and
+/// claim expiry.
+/// Mirrors Go UserInfo struct in server/events.go.
+///
+public sealed class UserInfo
+{
+ [System.Text.Json.Serialization.JsonPropertyName("userId")]
+ public string UserId { get; set; } = string.Empty;
+
+ [System.Text.Json.Serialization.JsonPropertyName("account")]
+ public string Account { get; set; } = string.Empty;
+
+ [System.Text.Json.Serialization.JsonPropertyName("permissions")]
+ [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
+ public object? Permissions { get; set; }
+
+ [System.Text.Json.Serialization.JsonPropertyName("expires")]
+ [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
+ public DateTime Expires { get; set; }
+}
+
+// ============================================================================
+// KickClientReq / LdmClientReq — client control request payloads
+// Mirrors Go anonymous structs used in server/events.go.
+// ============================================================================
+
+///
+/// Request payload for $SYS.REQ.SERVER.{id}.KICK, which asks this
+/// server to forcefully disconnect the client with the given CID.
+/// Mirrors the anonymous struct used in Server.kickClient().
+///
+internal sealed class KickClientReq
+{
+ [System.Text.Json.Serialization.JsonPropertyName("cid")]
+ public ulong Cid { get; set; }
+}
+
+///
+/// Request payload for $SYS.REQ.SERVER.{id}.LDM, which asks this
+/// server to put the client with the given CID into lame-duck mode.
+/// Mirrors the anonymous struct used in Server.ldmClient().
+///
+internal sealed class LdmClientReq
+{
+ [System.Text.Json.Serialization.JsonPropertyName("cid")]
+ public ulong Cid { get; set; }
+}
+
+// ============================================================================
+// StatszEventOptions — options for statsz z-endpoint requests
+// Mirrors Go StatszEventOptions in server/events.go.
+// ============================================================================
+
+///
+/// Options embedded in z-endpoint request messages that allow filtering and
+/// configuring the statsz response.
+/// Mirrors Go StatszEventOptions in server/events.go.
+///
+public sealed class StatszEventOptions : EventFilterOptions
+{
+}
+
+// ============================================================================
+// AccNumSubsReq — request payload for subscription count queries
+// Mirrors Go accNumSubsReq struct in server/events.go.
+// ============================================================================
+
+///
+/// Payload for $SYS.REQ.ACCOUNT.NSUBS requests, which ask a remote
+/// server how many local subscriptions exist for a given account + subject.
+/// Mirrors Go accNumSubsReq in server/events.go.
+///
+internal sealed class AccNumSubsReq
+{
+ [System.Text.Json.Serialization.JsonPropertyName("server")]
+ public ServerInfo Server { get; set; } = new();
+
+ [System.Text.Json.Serialization.JsonPropertyName("acc")]
+ public string Account { get; set; } = string.Empty;
+
+ [System.Text.Json.Serialization.JsonPropertyName("subject")]
+ public string Subject { get; set; } = string.Empty;
+
+ [System.Text.Json.Serialization.JsonPropertyName("queue")]
+ [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
+ public string? Queue { get; set; }
+}
+
// ============================================================================
// InSysMsg — queued internal system message
// Mirrors Go inSysMsg struct in server/events.go.
@@ -170,9 +369,9 @@ public delegate void SysMsgHandler(
///
internal sealed class InSysMsg
{
- public Subscription? Sub { get; set; }
- public NatsClient? Client { get; set; }
- public Account? Acc { get; set; }
+ public Subscription? Sub { get; set; }
+ public ClientConnection? Client { get; set; }
+ public Account? Acc { get; set; }
public string Subject { get; set; } = string.Empty;
public string Reply { get; set; } = string.Empty;
public byte[]? Hdr { get; set; }
@@ -195,10 +394,10 @@ internal sealed class InSysMsg
internal sealed class InternalState
{
// ---- identity / sequencing ----
- public Account? Account { get; set; }
- public NatsClient? Client { get; set; }
- public ulong Seq { get; set; }
- public int Sid { get; set; }
+ public Account? Account { get; set; }
+ public ClientConnection? Client { get; set; }
+ public ulong Seq { get; set; }
+ public int Sid { get; set; }
// ---- remote server tracking ----
/// Map of server ID → serverUpdate. Mirrors Go servers map[string]*serverUpdate.
@@ -216,7 +415,7 @@ internal sealed class InternalState
/// Pending reply subject → handler map.
/// Mirrors Go replies map[string]msgHandler.
///
- public Dictionary> Replies { get; set; } = new();
+ public Dictionary> Replies { get; set; } = new();
// ---- queues ----
/// Outbound message send queue. Mirrors Go sendq *ipQueue[*pubMsg].
@@ -289,8 +488,8 @@ internal sealed class ServerUpdate
///
internal sealed class PubMsg
{
- public NatsClient? Client { get; set; }
- public string Subject { get; set; } = string.Empty;
+ public ClientConnection? Client { get; set; }
+ public string Subject { get; set; } = string.Empty;
public string Reply { get; set; } = string.Empty;
public ServerInfo? Si { get; set; }
public byte[]? Hdr { get; set; }
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Events.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Events.cs
new file mode 100644
index 0000000..3030041
--- /dev/null
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Events.cs
@@ -0,0 +1,2196 @@
+// Copyright 2018-2026 The NATS Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Adapted from server/events.go in the NATS server Go source.
+// Batch 45: Server.* events methods — stats, remote tracking, connection events,
+// request handlers, leaf/gateway events, auth events, OCSP events, misc.
+
+using System.Diagnostics;
+using System.Net;
+using System.Runtime.InteropServices;
+using System.Security.Cryptography.X509Certificates;
+using System.Text;
+using System.Text.Json;
+using ZB.MOM.NatsNet.Server.Auth.CertificateIdentityProvider;
+using ZB.MOM.NatsNet.Server.Internal;
+
+namespace ZB.MOM.NatsNet.Server;
+
+///
+/// Partial class containing the server-level events subsystem methods.
+/// Mirrors server/events.go in the NATS server Go source.
+///
+public sealed partial class NatsServer
+{
+ // =========================================================================
+ // Core event infrastructure — EventsEnabled / EventsRunning
+ // =========================================================================
+
+ ///
+ /// Reports whether the server has internal events enabled via a system account.
+ /// Mirrors Go Server.EventsEnabled().
+ ///
+ public bool EventsEnabled()
+ {
+ _mu.EnterReadLock();
+ try { return EventsEnabledLocked(); }
+ finally { _mu.ExitReadLock(); }
+ }
+
+ ///
+ /// Reports whether events are enabled.
+ /// Server read lock must be held on entry.
+ /// Mirrors Go Server.eventsEnabled().
+ ///
+ internal bool EventsEnabledLocked() =>
+ _sys != null && _sys.Client != null && _sys.Account != null;
+
+ ///
+ /// Locked version of checking if events system is running.
+ /// Mirrors Go Server.eventsRunning().
+ ///
+ internal bool EventsRunning()
+ {
+ _mu.EnterReadLock();
+ bool er = Running() && EventsEnabledLocked();
+ _mu.ExitReadLock();
+ return er;
+ }
+
+ // =========================================================================
+ // sendInternalMsg / sendInternalMsgLocked / sendInternalResponse
+ // =========================================================================
+
+ ///
+ /// Queues up an internal message to be sent by the internal send loop.
+ /// Server read lock must be held on entry.
+ /// Mirrors Go Server.sendInternalMsg().
+ ///
+ internal void SendInternalMsg(string subj, string reply, ServerInfo? si, object? msg)
+ {
+ if (_sys?.SendQueue == null) return;
+ var pm = new PubMsg
+ {
+ Client = null,
+ Subject = subj,
+ Reply = reply,
+ Si = si,
+ Hdr = null,
+ Msg = msg,
+ };
+ _sys.SendQueue.Push(pm);
+ }
+
+ ///
+ /// Queues up an internal message; acquires/releases the read lock itself.
+ /// Mirrors Go Server.sendInternalMsgLocked().
+ ///
+ internal void SendInternalMsgLocked(string subj, string reply, ServerInfo? si, object? msg)
+ {
+ _mu.EnterReadLock();
+ SendInternalMsg(subj, reply, si, msg);
+ _mu.ExitReadLock();
+ }
+
+ ///
+ /// Queues an internal message on behalf of a specific account's internal client.
+ /// Mirrors Go Server.sendInternalAccountMsg().
+ ///
+ internal void SendInternalAccountMsg(Account? a, string subject, object? msg)
+ => SendInternalAccountMsgWithReply(a, subject, string.Empty, null, msg, false);
+
+ ///
+ /// Queues an internal message with an optional reply to an arbitrary account.
+ /// Mirrors Go Server.sendInternalAccountMsgWithReply().
+ ///
+ internal void SendInternalAccountMsgWithReply(Account? a, string subject, string reply, byte[]? hdr, object? msg, bool echo)
+ {
+ _mu.EnterReadLock();
+ if (_sys?.SendQueue == null)
+ {
+ _mu.ExitReadLock();
+ return;
+ }
+ ClientConnection? c = _sys.Client;
+ if (a != null)
+ {
+ lock (a)
+ {
+ c = a.InternalClient;
+ }
+ }
+ _sys.SendQueue.Push(new PubMsg
+ {
+ Client = c,
+ Subject = subject,
+ Reply = reply,
+ Si = null,
+ Hdr = hdr,
+ Msg = msg,
+ Echo = echo,
+ });
+ _mu.ExitReadLock();
+ }
+
+ ///
+ /// Queues an account-scoped system message.
+ /// Mirrors Go Server.sendInternalAccountSysMsg().
+ ///
+ internal void SendInternalAccountSysMsg(Account? a, string subj, ServerInfo? si, object? msg, int ct)
+ {
+ _mu.EnterReadLock();
+ if (_sys?.SendQueue == null || a == null)
+ {
+ _mu.ExitReadLock();
+ return;
+ }
+ var sendq = _sys.SendQueue;
+ _mu.ExitReadLock();
+
+ ClientConnection? c;
+ lock (a)
+ {
+ c = a.InternalClient;
+ }
+ sendq.Push(new PubMsg
+ {
+ Client = c,
+ Subject = subj,
+ Reply = string.Empty,
+ Si = si,
+ Hdr = null,
+ Msg = msg,
+ Oct = ct,
+ });
+ }
+
+ ///
+ /// Sends a server API response (varz, connz, etc.) via the internal send loop.
+ /// Mirrors Go Server.sendInternalResponse().
+ ///
+ internal void SendInternalResponse(string subj, ServerApiResponse response)
+ {
+ _mu.EnterReadLock();
+ if (_sys?.SendQueue == null)
+ {
+ _mu.ExitReadLock();
+ return;
+ }
+ _sys.SendQueue.Push(new PubMsg
+ {
+ Client = null,
+ Subject = subj,
+ Reply = string.Empty,
+ Si = response.Server,
+ Hdr = null,
+ Msg = response,
+ });
+ _mu.ExitReadLock();
+ }
+
+ // =========================================================================
+ // nextEventID
+ // =========================================================================
+
+ ///
+ /// Generates a unique event ID.
+ /// Server lock should be held.
+ /// Mirrors Go Server.nextEventID().
+ ///
+ private string NextEventID() => Guid.NewGuid().ToString("N");
+
+ // =========================================================================
+ // wrapChk — executes f under server write-lock, skips if events disabled
+ // =========================================================================
+
+ ///
+ /// Returns a closure that acquires the server write-lock, checks that events
+ /// are enabled, calls , then releases the lock.
+ /// Mirrors Go Server.wrapChk().
+ ///
+ internal Action WrapChk(Action f) => () =>
+ {
+ _mu.EnterWriteLock();
+ if (!EventsEnabledLocked())
+ {
+ _mu.ExitWriteLock();
+ return;
+ }
+ try { f(); }
+ finally { _mu.ExitWriteLock(); }
+ };
+
+ // =========================================================================
+ // noInlineCallback family
+ // =========================================================================
+
+ ///
+ /// Wraps a so that it is queued on the
+ /// regular receive queue rather than called inline.
+ /// Mirrors Go Server.noInlineCallback().
+ ///
+ internal MsgHandler? NoInlineCallback(SysMsgHandler cb) =>
+ NoInlineCallbackRecvQSelect(cb, recvQMuxed);
+
+ ///
+ /// Wraps a so that it is queued on the
+ /// priority (statsz) receive queue.
+ /// Mirrors Go Server.noInlineCallbackStatsz().
+ ///
+ internal MsgHandler? NoInlineCallbackStatsz(SysMsgHandler cb) =>
+ NoInlineCallbackRecvQSelect(cb, recvQStatsz);
+
+ private const int recvQMuxed = 1;
+ private const int recvQStatsz = 2;
+
+ private MsgHandler? NoInlineCallbackRecvQSelect(SysMsgHandler cb, int recvQSelect)
+ {
+ _mu.EnterReadLock();
+ if (!EventsEnabledLocked())
+ {
+ _mu.ExitReadLock();
+ return null;
+ }
+ IpQueue? recvq = recvQSelect == recvQStatsz
+ ? _sys!.RecvQueuePriority
+ : _sys!.RecvQueue;
+ _mu.ExitReadLock();
+
+ return (sub, c, acc, subj, rply, hdr, rmsg) =>
+ {
+ recvq?.Push(new InSysMsg
+ {
+ Sub = sub,
+ Client = c,
+ Acc = acc,
+ Subject = subj,
+ Reply = rply,
+ Hdr = hdr,
+ Msg = rmsg,
+ Cb = cb,
+ });
+ };
+ }
+
+ // =========================================================================
+ // sysSubscribe / sysSubscribeQ / sysSubscribeInternal / systemSubscribe
+ // =========================================================================
+
+ ///
+ /// Creates an internal subscription on the system account.
+ /// Mirrors Go Server.sysSubscribe().
+ ///
+ internal (Subscription? Sub, Exception? Error) SysSubscribe(string subject, MsgHandler? cb) =>
+ SystemSubscribe(subject, string.Empty, false, null, cb);
+
+ ///
+ /// Creates an internal subscription with a queue group.
+ /// Mirrors Go Server.sysSubscribeQ().
+ ///
+ internal (Subscription? Sub, Exception? Error) SysSubscribeQ(string subject, string queue, MsgHandler? cb) =>
+ SystemSubscribe(subject, queue, false, null, cb);
+
+ ///
+ /// Creates an internal subscription but does not forward interest.
+ /// Mirrors Go Server.sysSubscribeInternal().
+ ///
+ internal (Subscription? Sub, Exception? Error) SysSubscribeInternal(string subject, MsgHandler? cb) =>
+ SystemSubscribe(subject, string.Empty, true, null, cb);
+
+ ///
+ /// Core system-subscribe implementation.
+ /// Mirrors Go Server.systemSubscribe().
+ ///
+ private (Subscription? Sub, Exception? Error) SystemSubscribe(string subject, string queue, bool internalOnly, ClientConnection? c, MsgHandler? cb)
+ {
+ _mu.EnterWriteLock();
+ if (!EventsEnabledLocked())
+ {
+ _mu.ExitWriteLock();
+ return (null, new InvalidOperationException("no system account"));
+ }
+ if (cb == null)
+ {
+ _mu.ExitWriteLock();
+ return (null, new ArgumentNullException(nameof(cb), "undefined message handler"));
+ }
+ if (c == null) c = _sys!.Client;
+ _sys!.Sid++;
+ var sid = _sys.Sid.ToString();
+ _mu.ExitWriteLock();
+
+ // Create the subscription via the system client stub.
+ // In a full implementation this calls c.processSub().
+ // Stub: return a synthetic Subscription object.
+ var sub = new Subscription
+ {
+ Subject = Encoding.ASCII.GetBytes(subject),
+ Queue = string.IsNullOrEmpty(queue) ? null : Encoding.ASCII.GetBytes(queue),
+ Sid = Encoding.ASCII.GetBytes(sid),
+ };
+ return (sub, null);
+ }
+
+ // =========================================================================
+ // inboxReply
+ // =========================================================================
+
+ ///
+ /// Handles replies to the server's internal inbox wildcard subscription.
+ /// Mirrors Go Server.inboxReply().
+ ///
+ private void InboxReply(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg)
+ {
+ _mu.EnterReadLock();
+ if (!EventsEnabledLocked() || _sys!.Replies == null)
+ {
+ _mu.ExitReadLock();
+ return;
+ }
+ _sys.Replies.TryGetValue(subject, out var cb);
+ _mu.ExitReadLock();
+
+ cb?.Invoke(sub, c, acc, subject, reply, hdr, msg);
+ }
+
+ // =========================================================================
+ // newRespInbox
+ // =========================================================================
+
+ ///
+ /// Creates a short-lived reply inbox for a single in-flight request.
+ /// Mirrors Go Server.newRespInbox().
+ ///
+ private string NewRespInbox()
+ {
+ var prefix = _sys?.InboxPrefix ?? "$SYS._INBOX.";
+ return $"{prefix}{Guid.NewGuid():N}";
+ }
+
+ // =========================================================================
+ // InitEventTracking
+ // =========================================================================
+
+ ///
+ /// Sets up all system-wide subscriptions for tracking remote servers,
+ /// accounts, connections, and stats.
+ /// Mirrors Go Server.initEventTracking().
+ ///
+ internal void InitEventTracking()
+ {
+ _mu.EnterReadLock();
+ var sys = _sys;
+ _mu.ExitReadLock();
+
+ if (sys?.Client == null || sys.Account == null)
+ return;
+
+ // Create a hash used for server-targeted messages.
+ sys.ShortHash = GetHash(_info.Name);
+
+ // Inbox responses wildcard.
+ var subject = string.Format(SystemSubjects.InboxRespSubj, sys.ShortHash, "*");
+ if (SysSubscribe(subject, (s, c, a, subj, rply, hdr, m) =>
+ InboxReply(s, c, a, subj, rply, hdr ?? [], m ?? [])).Error is { } e1)
+ {
+ Errorf("Error setting up internal tracking: {0}", e1);
+ return;
+ }
+ sys.InboxPrefix = subject;
+
+ // Remote connection accounting (old subject).
+ subject = string.Format(SystemSubjects.AccConnsEventSubjOld, "*");
+ if (SysSubscribe(subject, NoInlineCallback(RemoteConnsUpdate)).Error is { } e2)
+ {
+ Errorf("Error setting up internal tracking for {0}: {1}", subject, e2);
+ return;
+ }
+
+ // Response for account info requests.
+ subject = string.Format(SystemSubjects.ConnsRespSubj, _info.Id);
+ if (SysSubscribe(subject, NoInlineCallback(RemoteConnsUpdate)).Error is { } e3)
+ {
+ Errorf("Error setting up internal tracking: {0}", e3);
+ return;
+ }
+
+ // Account nsubs requests.
+ if (SysSubscribe(SystemSubjects.AccNumSubsReqSubj, NoInlineCallback(NsubsRequest)).Error is { } e4)
+ {
+ Errorf("Error setting up internal tracking: {0}", e4);
+ return;
+ }
+
+ // Remote server statsz broadcast.
+ subject = string.Format(SystemSubjects.ServerStatsSubj, "*");
+ var (rss, e5) = SysSubscribe(subject, NoInlineCallback(RemoteServerUpdate));
+ if (e5 != null)
+ {
+ Errorf("Error setting up internal tracking: {0}", e5);
+ return;
+ }
+ sys.RemoteStatsSub = rss;
+
+ // Remote server shutdown events.
+ subject = string.Format(SystemSubjects.ShutdownEventSubj, "*");
+ if (SysSubscribe(subject, NoInlineCallback(RemoteServerShutdown)).Error is { } e6)
+ {
+ Errorf("Error setting up internal tracking: {0}", e6);
+ return;
+ }
+
+ // Lame-duck events (treated same as shutdown for now).
+ subject = string.Format(SystemSubjects.LameDuckEventSubj, "*");
+ if (SysSubscribe(subject, NoInlineCallback(RemoteServerShutdown)).Error is { } e7)
+ {
+ Errorf("Error setting up internal tracking: {0}", e7);
+ return;
+ }
+
+ // Account claim update subscriptions.
+ bool subscribeToUpdate = true;
+ if (_accResolver != null)
+ subscribeToUpdate = !_accResolver.IsTrackingUpdate();
+ if (subscribeToUpdate)
+ {
+ foreach (var updSubj in new[] { SystemSubjects.AccUpdateEventSubjOld, SystemSubjects.AccUpdateEventSubjNew })
+ {
+ if (SysSubscribe(string.Format(updSubj, "*"), NoInlineCallback(AccountClaimUpdate)).Error is { } eu)
+ {
+ Errorf("Error setting up internal tracking: {0}", eu);
+ return;
+ }
+ }
+ }
+
+ // Legacy statsz ping (kept for backwards compatibility).
+ if (SysSubscribe(SystemSubjects.ServerStatsPingReqSubj, NoInlineCallbackStatsz(StatszReq)).Error is { } e8)
+ {
+ Errorf("Error setting up internal tracking: {0}", e8);
+ return;
+ }
+
+ // Server-level monitoring services.
+ RegisterServerMonSubs();
+
+ // Account-level monitoring services.
+ RegisterAccMonSubs();
+
+ // User info (do not propagate interest).
+ if (SysSubscribeInternal(string.Format(SystemSubjects.UserDirectReqSubj, "*"), NoInlineCallback(UserInfoReq)).Error is { } e9)
+ {
+ Errorf("Error setting up internal tracking: {0}", e9);
+ return;
+ }
+
+ // STATZ account ping equivalent.
+ if (SysSubscribe(string.Format(SystemSubjects.AccPingReqSubj, "STATZ"),
+ NoInlineCallback(StatzAccPingReq)).Error is { } e10)
+ {
+ Errorf("Error setting up internal tracking: {0}", e10);
+ return;
+ }
+
+ // Leaf node connect events.
+ subject = string.Format(SystemSubjects.LeafNodeConnectEventSubj, "*");
+ if (SysSubscribe(subject, NoInlineCallback(LeafNodeConnected)).Error is { } e11)
+ {
+ Errorf("Error setting up internal tracking: {0}", e11);
+ return;
+ }
+
+ // Remote latency tracking.
+ subject = string.Format(SystemSubjects.RemoteLatencyEventSubj, sys.ShortHash);
+ if (SysSubscribe(subject, NoInlineCallback(RemoteLatencyUpdate)).Error is { } e12)
+ {
+ Errorf("Error setting up internal latency tracking: {0}", e12);
+ return;
+ }
+
+ // Debug subscribers.
+ if (SysSubscribeInternal(SystemSubjects.AccSubsSubj, NoInlineCallback(DebugSubscribers)).Error is { } e13)
+ {
+ Errorf("Error setting up internal debug service for subscribers: {0}", e13);
+ return;
+ }
+
+ // Server reload request.
+ subject = string.Format(SystemSubjects.ServerReloadReqSubj, _info.Id);
+ if (SysSubscribe(subject, NoInlineCallback(ReloadConfigHandler)).Error is { } e14)
+ {
+ Errorf("Error setting up server reload handler: {0}", e14);
+ return;
+ }
+
+ // Client kick.
+ subject = string.Format(SystemSubjects.ClientKickReqSubj, _info.Id);
+ if (SysSubscribe(subject, NoInlineCallback(KickClient)).Error is { } e15)
+ {
+ Errorf("Error setting up client kick service: {0}", e15);
+ return;
+ }
+
+ // Client LDM.
+ subject = string.Format(SystemSubjects.ClientLdmReqSubj, _info.Id);
+ if (SysSubscribe(subject, NoInlineCallback(LdmClient)).Error is { } e16)
+ {
+ Errorf("Error setting up client LDM service: {0}", e16);
+ }
+ }
+
+ // Helper: register per-server monitoring subscriptions.
+ private void RegisterServerMonSubs()
+ {
+ // Direct and ping subscriptions for each monitoring verb.
+ void RegisterVerb(string name, MsgHandler? h)
+ {
+ if (h == null) return;
+ var subject = string.Format(SystemSubjects.ServerDirectReqSubj, _info.Id, name);
+ if (SysSubscribe(subject, h).Error is { } e1)
+ { Errorf("Error setting up internal tracking: {0}", e1); return; }
+ subject = string.Format(SystemSubjects.ServerPingReqSubj, name);
+ if (SysSubscribe(subject, h).Error is { } e2)
+ { Errorf("Error setting up internal tracking: {0}", e2); }
+ }
+
+ RegisterVerb("IDZ", NoInlineCallback(IdzReq));
+ RegisterVerb("STATSZ", NoInlineCallbackStatsz(StatszReq));
+ RegisterVerb("VARZ", NoInlineCallback(VarzReq));
+ RegisterVerb("SUBSZ", NoInlineCallback(SubszReq));
+ RegisterVerb("CONNZ", NoInlineCallback(ConnzReq));
+ RegisterVerb("ROUTEZ", NoInlineCallback(RoutezReq));
+ RegisterVerb("GATEWAYZ", NoInlineCallback(GatewayzReq));
+ RegisterVerb("LEAFZ", NoInlineCallback(LeafzReq));
+ RegisterVerb("ACCOUNTZ", NoInlineCallback(AccountzReq));
+ RegisterVerb("JSZ", NoInlineCallback(JszReq));
+ RegisterVerb("HEALTHZ", NoInlineCallback(HealthzReq));
+ RegisterVerb("EXPVARZ", NoInlineCallback(ExpvarzReq));
+ RegisterVerb("IPQUEUESZ", NoInlineCallback(IpqueueszReq));
+ RegisterVerb("RAFTZ", NoInlineCallback(RaftzReq));
+ }
+
+ // Helper: register per-account monitoring subscriptions.
+ private void RegisterAccMonSubs()
+ {
+ void RegisterAccVerb(string name, MsgHandler? h)
+ {
+ if (h == null) return;
+ var subject = string.Format(SystemSubjects.AccDirectReqSubj, "*", name);
+ if (SysSubscribe(subject, h).Error is { } e)
+ Errorf("Error setting up internal tracking: {0}", e);
+ }
+
+ RegisterAccVerb("SUBSZ", NoInlineCallback(AccSubszReq));
+ RegisterAccVerb("CONNZ", NoInlineCallback(AccConnzReq));
+ RegisterAccVerb("LEAFZ", NoInlineCallback(AccLeafzReq));
+ RegisterAccVerb("JSZ", NoInlineCallback(AccJszReq));
+ RegisterAccVerb("INFO", NoInlineCallback(AccInfoReq));
+ RegisterAccVerb("STATZ", NoInlineCallback(AccStatzReq));
+ RegisterAccVerb("CONNS", NoInlineCallback(ConnsRequest));
+ }
+
+ // =========================================================================
+ // Group A: Stats & Heartbeat
+ // =========================================================================
+
+ // -------------------------------------------------------------------------
+ // TrackedRemoteServers
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Returns how many remote servers are being tracked from a system events perspective.
+ /// Returns -1 if events are not enabled.
+ /// Mirrors Go Server.TrackedRemoteServers().
+ ///
+ public int TrackedRemoteServers()
+ {
+ _mu.EnterReadLock();
+ try
+ {
+ if (!Running() || !EventsEnabledLocked()) return -1;
+ return _sys!.Servers.Count;
+ }
+ finally { _mu.ExitReadLock(); }
+ }
+
+ // -------------------------------------------------------------------------
+ // updateServerUsage
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Captures RSS/CPU usage into the advisory stats struct.
+ /// Mirrors Go Server.updateServerUsage().
+ ///
+ private static void UpdateServerUsage(ServerStatsAdvisory v)
+ {
+ v.Cores = Environment.ProcessorCount;
+ v.MaxProcs = Environment.ProcessorCount;
+ try
+ {
+ using var proc = Process.GetCurrentProcess();
+ v.Mem = proc.WorkingSet64;
+ // CPU is expensive to compute accurately — leave at 0 for now.
+ }
+ catch { /* ignore */ }
+ }
+
+ // -------------------------------------------------------------------------
+ // sendStatsz
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Builds and sends a advisory on the given subject.
+ /// Mirrors Go Server.sendStatsz().
+ ///
+ internal void SendStatsz(string subj)
+ {
+ var m = new ServerStatsMsg();
+ UpdateServerUsage(m.Stats);
+
+ if (LimitStatsz(subj)) return;
+
+ _mu.EnterReadLock();
+ try
+ {
+ if (_sys?.Account == null) return;
+
+ // Standalone mode: check for interest before sending.
+ var opts = GetOpts();
+ bool checkInterest = opts.Cluster.Port == 0 && opts.Gateway.Port == 0 && opts.LeafNode.Port == 0
+ && opts.LeafNode.Remotes.Count == 0;
+
+ if (checkInterest)
+ {
+ var sacc = _sys.Account;
+ if (sacc?.Sublist != null)
+ {
+ var rr = sacc.Sublist.Match(subj);
+ int total = rr.PSubs.Count + rr.QSubs.Count;
+ if (total == 0) return;
+ if (total == 1 && rr.PSubs.Count == 1 && rr.PSubs[0] == _sys.RemoteStatsSub) return;
+ }
+ }
+
+ m.Stats.Start = _start;
+ m.Stats.Connections = _clients.Count;
+ m.Stats.TotalConnections = _totalClients;
+ m.Stats.ActiveAccounts = Interlocked.CompareExchange(ref _activeAccounts, 0, 0);
+ m.Stats.Received = new DataStats { Msgs = Interlocked.Read(ref _stats.InMsgs), Bytes = Interlocked.Read(ref _stats.InBytes) };
+ m.Stats.Sent = new DataStats { Msgs = Interlocked.Read(ref _stats.OutMsgs), Bytes = Interlocked.Read(ref _stats.OutBytes) };
+ m.Stats.SlowConsumers = Interlocked.Read(ref _stats.SlowConsumers);
+
+ var scs = new SlowConsumersStats
+ {
+ Clients = (ulong)NumSlowConsumersClients(),
+ Routes = (ulong)NumSlowConsumersRoutes(),
+ Gateways = (ulong)NumSlowConsumersGateways(),
+ Leafs = (ulong)NumSlowConsumersLeafs(),
+ };
+ if (scs.Clients != 0 || scs.Routes != 0 || scs.Gateways != 0 || scs.Leafs != 0)
+ m.Stats.SlowConsumersStats = scs;
+
+ m.Stats.StaleConnections = Interlocked.Read(ref _stats.StaleConnections);
+ m.Stats.StalledClients = Interlocked.Read(ref _stats.Stalls);
+
+ var stcs = new StaleConnectionStats
+ {
+ Clients = (ulong)NumStaleConnectionsClients(),
+ Routes = (ulong)NumStaleConnectionsRoutes(),
+ Gateways = (ulong)NumStaleConnectionsGateways(),
+ Leafs = (ulong)NumStaleConnectionsLeafs(),
+ };
+ if (stcs.Clients != 0 || stcs.Routes != 0 || stcs.Gateways != 0 || stcs.Leafs != 0)
+ m.Stats.StaleConnectionStats = stcs;
+
+ m.Stats.NumSubs = NumSubscriptions();
+ m.Stats.ActiveServers = _sys.Servers.Count + 1;
+
+ // Routes.
+ ForEachRoute(r =>
+ {
+ var rs = new RouteStat { Id = r.Cid };
+ lock (r)
+ {
+ rs.Sent = new DataStats { Msgs = 0L, Bytes = 0L };
+ rs.Received = new DataStats { Msgs = 0L, Bytes = 0L };
+ rs.Pending = (int)(r.OutPb >> 10);
+ if (r.Route != null) rs.Name = r.Route.RemoteName;
+ }
+ m.Stats.Routes ??= [];
+ m.Stats.Routes.Add(rs);
+ });
+
+ SendInternalMsg(subj, string.Empty, m.Server, m);
+ }
+ finally { _mu.ExitReadLock(); }
+ }
+
+ // -------------------------------------------------------------------------
+ // limitStatsz
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Rate-limits statsz publishes to one per .
+ /// Returns true if the publish should be skipped.
+ /// Mirrors Go Server.limitStatsz().
+ ///
+ internal bool LimitStatsz(string subj)
+ {
+ _mu.EnterWriteLock();
+ try
+ {
+ if (_sys == null) return true;
+
+ // Only limit the normal broadcast subject.
+ if (subj != string.Format(SystemSubjects.ServerStatsSubj, ID()))
+ return false;
+
+ var interval = EventIntervals.DefaultStatszRateLimit;
+ if (_sys.ClientStatszInterval < interval && _sys.ClientStatszInterval > TimeSpan.Zero)
+ interval = _sys.ClientStatszInterval;
+
+ if (DateTime.UtcNow - _sys.LastStatsz < interval)
+ {
+ // Reschedule heartbeat for the next interval.
+ _sys.StatsMsgTimer?.Change((long)(_sys.LastStatsz.Add(interval) - DateTime.UtcNow).TotalMilliseconds, Timeout.Infinite);
+ return true;
+ }
+ _sys.LastStatsz = DateTime.UtcNow;
+ return false;
+ }
+ finally { _mu.ExitWriteLock(); }
+ }
+
+ // -------------------------------------------------------------------------
+ // heartbeatStatsz
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Called by the statsz timer; ramps up the send interval to the max.
+ /// Must be wrapped with .
+ /// Mirrors Go Server.heartbeatStatsz().
+ ///
+ private void HeartbeatStatsz()
+ {
+ if (_sys!.StatsMsgTimer != null)
+ {
+ if (_sys.ClientStatszInterval < _sys.StatszInterval)
+ {
+ _sys.ClientStatszInterval = _sys.ClientStatszInterval + _sys.ClientStatszInterval;
+ if (_sys.ClientStatszInterval > _sys.StatszInterval)
+ _sys.ClientStatszInterval = _sys.StatszInterval;
+ }
+ _sys.StatsMsgTimer.Change((long)_sys.ClientStatszInterval.TotalMilliseconds, Timeout.Infinite);
+ }
+ Task.Run(SendStatszUpdate);
+ }
+
+ // -------------------------------------------------------------------------
+ // resetLastStatsz
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Resets the last-sent timestamp so the next publish is not rate-limited.
+ /// Must be wrapped with .
+ /// Mirrors Go Server.resetLastStatsz().
+ ///
+ private void ResetLastStatsz()
+ {
+ if (_sys != null) _sys.LastStatsz = default;
+ }
+
+ // -------------------------------------------------------------------------
+ // sendStatszUpdate
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Triggers a statsz advisory publish on the server's broadcast subject.
+ /// Mirrors Go Server.sendStatszUpdate().
+ ///
+ private void SendStatszUpdate() =>
+ SendStatsz(string.Format(SystemSubjects.ServerStatsSubj, ID()));
+
+ // -------------------------------------------------------------------------
+ // startStatszTimer
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Starts the statsz heartbeat timer, beginning at 250ms and backing off.
+ /// Must be wrapped with .
+ /// Mirrors Go Server.startStatszTimer().
+ ///
+ private void StartStatszTimer()
+ {
+ _sys!.ClientStatszInterval = TimeSpan.FromMilliseconds(250);
+ var delay = (long)_sys.ClientStatszInterval.TotalMilliseconds;
+ _sys.StatsMsgTimer = new System.Threading.Timer(_ => WrapChk(HeartbeatStatsz)(), null, delay, Timeout.Infinite);
+ }
+
+ // -------------------------------------------------------------------------
+ // startRemoteServerSweepTimer
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Starts the timer that periodically sweeps for orphaned remote servers.
+ /// Must be wrapped with .
+ /// Mirrors Go Server.startRemoteServerSweepTimer().
+ ///
+ private void StartRemoteServerSweepTimer()
+ {
+ var delay = (long)_sys!.CheckOrphan.TotalMilliseconds;
+ _sys.Sweeper = new System.Threading.Timer(_ => WrapChk(CheckRemoteServers)(), null, delay, Timeout.Infinite);
+ }
+
+ // -------------------------------------------------------------------------
+ // checkRemoteServers
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Removes remote server entries that have not sent a heartbeat within the
+ /// orphan timeout window.
+ /// Must be wrapped with .
+ /// Mirrors Go Server.checkRemoteServers().
+ ///
+ private void CheckRemoteServers()
+ {
+ var now = DateTime.UtcNow;
+ var orphans = new List();
+ foreach (var (sid, su) in _sys!.Servers)
+ {
+ if (now - su.LTime > _sys.OrphanMax)
+ {
+ Debugf("Detected orphan remote server: {0}", sid);
+ orphans.Add(sid);
+ }
+ }
+ foreach (var sid in orphans)
+ ProcessRemoteServerShutdown(sid);
+
+ _sys.Sweeper?.Change((long)_sys.CheckOrphan.TotalMilliseconds, Timeout.Infinite);
+ }
+
+ // =========================================================================
+ // Group B: Remote Server Tracking
+ // =========================================================================
+
+ // -------------------------------------------------------------------------
+ // accountClaimUpdate
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Handles an account JWT claim-update message from another server.
+ /// Mirrors Go Server.accountClaimUpdate().
+ ///
+ private void AccountClaimUpdate(Subscription sub, ClientConnection c, Account acc, string subject, string resp, byte[] hdr, byte[] msg)
+ {
+ if (!EventsEnabled()) return;
+
+ var toks = subject.Split('.');
+ string pubKey;
+ if (toks.Length == SystemSubjects.AccUpdateTokensNew)
+ pubKey = toks[SystemSubjects.AccReqAccIndex];
+ else if (toks.Length == SystemSubjects.AccUpdateTokensOld)
+ pubKey = toks[SystemSubjects.AccUpdateAccIdxOld];
+ else
+ {
+ Debugf("Received account claims update on bad subject {0}", subject);
+ return;
+ }
+
+ if (msg.Length == 0)
+ {
+ EventHelpers.RespondToUpdate(this, resp, pubKey, "jwt update error", new InvalidOperationException("request body is empty"));
+ }
+ else
+ {
+ // Try to update the account with the new JWT.
+ if (!_accounts.TryGetValue(pubKey, out var account))
+ {
+ EventHelpers.RespondToUpdate(this, resp, pubKey, "jwt update skipped", null);
+ return;
+ }
+ var err = UpdateAccountWithClaimJwt(account, Encoding.UTF8.GetString(msg));
+ if (err != null)
+ EventHelpers.RespondToUpdate(this, resp, pubKey, "jwt update resulted in error", err);
+ else
+ EventHelpers.RespondToUpdate(this, resp, pubKey, "jwt updated", null);
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // processRemoteServerShutdown
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Handles a remote server going away: cleans up account remote-server maps
+ /// and removes from nodeToInfo.
+ /// Server lock must be held on entry.
+ /// Mirrors Go Server.processRemoteServerShutdown().
+ ///
+ private void ProcessRemoteServerShutdown(string sid)
+ {
+ foreach (var acc in _accounts.Values)
+ acc.RemoveRemoteServer(sid);
+
+ foreach (var key in _nodeToInfo.Keys.ToList())
+ {
+ if (_nodeToInfo.TryGetValue(key, out var v) && v is NodeInfo ni && ni.Id == sid)
+ {
+ var updated = new NodeInfo { Name = ni.Name, Version = ni.Version, Cluster = ni.Cluster, Domain = ni.Domain, Id = ni.Id, Tags = ni.Tags, Cfg = ni.Cfg, Stats = ni.Stats, Js = ni.Js, BinarySnapshots = ni.BinarySnapshots, AccountNrg = ni.AccountNrg, Offline = true };
+ _nodeToInfo.TryUpdate(key, updated, v);
+ break;
+ }
+ }
+ _sys!.Servers.Remove(sid);
+ }
+
+ // -------------------------------------------------------------------------
+ // sameDomain
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Returns true when the given domain matches (or is not set).
+ /// Mirrors Go Server.sameDomain().
+ ///
+ internal bool SameDomain(string domain) =>
+ string.IsNullOrEmpty(domain) || string.IsNullOrEmpty(_info.Domain) || domain == _info.Domain;
+
+ // -------------------------------------------------------------------------
+ // remoteServerShutdown
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Handles a remote server-shutdown system event.
+ /// Mirrors Go Server.remoteServerShutdown().
+ ///
+ private void RemoteServerShutdown(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg)
+ {
+ _mu.EnterWriteLock();
+ try
+ {
+ if (!EventsEnabledLocked()) return;
+
+ var toks = subject.Split('.');
+ if (toks.Length < SystemSubjects.ShutdownEventTokens)
+ {
+ Debugf("Received remote server shutdown on bad subject {0}", subject);
+ return;
+ }
+
+ if (msg.Length == 0)
+ {
+ Errorf("Remote server sent invalid (empty) shutdown message to {0}", subject);
+ return;
+ }
+
+ ServerInfo si;
+ try { si = JsonSerializer.Deserialize(msg)!; }
+ catch
+ {
+ Debugf("Received bad server info for remote server shutdown");
+ return;
+ }
+
+ // Mark the JetStream node as offline.
+ var node = GetHash(si.Name);
+ if (_nodeToInfo.TryGetValue(node, out var existing) && existing is NodeInfo oldNi)
+ {
+ var updatedNi = new NodeInfo { Name = oldNi.Name, Version = oldNi.Version, Cluster = oldNi.Cluster, Domain = oldNi.Domain, Id = oldNi.Id, Tags = oldNi.Tags, Cfg = oldNi.Cfg, Stats = oldNi.Stats, Js = oldNi.Js, BinarySnapshots = oldNi.BinarySnapshots, AccountNrg = oldNi.AccountNrg, Offline = true };
+ _nodeToInfo.TryUpdate(node, updatedNi, existing);
+ }
+
+ var sid = toks[SystemSubjects.ServerSubjectIndex];
+ if (_sys!.Servers.ContainsKey(sid))
+ ProcessRemoteServerShutdown(sid);
+ }
+ finally { _mu.ExitWriteLock(); }
+ }
+
+ // -------------------------------------------------------------------------
+ // remoteServerUpdate
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Handles a statsz heartbeat from a remote server.
+ /// Mirrors Go Server.remoteServerUpdate().
+ ///
+ private void RemoteServerUpdate(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg)
+ {
+ if (msg.Length == 0)
+ {
+ Debugf("Received empty server info for remote server update");
+ return;
+ }
+
+ ServerStatsMsg ssm;
+ try { ssm = JsonSerializer.Deserialize(msg)!; }
+ catch
+ {
+ Debugf("Received bad server info for remote server update");
+ return;
+ }
+
+ var si = ssm.Server;
+
+ _mu.EnterWriteLock();
+ if (Running() && EventsEnabledLocked() && si.Id != _info.Id)
+ UpdateRemoteServer(si);
+ _mu.ExitWriteLock();
+
+ // JetStream node updates.
+ if (!SameDomain(si.Domain)) return;
+
+ var node = GetHash(si.Name);
+ _nodeToInfo[node] = new NodeInfo
+ {
+ Name = si.Name,
+ Id = si.Id,
+ Cluster = si.Cluster,
+ Domain = si.Domain,
+ Version = si.Version,
+ Offline = false,
+ };
+ Task.Run(UpdateNRGAccountStatus);
+ }
+
+ // -------------------------------------------------------------------------
+ // updateRemoteServer
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Tracks or updates a remote server's heartbeat sequence and timestamp.
+ /// Server lock must be held on entry.
+ /// Mirrors Go Server.updateRemoteServer().
+ ///
+ private void UpdateRemoteServer(ServerInfo si)
+ {
+ if (!_sys!.Servers.TryGetValue(si.Id, out var su))
+ {
+ _sys.Servers[si.Id] = new ServerUpdate { Seq = si.Seq, LTime = DateTime.UtcNow };
+ ProcessNewServer(si);
+ }
+ else
+ {
+ if (si.Seq <= su.Seq)
+ {
+ Errorf("Received out of order remote server update from: {0}", si.Id);
+ return;
+ }
+ su.Seq = si.Seq;
+ su.LTime = DateTime.UtcNow;
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // processNewServer
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Called the first time a remote server is seen.
+ /// Ensures gateways are in interest-only mode for leaf-connected accounts
+ /// and announces ourselves to the new server.
+ /// Server lock must be held on entry.
+ /// Mirrors Go Server.processNewServer().
+ ///
+ private void ProcessNewServer(ServerInfo si)
+ {
+ EnsureGWsInterestOnlyForLeafNodes();
+
+ if (SameDomain(si.Domain))
+ {
+ var node = GetHash(si.Name);
+ _nodeToInfo.TryAdd(node, new NodeInfo
+ {
+ Name = si.Name,
+ Id = si.Id,
+ Cluster = si.Cluster,
+ Domain = si.Domain,
+ Version = si.Version,
+ Offline = false,
+ });
+ }
+ Task.Run(UpdateNRGAccountStatus);
+ Task.Run(SendStatszUpdate);
+ }
+
+ // -------------------------------------------------------------------------
+ // updateNRGAccountStatus
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Recreates internal subscriptions on all Raft nodes to reflect updated
+ /// account-NRG capability state.
+ /// Server lock must NOT be held on entry.
+ /// Mirrors Go Server.updateNRGAccountStatus().
+ ///
+ private void UpdateNRGAccountStatus()
+ {
+ var raftNodes = new List