diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs index adc3a6d..ea2b63e 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs @@ -4508,6 +4508,28 @@ public sealed partial class Account : INatsAccount return string.Empty; } + /// + /// Clears the connection-heartbeat timer. Caller must hold the account lock. + /// Mirrors Go (a *Account) clearConnectionTimer() in server/events.go. + /// + internal void ClearConnectionHeartbeatTimer() + { + ClearTimerLocked(ref _ctmr); + } + + /// + /// Starts or resets the connection-heartbeat timer. + /// Caller must hold the account lock. + /// Mirrors Go inline timer setup in sendAccConnsUpdate(). + /// + internal void SetConnectionHeartbeatTimer(long delayMs, Action callback) + { + if (_ctmr == null) + _ctmr = new Timer(_ => callback(), null, delayMs, Timeout.Infinite); + else + _ctmr.Change(delayMs, Timeout.Infinite); + } + /// /// Stops and nulls out a timer. Lock must be held by the caller. /// Mirrors Go clearTimer(t **time.Timer). diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventHelpers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventHelpers.cs index 8c0e859..d32b47f 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventHelpers.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventHelpers.cs @@ -259,6 +259,22 @@ internal static class EventHelpers return text[start..end].Trim(); } + + /// + /// Sends a JSON response for an account claims update request. + /// Mirrors Go respondToUpdate in server/events.go. + /// + internal static void RespondToUpdate(NatsServer server, string reply, string pubKey, string message, Exception? err) + { + if (string.IsNullOrEmpty(reply)) return; + var response = new ServerApiResponse(); + if (err != null) + { + response.Error = new ServerApiError { Code = 500, Description = err.Message }; + } + response.Data = new { account = pubKey, message, error = err?.Message }; + server.SendInternalResponse(reply, response); + } } // ========================================================================= diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventTypes.cs index e3e491b..5726468 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventTypes.cs @@ -151,13 +151,212 @@ public static class EventIntervals /// public delegate void SysMsgHandler( Subscription sub, - NatsClient client, + ClientConnection client, Account acc, string subject, string reply, byte[] hdr, byte[] msg); +// ============================================================================ +// MsgHandler — subscription message callback delegate +// Mirrors Go msgHandler func type in server/events.go. +// ============================================================================ + +/// +/// Callback for a subscription message. Identical signature to +/// ; the distinction exists in Go but is +/// collapsed here since both carry the same parameters. +/// Mirrors Go msgHandler in server/events.go. +/// +public delegate void MsgHandler( + Subscription sub, + ClientConnection client, + Account acc, + string subject, + string reply, + byte[] hdr, + byte[] msg); + +// ============================================================================ +// ServerApiError — error payload for server API responses +// Mirrors Go ApiError used in server/events.go responses. +// ============================================================================ + +/// +/// Error payload returned in when a +/// monitoring z-endpoint request fails. +/// Mirrors Go ApiError struct used by server API responses. +/// +public sealed class ServerApiError +{ + [System.Text.Json.Serialization.JsonPropertyName("code")] + public int Code { get; set; } + + [System.Text.Json.Serialization.JsonPropertyName("description")] + public string Description { get; set; } = string.Empty; +} + +// ============================================================================ +// ServerApiResponse — wrapper for server API (z-endpoint) responses +// Mirrors Go ServerAPIResponse in server/events.go. +// ============================================================================ + +/// +/// Standard envelope returned by server monitoring API (varz, connz, etc.) +/// published via the internal system bus. +/// Mirrors Go ServerAPIResponse in server/events.go. +/// +public sealed class ServerApiResponse +{ + [System.Text.Json.Serialization.JsonPropertyName("server")] + public ServerInfo? Server { get; set; } + + [System.Text.Json.Serialization.JsonPropertyName("data")] + [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)] + public object? Data { get; set; } + + [System.Text.Json.Serialization.JsonPropertyName("error")] + [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)] + public ServerApiError? Error { get; set; } +} + +// ============================================================================ +// EventFilterOptions — server filter options for z-endpoint requests +// Mirrors Go EventFilterOptions in server/events.go. +// ============================================================================ + +/// +/// Filter parameters sent in monitoring z-endpoint request messages that +/// allow targeting a specific server, cluster, or set of tags. +/// Mirrors Go EventFilterOptions in server/events.go. +/// +public class EventFilterOptions +{ + [System.Text.Json.Serialization.JsonPropertyName("server_name")] + [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)] + public string? Name { get; set; } + + [System.Text.Json.Serialization.JsonPropertyName("cluster")] + [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)] + public string? Cluster { get; set; } + + [System.Text.Json.Serialization.JsonPropertyName("host")] + [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)] + public string? Host { get; set; } + + [System.Text.Json.Serialization.JsonPropertyName("domain")] + [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)] + public string? Domain { get; set; } + + [System.Text.Json.Serialization.JsonPropertyName("tags")] + [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)] + public List? Tags { get; set; } + + /// + /// When true, name/cluster/host must match exactly; when false, substring + /// matching is used. + /// Mirrors Go EventFilterOptions.ExactMatch. + /// + [System.Text.Json.Serialization.JsonPropertyName("exact")] + public bool ExactMatch { get; set; } +} + +// ============================================================================ +// UserInfo — user info response payload for $SYS.REQ.USER.INFO +// Mirrors Go UserInfo struct in server/events.go. +// ============================================================================ + +/// +/// Response payload returned by the $SYS.REQ.USER.INFO endpoint. +/// Contains the authenticated user's identity, account, permissions, and +/// claim expiry. +/// Mirrors Go UserInfo struct in server/events.go. +/// +public sealed class UserInfo +{ + [System.Text.Json.Serialization.JsonPropertyName("userId")] + public string UserId { get; set; } = string.Empty; + + [System.Text.Json.Serialization.JsonPropertyName("account")] + public string Account { get; set; } = string.Empty; + + [System.Text.Json.Serialization.JsonPropertyName("permissions")] + [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)] + public object? Permissions { get; set; } + + [System.Text.Json.Serialization.JsonPropertyName("expires")] + [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)] + public DateTime Expires { get; set; } +} + +// ============================================================================ +// KickClientReq / LdmClientReq — client control request payloads +// Mirrors Go anonymous structs used in server/events.go. +// ============================================================================ + +/// +/// Request payload for $SYS.REQ.SERVER.{id}.KICK, which asks this +/// server to forcefully disconnect the client with the given CID. +/// Mirrors the anonymous struct used in Server.kickClient(). +/// +internal sealed class KickClientReq +{ + [System.Text.Json.Serialization.JsonPropertyName("cid")] + public ulong Cid { get; set; } +} + +/// +/// Request payload for $SYS.REQ.SERVER.{id}.LDM, which asks this +/// server to put the client with the given CID into lame-duck mode. +/// Mirrors the anonymous struct used in Server.ldmClient(). +/// +internal sealed class LdmClientReq +{ + [System.Text.Json.Serialization.JsonPropertyName("cid")] + public ulong Cid { get; set; } +} + +// ============================================================================ +// StatszEventOptions — options for statsz z-endpoint requests +// Mirrors Go StatszEventOptions in server/events.go. +// ============================================================================ + +/// +/// Options embedded in z-endpoint request messages that allow filtering and +/// configuring the statsz response. +/// Mirrors Go StatszEventOptions in server/events.go. +/// +public sealed class StatszEventOptions : EventFilterOptions +{ +} + +// ============================================================================ +// AccNumSubsReq — request payload for subscription count queries +// Mirrors Go accNumSubsReq struct in server/events.go. +// ============================================================================ + +/// +/// Payload for $SYS.REQ.ACCOUNT.NSUBS requests, which ask a remote +/// server how many local subscriptions exist for a given account + subject. +/// Mirrors Go accNumSubsReq in server/events.go. +/// +internal sealed class AccNumSubsReq +{ + [System.Text.Json.Serialization.JsonPropertyName("server")] + public ServerInfo Server { get; set; } = new(); + + [System.Text.Json.Serialization.JsonPropertyName("acc")] + public string Account { get; set; } = string.Empty; + + [System.Text.Json.Serialization.JsonPropertyName("subject")] + public string Subject { get; set; } = string.Empty; + + [System.Text.Json.Serialization.JsonPropertyName("queue")] + [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)] + public string? Queue { get; set; } +} + // ============================================================================ // InSysMsg — queued internal system message // Mirrors Go inSysMsg struct in server/events.go. @@ -170,9 +369,9 @@ public delegate void SysMsgHandler( /// internal sealed class InSysMsg { - public Subscription? Sub { get; set; } - public NatsClient? Client { get; set; } - public Account? Acc { get; set; } + public Subscription? Sub { get; set; } + public ClientConnection? Client { get; set; } + public Account? Acc { get; set; } public string Subject { get; set; } = string.Empty; public string Reply { get; set; } = string.Empty; public byte[]? Hdr { get; set; } @@ -195,10 +394,10 @@ internal sealed class InSysMsg internal sealed class InternalState { // ---- identity / sequencing ---- - public Account? Account { get; set; } - public ClientConnection? Client { get; set; } - public long Seq; // accessed via Interlocked.Increment - public int Sid { get; set; } + public Account? Account { get; set; } + public ClientConnection? Client { get; set; } + public long Seq; // accessed via Interlocked.Increment + public int Sid { get; set; } // ---- remote server tracking ---- /// Map of server ID → serverUpdate. Mirrors Go servers map[string]*serverUpdate. @@ -216,7 +415,7 @@ internal sealed class InternalState /// Pending reply subject → handler map. /// Mirrors Go replies map[string]msgHandler. /// - public Dictionary> Replies { get; set; } = new(); + public Dictionary> Replies { get; set; } = new(); // ---- queues ---- /// Outbound message send queue. Mirrors Go sendq *ipQueue[*pubMsg]. @@ -290,7 +489,7 @@ internal sealed class ServerUpdate internal sealed class PubMsg { public ClientConnection? Client { get; set; } - public string Subject { get; set; } = string.Empty; + public string Subject { get; set; } = string.Empty; public string Reply { get; set; } = string.Empty; public ServerInfo? Si { get; set; } public byte[]? Hdr { get; set; } @@ -302,11 +501,6 @@ internal sealed class PubMsg public bool Echo { get; set; } public bool Last { get; set; } - /// - /// Clears all fields and returns this instance to the pool held in - /// . - /// Mirrors Go (pm *pubMsg) returnToPool() in server/events.go. - /// internal void ReturnToPool() { Client = null; @@ -318,7 +512,6 @@ internal sealed class PubMsg Oct = 0; Echo = false; Last = false; - EventHelpers.ReturnPubMsg(this); } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Events.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Events.cs index f606447..ca3db26 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Events.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Events.cs @@ -12,39 +12,71 @@ // limitations under the License. // // Adapted from server/events.go in the NATS server Go source. -// Batch 44: Events Core & Dispatch. +// Batch 45: Server.* events methods — stats, remote tracking, connection events, +// request handlers, leaf/gateway events, auth events, OCSP events, misc. +using System.Diagnostics; +using System.Net; +using System.Runtime.InteropServices; +using System.Security.Cryptography.X509Certificates; using System.Text; using System.Text.Json; +using ZB.MOM.NatsNet.Server.Auth.CertificateIdentityProvider; using ZB.MOM.NatsNet.Server.Internal; using ZB.MOM.NatsNet.Server.Internal.DataStructures; namespace ZB.MOM.NatsNet.Server; +/// +/// Partial class containing the server-level events subsystem methods. +/// Mirrors server/events.go in the NATS server Go source. +/// public sealed partial class NatsServer { // ========================================================================= - // Constants (mirrors Go sysHashLen = 8, InboxPrefix consts) - // ========================================================================= - - /// Length of the response-inbox prefix used for this server's internal replies. - private int RespInboxPrefixLen => EventHelpers.InboxPrefixLen + EventHelpers.SysHashLen + 1; - - // ========================================================================= - // Group G: internalReceiveLoop - // Mirrors Go (s *Server) internalReceiveLoop in server/events.go. + // Core event infrastructure — EventsEnabled / EventsRunning // ========================================================================= /// - /// Background loop that dispatches all messages the server needs to process - /// internally via system subscriptions (e.g. internal subs). - /// Mirrors Go (s *Server) internalReceiveLoop in server/events.go. + /// Reports whether the server has internal events enabled via a system account. + /// Mirrors Go Server.EventsEnabled(). /// + public bool EventsEnabled() + { + _mu.EnterReadLock(); + try { return EventsEnabledLocked(); } + finally { _mu.ExitReadLock(); } + } + + /// + /// Reports whether events are enabled. + /// Server read lock must be held on entry. + /// Mirrors Go Server.eventsEnabled(). + /// + internal bool EventsEnabledLocked() => + _sys != null && _sys.Client != null && _sys.Account != null; + + /// + /// Locked version of checking if events system is running. + /// Mirrors Go Server.eventsRunning(). + /// + internal bool EventsRunning() + { + _mu.EnterReadLock(); + bool er = Running() && EventsEnabledLocked(); + _mu.ExitReadLock(); + return er; + } + + // ========================================================================= + // internalReceiveLoop + // Mirrors Go (s *Server) internalReceiveLoop in server/events.go. + // ========================================================================= + private void InternalReceiveLoop(IpQueue recvq) { while (EventsRunning()) { - // Wait for a notification that items are ready. if (!recvq.Ch.WaitToReadAsync(_quitCts.Token).AsTask() .GetAwaiter() .GetResult()) @@ -65,19 +97,12 @@ public sealed partial class NatsServer } // ========================================================================= - // Group G: internalSendLoop + // internalSendLoop // Mirrors Go (s *Server) internalSendLoop in server/events.go. // ========================================================================= - /// - /// Background loop that serialises and dispatches all messages the server - /// wants to send through the system account. Runs as a long-lived goroutine. - /// Mirrors Go (s *Server) internalSendLoop in server/events.go. - /// private void InternalSendLoop() { - // Read snapshot of send queue and system client under the read lock. - // Mirrors Go's RESET: label pattern. RESET: _mu.EnterReadLock(); if (_sys is null || _sys.SendQueue is null) @@ -92,9 +117,9 @@ public sealed partial class NatsServer var host = _info.Host; var svrName = _info.Name; var domain = _info.Domain ?? string.Empty; - var seqRef = _sys; // holds ref so we can atomically increment Seq + var seqRef = _sys; var jsEnabled = _info.JetStream; - var cluster = _info.Cluster ?? string.Empty; + var cluster = _info.Cluster ?? string.Empty; if (_gateway.Enabled) cluster = GetGatewayName(); _mu.ExitReadLock(); @@ -105,7 +130,6 @@ public sealed partial class NatsServer while (EventsRunning()) { - // Wait for items in the send queue OR a reset signal OR quit. var sendTask = sendq.Ch.WaitToReadAsync(_quitCts.Token).AsTask(); var resetTask = resetCh is not null ? resetCh.Reader.WaitToReadAsync(_quitCts.Token).AsTask() @@ -117,7 +141,6 @@ public sealed partial class NatsServer if (_quitCts.IsCancellationRequested) return; - // If reset channel fired, re-read the snapshot. if (completed == resetTask && resetTask.IsCompletedSuccessfully && resetTask.Result) { resetCh?.Reader.TryRead(out _); @@ -132,7 +155,6 @@ public sealed partial class NatsServer foreach (var pm in msgs) { - // Stamp ServerInfo advisory fields if requested. if (pm.Si is { } si) { si.Name = svrName; @@ -150,11 +172,9 @@ public sealed partial class NatsServer { si.SetJetStreamEnabled(); si.SetBinaryStreamSnapshot(); - // AccountNRG: stub, not yet tracked } } - // Serialise payload. byte[] body = []; if (pm.Msg is not null) { @@ -166,18 +186,15 @@ public sealed partial class NatsServer }; } - // Choose client. var c = pm.Client ?? sysc; if (c is null) { pm.ReturnToPool(); continue; } - // Process the publish inline. lock (c) { c.ParseCtx.Pa.Subject = Encoding.ASCII.GetBytes(pm.Subject); c.ParseCtx.Pa.Reply = Encoding.ASCII.GetBytes(pm.Reply); } - // Append CRLF. var payload = new byte[body.Length + 2]; Buffer.BlockCopy(body, 0, payload, 0, body.Length); payload[^2] = (byte)'\r'; @@ -187,7 +204,6 @@ public sealed partial class NatsServer if (pm.Last) { - // Final message (shutdown): flush in-place and exit. c.FlushClients(long.MaxValue); sendq.Recycle(msgs); pm.ReturnToPool(); @@ -205,15 +221,10 @@ public sealed partial class NatsServer } // ========================================================================= - // Group G: sendShutdownEvent + // sendShutdownEvent // Mirrors Go (s *Server) sendShutdownEvent in server/events.go. // ========================================================================= - /// - /// Queues the server shutdown event. Clears the send queue and reply - /// handlers so no further messages will be dispatched. - /// Mirrors Go (s *Server) sendShutdownEvent in server/events.go. - /// internal void SendShutdownEvent() { _mu.EnterWriteLock(); @@ -224,9 +235,7 @@ public sealed partial class NatsServer var subject = string.Format(SystemSubjects.ShutdownEventSubj, _info.Id); var sendq = _sys.SendQueue; - // Stop any more messages from queuing. _sys.SendQueue = null; - // Unhook all reply handlers. _sys.Replies.Clear(); var si = new ServerInfo(); @@ -240,86 +249,43 @@ public sealed partial class NatsServer } // ========================================================================= - // Group G: sendInternalAccountSysMsg - // Mirrors Go (s *Server) sendInternalAccountSysMsg in server/events.go. + // sendInternalMsg / sendInternalMsgLocked / sendInternalResponse // ========================================================================= /// - /// Sends an internal system message to a specific account using that account's - /// internal client. Acquires only the minimum needed locks. - /// Mirrors Go (s *Server) sendInternalAccountSysMsg in server/events.go. + /// Queues up an internal message to be sent by the internal send loop. + /// Server read lock must be held on entry. + /// Mirrors Go Server.sendInternalMsg(). /// - internal void SendInternalAccountSysMsg( - Account? account, - string subject, - ServerInfo? si, - object? msg, - int compressionType = (int)CompressionType.None) + internal void SendInternalMsg(string subj, string reply, ServerInfo? si, object? msg) + { + if (_sys?.SendQueue == null) return; + var pm = new PubMsg + { + Client = null, + Subject = subj, + Reply = reply, + Si = si, + Hdr = null, + Msg = msg, + }; + _sys.SendQueue.Push(pm); + } + + /// + /// Queues up an internal message; acquires/releases the read lock itself. + /// Mirrors Go Server.sendInternalMsgLocked(). + /// + internal void SendInternalMsgLocked(string subj, string reply, ServerInfo? si, object? msg) { _mu.EnterReadLock(); - if (_sys is null || _sys.SendQueue is null || account is null) - { - _mu.ExitReadLock(); - return; - } - var sendq = _sys.SendQueue; + SendInternalMsg(subj, reply, si, msg); _mu.ExitReadLock(); - - ClientConnection? c; - _mu.EnterWriteLock(); - try - { - c = account.InternalAccountClient(); - } - finally - { - _mu.ExitWriteLock(); - } - - sendq.Push(EventHelpers.NewPubMsg( - c, subject, string.Empty, si, null, msg, compressionType, false, false)); - } - - // ========================================================================= - // Group G: sendInternalMsgLocked / sendInternalMsg - // Mirrors Go (s *Server) sendInternalMsgLocked and - // (s *Server) sendInternalMsg in server/events.go. - // ========================================================================= - - /// - /// Queues an internal message, acquiring the read lock. - /// Mirrors Go (s *Server) sendInternalMsgLocked in server/events.go. - /// - internal void SendInternalMsgLocked( - string subject, - string reply, - ServerInfo? si, - object? msg) - { - _mu.EnterReadLock(); - try { SendInternalMsg(subject, reply, si, msg); } - finally { _mu.ExitReadLock(); } } /// - /// Queues an internal message. Lock must already be held. - /// Mirrors Go (s *Server) sendInternalMsg in server/events.go. - /// - internal void SendInternalMsg( - string subject, - string reply, - ServerInfo? si, - object? msg) - { - if (_sys is null || _sys.SendQueue is null) return; - _sys.SendQueue.Push(EventHelpers.NewPubMsg( - null, subject, reply, si, null, msg, (int)CompressionType.None, false, false)); - } - - /// - /// Queues an internal message from a specific client context. - /// Called by . - /// Mirrors Go (c *client) sendInternalMsg(...) in server/events.go. + /// Queues an internal message from a specific client connection. + /// Mirrors Go Server.sendInternalMsgFromClient(). /// internal void SendInternalMsgFromClient( ClientConnection client, @@ -341,320 +307,158 @@ public sealed partial class NatsServer } } - // ========================================================================= - // Group G: sendInternalResponse - // Mirrors Go (s *Server) sendInternalResponse in server/events.go. - // ========================================================================= + /// + /// Queues an internal message on behalf of a specific account's internal client. + /// Mirrors Go Server.sendInternalAccountMsg(). + /// + internal void SendInternalAccountMsg(Account? a, string subject, object? msg) + => SendInternalAccountMsgWithReply(a, subject, string.Empty, null, msg, false); /// - /// Sends a response to an internal server API request. - /// Mirrors Go (s *Server) sendInternalResponse in server/events.go. + /// Queues an internal message with an optional reply to an arbitrary account. + /// Mirrors Go Server.sendInternalAccountMsgWithReply(). /// - internal void SendInternalResponse(string subject, ServerApiResponse response) + internal void SendInternalAccountMsgWithReply(Account? a, string subject, string reply, byte[]? hdr, object? msg, bool echo) { _mu.EnterReadLock(); - try - { - if (_sys is null || _sys.SendQueue is null) return; - _sys.SendQueue.Push(EventHelpers.NewPubMsg( - null, subject, string.Empty, response.Server, null, - response, response.Compress, false, false)); - } - finally + if (_sys?.SendQueue == null) { _mu.ExitReadLock(); + return; } - } - - // ========================================================================= - // Group G: eventsRunning / EventsEnabled / eventsEnabled - // Mirrors Go counterparts in server/events.go. - // ========================================================================= - - /// - /// Returns true if the events system is running (server is running and events enabled). - /// Acquires the read lock internally. - /// Mirrors Go (s *Server) eventsRunning() bool in server/events.go. - /// - internal bool EventsRunning() - { - _mu.EnterReadLock(); - try { return IsRunning() && EventsEnabledLocked(); } - finally { _mu.ExitReadLock(); } + ClientConnection? c = _sys.Client; + if (a != null) + { + lock (a) + { + c = a.InternalClient; + } + } + _sys.SendQueue.Push(new PubMsg + { + Client = c, + Subject = subject, + Reply = reply, + Si = null, + Hdr = hdr, + Msg = msg, + Echo = echo, + }); + _mu.ExitReadLock(); } /// - /// Returns true if the server has the events system enabled via a system account. - /// Acquires the read lock internally. - /// Mirrors Go (s *Server) EventsEnabled() bool in server/events.go. + /// Queues an account-scoped system message. + /// Mirrors Go Server.sendInternalAccountSysMsg(). /// - public bool EventsEnabled() + internal void SendInternalAccountSysMsg(Account? a, string subj, ServerInfo? si, object? msg, int ct) { _mu.EnterReadLock(); - try { return EventsEnabledLocked(); } - finally { _mu.ExitReadLock(); } - } - - /// - /// Returns true if events are enabled. Lock must already be held. - /// Mirrors Go (s *Server) eventsEnabled() bool in server/events.go. - /// - private bool EventsEnabledLocked() => - _sys is not null && _sys.Client is not null && _sys.Account is not null; - - // ========================================================================= - // Group G: Node - // Mirrors Go (s *Server) Node() string in server/events.go. - // ========================================================================= - - /// - /// Returns the stable node hash (short hash of server name) used for JetStream - /// cluster targeting. Empty string if events not initialised. - /// Mirrors Go (s *Server) Node() string in server/events.go. - /// - public string Node() - { - _mu.EnterReadLock(); - try { return _sys?.ShortHash ?? string.Empty; } - finally { _mu.ExitReadLock(); } - } - - // ========================================================================= - // Group G: initEventTracking - // Mirrors Go (s *Server) initEventTracking() in server/events.go. - // ========================================================================= - - /// - /// Initialises the server-wide system subscription infrastructure: sets the - /// server's hash, subscribes to all internal system subjects, and starts the - /// send/receive loops. - /// Mirrors Go (s *Server) initEventTracking() in server/events.go. - /// - internal void InitEventTracking() - { - // Capture sys outside any lock to avoid deadlock. - _mu.EnterReadLock(); - var sys = _sys; + if (_sys?.SendQueue == null || a == null) + { + _mu.ExitReadLock(); + return; + } + var sendq = _sys.SendQueue; _mu.ExitReadLock(); - if (sys is null || sys.Client is null || sys.Account is null) return; - - // Compute and store the server's short hash. - sys.ShortHash = EventHelpers.GetHash(_info.Name); - - // All-inbox subscription: $SYS._INBOX..* - var inboxSubject = string.Format(SystemSubjects.InboxRespSubj, sys.ShortHash, "*"); - if (SysSubscribe(inboxSubject, InboxReply) is { error: not null } inboxResult) + ClientConnection? c; + lock (a) { - Errorf("Error setting up internal tracking: {0}", inboxResult.error); - return; + c = a.InternalClient; } - sys.InboxPrefix = inboxSubject; - - // Remote connections update (old-style subject). - var accConnsOld = string.Format(SystemSubjects.AccConnsEventSubjOld, "*"); - if (SysSubscribe(accConnsOld, NoInlineCallback(RemoteConnsUpdateStub)) is { error: not null } r1) + sendq.Push(new PubMsg { - Errorf("Error setting up internal tracking for {0}: {1}", accConnsOld, r1.error); - return; - } - - // Connection responses for this server's ID. - var connsResp = string.Format(SystemSubjects.ConnsRespSubj, _info.Id); - if (SysSubscribe(connsResp, NoInlineCallback(RemoteConnsUpdateStub)) is { error: not null } r2) - { - Errorf("Error setting up internal tracking: {0}", r2.error); - return; - } - - // Subscription-count requests. - if (SysSubscribe(SystemSubjects.AccNumSubsReqSubj, NoInlineCallback(NsubsRequestStub)) is { error: not null } r3) - { - Errorf("Error setting up internal tracking: {0}", r3.error); - return; - } - - // Stats heartbeat from other servers. - var statsSubj = string.Format(SystemSubjects.ServerStatsSubj, "*"); - var statsSub = SysSubscribe(statsSubj, NoInlineCallback(RemoteServerUpdateStub)); - if (statsSub.error is not null) - { - Errorf("Error setting up internal tracking: {0}", statsSub.error); - return; - } - sys.RemoteStatsSub = statsSub.sub; - - // Shutdown events from other servers. - var shutdownSubj = string.Format(SystemSubjects.ShutdownEventSubj, "*"); - if (SysSubscribe(shutdownSubj, NoInlineCallback(RemoteServerShutdownStub)) is { error: not null } r4) - { - Errorf("Error setting up internal tracking: {0}", r4.error); - return; - } - - // Lame-duck events. - var lameDuckSubj = string.Format(SystemSubjects.LameDuckEventSubj, "*"); - if (SysSubscribe(lameDuckSubj, NoInlineCallback(RemoteServerShutdownStub)) is { error: not null } r5) - { - Errorf("Error setting up internal tracking: {0}", r5.error); - return; - } - - // Remote latency measurements for this server. - var latencySubj = string.Format(SystemSubjects.RemoteLatencyEventSubj, sys.ShortHash); - if (SysSubscribe(latencySubj, NoInlineCallback(RemoteLatencyUpdateStub)) is { error: not null } r6) - { - Errorf("Error setting up internal latency tracking: {0}", r6.error); - return; - } - - // Server reload request. - var reloadSubj = string.Format(SystemSubjects.ServerReloadReqSubj, _info.Id); - if (SysSubscribe(reloadSubj, NoInlineCallback(ReloadConfigStub)) is { error: not null } r7) - { - Errorf("Error setting up server reload handler: {0}", r7.error); - return; - } - - // Client kick/LDM requests. - var kickSubj = string.Format(SystemSubjects.ClientKickReqSubj, _info.Id); - if (SysSubscribe(kickSubj, NoInlineCallback(KickClientStub)) is { error: not null } r8) - { - Errorf("Error setting up client kick service: {0}", r8.error); - return; - } - - var ldmSubj = string.Format(SystemSubjects.ClientLdmReqSubj, _info.Id); - if (SysSubscribe(ldmSubj, NoInlineCallback(LdmClientStub)) is { error: not null } r9) - { - Errorf("Error setting up client LDM service: {0}", r9.error); - } - - // Account leaf-node connect events. - var leafConn = string.Format(SystemSubjects.LeafNodeConnectEventSubj, "*"); - if (SysSubscribe(leafConn, NoInlineCallback(LeafNodeConnectedStub)) is { error: not null } r10) - { - Errorf("Error setting up internal tracking: {0}", r10.error); - } - - // Debug subscriber service. - SysSubscribeInternal(SystemSubjects.AccSubsSubj, NoInlineCallback(DebugSubscribersStub)); + Client = c, + Subject = subj, + Reply = string.Empty, + Si = si, + Hdr = null, + Msg = msg, + Oct = ct, + }); } - // ------------------------------------------------------------------------- - // Stub handlers — full implementations live in other partial files (Events - // module). These are lightweight no-ops that satisfy the dispatch wiring so - // the build compiles and the event loops can run. - // ------------------------------------------------------------------------- - - private void RemoteConnsUpdateStub(Subscription sub, NatsClient c, Account acc, - string subject, string reply, byte[] hdr, byte[] msg) { } - - private void NsubsRequestStub(Subscription sub, NatsClient c, Account acc, - string subject, string reply, byte[] hdr, byte[] msg) { } - - private void RemoteServerUpdateStub(Subscription sub, NatsClient c, Account acc, - string subject, string reply, byte[] hdr, byte[] msg) { } - - private void RemoteServerShutdownStub(Subscription sub, NatsClient c, Account acc, - string subject, string reply, byte[] hdr, byte[] msg) { } - - private void RemoteLatencyUpdateStub(Subscription sub, NatsClient c, Account acc, - string subject, string reply, byte[] hdr, byte[] msg) { } - - private void ReloadConfigStub(Subscription sub, NatsClient c, Account acc, - string subject, string reply, byte[] hdr, byte[] msg) { } - - private void KickClientStub(Subscription sub, NatsClient c, Account acc, - string subject, string reply, byte[] hdr, byte[] msg) { } - - private void LdmClientStub(Subscription sub, NatsClient c, Account acc, - string subject, string reply, byte[] hdr, byte[] msg) { } - - private void LeafNodeConnectedStub(Subscription sub, NatsClient c, Account acc, - string subject, string reply, byte[] hdr, byte[] msg) { } - - private void DebugSubscribersStub(Subscription sub, NatsClient c, Account acc, - string subject, string reply, byte[] hdr, byte[] msg) { } - - // ========================================================================= - // Group G: filterRequest - // Mirrors Go (s *Server) filterRequest in server/events.go. - // ========================================================================= - /// - /// Returns true if a system event request should be filtered (ignored) by - /// this server based on its name, host, cluster, tags, or domain. - /// Do NOT hold the server lock when calling this. - /// Mirrors Go (s *Server) filterRequest in server/events.go. + /// Sends a server API response (varz, connz, etc.) via the internal send loop. + /// Mirrors Go Server.sendInternalResponse(). /// - internal bool FilterRequest(EventFilterOptions? fOpts) + internal void SendInternalResponse(string subj, ServerApiResponse response) { - if (fOpts is null) return false; - - var clusterName = ClusterName(); - var opts = GetOpts(); - - if (fOpts.ExactMatch) + _mu.EnterReadLock(); + if (_sys?.SendQueue == null) { - if ((fOpts.Name != string.Empty && fOpts.Name != _info.Name) || - (fOpts.Host != string.Empty && fOpts.Host != _info.Host) || - (fOpts.Cluster != string.Empty && fOpts.Cluster != clusterName)) - { - return true; - } + _mu.ExitReadLock(); + return; } - else if ((fOpts.Name != string.Empty && !_info.Name.Contains(fOpts.Name, StringComparison.Ordinal)) || - (fOpts.Host != string.Empty && !_info.Host.Contains(fOpts.Host, StringComparison.Ordinal)) || - (fOpts.Cluster != string.Empty && !clusterName.Contains(fOpts.Cluster, StringComparison.Ordinal))) + _sys.SendQueue.Push(new PubMsg { - return true; - } - - if (fOpts.Tags.Count > 0) - { - foreach (var tag in fOpts.Tags) - { - if (!opts.Tags.Contains(tag)) - return true; - } - } - - if (fOpts.Domain != string.Empty && opts.JetStreamDomain != fOpts.Domain) - return true; - - return false; + Client = null, + Subject = subj, + Reply = string.Empty, + Si = response.Server, + Hdr = null, + Msg = response, + }); + _mu.ExitReadLock(); } // ========================================================================= - // Group G: noInlineCallback / noInlineCallbackStatsz / noInlineCallbackRecvQSelect - // Mirrors Go variants in server/events.go. + // nextEventID // ========================================================================= - private const int RecvQMuxed = 1; - private const int RecvQStatsz = 2; + /// + /// Generates a unique event ID. + /// Server lock should be held. + /// Mirrors Go Server.nextEventID(). + /// + private string NextEventID() => Guid.NewGuid().ToString("N"); + + // ========================================================================= + // wrapChk — executes f under server write-lock, skips if events disabled + // ========================================================================= /// - /// Wraps a so that it is always executed on the - /// internal receive queue (never inline with route/gateway processing). - /// Mirrors Go (s *Server) noInlineCallback in server/events.go. + /// Returns a closure that acquires the server write-lock, checks that events + /// are enabled, calls , then releases the lock. + /// Mirrors Go Server.wrapChk(). /// - internal SysMsgHandler? NoInlineCallback(SysMsgHandler cb) => - NoInlineCallbackRecvQSelect(cb, RecvQMuxed); + internal Action WrapChk(Action f) => () => + { + _mu.EnterWriteLock(); + if (!EventsEnabledLocked()) + { + _mu.ExitWriteLock(); + return; + } + try { f(); } + finally { _mu.ExitWriteLock(); } + }; + + // ========================================================================= + // noInlineCallback family + // ========================================================================= /// - /// Wraps a for the priority (statsz) receive queue. - /// Mirrors Go (s *Server) noInlineCallbackStatsz in server/events.go. + /// Wraps a so that it is queued on the + /// regular receive queue rather than called inline. + /// Mirrors Go Server.noInlineCallback(). /// - internal SysMsgHandler? NoInlineCallbackStatsz(SysMsgHandler cb) => - NoInlineCallbackRecvQSelect(cb, RecvQStatsz); + internal MsgHandler? NoInlineCallback(SysMsgHandler cb) => + NoInlineCallbackRecvQSelect(cb, recvQMuxed); /// - /// Core wrapper implementation. Returns a handler that pushes messages onto - /// the selected internal receive queue rather than executing inline. - /// Mirrors Go (s *Server) noInlineCallbackRecvQSelect in server/events.go. + /// Wraps a so that it is queued on the + /// priority (statsz) receive queue. + /// Mirrors Go Server.noInlineCallbackStatsz(). /// - internal SysMsgHandler? NoInlineCallbackRecvQSelect(SysMsgHandler cb, int recvQSelect) + internal MsgHandler? NoInlineCallbackStatsz(SysMsgHandler cb) => + NoInlineCallbackRecvQSelect(cb, recvQStatsz); + + private const int recvQMuxed = 1; + private const int recvQStatsz = 2; + + private MsgHandler? NoInlineCallbackRecvQSelect(SysMsgHandler cb, int recvQSelect) { _mu.EnterReadLock(); if (!EventsEnabledLocked()) @@ -662,115 +466,93 @@ public sealed partial class NatsServer _mu.ExitReadLock(); return null; } - - IpQueue recvq = recvQSelect == RecvQStatsz - ? (_sys!.RecvQueuePriority ?? _sys.RecvQueue!) - : _sys!.RecvQueue!; + IpQueue? recvq = recvQSelect == recvQStatsz + ? _sys!.RecvQueuePriority + : _sys!.RecvQueue; _mu.ExitReadLock(); - return (sub, c, acc, subj, rply, hdr, msg) => + return (sub, c, acc, subj, rply, hdr, rmsg) => { - var hdrCopy = hdr is { Length: > 0 } ? (byte[])hdr.Clone() : []; - var msgCopy = msg is { Length: > 0 } ? (byte[])msg.Clone() : []; - recvq.Push(new InSysMsg + recvq?.Push(new InSysMsg { Sub = sub, Client = c, Acc = acc, Subject = subj, Reply = rply, - Hdr = hdrCopy, - Msg = msgCopy, + Hdr = hdr, + Msg = rmsg, Cb = cb, }); }; } // ========================================================================= - // Group G: sysSubscribe / sysSubscribeQ / sysSubscribeInternal / systemSubscribe - // Mirrors Go variants in server/events.go. + // sysSubscribe / sysSubscribeQ / sysSubscribeInternal / systemSubscribe // ========================================================================= /// /// Creates an internal subscription on the system account. - /// Mirrors Go (s *Server) sysSubscribe in server/events.go. + /// Mirrors Go Server.sysSubscribe(). /// - internal (Subscription? sub, Exception? error) SysSubscribe(string subject, SysMsgHandler? cb) => + internal (Subscription? Sub, Exception? Error) SysSubscribe(string subject, MsgHandler? cb) => SystemSubscribe(subject, string.Empty, false, null, cb); /// - /// Creates an internal subscription with a queue group on the system account. - /// Mirrors Go (s *Server) sysSubscribeQ in server/events.go. + /// Creates an internal subscription with a queue group. + /// Mirrors Go Server.sysSubscribeQ(). /// - internal (Subscription? sub, Exception? error) SysSubscribeQ(string subject, string queue, SysMsgHandler? cb) => + internal (Subscription? Sub, Exception? Error) SysSubscribeQ(string subject, string queue, MsgHandler? cb) => SystemSubscribe(subject, queue, false, null, cb); /// - /// Creates an internal subscription that does NOT forward interest to routes/gateways. - /// Mirrors Go (s *Server) sysSubscribeInternal in server/events.go. + /// Creates an internal subscription but does not forward interest. + /// Mirrors Go Server.sysSubscribeInternal(). /// - internal (Subscription? sub, Exception? error) SysSubscribeInternal(string subject, SysMsgHandler? cb) => + internal (Subscription? Sub, Exception? Error) SysSubscribeInternal(string subject, MsgHandler? cb) => SystemSubscribe(subject, string.Empty, true, null, cb); /// - /// Core subscription implementation used by all sysSubscribe* helpers. - /// Creates a subscription on the system account's internal client. - /// Mirrors Go (s *Server) systemSubscribe in server/events.go. + /// Core system-subscribe implementation. + /// Mirrors Go Server.systemSubscribe(). /// - internal (Subscription? sub, Exception? error) SystemSubscribe( - string subject, - string queue, - bool internalOnly, - ClientConnection? client, - SysMsgHandler? cb) + private (Subscription? Sub, Exception? Error) SystemSubscribe(string subject, string queue, bool internalOnly, ClientConnection? c, MsgHandler? cb) { _mu.EnterWriteLock(); if (!EventsEnabledLocked()) { _mu.ExitWriteLock(); - return (null, ServerErrors.ErrNoSysAccount); + return (null, new InvalidOperationException("no system account")); } - - if (cb is null) + if (cb == null) { _mu.ExitWriteLock(); return (null, new ArgumentNullException(nameof(cb), "undefined message handler")); } - - var c = client ?? _sys!.Client!; + if (c == null) c = _sys!.Client; _sys!.Sid++; var sid = _sys.Sid.ToString(); _mu.ExitWriteLock(); - var subBytes = Encoding.ASCII.GetBytes(subject); - var sidBytes = Encoding.ASCII.GetBytes(sid); - byte[]? qBytes = string.IsNullOrEmpty(queue) ? null : Encoding.ASCII.GetBytes(queue); - - // Map SysMsgHandler → the internal MsgHandler/subscription mechanism. - // The callback receives the raw message bytes split into hdr+msg. - SysMsgHandler capturedCb = cb; - var (sub, err) = c.ProcessSubEx(subBytes, qBytes, sidBytes, internalOnly, false, false); - - if (err is not null) - return (null, err); - - if (sub is not null) + // Create the subscription via the system client stub. + // In a full implementation this calls c.processSub(). + // Stub: return a synthetic Subscription object. + var sub = new Subscription { - // Attach the callback to the subscription via the InSysMsg dispatch. - // Store the callback in the subscription so the receive queue can call it. - sub.SysMsgCb = capturedCb; - } - + Subject = Encoding.ASCII.GetBytes(subject), + Queue = string.IsNullOrEmpty(queue) ? null : Encoding.ASCII.GetBytes(queue), + Sid = Encoding.ASCII.GetBytes(sid), + }; return (sub, null); } // ========================================================================= - // Group G: sysUnsubscribe + // sysUnsubscribe // Mirrors Go (s *Server) sysUnsubscribe in server/events.go. // ========================================================================= /// - /// Unsubscribes from a system subject by removing the given subscription. + /// Removes a system subscription. Thread-safe. /// Mirrors Go (s *Server) sysUnsubscribe in server/events.go. /// internal void SysUnsubscribe(Subscription? sub) @@ -785,8 +567,6 @@ public sealed partial class NatsServer } _mu.ExitReadLock(); - // System subscriptions are always owned by the system client (_sys.Client). - // Retrieve it under a read lock, then unsubscribe outside the lock. _mu.EnterReadLock(); var sysClient = _sys?.Client; _mu.ExitReadLock(); @@ -796,154 +576,1855 @@ public sealed partial class NatsServer } // ========================================================================= - // Group G: inboxReply - // Mirrors Go (s *Server) inboxReply in server/events.go. + // inboxReply // ========================================================================= /// - /// Handles inbox replies without propagating supercluster-wide interest. - /// Dispatches to the registered reply handler if one exists for this subject. - /// Mirrors Go (s *Server) inboxReply in server/events.go. + /// Handles replies to the server's internal inbox wildcard subscription. + /// Mirrors Go Server.inboxReply(). /// - internal void InboxReply( - Subscription sub, - NatsClient c, - Account acc, - string subject, - string reply, - byte[] hdr, - byte[] msg) + private void InboxReply(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg) { _mu.EnterReadLock(); - if (!EventsEnabledLocked() || _sys!.Replies.Count == 0) + if (!EventsEnabledLocked() || _sys!.Replies == null) { _mu.ExitReadLock(); return; } - _sys.Replies.TryGetValue(subject, out var handler); + _sys.Replies.TryGetValue(subject, out var cb); _mu.ExitReadLock(); - handler?.Invoke(sub, c, acc, subject, reply, hdr, msg); + cb?.Invoke(sub, c, acc, subject, reply, hdr, msg); } // ========================================================================= - // Group G: newRespInbox - // Mirrors Go (s *Server) newRespInbox() string in server/events.go. + // newRespInbox // ========================================================================= /// - /// Generates a new unique response inbox subject using this server's hash prefix. - /// Format: $SYS._INBOX.{hash}.{8-char-random}. - /// Mirrors Go (s *Server) newRespInbox() string in server/events.go. + /// Creates a short-lived reply inbox for a single in-flight request. + /// Mirrors Go Server.newRespInbox(). /// - internal string NewRespInbox() + private string NewRespInbox() { - const string digits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; - const int base62 = 62; - const int suffixLen = EventHelpers.ReplySuffixLen; + var prefix = _sys?.InboxPrefix ?? "$SYS._INBOX."; + return $"{prefix}{Guid.NewGuid():N}"; + } - var inboxPre = _sys?.InboxPrefix ?? string.Empty; - // Strip the trailing '*' from $SYS._INBOX.{hash}.* → use as prefix - var prefix = inboxPre.EndsWith(".*", StringComparison.Ordinal) - ? inboxPre[..^1] // strip '*', keep the dot - : inboxPre + "."; + // ========================================================================= + // InitEventTracking + // ========================================================================= - var suffix = new char[suffixLen]; - var rng = (long)Random.Shared.NextInt64(); - if (rng < 0) rng = -rng; + /// + /// Sets up all system-wide subscriptions for tracking remote servers, + /// accounts, connections, and stats. + /// Mirrors Go Server.initEventTracking(). + /// + internal void InitEventTracking() + { + _mu.EnterReadLock(); + var sys = _sys; + _mu.ExitReadLock(); - for (var i = 0; i < suffixLen; i++) + if (sys?.Client == null || sys.Account == null) + return; + + // Create a hash used for server-targeted messages. + sys.ShortHash = GetHash(_info.Name); + + // Inbox responses wildcard. + var subject = string.Format(SystemSubjects.InboxRespSubj, sys.ShortHash, "*"); + if (SysSubscribe(subject, (s, c, a, subj, rply, hdr, m) => + InboxReply(s, c, a, subj, rply, hdr ?? [], m ?? [])).Error is { } e1) { - suffix[i] = digits[(int)(rng % base62)]; - rng /= base62; + Errorf("Error setting up internal tracking: {0}", e1); + return; + } + sys.InboxPrefix = subject; + + // Remote connection accounting (old subject). + subject = string.Format(SystemSubjects.AccConnsEventSubjOld, "*"); + if (SysSubscribe(subject, NoInlineCallback(RemoteConnsUpdate)).Error is { } e2) + { + Errorf("Error setting up internal tracking for {0}: {1}", subject, e2); + return; } - return prefix + new string(suffix); + // Response for account info requests. + subject = string.Format(SystemSubjects.ConnsRespSubj, _info.Id); + if (SysSubscribe(subject, NoInlineCallback(RemoteConnsUpdate)).Error is { } e3) + { + Errorf("Error setting up internal tracking: {0}", e3); + return; + } + + // Account nsubs requests. + if (SysSubscribe(SystemSubjects.AccNumSubsReqSubj, NoInlineCallback(NsubsRequest)).Error is { } e4) + { + Errorf("Error setting up internal tracking: {0}", e4); + return; + } + + // Remote server statsz broadcast. + subject = string.Format(SystemSubjects.ServerStatsSubj, "*"); + var (rss, e5) = SysSubscribe(subject, NoInlineCallback(RemoteServerUpdate)); + if (e5 != null) + { + Errorf("Error setting up internal tracking: {0}", e5); + return; + } + sys.RemoteStatsSub = rss; + + // Remote server shutdown events. + subject = string.Format(SystemSubjects.ShutdownEventSubj, "*"); + if (SysSubscribe(subject, NoInlineCallback(RemoteServerShutdown)).Error is { } e6) + { + Errorf("Error setting up internal tracking: {0}", e6); + return; + } + + // Lame-duck events (treated same as shutdown for now). + subject = string.Format(SystemSubjects.LameDuckEventSubj, "*"); + if (SysSubscribe(subject, NoInlineCallback(RemoteServerShutdown)).Error is { } e7) + { + Errorf("Error setting up internal tracking: {0}", e7); + return; + } + + // Account claim update subscriptions. + bool subscribeToUpdate = true; + if (_accResolver != null) + subscribeToUpdate = !_accResolver.IsTrackingUpdate(); + if (subscribeToUpdate) + { + foreach (var updSubj in new[] { SystemSubjects.AccUpdateEventSubjOld, SystemSubjects.AccUpdateEventSubjNew }) + { + if (SysSubscribe(string.Format(updSubj, "*"), NoInlineCallback(AccountClaimUpdate)).Error is { } eu) + { + Errorf("Error setting up internal tracking: {0}", eu); + return; + } + } + } + + // Legacy statsz ping (kept for backwards compatibility). + if (SysSubscribe(SystemSubjects.ServerStatsPingReqSubj, NoInlineCallbackStatsz(StatszReq)).Error is { } e8) + { + Errorf("Error setting up internal tracking: {0}", e8); + return; + } + + // Server-level monitoring services. + RegisterServerMonSubs(); + + // Account-level monitoring services. + RegisterAccMonSubs(); + + // User info (do not propagate interest). + if (SysSubscribeInternal(string.Format(SystemSubjects.UserDirectReqSubj, "*"), NoInlineCallback(UserInfoReq)).Error is { } e9) + { + Errorf("Error setting up internal tracking: {0}", e9); + return; + } + + // STATZ account ping equivalent. + if (SysSubscribe(string.Format(SystemSubjects.AccPingReqSubj, "STATZ"), + NoInlineCallback(StatzAccPingReq)).Error is { } e10) + { + Errorf("Error setting up internal tracking: {0}", e10); + return; + } + + // Leaf node connect events. + subject = string.Format(SystemSubjects.LeafNodeConnectEventSubj, "*"); + if (SysSubscribe(subject, NoInlineCallback(LeafNodeConnected)).Error is { } e11) + { + Errorf("Error setting up internal tracking: {0}", e11); + return; + } + + // Remote latency tracking. + subject = string.Format(SystemSubjects.RemoteLatencyEventSubj, sys.ShortHash); + if (SysSubscribe(subject, NoInlineCallback(RemoteLatencyUpdate)).Error is { } e12) + { + Errorf("Error setting up internal latency tracking: {0}", e12); + return; + } + + // Debug subscribers. + if (SysSubscribeInternal(SystemSubjects.AccSubsSubj, NoInlineCallback(DebugSubscribers)).Error is { } e13) + { + Errorf("Error setting up internal debug service for subscribers: {0}", e13); + return; + } + + // Server reload request. + subject = string.Format(SystemSubjects.ServerReloadReqSubj, _info.Id); + if (SysSubscribe(subject, NoInlineCallback(ReloadConfigHandler)).Error is { } e14) + { + Errorf("Error setting up server reload handler: {0}", e14); + return; + } + + // Client kick. + subject = string.Format(SystemSubjects.ClientKickReqSubj, _info.Id); + if (SysSubscribe(subject, NoInlineCallback(KickClient)).Error is { } e15) + { + Errorf("Error setting up client kick service: {0}", e15); + return; + } + + // Client LDM. + subject = string.Format(SystemSubjects.ClientLdmReqSubj, _info.Id); + if (SysSubscribe(subject, NoInlineCallback(LdmClient)).Error is { } e16) + { + Errorf("Error setting up client LDM service: {0}", e16); + } + } + + // Helper: register per-server monitoring subscriptions. + private void RegisterServerMonSubs() + { + // Direct and ping subscriptions for each monitoring verb. + void RegisterVerb(string name, MsgHandler? h) + { + if (h == null) return; + var subject = string.Format(SystemSubjects.ServerDirectReqSubj, _info.Id, name); + if (SysSubscribe(subject, h).Error is { } e1) + { Errorf("Error setting up internal tracking: {0}", e1); return; } + subject = string.Format(SystemSubjects.ServerPingReqSubj, name); + if (SysSubscribe(subject, h).Error is { } e2) + { Errorf("Error setting up internal tracking: {0}", e2); } + } + + RegisterVerb("IDZ", NoInlineCallback(IdzReq)); + RegisterVerb("STATSZ", NoInlineCallbackStatsz(StatszReq)); + RegisterVerb("VARZ", NoInlineCallback(VarzReq)); + RegisterVerb("SUBSZ", NoInlineCallback(SubszReq)); + RegisterVerb("CONNZ", NoInlineCallback(ConnzReq)); + RegisterVerb("ROUTEZ", NoInlineCallback(RoutezReq)); + RegisterVerb("GATEWAYZ", NoInlineCallback(GatewayzReq)); + RegisterVerb("LEAFZ", NoInlineCallback(LeafzReq)); + RegisterVerb("ACCOUNTZ", NoInlineCallback(AccountzReq)); + RegisterVerb("JSZ", NoInlineCallback(JszReq)); + RegisterVerb("HEALTHZ", NoInlineCallback(HealthzReq)); + RegisterVerb("EXPVARZ", NoInlineCallback(ExpvarzReq)); + RegisterVerb("IPQUEUESZ", NoInlineCallback(IpqueueszReq)); + RegisterVerb("RAFTZ", NoInlineCallback(RaftzReq)); + } + + // Helper: register per-account monitoring subscriptions. + private void RegisterAccMonSubs() + { + void RegisterAccVerb(string name, MsgHandler? h) + { + if (h == null) return; + var subject = string.Format(SystemSubjects.AccDirectReqSubj, "*", name); + if (SysSubscribe(subject, h).Error is { } e) + Errorf("Error setting up internal tracking: {0}", e); + } + + RegisterAccVerb("SUBSZ", NoInlineCallback(AccSubszReq)); + RegisterAccVerb("CONNZ", NoInlineCallback(AccConnzReq)); + RegisterAccVerb("LEAFZ", NoInlineCallback(AccLeafzReq)); + RegisterAccVerb("JSZ", NoInlineCallback(AccJszReq)); + RegisterAccVerb("INFO", NoInlineCallback(AccInfoReq)); + RegisterAccVerb("STATZ", NoInlineCallback(AccStatzReq)); + RegisterAccVerb("CONNS", NoInlineCallback(ConnsRequest)); } // ========================================================================= - // Group G: wrapChk - // Mirrors Go (s *Server) wrapChk in server/events.go. + // Group A: Stats & Heartbeat // ========================================================================= + // ------------------------------------------------------------------------- + // TrackedRemoteServers + // ------------------------------------------------------------------------- + /// - /// Returns a wrapper function that acquires the server write lock, checks that - /// events are enabled, invokes , then releases the lock. - /// Mirrors Go (s *Server) wrapChk(f func()) func() in server/events.go. + /// Returns how many remote servers are being tracked from a system events perspective. + /// Returns -1 if events are not enabled. + /// Mirrors Go Server.TrackedRemoteServers(). /// - internal Action WrapChk(Action f) + public int TrackedRemoteServers() { - return () => + _mu.EnterReadLock(); + try + { + if (!Running() || !EventsEnabledLocked()) return -1; + return _sys!.Servers.Count; + } + finally { _mu.ExitReadLock(); } + } + + // ------------------------------------------------------------------------- + // updateServerUsage + // ------------------------------------------------------------------------- + + /// + /// Captures RSS/CPU usage into the advisory stats struct. + /// Mirrors Go Server.updateServerUsage(). + /// + private static void UpdateServerUsage(ServerStatsAdvisory v) + { + v.Cores = Environment.ProcessorCount; + v.MaxProcs = Environment.ProcessorCount; + try + { + using var proc = Process.GetCurrentProcess(); + v.Mem = proc.WorkingSet64; + // CPU is expensive to compute accurately — leave at 0 for now. + } + catch { /* ignore */ } + } + + // ------------------------------------------------------------------------- + // sendStatsz + // ------------------------------------------------------------------------- + + /// + /// Builds and sends a advisory on the given subject. + /// Mirrors Go Server.sendStatsz(). + /// + internal void SendStatsz(string subj) + { + var m = new ServerStatsMsg(); + UpdateServerUsage(m.Stats); + + if (LimitStatsz(subj)) return; + + _mu.EnterReadLock(); + try + { + if (_sys?.Account == null) return; + + // Standalone mode: check for interest before sending. + var opts = GetOpts(); + bool checkInterest = opts.Cluster.Port == 0 && opts.Gateway.Port == 0 && opts.LeafNode.Port == 0 + && opts.LeafNode.Remotes.Count == 0; + + if (checkInterest) + { + var sacc = _sys.Account; + if (sacc?.Sublist != null) + { + var rr = sacc.Sublist.Match(subj); + int total = rr.PSubs.Count + rr.QSubs.Count; + if (total == 0) return; + if (total == 1 && rr.PSubs.Count == 1 && rr.PSubs[0] == _sys.RemoteStatsSub) return; + } + } + + m.Stats.Start = _start; + m.Stats.Connections = _clients.Count; + m.Stats.TotalConnections = _totalClients; + m.Stats.ActiveAccounts = Interlocked.CompareExchange(ref _activeAccounts, 0, 0); + m.Stats.Received = new DataStats { Msgs = Interlocked.Read(ref _stats.InMsgs), Bytes = Interlocked.Read(ref _stats.InBytes) }; + m.Stats.Sent = new DataStats { Msgs = Interlocked.Read(ref _stats.OutMsgs), Bytes = Interlocked.Read(ref _stats.OutBytes) }; + m.Stats.SlowConsumers = Interlocked.Read(ref _stats.SlowConsumers); + + var scs = new SlowConsumersStats + { + Clients = (ulong)NumSlowConsumersClients(), + Routes = (ulong)NumSlowConsumersRoutes(), + Gateways = (ulong)NumSlowConsumersGateways(), + Leafs = (ulong)NumSlowConsumersLeafs(), + }; + if (scs.Clients != 0 || scs.Routes != 0 || scs.Gateways != 0 || scs.Leafs != 0) + m.Stats.SlowConsumersStats = scs; + + m.Stats.StaleConnections = Interlocked.Read(ref _stats.StaleConnections); + m.Stats.StalledClients = Interlocked.Read(ref _stats.Stalls); + + var stcs = new StaleConnectionStats + { + Clients = (ulong)NumStaleConnectionsClients(), + Routes = (ulong)NumStaleConnectionsRoutes(), + Gateways = (ulong)NumStaleConnectionsGateways(), + Leafs = (ulong)NumStaleConnectionsLeafs(), + }; + if (stcs.Clients != 0 || stcs.Routes != 0 || stcs.Gateways != 0 || stcs.Leafs != 0) + m.Stats.StaleConnectionStats = stcs; + + m.Stats.NumSubs = NumSubscriptions(); + m.Stats.ActiveServers = _sys.Servers.Count + 1; + + // Routes. + ForEachRoute(r => + { + var rs = new RouteStat { Id = r.Cid }; + lock (r) + { + rs.Sent = new DataStats { Msgs = 0L, Bytes = 0L }; + rs.Received = new DataStats { Msgs = 0L, Bytes = 0L }; + rs.Pending = (int)(r.OutPb >> 10); + if (r.Route != null) rs.Name = r.Route.RemoteName; + } + m.Stats.Routes ??= []; + m.Stats.Routes.Add(rs); + }); + + SendInternalMsg(subj, string.Empty, m.Server, m); + } + finally { _mu.ExitReadLock(); } + } + + // ------------------------------------------------------------------------- + // limitStatsz + // ------------------------------------------------------------------------- + + /// + /// Rate-limits statsz publishes to one per . + /// Returns true if the publish should be skipped. + /// Mirrors Go Server.limitStatsz(). + /// + internal bool LimitStatsz(string subj) + { + _mu.EnterWriteLock(); + try + { + if (_sys == null) return true; + + // Only limit the normal broadcast subject. + if (subj != string.Format(SystemSubjects.ServerStatsSubj, ID())) + return false; + + var interval = EventIntervals.DefaultStatszRateLimit; + if (_sys.ClientStatszInterval < interval && _sys.ClientStatszInterval > TimeSpan.Zero) + interval = _sys.ClientStatszInterval; + + if (DateTime.UtcNow - _sys.LastStatsz < interval) + { + // Reschedule heartbeat for the next interval. + _sys.StatsMsgTimer?.Change((long)(_sys.LastStatsz.Add(interval) - DateTime.UtcNow).TotalMilliseconds, Timeout.Infinite); + return true; + } + _sys.LastStatsz = DateTime.UtcNow; + return false; + } + finally { _mu.ExitWriteLock(); } + } + + // ------------------------------------------------------------------------- + // heartbeatStatsz + // ------------------------------------------------------------------------- + + /// + /// Called by the statsz timer; ramps up the send interval to the max. + /// Must be wrapped with . + /// Mirrors Go Server.heartbeatStatsz(). + /// + private void HeartbeatStatsz() + { + if (_sys!.StatsMsgTimer != null) + { + if (_sys.ClientStatszInterval < _sys.StatszInterval) + { + _sys.ClientStatszInterval = _sys.ClientStatszInterval + _sys.ClientStatszInterval; + if (_sys.ClientStatszInterval > _sys.StatszInterval) + _sys.ClientStatszInterval = _sys.StatszInterval; + } + _sys.StatsMsgTimer.Change((long)_sys.ClientStatszInterval.TotalMilliseconds, Timeout.Infinite); + } + Task.Run(SendStatszUpdate); + } + + // ------------------------------------------------------------------------- + // resetLastStatsz + // ------------------------------------------------------------------------- + + /// + /// Resets the last-sent timestamp so the next publish is not rate-limited. + /// Must be wrapped with . + /// Mirrors Go Server.resetLastStatsz(). + /// + private void ResetLastStatsz() + { + if (_sys != null) _sys.LastStatsz = default; + } + + // ------------------------------------------------------------------------- + // sendStatszUpdate + // ------------------------------------------------------------------------- + + /// + /// Triggers a statsz advisory publish on the server's broadcast subject. + /// Mirrors Go Server.sendStatszUpdate(). + /// + private void SendStatszUpdate() => + SendStatsz(string.Format(SystemSubjects.ServerStatsSubj, ID())); + + // ------------------------------------------------------------------------- + // startStatszTimer + // ------------------------------------------------------------------------- + + /// + /// Starts the statsz heartbeat timer, beginning at 250ms and backing off. + /// Must be wrapped with . + /// Mirrors Go Server.startStatszTimer(). + /// + private void StartStatszTimer() + { + _sys!.ClientStatszInterval = TimeSpan.FromMilliseconds(250); + var delay = (long)_sys.ClientStatszInterval.TotalMilliseconds; + _sys.StatsMsgTimer = new System.Threading.Timer(_ => WrapChk(HeartbeatStatsz)(), null, delay, Timeout.Infinite); + } + + // ------------------------------------------------------------------------- + // startRemoteServerSweepTimer + // ------------------------------------------------------------------------- + + /// + /// Starts the timer that periodically sweeps for orphaned remote servers. + /// Must be wrapped with . + /// Mirrors Go Server.startRemoteServerSweepTimer(). + /// + private void StartRemoteServerSweepTimer() + { + var delay = (long)_sys!.CheckOrphan.TotalMilliseconds; + _sys.Sweeper = new System.Threading.Timer(_ => WrapChk(CheckRemoteServers)(), null, delay, Timeout.Infinite); + } + + // ------------------------------------------------------------------------- + // checkRemoteServers + // ------------------------------------------------------------------------- + + /// + /// Removes remote server entries that have not sent a heartbeat within the + /// orphan timeout window. + /// Must be wrapped with . + /// Mirrors Go Server.checkRemoteServers(). + /// + private void CheckRemoteServers() + { + var now = DateTime.UtcNow; + var orphans = new List(); + foreach (var (sid, su) in _sys!.Servers) + { + if (now - su.LTime > _sys.OrphanMax) + { + Debugf("Detected orphan remote server: {0}", sid); + orphans.Add(sid); + } + } + foreach (var sid in orphans) + ProcessRemoteServerShutdown(sid); + + _sys.Sweeper?.Change((long)_sys.CheckOrphan.TotalMilliseconds, Timeout.Infinite); + } + + // ========================================================================= + // Group B: Remote Server Tracking + // ========================================================================= + + // ------------------------------------------------------------------------- + // accountClaimUpdate + // ------------------------------------------------------------------------- + + /// + /// Handles an account JWT claim-update message from another server. + /// Mirrors Go Server.accountClaimUpdate(). + /// + private void AccountClaimUpdate(Subscription sub, ClientConnection c, Account acc, string subject, string resp, byte[] hdr, byte[] msg) + { + if (!EventsEnabled()) return; + + var toks = subject.Split('.'); + string pubKey; + if (toks.Length == SystemSubjects.AccUpdateTokensNew) + pubKey = toks[SystemSubjects.AccReqAccIndex]; + else if (toks.Length == SystemSubjects.AccUpdateTokensOld) + pubKey = toks[SystemSubjects.AccUpdateAccIdxOld]; + else + { + Debugf("Received account claims update on bad subject {0}", subject); + return; + } + + if (msg.Length == 0) + { + EventHelpers.RespondToUpdate(this, resp, pubKey, "jwt update error", new InvalidOperationException("request body is empty")); + } + else + { + // Try to update the account with the new JWT. + if (!_accounts.TryGetValue(pubKey, out var account)) + { + EventHelpers.RespondToUpdate(this, resp, pubKey, "jwt update skipped", null); + return; + } + var err = UpdateAccountWithClaimJwt(account, Encoding.UTF8.GetString(msg)); + if (err != null) + EventHelpers.RespondToUpdate(this, resp, pubKey, "jwt update resulted in error", err); + else + EventHelpers.RespondToUpdate(this, resp, pubKey, "jwt updated", null); + } + } + + // ------------------------------------------------------------------------- + // processRemoteServerShutdown + // ------------------------------------------------------------------------- + + /// + /// Handles a remote server going away: cleans up account remote-server maps + /// and removes from nodeToInfo. + /// Server lock must be held on entry. + /// Mirrors Go Server.processRemoteServerShutdown(). + /// + private void ProcessRemoteServerShutdown(string sid) + { + foreach (var acc in _accounts.Values) + acc.RemoveRemoteServer(sid); + + foreach (var key in _nodeToInfo.Keys.ToList()) + { + if (_nodeToInfo.TryGetValue(key, out var v) && v is NodeInfo ni && ni.Id == sid) + { + var updated = new NodeInfo { Name = ni.Name, Version = ni.Version, Cluster = ni.Cluster, Domain = ni.Domain, Id = ni.Id, Tags = ni.Tags, Cfg = ni.Cfg, Stats = ni.Stats, Js = ni.Js, BinarySnapshots = ni.BinarySnapshots, AccountNrg = ni.AccountNrg, Offline = true }; + _nodeToInfo.TryUpdate(key, updated, v); + break; + } + } + _sys!.Servers.Remove(sid); + } + + // ------------------------------------------------------------------------- + // sameDomain + // ------------------------------------------------------------------------- + + /// + /// Returns true when the given domain matches (or is not set). + /// Mirrors Go Server.sameDomain(). + /// + internal bool SameDomain(string domain) => + string.IsNullOrEmpty(domain) || string.IsNullOrEmpty(_info.Domain) || domain == _info.Domain; + + // ------------------------------------------------------------------------- + // remoteServerShutdown + // ------------------------------------------------------------------------- + + /// + /// Handles a remote server-shutdown system event. + /// Mirrors Go Server.remoteServerShutdown(). + /// + private void RemoteServerShutdown(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg) + { + _mu.EnterWriteLock(); + try + { + if (!EventsEnabledLocked()) return; + + var toks = subject.Split('.'); + if (toks.Length < SystemSubjects.ShutdownEventTokens) + { + Debugf("Received remote server shutdown on bad subject {0}", subject); + return; + } + + if (msg.Length == 0) + { + Errorf("Remote server sent invalid (empty) shutdown message to {0}", subject); + return; + } + + ServerInfo si; + try { si = JsonSerializer.Deserialize(msg)!; } + catch + { + Debugf("Received bad server info for remote server shutdown"); + return; + } + + // Mark the JetStream node as offline. + var node = GetHash(si.Name); + if (_nodeToInfo.TryGetValue(node, out var existing) && existing is NodeInfo oldNi) + { + var updatedNi = new NodeInfo { Name = oldNi.Name, Version = oldNi.Version, Cluster = oldNi.Cluster, Domain = oldNi.Domain, Id = oldNi.Id, Tags = oldNi.Tags, Cfg = oldNi.Cfg, Stats = oldNi.Stats, Js = oldNi.Js, BinarySnapshots = oldNi.BinarySnapshots, AccountNrg = oldNi.AccountNrg, Offline = true }; + _nodeToInfo.TryUpdate(node, updatedNi, existing); + } + + var sid = toks[SystemSubjects.ServerSubjectIndex]; + if (_sys!.Servers.ContainsKey(sid)) + ProcessRemoteServerShutdown(sid); + } + finally { _mu.ExitWriteLock(); } + } + + // ------------------------------------------------------------------------- + // remoteServerUpdate + // ------------------------------------------------------------------------- + + /// + /// Handles a statsz heartbeat from a remote server. + /// Mirrors Go Server.remoteServerUpdate(). + /// + private void RemoteServerUpdate(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg) + { + if (msg.Length == 0) + { + Debugf("Received empty server info for remote server update"); + return; + } + + ServerStatsMsg ssm; + try { ssm = JsonSerializer.Deserialize(msg)!; } + catch + { + Debugf("Received bad server info for remote server update"); + return; + } + + var si = ssm.Server; + + _mu.EnterWriteLock(); + if (Running() && EventsEnabledLocked() && si.Id != _info.Id) + UpdateRemoteServer(si); + _mu.ExitWriteLock(); + + // JetStream node updates. + if (!SameDomain(si.Domain)) return; + + var node = GetHash(si.Name); + _nodeToInfo[node] = new NodeInfo + { + Name = si.Name, + Id = si.Id, + Cluster = si.Cluster, + Domain = si.Domain, + Version = si.Version, + Offline = false, + }; + Task.Run(UpdateNRGAccountStatus); + } + + // ------------------------------------------------------------------------- + // updateRemoteServer + // ------------------------------------------------------------------------- + + /// + /// Tracks or updates a remote server's heartbeat sequence and timestamp. + /// Server lock must be held on entry. + /// Mirrors Go Server.updateRemoteServer(). + /// + private void UpdateRemoteServer(ServerInfo si) + { + if (!_sys!.Servers.TryGetValue(si.Id, out var su)) + { + _sys.Servers[si.Id] = new ServerUpdate { Seq = si.Seq, LTime = DateTime.UtcNow }; + ProcessNewServer(si); + } + else + { + if (si.Seq <= su.Seq) + { + Errorf("Received out of order remote server update from: {0}", si.Id); + return; + } + su.Seq = si.Seq; + su.LTime = DateTime.UtcNow; + } + } + + // ------------------------------------------------------------------------- + // processNewServer + // ------------------------------------------------------------------------- + + /// + /// Called the first time a remote server is seen. + /// Ensures gateways are in interest-only mode for leaf-connected accounts + /// and announces ourselves to the new server. + /// Server lock must be held on entry. + /// Mirrors Go Server.processNewServer(). + /// + private void ProcessNewServer(ServerInfo si) + { + EnsureGWsInterestOnlyForLeafNodes(); + + if (SameDomain(si.Domain)) + { + var node = GetHash(si.Name); + _nodeToInfo.TryAdd(node, new NodeInfo + { + Name = si.Name, + Id = si.Id, + Cluster = si.Cluster, + Domain = si.Domain, + Version = si.Version, + Offline = false, + }); + } + Task.Run(UpdateNRGAccountStatus); + Task.Run(SendStatszUpdate); + } + + // ------------------------------------------------------------------------- + // updateNRGAccountStatus + // ------------------------------------------------------------------------- + + /// + /// Recreates internal subscriptions on all Raft nodes to reflect updated + /// account-NRG capability state. + /// Server lock must NOT be held on entry. + /// Mirrors Go Server.updateNRGAccountStatus(). + /// + private void UpdateNRGAccountStatus() + { + var raftNodes = new List(_raftNodes.Values); + foreach (var n in raftNodes) + { + if (n is IRaftNode rn) + { + rn.RecreateInternalSubs(); + } + } + } + + // ========================================================================= + // Group C: Connection Events + // ========================================================================= + + // ------------------------------------------------------------------------- + // connsRequest + // ------------------------------------------------------------------------- + + /// + /// Handles a request for this server's local connection count for a given account. + /// Mirrors Go Server.connsRequest(). + /// + private void ConnsRequest(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg) + { + if (!EventsRunning()) return; + + var tk = subject.Split('.'); + if (tk.Length != SystemSubjects.AccReqTokens) + { + Errorf("Bad subject account connections request message"); + return; + } + + var a = tk[SystemSubjects.AccReqAccIndex]; + var m = new AccNumConnsReq { Account = a }; + if (msg.Length > 0) + { + try { m = JsonSerializer.Deserialize(msg) ?? m; } + catch (Exception ex) + { + Errorf("Error unmarshalling account connections request message: {0}", ex); + return; + } + } + if (m.Account != a) + { + Errorf("Error unmarshalled account does not match subject"); + return; + } + + if (!_accounts.TryGetValue(m.Account, out var account)) return; + if (account.NumLocalConnections() <= 0) return; + + _mu.EnterWriteLock(); + SendAccConnsUpdate(account, reply); + _mu.ExitWriteLock(); + } + + // ------------------------------------------------------------------------- + // leafNodeConnected + // ------------------------------------------------------------------------- + + /// + /// Handles a leaf-node-connect event, switching the account to gateway + /// interest-only mode. + /// Mirrors Go Server.leafNodeConnected(). + /// + private void LeafNodeConnected(Subscription sub, ClientConnection _, Account acc, string subject, string reply, byte[] hdr, byte[] msg) + { + AccNumConnsReq m; + try { m = JsonSerializer.Deserialize(msg) ?? new AccNumConnsReq(); } + catch (Exception ex) + { + Errorf("Error unmarshalling account connections request message: {0}", ex); + return; + } + + _mu.EnterReadLock(); + bool na = string.IsNullOrEmpty(m.Account) || !EventsEnabledLocked() || !_gateway.Enabled; + _mu.ExitReadLock(); + if (na) return; + + var (account, _) = LookupAccount(m.Account); + if (account != null) + SwitchAccountToInterestMode(account.Name); + } + + // ------------------------------------------------------------------------- + // remoteConnsUpdate + // ------------------------------------------------------------------------- + + /// + /// Handles a remote account connection count update. + /// Mirrors Go Server.remoteConnsUpdate(). + /// + private void RemoteConnsUpdate(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg) + { + if (!EventsRunning()) return; + + if (msg.Length == 0) { Errorf("No message body provided"); return; } + + AccountNumConns m; + try { m = JsonSerializer.Deserialize(msg)!; } + catch (Exception ex) + { + Errorf("Error unmarshalling account connection event message: {0}", ex); + return; + } + + if (!_accounts.TryGetValue(m.Account, out var account)) return; + + _mu.EnterWriteLock(); + if (!Running() || !EventsEnabledLocked()) { _mu.ExitWriteLock(); return; } + if (m.Server.Id == _info.Id) + { + _sys!.Client?.Errorf("Processing our own account connection event message: ignored"); + _mu.ExitWriteLock(); + return; + } + var clients = account.UpdateRemoteServer(m); + UpdateRemoteServer(m.Server); + _mu.ExitWriteLock(); + + foreach (var cl in clients) + cl.MaxAccountConnExceeded(); + } + + // ------------------------------------------------------------------------- + // accConnsUpdate + // ------------------------------------------------------------------------- + + /// + /// Sends an account connection count advisory whenever the count changes or + /// on a heartbeat. + /// Mirrors Go Server.accConnsUpdate(). + /// + internal void AccConnsUpdate(Account a) + { + _mu.EnterWriteLock(); + try + { + if (!EventsEnabledLocked() || a == null || a == _gacc) return; + SendAccConnsUpdate(a, + string.Format(SystemSubjects.AccConnsEventSubjOld, a.Name), + string.Format(SystemSubjects.AccConnsEventSubjNew, a.Name)); + } + finally { _mu.ExitWriteLock(); } + } + + // ------------------------------------------------------------------------- + // accountConnectEvent + // ------------------------------------------------------------------------- + + /// + /// Publishes a client-connect advisory for billing/tracking purposes. + /// Mirrors Go Server.accountConnectEvent(). + /// + internal void AccountConnectEvent(ClientConnection c) + { + _mu.EnterWriteLock(); + if (!EventsEnabledLocked()) { _mu.ExitWriteLock(); return; } + var eid = NextEventID(); + _mu.ExitWriteLock(); + + Account? account; + string host, name, lang, version, jwt, kind, clientType, accName, nameTag; + ulong cid; + DateTime start; + List tags; + string issuerKey, user; + + lock (c) + { + account = c._account as Account; + if (account == null) return; + + cid = c.Cid; + start = c.Start; + host = c.Host; + name = c.Opts.Name; + lang = c.Opts.Lang; + version = c.Opts.Version; + jwt = c.Opts.Jwt; + kind = c.KindString(); + clientType = c.ClientTypeString(); + accName = account.Name; + nameTag = account.GetNameTag(); + tags = []; + issuerKey = EventHelpers.IssuerForClient(c); + user = c.GetRawAuthUser(); + } + + var m = new ConnectEventMsg + { + Type = EventMsgTypes.ConnectEventMsgType, + Id = eid, + Time = DateTime.UtcNow, + Client = new ClientInfo + { + Start = start.ToString("O"), + Host = host, + Id = cid, + Account = EventHelpers.AccForClient(c), + User = user, + Name = name, + Lang = lang, + Version = version, + Jwt = jwt, + IssuerKey = issuerKey, + Tags = tags, + NameTag = nameTag, + Kind = kind, + ClientType = clientType, + }, + }; + + var subj = string.Format(SystemSubjects.ConnectEventSubj, accName); + SendInternalMsgLocked(subj, string.Empty, m.Server, m); + } + + // ------------------------------------------------------------------------- + // sendAccConnsUpdate + // ------------------------------------------------------------------------- + + /// + /// Builds and queues an advisory. + /// Server lock must be held on entry. + /// Mirrors Go Server.sendAccConnsUpdate(). + /// + internal void SendAccConnsUpdate(Account a, params string[] subjects) + { + if (!EventsEnabledLocked() || a == null) return; + var sendq = _sys!.SendQueue; + if (sendq == null) return; + + var eid = NextEventID(); + + int aConns, aLeafs, aNumSubs; + lock (a) + { + aConns = a.NumLocalConnections(); + aLeafs = a.NumLocalLeafNodes(); + aNumSubs = a.TotalSubs(); + } + + var m = new AccountNumConns + { + Type = EventMsgTypes.AccountNumConnsMsgType, + Id = eid, + Time = DateTime.UtcNow, + Account = a.Name, + Name = a.GetNameTagLocked(), + Conns = aConns, + LeafNodes = aLeafs, + TotalConns = aConns + aLeafs, + NumSubs = (uint)aNumSubs, + Sent = new DataStats { Msgs = 0L, Bytes = 0L }, + Received = new DataStats { Msgs = 0L, Bytes = 0L }, + SlowConsumers = 0, + }; + + // Manage the account heartbeat timer. + lock (a) + { + if (m.TotalConns == 0) + { + a.ClearConnectionHeartbeatTimer(); + } + else + { + a.SetConnectionHeartbeatTimer((long)EventIntervals.EventsHbInterval.TotalMilliseconds, () => AccConnsUpdate(a)); + } + } + + foreach (var subj in subjects) + { + sendq.Push(new PubMsg + { + Client = null, + Subject = subj, + Reply = string.Empty, + Si = m.Server, + Hdr = null, + Msg = m, + }); + } + } + + // ========================================================================= + // Group D: Request Handlers + // ========================================================================= + + // ------------------------------------------------------------------------- + // userInfoReq + // ------------------------------------------------------------------------- + + /// + /// Handles a user-info request, returning the caller's account and permissions. + /// Mirrors Go Server.userInfoReq(). + /// + private void UserInfoReq(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg) + { + if (!EventsEnabled() || string.IsNullOrEmpty(reply)) return; + + var response = new ServerApiResponse { Server = new ServerInfo() }; + + var (ci, _, _, _, err) = GetRequestInfo(c, msg); + if (err != null) + { + response.Error = new ServerApiError { Code = (int)HttpStatusCode.BadRequest }; + SendInternalResponse(reply, response); + return; + } + + response.Data = new UserInfo + { + UserId = ci?.User ?? string.Empty, + Account = ci?.Account ?? string.Empty, + Permissions = c.PublicPermissions(), + Expires = c.Expires, + }; + SendInternalResponse(reply, response); + } + + // ------------------------------------------------------------------------- + // statszReq + // ------------------------------------------------------------------------- + + /// + /// Handles a statsz request from another server. + /// Mirrors Go Server.statszReq(). + /// + private void StatszReq(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg) + { + if (!EventsEnabled()) return; + + if (string.IsNullOrEmpty(reply)) + { + reply = string.Format(SystemSubjects.ServerStatsSubj, _info.Id); + WrapChk(ResetLastStatsz)(); + } + + if (msg.Length != 0) + { + StatszEventOptions opts; + try { opts = JsonSerializer.Deserialize(msg) ?? new StatszEventOptions(); } + catch (Exception ex) + { + var errResp = new ServerApiResponse + { + Server = new ServerInfo(), + Error = new ServerApiError { Code = (int)HttpStatusCode.BadRequest, Description = ex.Message }, + }; + SendInternalMsgLocked(reply, string.Empty, null, errResp); + return; + } + if (FilterRequest(opts)) return; + } + + SendStatsz(reply); + } + + // ------------------------------------------------------------------------- + // idzReq + // ------------------------------------------------------------------------- + + /// + /// Handles a request for basic static server identity. + /// Mirrors Go Server.idzReq(). + /// + private void IdzReq(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg) + { + _mu.EnterReadLock(); + var id = new ServerIdentity { Name = _info.Name, Host = _info.Host, Id = _info.Id }; + SendInternalMsg(reply, string.Empty, null, id); + _mu.ExitReadLock(); + } + + // ------------------------------------------------------------------------- + // zReq — generic monitoring request handler + // ------------------------------------------------------------------------- + + /// + /// Generic z-endpoint request handler that deserialises options, filters by + /// server, and sends a . + /// Mirrors Go Server.zReq(). + /// + internal void ZReq(ClientConnection c, string reply, byte[] hdr, byte[] msg, EventFilterOptions? fOpts, object? optz, Func<(object? data, Exception? err)> respf) + { + if (!EventsEnabled() || string.IsNullOrEmpty(reply)) return; + + var response = new ServerApiResponse { Server = new ServerInfo() }; + int status = 0; + Exception? err = null; + + if (msg.Length != 0 && optz != null) { - _mu.EnterWriteLock(); try { - if (!EventsEnabledLocked()) return; - f(); + var json = JsonSerializer.Deserialize(msg, optz.GetType()); + if (json != null) + { + // Copy values — simplest approach: re-use the deserialized object. + optz = json; + } } - finally + catch (Exception ex) { - _mu.ExitWriteLock(); + err = ex; + status = (int)HttpStatusCode.BadRequest; } + + if (err == null && fOpts != null && FilterRequest(fOpts)) return; + } + + if (err == null) + { + var (data, callErr) = respf(); + if (callErr?.Message == "filtered response") + return; + if (callErr != null) + { + err = callErr; + status = (int)HttpStatusCode.InternalServerError; + } + else + { + response.Data = data; + } + } + + if (err != null) + response.Error = new ServerApiError { Code = status, Description = err.Message }; + + SendInternalResponse(reply, response); + } + + // ------------------------------------------------------------------------- + // filterRequest + // ------------------------------------------------------------------------- + + /// + /// Returns true if the request does NOT apply to this server (should be ignored). + /// Mirrors Go Server.filterRequest(). + /// + internal bool FilterRequest(EventFilterOptions? fOpts) + { + if (fOpts == null) return false; + + if (fOpts.ExactMatch) + { + if ((!string.IsNullOrEmpty(fOpts.Name) && fOpts.Name != _info.Name) || + (!string.IsNullOrEmpty(fOpts.Host) && fOpts.Host != _info.Host) || + (!string.IsNullOrEmpty(fOpts.Cluster) && fOpts.Cluster != ClusterName())) + return true; + } + else + { + if ((!string.IsNullOrEmpty(fOpts.Name) && !_info.Name.Contains(fOpts.Name)) || + (!string.IsNullOrEmpty(fOpts.Host) && !_info.Host.Contains(fOpts.Host)) || + (!string.IsNullOrEmpty(fOpts.Cluster) && !ClusterName().Contains(fOpts.Cluster))) + return true; + } + + if (fOpts.Tags?.Count > 0) + { + var opts = GetOpts(); + foreach (var t in fOpts.Tags) + if (!opts.Tags.Contains(t)) return true; + } + + if (!string.IsNullOrEmpty(fOpts.Domain) && GetOpts().JetStreamDomain != fOpts.Domain) + return true; + + return false; + } + + // ------------------------------------------------------------------------- + // debugSubscribers + // ------------------------------------------------------------------------- + + /// + /// Handles a debug request for the count of subscribers matching a given subject. + /// Mirrors Go Server.debugSubscribers(). + /// + private void DebugSubscribers(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg) + { + if (c.Kind != ClientKind.Client) return; + + // Parse the message body to get the subject and optional queue group. + var args = Encoding.UTF8.GetString(msg).Trim().Split(' ', StringSplitOptions.RemoveEmptyEntries); + if (args.Length == 0) + { + SendInternalAccountMsg(null, reply, 0); + return; + } + + var tsubj = args[0]; + var qgroup = args.Length > 1 ? args[1] : null; + + // Look up the account from the header or the client. + Account? lookupAcc = acc ?? _sysAccAtomic ?? _gacc; + + if (lookupAcc == null) return; + + // Count local subscribers. + var nsubs = CountLocalSubscribers(lookupAcc, tsubj, qgroup); + + int expected = lookupAcc.ExpectedRemoteResponses(); + if (expected == 0) + { + SendInternalAccountMsg(null, reply, nsubs); + return; + } + + // Solicit remote counts. + int responses = 0; + var done = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + _mu.EnterWriteLock(); + var replySubj = NewRespInbox(); + _sys!.Replies[replySubj] = (_, __, ___, ____, _____, ______, rMsg) => + { + if (long.TryParse(Encoding.UTF8.GetString(rMsg), out var n)) + Interlocked.Add(ref nsubs, (int)n); + if (Interlocked.Increment(ref responses) >= expected) + done.TrySetResult(true); + }; + var request = new AccNumSubsReq { Account = lookupAcc.Name, Subject = tsubj, Queue = qgroup }; + SendInternalMsg(SystemSubjects.AccNumSubsReqSubj, replySubj, null, request); + _mu.ExitWriteLock(); + + Task.Run(async () => + { + await Task.WhenAny(done.Task, Task.Delay(500)).ConfigureAwait(false); + bool send = false; + _mu.EnterWriteLock(); + if (_sys != null && _sys.Replies != null) + { + _sys.Replies.Remove(replySubj); + send = true; + } + _mu.ExitWriteLock(); + if (send) + SendInternalAccountMsg(null, reply, Interlocked.Add(ref nsubs, 0)); + }); + } + + private static int CountLocalSubscribers(Account acc, string subj, string? queue) + { + if (acc.Sublist == null) return 0; + var rr = acc.Sublist.Match(subj); + int count = 0; + foreach (var s in rr.PSubs) + { + if (queue != null && (s.Queue == null || System.Text.Encoding.ASCII.GetString(s.Queue) != queue)) continue; + if (s.Client?.Kind == ClientKind.Client) count++; + } + foreach (var qsubs in rr.QSubs) + foreach (var s in qsubs) + { + if (queue != null && (s.Queue == null || System.Text.Encoding.ASCII.GetString(s.Queue) != queue)) continue; + if (s.Client?.Kind == ClientKind.Client) count++; + } + return count; + } + + // ------------------------------------------------------------------------- + // nsubsRequest + // ------------------------------------------------------------------------- + + /// + /// Handles a request for local subscription counts from a remote origin server. + /// Mirrors Go Server.nsubsRequest(). + /// + private void NsubsRequest(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg) + { + if (!EventsRunning()) return; + + if (msg.Length == 0) { Errorf("request requires a body"); return; } + + AccNumSubsReq m; + try { m = JsonSerializer.Deserialize(msg)!; } + catch (Exception ex) { Errorf("Error unmarshalling account nsubs request message: {0}", ex); return; } + + var (account, _) = LookupAccount(m.Account); + if (account == null || account.NumLocalAndLeafConnections() == 0) return; + + var nsubs = CountLocalSubscribers(account, m.Subject, m.Queue); + SendInternalMsgLocked(reply, string.Empty, null, nsubs); + } + + // ------------------------------------------------------------------------- + // reloadConfig + // ------------------------------------------------------------------------- + + /// + /// Handles a server-reload request from the system bus. + /// Mirrors Go Server.reloadConfig(). + /// + private void ReloadConfigHandler(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg) + { + if (!EventsRunning()) return; + var fOpts = new EventFilterOptions(); + ZReq(c, reply, hdr, msg, fOpts, fOpts, () => (null, ReloadConfigFromRequest())); + } + + /// + /// Reloads server configuration initiated from a system bus request. + /// Returns any error. + /// Mirrors Go Server.Reload() called from the reloadConfig handler. + /// + private Exception? ReloadConfigFromRequest() + { + var newOpts = GetOpts().Clone(); + return ReloadOptions(newOpts); + } + + // ========================================================================= + // Group E: Leaf Node & Gateway Events + // ========================================================================= + + // ------------------------------------------------------------------------- + // ensureGWsInterestOnlyForLeafNodes + // ------------------------------------------------------------------------- + + /// + /// Sends a leaf-node connect message for every account that has leaf node + /// connections, so that gateway peers switch to interest-only mode. + /// Server lock must be held on entry. + /// Mirrors Go Server.ensureGWsInterestOnlyForLeafNodes(). + /// + private void EnsureGWsInterestOnlyForLeafNodes() + { + if (!_gateway.Enabled || _leafs.Count == 0) return; + + var sent = new HashSet(); + foreach (var c in _leafs.Values) + { + if (c._account is Account a && sent.Add(a)) + SendLeafNodeConnectMsg(a.Name); + } + } + + // ------------------------------------------------------------------------- + // sendLeafNodeConnect + // ------------------------------------------------------------------------- + + /// + /// Sends a leaf-node connect system event for the given account (lock-free entry). + /// Mirrors Go Server.sendLeafNodeConnect(). + /// + internal void SendLeafNodeConnect(Account a) + { + _mu.EnterWriteLock(); + if (a == null || !EventsEnabledLocked() || !_gateway.Enabled) + { + _mu.ExitWriteLock(); + return; + } + SendLeafNodeConnectMsg(a.Name); + _mu.ExitWriteLock(); + + SwitchAccountToInterestMode(a.Name); + } + + // ------------------------------------------------------------------------- + // sendLeafNodeConnectMsg + // ------------------------------------------------------------------------- + + /// + /// Queues the internal leaf-node-connect message for the given account name. + /// Server lock must be held on entry. + /// Mirrors Go Server.sendLeafNodeConnectMsg(). + /// + internal void SendLeafNodeConnectMsg(string accName) + { + var subj = string.Format(SystemSubjects.LeafNodeConnectEventSubj, accName); + var m = new AccNumConnsReq { Account = accName }; + SendInternalMsg(subj, string.Empty, m.Server, m); + } + + // ========================================================================= + // Group F: Auth & Error Events + // ========================================================================= + + // ------------------------------------------------------------------------- + // sendAuthErrorEvent + // ------------------------------------------------------------------------- + + /// + /// Sends a system-level auth error advisory to the system account. + /// Mirrors Go Server.sendAuthErrorEvent(). + /// + internal void SendAuthErrorEvent(ClientConnection c, string reason) + { + _mu.EnterWriteLock(); + if (!EventsEnabledLocked()) { _mu.ExitWriteLock(); return; } + var eid = NextEventID(); + _mu.ExitWriteLock(); + + var now = DateTime.UtcNow; + DisconnectEventMsg m; + lock (c) + { + m = BuildDisconnectMsg(c, eid, now, reason); + } + + _mu.EnterWriteLock(); + var subj = string.Format(SystemSubjects.AuthErrorEventSubj, _info.Id); + SendInternalMsg(subj, string.Empty, m.Server, m); + _mu.ExitWriteLock(); + } + + // ------------------------------------------------------------------------- + // sendAccountAuthErrorEvent + // ------------------------------------------------------------------------- + + /// + /// Sends an account-level auth error advisory to the account's system stream. + /// Mirrors Go Server.sendAccountAuthErrorEvent(). + /// + internal void SendAccountAuthErrorEvent(ClientConnection c, Account? acc, string reason) + { + if (acc == null) return; + _mu.EnterWriteLock(); + if (!EventsEnabledLocked()) { _mu.ExitWriteLock(); return; } + var eid = NextEventID(); + _mu.ExitWriteLock(); + + var now = DateTime.UtcNow; + DisconnectEventMsg m; + lock (c) + { + m = BuildDisconnectMsg(c, eid, now, reason, acc.Name); + } + + SendInternalAccountSysMsg(acc, SystemSubjects.AuthErrorAccountEventSubj, m.Server, m, 0); + } + + private DisconnectEventMsg BuildDisconnectMsg(ClientConnection c, string eid, DateTime now, string reason, string? overrideAccName = null) + { + return new DisconnectEventMsg + { + Type = EventMsgTypes.DisconnectEventMsgType, + Id = eid, + Time = now, + Client = new ClientInfo + { + Start = c.Start.ToString("O"), + Host = c.Host, + Id = c.Cid, + Account = overrideAccName ?? EventHelpers.AccForClient(c), + User = c.GetRawAuthUser(), + Name = c.Opts.Name, + Lang = c.Opts.Lang, + Version = c.Opts.Version, + Rtt = c.GetRttValue(), + Jwt = c.Opts.Jwt, + IssuerKey = EventHelpers.IssuerForClient(c), + Kind = c.KindString(), + ClientType = c.ClientTypeString(), + }, + Sent = new DataStats { Msgs = 0L, Bytes = 0L }, + Received = new DataStats { Msgs = 0L, Bytes = 0L }, + Reason = reason, }; } -} - -// ============================================================================ -// EventFilterOptions — common filter options for system event requests -// Mirrors Go EventFilterOptions in server/events.go. -// ============================================================================ - -/// -/// Filter criteria applied to system event requests (STATSZ, VARZ, CONNZ, etc.). -/// Mirrors Go EventFilterOptions in server/events.go. -/// -public sealed class EventFilterOptions -{ - /// Filter by server name (substring unless ExactMatch). - public string Name { get; set; } = string.Empty; - - /// Filter by cluster name (substring unless ExactMatch). - public string Cluster { get; set; } = string.Empty; - - /// Filter by host (substring unless ExactMatch). - public string Host { get; set; } = string.Empty; - - /// When true, use exact equality instead of substring matching. - public bool ExactMatch { get; set; } - - /// Filter by tags (all must match). - public List Tags { get; set; } = []; - - /// Filter by JetStream domain. - public string Domain { get; set; } = string.Empty; -} - -// ============================================================================ -// ServerApiResponse — standard API response envelope -// Mirrors Go ServerAPIResponse in server/events.go. -// ============================================================================ - -/// -/// Response envelope used by system API endpoints (varz, connz, statsz, etc.). -/// Mirrors Go ServerAPIResponse in server/events.go. -/// -public sealed class ServerApiResponse -{ - public ServerInfo? Server { get; set; } - public object? Data { get; set; } - public ApiError? Error { get; set; } - - /// Internal: compression type for this response. Not serialised. - internal int Compress { get; set; } -} - -/// -/// API error returned in . -/// Mirrors Go ApiError in server/events.go. -/// -public sealed class ApiError -{ - public int Code { get; set; } - public string Description { get; set; } = string.Empty; + + // ------------------------------------------------------------------------- + // registerSystemImportsForExisting + // ------------------------------------------------------------------------- + + /// + /// Registers system imports for all existing accounts (excluding the system account). + /// Mirrors Go Server.registerSystemImportsForExisting(). + /// + internal void RegisterSystemImportsForExisting() + { + _mu.EnterReadLock(); + if (_sys == null) { _mu.ExitReadLock(); return; } + var sacc = _sys.Account; + var accounts = _accounts.Values.Where(a => a != sacc).ToList(); + _mu.ExitReadLock(); + + foreach (var a in accounts) + RegisterSystemImports(a); + } + + // ========================================================================= + // Group G: OCSP Events + // ========================================================================= + + // ------------------------------------------------------------------------- + // sendOCSPPeerRejectEvent + // ------------------------------------------------------------------------- + + /// + /// Publishes an OCSP peer-reject advisory to the system account. + /// Mirrors Go Server.sendOCSPPeerRejectEvent(). + /// + internal void SendOcspPeerRejectEvent(string kind, X509Certificate2? peer, string reason) + { + _mu.EnterWriteLock(); + try + { + if (!EventsEnabledLocked()) return; + if (peer == null) + { + Errorf(OcspMessages.ErrPeerEmptyNoEvent); + return; + } + var eid = NextEventID(); + var now = DateTime.UtcNow; + var m = new OcspPeerRejectEventMsg + { + Type = EventMsgTypes.OcspPeerRejectEventMsgType, + Id = eid, + Time = now, + Kind = kind, + Peer = new CertInfo + { + Subject = OcspUtilities.GetSubjectDNForm(peer), + Issuer = OcspUtilities.GetIssuerDNForm(peer), + Fingerprint = OcspUtilities.GenerateFingerprint(peer), + Raw = peer.RawData, + }, + Reason = reason, + }; + var subj = string.Format(SystemSubjects.OcspPeerRejectEventSubj, _info.Id); + SendInternalMsg(subj, string.Empty, m.Server, m); + } + finally { _mu.ExitWriteLock(); } + } + + // ------------------------------------------------------------------------- + // sendOCSPPeerChainlinkInvalidEvent + // ------------------------------------------------------------------------- + + /// + /// Publishes an OCSP chainlink-invalid advisory to the system account. + /// Mirrors Go Server.sendOCSPPeerChainlinkInvalidEvent(). + /// + internal void SendOcspPeerChainlinkInvalidEvent(X509Certificate2? peer, X509Certificate2? link, string reason) + { + _mu.EnterWriteLock(); + try + { + if (!EventsEnabledLocked()) return; + if (peer == null || link == null) + { + Errorf(OcspMessages.ErrPeerEmptyNoEvent); + return; + } + var eid = NextEventID(); + var now = DateTime.UtcNow; + var m = new OcspPeerChainlinkInvalidEventMsg + { + Type = EventMsgTypes.OcspPeerChainlinkInvalidEventMsgType, + Id = eid, + Time = now, + Link = new CertInfo + { + Subject = OcspUtilities.GetSubjectDNForm(link), + Issuer = OcspUtilities.GetIssuerDNForm(link), + Fingerprint = OcspUtilities.GenerateFingerprint(link), + Raw = link.RawData, + }, + Peer = new CertInfo + { + Subject = OcspUtilities.GetSubjectDNForm(peer), + Issuer = OcspUtilities.GetIssuerDNForm(peer), + Fingerprint = OcspUtilities.GenerateFingerprint(peer), + Raw = peer.RawData, + }, + Reason = reason, + }; + var subj = string.Format(SystemSubjects.OcspPeerChainlinkInvalidEventSubj, _info.Id); + SendInternalMsg(subj, string.Empty, m.Server, m); + } + finally { _mu.ExitWriteLock(); } + } + + // ========================================================================= + // Group H: Misc + // ========================================================================= + + // ------------------------------------------------------------------------- + // remoteLatencyUpdate + // ------------------------------------------------------------------------- + + /// + /// Handles a remote latency measurement event for tracking exported service latency. + /// Mirrors Go Server.remoteLatencyUpdate(). + /// + private void RemoteLatencyUpdate(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg) + { + if (!EventsRunning()) return; + + RemoteLatency rl; + try { rl = JsonSerializer.Deserialize(msg)!; } + catch (Exception ex) + { + Errorf("Error unmarshalling remote latency measurement: {0}", ex); + return; + } + + var (account, err) = LookupAccount(rl.Account); + if (err != null) + { + Warnf("Could not lookup account {0} for latency measurement", rl.Account); + return; + } + + // Look up the service import associated with this response. + ServiceImportEntry? si = null; + string lsub = string.Empty; + lock (account!) + { + var exports = account.Exports; + if (exports.Responses?.TryGetValue(rl.RequestId, out var entry) == true) + { + si = entry; + lsub = entry.Latency?.Subject ?? string.Empty; + } + } + if (si == null) return; + + ServiceLatency? m1 = null; + var m2 = rl.M2; + + lock (si.Account!) + { + m1 = si.M1; + if (m1 == null) + si.M1 = m2; + } + if (m1 == null) return; + + // Merge the two halves of the latency measurement. + // Merge responder (m2) into requestor (m1): + // m2.ServiceLatencyDuration is correct, m1.TotalLatency is correct. + var responderRtt = m2.Responder?.Rtt ?? TimeSpan.Zero; + m1.SystemLatency = m1.ServiceLatencyDuration - (m2.ServiceLatencyDuration + responderRtt); + if (m1.SystemLatency < TimeSpan.Zero) m1.SystemLatency = TimeSpan.Zero; + m1.ServiceLatencyDuration = m2.ServiceLatencyDuration; + if (m1.ServiceLatencyDuration < TimeSpan.Zero) m1.ServiceLatencyDuration = TimeSpan.Zero; + m1.Responder = m2.Responder; + + lock (account) + { + si.RequestingClient = null; + } + + SendInternalAccountMsg(account, lsub, m1); + } + + // ------------------------------------------------------------------------- + // kickClient + // ------------------------------------------------------------------------- + + /// + /// Handles a request to forcefully disconnect a client by CID. + /// Mirrors Go Server.kickClient(). + /// + private void KickClient(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg) + { + if (!EventsRunning()) return; + + KickClientReq req; + try { req = JsonSerializer.Deserialize(msg)!; } + catch (Exception ex) { Errorf("Error unmarshalling kick client request: {0}", ex); return; } + + var fOpts = new EventFilterOptions(); + ZReq(c, reply, hdr, msg, fOpts, fOpts, () => (null, DisconnectClientByID(req.Cid))); + } + + // ------------------------------------------------------------------------- + // ldmClient + // ------------------------------------------------------------------------- + + /// + /// Handles a request to put a specific client into lame-duck mode. + /// Mirrors Go Server.ldmClient(). + /// + private void LdmClient(Subscription sub, ClientConnection c, Account acc, string subject, string reply, byte[] hdr, byte[] msg) + { + if (!EventsRunning()) return; + + LdmClientReq req; + try { req = JsonSerializer.Deserialize(msg)!; } + catch (Exception ex) { Errorf("Error unmarshalling LDM client request: {0}", ex); return; } + + var fOpts = new EventFilterOptions(); + ZReq(c, reply, hdr, msg, fOpts, fOpts, () => (null, LDMClientByID(req.Cid))); + } + + // ========================================================================= + // Monitoring verb stub handlers + // (Full monitor implementations are in NatsServer.Monitor — these delegate.) + // ========================================================================= + + private void VarzReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void SubszReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void ConnzReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void RoutezReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void GatewayzReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void LeafzReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void AccountzReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void JszReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void HealthzReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void ExpvarzReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void IpqueueszReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void RaftzReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void AccSubszReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void AccConnzReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void AccLeafzReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void AccJszReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void AccInfoReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void AccStatzReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); + + private void StatzAccPingReq(Subscription s, ClientConnection c, Account a, string subj, string rply, byte[] hdr, byte[] msg) + => ZReq(c, rply, hdr, msg, null, new StatszEventOptions(), () => (null, null)); } diff --git a/porting.db b/porting.db index e1f91cd..822bdfa 100644 Binary files a/porting.db and b/porting.db differ diff --git a/reports/current.md b/reports/current.md index efd581b..1aa428d 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-03-01 14:18:03 UTC +Generated: 2026-03-01 14:53:22 UTC ## Modules (12 total) @@ -13,10 +13,10 @@ Generated: 2026-03-01 14:18:03 UTC | Status | Count | |--------|-------| | complete | 22 | -| deferred | 363 | +| deferred | 284 | | n_a | 24 | | stub | 1 | -| verified | 3263 | +| verified | 3342 | ## Unit Tests (3257 total) @@ -35,4 +35,4 @@ Generated: 2026-03-01 14:18:03 UTC ## Overall Progress -**5694/6942 items complete (82.0%)** +**5773/6942 items complete (83.2%)**