diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.Events.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.Events.cs
new file mode 100644
index 0000000..39e7254
--- /dev/null
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.Events.cs
@@ -0,0 +1,143 @@
+// Copyright 2018-2026 The NATS Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Adapted from server/accounts.go (statz) and server/events.go in the NATS server Go source.
+
+namespace ZB.MOM.NatsNet.Server;
+
+// ============================================================================
+// AccountTrafficStats — per-kind traffic counters for an account
+// Mirrors the anonymous embedded `stats` struct in Go's Account.stats.
+// ============================================================================
+
+///
+/// Per-kind (total, gateway, route, leaf) message and byte traffic counters
+/// for an account. Protected by the owning lock.
+/// Mirrors Go stats struct (the small one embedded in Account.stats).
+///
+internal sealed class AccountTrafficStats
+{
+ public long InMsgs;
+ public long OutMsgs;
+ public long InBytes;
+ public long OutBytes;
+ public long SlowConsumers;
+}
+
+///
+/// Aggregated traffic statistics for an account, including breakdowns by
+/// gateway, route, and leaf-node traffic.
+/// Mirrors Go's embedded anonymous stats struct in Account.
+///
+internal sealed class AccountTrafficStatSet
+{
+ private readonly object _mu = new();
+
+ public AccountTrafficStats Total { get; } = new();
+ public AccountTrafficStats Gateways { get; } = new();
+ public AccountTrafficStats Routes { get; } = new();
+ public AccountTrafficStats Leafs { get; } = new();
+
+ public void Lock() => Monitor.Enter(_mu);
+ public void Unlock() => Monitor.Exit(_mu);
+}
+
+// ============================================================================
+// Account partial — events / statz
+// ============================================================================
+
+public sealed partial class Account
+{
+ // -------------------------------------------------------------------------
+ // Traffic statistics (mirrors Go Account.stats embedded struct)
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Aggregated account traffic statistics.
+ /// Mirrors Go's stats struct { sync.Mutex; stats; gw stats; rt stats; ln stats }
+ /// embedded in Account.
+ ///
+ internal readonly AccountTrafficStatSet TrafficStats = new();
+
+ // -------------------------------------------------------------------------
+ // Group F: Account.statz
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Computes and returns a snapshot of the account's current statistics.
+ /// Lock (account _mu write lock) should be held on entry.
+ /// Mirrors Go (a *Account) statz() *AccountStat in server/events.go.
+ ///
+ internal AccountStat Statz()
+ {
+ var localConns = NumLocalConnectionsLocked();
+ var leafConns = NumLocalLeafNodes();
+
+ TrafficStats.Lock();
+ var received = new DataStats
+ {
+ Msgs = TrafficStats.Total.InMsgs,
+ Bytes = TrafficStats.Total.InBytes,
+ Gateways = new MsgBytes
+ {
+ Msgs = TrafficStats.Gateways.InMsgs,
+ Bytes = TrafficStats.Gateways.InBytes,
+ },
+ Routes = new MsgBytes
+ {
+ Msgs = TrafficStats.Routes.InMsgs,
+ Bytes = TrafficStats.Routes.InBytes,
+ },
+ Leafs = new MsgBytes
+ {
+ Msgs = TrafficStats.Leafs.InMsgs,
+ Bytes = TrafficStats.Leafs.InBytes,
+ },
+ };
+ var sent = new DataStats
+ {
+ Msgs = TrafficStats.Total.OutMsgs,
+ Bytes = TrafficStats.Total.OutBytes,
+ Gateways = new MsgBytes
+ {
+ Msgs = TrafficStats.Gateways.OutMsgs,
+ Bytes = TrafficStats.Gateways.OutBytes,
+ },
+ Routes = new MsgBytes
+ {
+ Msgs = TrafficStats.Routes.OutMsgs,
+ Bytes = TrafficStats.Routes.OutBytes,
+ },
+ Leafs = new MsgBytes
+ {
+ Msgs = TrafficStats.Leafs.OutMsgs,
+ Bytes = TrafficStats.Leafs.OutBytes,
+ },
+ };
+ var slowConsumers = TrafficStats.Total.SlowConsumers;
+ TrafficStats.Unlock();
+
+ return new AccountStat
+ {
+ Account = Name,
+ Name = GetNameTagLocked(),
+ Conns = localConns,
+ LeafNodes = leafConns,
+ TotalConns = localConns + leafConns,
+ NumSubs = Sublist?.Count() ?? 0,
+ Received = received,
+ Sent = sent,
+ SlowConsumers = slowConsumers,
+ };
+ }
+}
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Events.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Events.cs
new file mode 100644
index 0000000..5e5307b
--- /dev/null
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Events.cs
@@ -0,0 +1,40 @@
+// Copyright 2012-2026 The NATS Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Adapted from server/events.go (client.sendInternalMsg) in the NATS server Go source.
+
+namespace ZB.MOM.NatsNet.Server;
+
+public sealed partial class ClientConnection
+{
+ // =========================================================================
+ // Group E: sendInternalMsg from a client context
+ // =========================================================================
+
+ ///
+ /// Sends an internal message from this client's context, avoiding no-echo issues
+ /// when the message originates from a non-system client.
+ /// Mirrors Go (c *client) sendInternalMsg(...) in server/events.go.
+ ///
+ internal void SendInternalMsg(
+ string subject,
+ string reply,
+ ServerInfo? si,
+ object? msg)
+ {
+ var srv = Server as NatsServer;
+ if (srv is null) return;
+
+ srv.SendInternalMsgFromClient(this, subject, reply, si, msg);
+ }
+}
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientTypes.cs
index ca53a62..8115a96 100644
--- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientTypes.cs
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientTypes.cs
@@ -329,6 +329,53 @@ public sealed class ClientInfo
///
public string ServiceAccount() =>
string.IsNullOrWhiteSpace(ServiceName) ? Account : ServiceName;
+
+ // =========================================================================
+ // Group C: ClientInfo projection methods
+ // Mirrors Go methods on *ClientInfo in server/events.go.
+ // =========================================================================
+
+ ///
+ /// Returns the minimum amount of ClientInfo needed for assignment snapshots
+ /// (account, service, cluster only).
+ /// Mirrors Go (ci *ClientInfo) forAssignmentSnap() *ClientInfo in server/events.go.
+ ///
+ public ClientInfo ForAssignmentSnap() => new()
+ {
+ Account = Account,
+ ServiceName = ServiceName,
+ Cluster = Cluster,
+ };
+
+ ///
+ /// Returns a copy of this ClientInfo with JWT and issuer key stripped,
+ /// suitable for raft proposals.
+ /// Mirrors Go (ci *ClientInfo) forProposal() *ClientInfo in server/events.go.
+ ///
+ public ClientInfo? ForProposal()
+ {
+ var copy = (ClientInfo)MemberwiseClone();
+ copy.Jwt = string.Empty;
+ copy.IssuerKey = string.Empty;
+ // Deep-copy the Tags list to avoid sharing.
+ copy.Tags = [..Tags];
+ copy.Alternates = [..Alternates];
+ return copy;
+ }
+
+ ///
+ /// Returns a copy of this ClientInfo with JWT stripped and Alternates nulled,
+ /// suitable for JetStream advisory events.
+ /// Mirrors Go (ci *ClientInfo) forAdvisory() *ClientInfo in server/events.go.
+ ///
+ public ClientInfo? ForAdvisory()
+ {
+ var copy = (ClientInfo)MemberwiseClone();
+ copy.Jwt = string.Empty;
+ copy.Alternates = [];
+ copy.Tags = [..Tags];
+ return copy;
+ }
}
// ============================================================================
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventHelpers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventHelpers.cs
new file mode 100644
index 0000000..8c0e859
--- /dev/null
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventHelpers.cs
@@ -0,0 +1,278 @@
+// Copyright 2018-2026 The NATS Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Adapted from server/events.go in the NATS server Go source.
+
+using System.Collections.Concurrent;
+using System.Security.Cryptography;
+using System.Text;
+using ZB.MOM.NatsNet.Server.Internal;
+using ZB.MOM.NatsNet.Server.Internal.DataStructures;
+
+namespace ZB.MOM.NatsNet.Server;
+
+///
+/// Static helpers used by the event system.
+/// Mirrors various package-level helper functions in server/events.go.
+///
+internal static class EventHelpers
+{
+ // =========================================================================
+ // Base-62 alphabet used by NATS hash functions.
+ // Mirrors Go package-level const in server/accounts.go.
+ // =========================================================================
+
+ private const string Digits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+ private const int Base = 62;
+
+ // =========================================================================
+ // System hash length (mirrors Go sysHashLen = 8)
+ // =========================================================================
+
+ internal const int SysHashLen = 8;
+
+ // =========================================================================
+ // Inbox constants (mirrors Go InboxPrefix / related consts)
+ // =========================================================================
+
+ internal const string InboxPrefix = "$SYS._INBOX.";
+ internal const int InboxPrefixLen = 12; // len("$SYS._INBOX.")
+ internal const int ReplySuffixLen = 8; // Gives us 62^8 unique values
+
+ // =========================================================================
+ // PubMsg pool
+ // =========================================================================
+
+ private static readonly ConcurrentBag PubMsgPool = [];
+
+ ///
+ /// Factory for an internally-queued outbound publish message, using a pool
+ /// to amortise allocations.
+ /// Mirrors Go newPubMsg in server/events.go.
+ ///
+ internal static PubMsg NewPubMsg(
+ ClientConnection? client,
+ string subject,
+ string reply,
+ ServerInfo? si,
+ byte[]? hdr,
+ object? msg,
+ int oct,
+ bool echo,
+ bool last)
+ {
+ if (!PubMsgPool.TryTake(out var pm))
+ pm = new PubMsg();
+
+ pm.Client = client;
+ pm.Subject = subject;
+ pm.Reply = reply;
+ pm.Si = si;
+ pm.Hdr = hdr;
+ pm.Msg = msg;
+ pm.Oct = oct;
+ pm.Echo = echo;
+ pm.Last = last;
+ return pm;
+ }
+
+ ///
+ /// Returns a to the pool after all fields have been cleared.
+ /// Called by .
+ ///
+ internal static void ReturnPubMsg(PubMsg pm) => PubMsgPool.Add(pm);
+
+ // =========================================================================
+ // Group A: helpers
+ // =========================================================================
+
+ ///
+ /// Generate a RouteStat for a statz update from a route client.
+ /// Mirrors Go routeStat in server/events.go.
+ /// Note: full in/out message counters require ClientConnection to carry those fields
+ /// (added in a later batch). Until then, returns zero-valued counters.
+ ///
+ internal static RouteStat? RouteStat(ClientConnection? route)
+ {
+ if (route is null) return null;
+
+ lock (route)
+ {
+ var rs = new RouteStat
+ {
+ Id = route.Cid,
+ Sent = new DataStats { Msgs = 0, Bytes = 0 },
+ Received = new DataStats { Msgs = 0, Bytes = 0 },
+ Pending = (int)(route.OutPb),
+ };
+ if (route.Route != null)
+ rs.Name = route.Route.RemoteName;
+ return rs;
+ }
+ }
+
+ ///
+ /// Computes an 8-character base-62 hash of .
+ /// Mirrors Go getHash(name string) string in server/events.go.
+ ///
+ internal static string GetHash(string name) => GetHashSize(name, SysHashLen);
+
+ ///
+ /// Computes a hash of characters for .
+ /// Uses SHA-256, then maps the first bytes through the
+ /// base-62 digit alphabet — identical to Go's getHashSize.
+ /// Mirrors Go getHashSize(name string, size int) string in server/events.go.
+ ///
+ internal static string GetHashSize(string name, int size)
+ {
+ var hash = SHA256.HashData(Encoding.UTF8.GetBytes(name));
+ var result = new char[size];
+ for (var i = 0; i < size; i++)
+ result[i] = Digits[(int)(hash[i] % Base)];
+ return new string(result);
+ }
+
+ ///
+ /// Parses an Accept-Encoding header to determine which compression type
+ /// the remote supports (snappy / gzip / none).
+ /// Mirrors Go getAcceptEncoding(hdr []byte) compressionType in server/events.go.
+ ///
+ internal static CompressionType GetAcceptEncoding(byte[]? hdr)
+ {
+ if (hdr is null || hdr.Length == 0) return CompressionType.None;
+
+ // Extract the Accept-Encoding header value.
+ const string headerName = "Accept-Encoding";
+ var ae = GetHeaderValue(headerName, hdr)?.ToLowerInvariant() ?? string.Empty;
+
+ if (string.IsNullOrEmpty(ae)) return CompressionType.None;
+ if (ae.Contains("snappy") || ae.Contains("s2")) return CompressionType.Snappy;
+ if (ae.Contains("gzip")) return CompressionType.Gzip;
+ return CompressionType.Unsupported;
+ }
+
+ ///
+ /// Generate the tracking subject for remote latency from a response subject.
+ /// Mirrors Go remoteLatencySubjectForResponse in server/events.go.
+ ///
+ internal static string RemoteLatencySubjectForResponse(ReadOnlySpan subject)
+ {
+ if (!Account.IsTrackedReply(subject)) return string.Empty;
+
+ // Split on '.' and take second-to-last token.
+ var str = Encoding.ASCII.GetString(subject);
+ var toks = str.Split('.');
+ if (toks.Length < 2) return string.Empty;
+
+ return string.Format(SystemSubjects.RemoteLatencyEventSubj, toks[^2]);
+ }
+
+ ///
+ /// Sum subscription counts from a sublist result for clients and hub leaf nodes.
+ /// Mirrors Go totalSubs in server/events.go.
+ ///
+ internal static int TotalSubs(SubscriptionIndexResult? rr, byte[]? queueGroup)
+ {
+ if (rr is null) return 0;
+
+ var count = 0;
+
+ bool Matches(Subscription sub)
+ {
+ if (queueGroup != null && !sub.Queue.AsSpan().SequenceEqual(queueGroup))
+ return false;
+ return sub.Client?.Kind == ClientKind.Client || (sub.Client?.IsHubLeafNode() ?? false);
+ }
+
+ if (queueGroup is null)
+ {
+ foreach (var sub in rr.PSubs)
+ if (Matches(sub)) count++;
+ }
+
+ foreach (var qsubs in rr.QSubs)
+ foreach (var sub in qsubs)
+ if (Matches(sub)) count++;
+
+ return count;
+ }
+
+ ///
+ /// Returns the account name for a client (or "N/A" if none).
+ /// Mirrors Go accForClient(c *client) string in server/events.go.
+ ///
+ internal static string AccForClient(ClientConnection c)
+ {
+ var acc = c._account as Account;
+ return acc?.Name ?? "N/A";
+ }
+
+ ///
+ /// Returns the issuer key for a client (empty if none).
+ /// Mirrors Go issuerForClient(c *client) string in server/events.go.
+ ///
+ internal static string IssuerForClient(ClientConnection c)
+ {
+ if (c.Perms is null) return string.Empty;
+ // Stub: full user/signing-key lookup requires auth integration.
+ return string.Empty;
+ }
+
+ ///
+ /// Safely stops and nulls a timer reference.
+ /// Mirrors Go clearTimer(tp **time.Timer) in server/events.go.
+ ///
+ internal static void ClearTimer(ref System.Threading.Timer? timer)
+ {
+ var t = timer;
+ timer = null;
+ t?.Dispose();
+ }
+
+ // =========================================================================
+ // Header parsing helper
+ // =========================================================================
+
+ ///
+ /// Extracts the value of a named header from a NATS wire-format header block.
+ ///
+ private static string? GetHeaderValue(string headerName, byte[] hdr)
+ {
+ var text = Encoding.ASCII.GetString(hdr);
+ var search = headerName + ":";
+ var idx = text.IndexOf(search, StringComparison.OrdinalIgnoreCase);
+ if (idx < 0) return null;
+
+ var start = idx + search.Length;
+ var end = text.IndexOf('\r', start);
+ if (end < 0) end = text.Length;
+
+ return text[start..end].Trim();
+ }
+}
+
+// =========================================================================
+// CompressionType — mirrors Go compressionType enum in events.go
+// =========================================================================
+
+///
+/// Compression type for internal message publishing.
+/// Mirrors Go compressionType in server/events.go.
+///
+internal enum CompressionType : int
+{
+ None = 0,
+ Gzip = 1,
+ Snappy = 2,
+ Unsupported = 3,
+}
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventTypes.cs
index 540ac72..e3e491b 100644
--- a/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventTypes.cs
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventTypes.cs
@@ -196,8 +196,8 @@ internal sealed class InternalState
{
// ---- identity / sequencing ----
public Account? Account { get; set; }
- public NatsClient? Client { get; set; }
- public ulong Seq { get; set; }
+ public ClientConnection? Client { get; set; }
+ public long Seq; // accessed via Interlocked.Increment
public int Sid { get; set; }
// ---- remote server tracking ----
@@ -289,7 +289,7 @@ internal sealed class ServerUpdate
///
internal sealed class PubMsg
{
- public NatsClient? Client { get; set; }
+ public ClientConnection? Client { get; set; }
public string Subject { get; set; } = string.Empty;
public string Reply { get; set; } = string.Empty;
public ServerInfo? Si { get; set; }
@@ -302,7 +302,24 @@ internal sealed class PubMsg
public bool Echo { get; set; }
public bool Last { get; set; }
- // TODO: session 12 — add pool return helper (returnToPool).
+ ///
+ /// Clears all fields and returns this instance to the pool held in
+ /// .
+ /// Mirrors Go (pm *pubMsg) returnToPool() in server/events.go.
+ ///
+ internal void ReturnToPool()
+ {
+ Client = null;
+ Subject = string.Empty;
+ Reply = string.Empty;
+ Si = null;
+ Hdr = null;
+ Msg = null;
+ Oct = 0;
+ Echo = false;
+ Last = false;
+ EventHelpers.ReturnPubMsg(this);
+ }
}
// ============================================================================
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/Subscription.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/Subscription.cs
index bf63462..aa88c1a 100644
--- a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/Subscription.cs
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/Subscription.cs
@@ -13,6 +13,8 @@
//
// Adapted from server/client.go (subscription struct) in the NATS server Go source.
+using ZB.MOM.NatsNet.Server;
+
namespace ZB.MOM.NatsNet.Server.Internal;
///
@@ -40,6 +42,13 @@ public sealed class Subscription
/// The client that owns this subscription. Null in test/stub scenarios.
public NatsClient? Client { get; set; }
+ ///
+ /// System message callback. Set on subscriptions created via systemSubscribe
+ /// so that the internal receive queue can dispatch to the right handler.
+ /// Mirrors Go subscription.icb sysMsgHandler in server/events.go.
+ ///
+ public SysMsgHandler? SysMsgCb { get; set; }
+
/// Marks this subscription as closed.
public void Close() => Interlocked.Exchange(ref _closed, 1);
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Events.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Events.cs
new file mode 100644
index 0000000..f606447
--- /dev/null
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Events.cs
@@ -0,0 +1,949 @@
+// Copyright 2018-2026 The NATS Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Adapted from server/events.go in the NATS server Go source.
+// Batch 44: Events Core & Dispatch.
+
+using System.Text;
+using System.Text.Json;
+using ZB.MOM.NatsNet.Server.Internal;
+using ZB.MOM.NatsNet.Server.Internal.DataStructures;
+
+namespace ZB.MOM.NatsNet.Server;
+
+public sealed partial class NatsServer
+{
+ // =========================================================================
+ // Constants (mirrors Go sysHashLen = 8, InboxPrefix consts)
+ // =========================================================================
+
+ /// Length of the response-inbox prefix used for this server's internal replies.
+ private int RespInboxPrefixLen => EventHelpers.InboxPrefixLen + EventHelpers.SysHashLen + 1;
+
+ // =========================================================================
+ // Group G: internalReceiveLoop
+ // Mirrors Go (s *Server) internalReceiveLoop in server/events.go.
+ // =========================================================================
+
+ ///
+ /// Background loop that dispatches all messages the server needs to process
+ /// internally via system subscriptions (e.g. internal subs).
+ /// Mirrors Go (s *Server) internalReceiveLoop in server/events.go.
+ ///
+ private void InternalReceiveLoop(IpQueue recvq)
+ {
+ while (EventsRunning())
+ {
+ // Wait for a notification that items are ready.
+ if (!recvq.Ch.WaitToReadAsync(_quitCts.Token).AsTask()
+ .GetAwaiter()
+ .GetResult())
+ {
+ return;
+ }
+
+ var msgs = recvq.Pop();
+ if (msgs is not null)
+ {
+ foreach (var m in msgs)
+ {
+ m.Cb?.Invoke(m.Sub, m.Client, m.Acc, m.Subject, m.Reply, m.Hdr, m.Msg);
+ }
+ recvq.Recycle(msgs);
+ }
+ }
+ }
+
+ // =========================================================================
+ // Group G: internalSendLoop
+ // Mirrors Go (s *Server) internalSendLoop in server/events.go.
+ // =========================================================================
+
+ ///
+ /// Background loop that serialises and dispatches all messages the server
+ /// wants to send through the system account. Runs as a long-lived goroutine.
+ /// Mirrors Go (s *Server) internalSendLoop in server/events.go.
+ ///
+ private void InternalSendLoop()
+ {
+ // Read snapshot of send queue and system client under the read lock.
+ // Mirrors Go's RESET: label pattern.
+ RESET:
+ _mu.EnterReadLock();
+ if (_sys is null || _sys.SendQueue is null)
+ {
+ _mu.ExitReadLock();
+ return;
+ }
+ var sysc = _sys.Client;
+ var resetCh = _sys.ResetChannel;
+ var sendq = _sys.SendQueue;
+ var id = _info.Id;
+ var host = _info.Host;
+ var svrName = _info.Name;
+ var domain = _info.Domain ?? string.Empty;
+ var seqRef = _sys; // holds ref so we can atomically increment Seq
+ var jsEnabled = _info.JetStream;
+ var cluster = _info.Cluster ?? string.Empty;
+ if (_gateway.Enabled)
+ cluster = GetGatewayName();
+ _mu.ExitReadLock();
+
+ var opts = GetOpts();
+ var tags = opts.Tags;
+ var metadata = opts.Metadata;
+
+ while (EventsRunning())
+ {
+ // Wait for items in the send queue OR a reset signal OR quit.
+ var sendTask = sendq.Ch.WaitToReadAsync(_quitCts.Token).AsTask();
+ var resetTask = resetCh is not null
+ ? resetCh.Reader.WaitToReadAsync(_quitCts.Token).AsTask()
+ : Task.FromResult(false);
+
+ var completed = Task.WhenAny(sendTask, resetTask, Task.Delay(Timeout.Infinite, _quitCts.Token))
+ .GetAwaiter().GetResult();
+
+ if (_quitCts.IsCancellationRequested)
+ return;
+
+ // If reset channel fired, re-read the snapshot.
+ if (completed == resetTask && resetTask.IsCompletedSuccessfully && resetTask.Result)
+ {
+ resetCh?.Reader.TryRead(out _);
+ goto RESET;
+ }
+
+ if (!sendTask.IsCompletedSuccessfully || !sendTask.Result)
+ continue;
+
+ var msgs = sendq.Pop();
+ if (msgs is null) continue;
+
+ foreach (var pm in msgs)
+ {
+ // Stamp ServerInfo advisory fields if requested.
+ if (pm.Si is { } si)
+ {
+ si.Name = svrName;
+ si.Domain = domain;
+ si.Host = host;
+ si.Cluster = cluster;
+ si.Id = id;
+ si.Seq = (ulong)Interlocked.Increment(ref seqRef.Seq);
+ si.Version = ServerConstants.Version;
+ si.Time = DateTime.UtcNow;
+ si.Tags = tags.Count > 0 ? [..tags] : null;
+ si.Metadata = metadata.Count > 0 ? new Dictionary(metadata) : null;
+ si.Capabilities = 0;
+ if (jsEnabled)
+ {
+ si.SetJetStreamEnabled();
+ si.SetBinaryStreamSnapshot();
+ // AccountNRG: stub, not yet tracked
+ }
+ }
+
+ // Serialise payload.
+ byte[] body = [];
+ if (pm.Msg is not null)
+ {
+ body = pm.Msg switch
+ {
+ string s => Encoding.UTF8.GetBytes(s),
+ byte[] b => b,
+ _ => JsonSerializer.SerializeToUtf8Bytes(pm.Msg),
+ };
+ }
+
+ // Choose client.
+ var c = pm.Client ?? sysc;
+ if (c is null) { pm.ReturnToPool(); continue; }
+
+ // Process the publish inline.
+ lock (c)
+ {
+ c.ParseCtx.Pa.Subject = Encoding.ASCII.GetBytes(pm.Subject);
+ c.ParseCtx.Pa.Reply = Encoding.ASCII.GetBytes(pm.Reply);
+ }
+
+ // Append CRLF.
+ var payload = new byte[body.Length + 2];
+ Buffer.BlockCopy(body, 0, payload, 0, body.Length);
+ payload[^2] = (byte)'\r';
+ payload[^1] = (byte)'\n';
+
+ c.ProcessInboundClientMsg(payload);
+
+ if (pm.Last)
+ {
+ // Final message (shutdown): flush in-place and exit.
+ c.FlushClients(long.MaxValue);
+ sendq.Recycle(msgs);
+ pm.ReturnToPool();
+ return;
+ }
+ else
+ {
+ c.FlushClients(0);
+ }
+
+ pm.ReturnToPool();
+ }
+ sendq.Recycle(msgs);
+ }
+ }
+
+ // =========================================================================
+ // Group G: sendShutdownEvent
+ // Mirrors Go (s *Server) sendShutdownEvent in server/events.go.
+ // =========================================================================
+
+ ///
+ /// Queues the server shutdown event. Clears the send queue and reply
+ /// handlers so no further messages will be dispatched.
+ /// Mirrors Go (s *Server) sendShutdownEvent in server/events.go.
+ ///
+ internal void SendShutdownEvent()
+ {
+ _mu.EnterWriteLock();
+ try
+ {
+ if (_sys is null || _sys.SendQueue is null) return;
+
+ var subject = string.Format(SystemSubjects.ShutdownEventSubj, _info.Id);
+ var sendq = _sys.SendQueue;
+
+ // Stop any more messages from queuing.
+ _sys.SendQueue = null;
+ // Unhook all reply handlers.
+ _sys.Replies.Clear();
+
+ var si = new ServerInfo();
+ sendq.Push(EventHelpers.NewPubMsg(null, subject, string.Empty, si, null, si,
+ (int)CompressionType.None, false, true));
+ }
+ finally
+ {
+ _mu.ExitWriteLock();
+ }
+ }
+
+ // =========================================================================
+ // Group G: sendInternalAccountSysMsg
+ // Mirrors Go (s *Server) sendInternalAccountSysMsg in server/events.go.
+ // =========================================================================
+
+ ///
+ /// Sends an internal system message to a specific account using that account's
+ /// internal client. Acquires only the minimum needed locks.
+ /// Mirrors Go (s *Server) sendInternalAccountSysMsg in server/events.go.
+ ///
+ internal void SendInternalAccountSysMsg(
+ Account? account,
+ string subject,
+ ServerInfo? si,
+ object? msg,
+ int compressionType = (int)CompressionType.None)
+ {
+ _mu.EnterReadLock();
+ if (_sys is null || _sys.SendQueue is null || account is null)
+ {
+ _mu.ExitReadLock();
+ return;
+ }
+ var sendq = _sys.SendQueue;
+ _mu.ExitReadLock();
+
+ ClientConnection? c;
+ _mu.EnterWriteLock();
+ try
+ {
+ c = account.InternalAccountClient();
+ }
+ finally
+ {
+ _mu.ExitWriteLock();
+ }
+
+ sendq.Push(EventHelpers.NewPubMsg(
+ c, subject, string.Empty, si, null, msg, compressionType, false, false));
+ }
+
+ // =========================================================================
+ // Group G: sendInternalMsgLocked / sendInternalMsg
+ // Mirrors Go (s *Server) sendInternalMsgLocked and
+ // (s *Server) sendInternalMsg in server/events.go.
+ // =========================================================================
+
+ ///
+ /// Queues an internal message, acquiring the read lock.
+ /// Mirrors Go (s *Server) sendInternalMsgLocked in server/events.go.
+ ///
+ internal void SendInternalMsgLocked(
+ string subject,
+ string reply,
+ ServerInfo? si,
+ object? msg)
+ {
+ _mu.EnterReadLock();
+ try { SendInternalMsg(subject, reply, si, msg); }
+ finally { _mu.ExitReadLock(); }
+ }
+
+ ///
+ /// Queues an internal message. Lock must already be held.
+ /// Mirrors Go (s *Server) sendInternalMsg in server/events.go.
+ ///
+ internal void SendInternalMsg(
+ string subject,
+ string reply,
+ ServerInfo? si,
+ object? msg)
+ {
+ if (_sys is null || _sys.SendQueue is null) return;
+ _sys.SendQueue.Push(EventHelpers.NewPubMsg(
+ null, subject, reply, si, null, msg, (int)CompressionType.None, false, false));
+ }
+
+ ///
+ /// Queues an internal message from a specific client context.
+ /// Called by .
+ /// Mirrors Go (c *client) sendInternalMsg(...) in server/events.go.
+ ///
+ internal void SendInternalMsgFromClient(
+ ClientConnection client,
+ string subject,
+ string reply,
+ ServerInfo? si,
+ object? msg)
+ {
+ _mu.EnterReadLock();
+ try
+ {
+ if (_sys is null || _sys.SendQueue is null) return;
+ _sys.SendQueue.Push(EventHelpers.NewPubMsg(
+ client, subject, reply, si, null, msg, (int)CompressionType.None, false, false));
+ }
+ finally
+ {
+ _mu.ExitReadLock();
+ }
+ }
+
+ // =========================================================================
+ // Group G: sendInternalResponse
+ // Mirrors Go (s *Server) sendInternalResponse in server/events.go.
+ // =========================================================================
+
+ ///
+ /// Sends a response to an internal server API request.
+ /// Mirrors Go (s *Server) sendInternalResponse in server/events.go.
+ ///
+ internal void SendInternalResponse(string subject, ServerApiResponse response)
+ {
+ _mu.EnterReadLock();
+ try
+ {
+ if (_sys is null || _sys.SendQueue is null) return;
+ _sys.SendQueue.Push(EventHelpers.NewPubMsg(
+ null, subject, string.Empty, response.Server, null,
+ response, response.Compress, false, false));
+ }
+ finally
+ {
+ _mu.ExitReadLock();
+ }
+ }
+
+ // =========================================================================
+ // Group G: eventsRunning / EventsEnabled / eventsEnabled
+ // Mirrors Go counterparts in server/events.go.
+ // =========================================================================
+
+ ///
+ /// Returns true if the events system is running (server is running and events enabled).
+ /// Acquires the read lock internally.
+ /// Mirrors Go (s *Server) eventsRunning() bool in server/events.go.
+ ///
+ internal bool EventsRunning()
+ {
+ _mu.EnterReadLock();
+ try { return IsRunning() && EventsEnabledLocked(); }
+ finally { _mu.ExitReadLock(); }
+ }
+
+ ///
+ /// Returns true if the server has the events system enabled via a system account.
+ /// Acquires the read lock internally.
+ /// Mirrors Go (s *Server) EventsEnabled() bool in server/events.go.
+ ///
+ public bool EventsEnabled()
+ {
+ _mu.EnterReadLock();
+ try { return EventsEnabledLocked(); }
+ finally { _mu.ExitReadLock(); }
+ }
+
+ ///
+ /// Returns true if events are enabled. Lock must already be held.
+ /// Mirrors Go (s *Server) eventsEnabled() bool in server/events.go.
+ ///
+ private bool EventsEnabledLocked() =>
+ _sys is not null && _sys.Client is not null && _sys.Account is not null;
+
+ // =========================================================================
+ // Group G: Node
+ // Mirrors Go (s *Server) Node() string in server/events.go.
+ // =========================================================================
+
+ ///
+ /// Returns the stable node hash (short hash of server name) used for JetStream
+ /// cluster targeting. Empty string if events not initialised.
+ /// Mirrors Go (s *Server) Node() string in server/events.go.
+ ///
+ public string Node()
+ {
+ _mu.EnterReadLock();
+ try { return _sys?.ShortHash ?? string.Empty; }
+ finally { _mu.ExitReadLock(); }
+ }
+
+ // =========================================================================
+ // Group G: initEventTracking
+ // Mirrors Go (s *Server) initEventTracking() in server/events.go.
+ // =========================================================================
+
+ ///
+ /// Initialises the server-wide system subscription infrastructure: sets the
+ /// server's hash, subscribes to all internal system subjects, and starts the
+ /// send/receive loops.
+ /// Mirrors Go (s *Server) initEventTracking() in server/events.go.
+ ///
+ internal void InitEventTracking()
+ {
+ // Capture sys outside any lock to avoid deadlock.
+ _mu.EnterReadLock();
+ var sys = _sys;
+ _mu.ExitReadLock();
+
+ if (sys is null || sys.Client is null || sys.Account is null) return;
+
+ // Compute and store the server's short hash.
+ sys.ShortHash = EventHelpers.GetHash(_info.Name);
+
+ // All-inbox subscription: $SYS._INBOX..*
+ var inboxSubject = string.Format(SystemSubjects.InboxRespSubj, sys.ShortHash, "*");
+ if (SysSubscribe(inboxSubject, InboxReply) is { error: not null } inboxResult)
+ {
+ Errorf("Error setting up internal tracking: {0}", inboxResult.error);
+ return;
+ }
+ sys.InboxPrefix = inboxSubject;
+
+ // Remote connections update (old-style subject).
+ var accConnsOld = string.Format(SystemSubjects.AccConnsEventSubjOld, "*");
+ if (SysSubscribe(accConnsOld, NoInlineCallback(RemoteConnsUpdateStub)) is { error: not null } r1)
+ {
+ Errorf("Error setting up internal tracking for {0}: {1}", accConnsOld, r1.error);
+ return;
+ }
+
+ // Connection responses for this server's ID.
+ var connsResp = string.Format(SystemSubjects.ConnsRespSubj, _info.Id);
+ if (SysSubscribe(connsResp, NoInlineCallback(RemoteConnsUpdateStub)) is { error: not null } r2)
+ {
+ Errorf("Error setting up internal tracking: {0}", r2.error);
+ return;
+ }
+
+ // Subscription-count requests.
+ if (SysSubscribe(SystemSubjects.AccNumSubsReqSubj, NoInlineCallback(NsubsRequestStub)) is { error: not null } r3)
+ {
+ Errorf("Error setting up internal tracking: {0}", r3.error);
+ return;
+ }
+
+ // Stats heartbeat from other servers.
+ var statsSubj = string.Format(SystemSubjects.ServerStatsSubj, "*");
+ var statsSub = SysSubscribe(statsSubj, NoInlineCallback(RemoteServerUpdateStub));
+ if (statsSub.error is not null)
+ {
+ Errorf("Error setting up internal tracking: {0}", statsSub.error);
+ return;
+ }
+ sys.RemoteStatsSub = statsSub.sub;
+
+ // Shutdown events from other servers.
+ var shutdownSubj = string.Format(SystemSubjects.ShutdownEventSubj, "*");
+ if (SysSubscribe(shutdownSubj, NoInlineCallback(RemoteServerShutdownStub)) is { error: not null } r4)
+ {
+ Errorf("Error setting up internal tracking: {0}", r4.error);
+ return;
+ }
+
+ // Lame-duck events.
+ var lameDuckSubj = string.Format(SystemSubjects.LameDuckEventSubj, "*");
+ if (SysSubscribe(lameDuckSubj, NoInlineCallback(RemoteServerShutdownStub)) is { error: not null } r5)
+ {
+ Errorf("Error setting up internal tracking: {0}", r5.error);
+ return;
+ }
+
+ // Remote latency measurements for this server.
+ var latencySubj = string.Format(SystemSubjects.RemoteLatencyEventSubj, sys.ShortHash);
+ if (SysSubscribe(latencySubj, NoInlineCallback(RemoteLatencyUpdateStub)) is { error: not null } r6)
+ {
+ Errorf("Error setting up internal latency tracking: {0}", r6.error);
+ return;
+ }
+
+ // Server reload request.
+ var reloadSubj = string.Format(SystemSubjects.ServerReloadReqSubj, _info.Id);
+ if (SysSubscribe(reloadSubj, NoInlineCallback(ReloadConfigStub)) is { error: not null } r7)
+ {
+ Errorf("Error setting up server reload handler: {0}", r7.error);
+ return;
+ }
+
+ // Client kick/LDM requests.
+ var kickSubj = string.Format(SystemSubjects.ClientKickReqSubj, _info.Id);
+ if (SysSubscribe(kickSubj, NoInlineCallback(KickClientStub)) is { error: not null } r8)
+ {
+ Errorf("Error setting up client kick service: {0}", r8.error);
+ return;
+ }
+
+ var ldmSubj = string.Format(SystemSubjects.ClientLdmReqSubj, _info.Id);
+ if (SysSubscribe(ldmSubj, NoInlineCallback(LdmClientStub)) is { error: not null } r9)
+ {
+ Errorf("Error setting up client LDM service: {0}", r9.error);
+ }
+
+ // Account leaf-node connect events.
+ var leafConn = string.Format(SystemSubjects.LeafNodeConnectEventSubj, "*");
+ if (SysSubscribe(leafConn, NoInlineCallback(LeafNodeConnectedStub)) is { error: not null } r10)
+ {
+ Errorf("Error setting up internal tracking: {0}", r10.error);
+ }
+
+ // Debug subscriber service.
+ SysSubscribeInternal(SystemSubjects.AccSubsSubj, NoInlineCallback(DebugSubscribersStub));
+ }
+
+ // -------------------------------------------------------------------------
+ // Stub handlers — full implementations live in other partial files (Events
+ // module). These are lightweight no-ops that satisfy the dispatch wiring so
+ // the build compiles and the event loops can run.
+ // -------------------------------------------------------------------------
+
+ private void RemoteConnsUpdateStub(Subscription sub, NatsClient c, Account acc,
+ string subject, string reply, byte[] hdr, byte[] msg) { }
+
+ private void NsubsRequestStub(Subscription sub, NatsClient c, Account acc,
+ string subject, string reply, byte[] hdr, byte[] msg) { }
+
+ private void RemoteServerUpdateStub(Subscription sub, NatsClient c, Account acc,
+ string subject, string reply, byte[] hdr, byte[] msg) { }
+
+ private void RemoteServerShutdownStub(Subscription sub, NatsClient c, Account acc,
+ string subject, string reply, byte[] hdr, byte[] msg) { }
+
+ private void RemoteLatencyUpdateStub(Subscription sub, NatsClient c, Account acc,
+ string subject, string reply, byte[] hdr, byte[] msg) { }
+
+ private void ReloadConfigStub(Subscription sub, NatsClient c, Account acc,
+ string subject, string reply, byte[] hdr, byte[] msg) { }
+
+ private void KickClientStub(Subscription sub, NatsClient c, Account acc,
+ string subject, string reply, byte[] hdr, byte[] msg) { }
+
+ private void LdmClientStub(Subscription sub, NatsClient c, Account acc,
+ string subject, string reply, byte[] hdr, byte[] msg) { }
+
+ private void LeafNodeConnectedStub(Subscription sub, NatsClient c, Account acc,
+ string subject, string reply, byte[] hdr, byte[] msg) { }
+
+ private void DebugSubscribersStub(Subscription sub, NatsClient c, Account acc,
+ string subject, string reply, byte[] hdr, byte[] msg) { }
+
+ // =========================================================================
+ // Group G: filterRequest
+ // Mirrors Go (s *Server) filterRequest in server/events.go.
+ // =========================================================================
+
+ ///
+ /// Returns true if a system event request should be filtered (ignored) by
+ /// this server based on its name, host, cluster, tags, or domain.
+ /// Do NOT hold the server lock when calling this.
+ /// Mirrors Go (s *Server) filterRequest in server/events.go.
+ ///
+ internal bool FilterRequest(EventFilterOptions? fOpts)
+ {
+ if (fOpts is null) return false;
+
+ var clusterName = ClusterName();
+ var opts = GetOpts();
+
+ if (fOpts.ExactMatch)
+ {
+ if ((fOpts.Name != string.Empty && fOpts.Name != _info.Name) ||
+ (fOpts.Host != string.Empty && fOpts.Host != _info.Host) ||
+ (fOpts.Cluster != string.Empty && fOpts.Cluster != clusterName))
+ {
+ return true;
+ }
+ }
+ else if ((fOpts.Name != string.Empty && !_info.Name.Contains(fOpts.Name, StringComparison.Ordinal)) ||
+ (fOpts.Host != string.Empty && !_info.Host.Contains(fOpts.Host, StringComparison.Ordinal)) ||
+ (fOpts.Cluster != string.Empty && !clusterName.Contains(fOpts.Cluster, StringComparison.Ordinal)))
+ {
+ return true;
+ }
+
+ if (fOpts.Tags.Count > 0)
+ {
+ foreach (var tag in fOpts.Tags)
+ {
+ if (!opts.Tags.Contains(tag))
+ return true;
+ }
+ }
+
+ if (fOpts.Domain != string.Empty && opts.JetStreamDomain != fOpts.Domain)
+ return true;
+
+ return false;
+ }
+
+ // =========================================================================
+ // Group G: noInlineCallback / noInlineCallbackStatsz / noInlineCallbackRecvQSelect
+ // Mirrors Go variants in server/events.go.
+ // =========================================================================
+
+ private const int RecvQMuxed = 1;
+ private const int RecvQStatsz = 2;
+
+ ///
+ /// Wraps a so that it is always executed on the
+ /// internal receive queue (never inline with route/gateway processing).
+ /// Mirrors Go (s *Server) noInlineCallback in server/events.go.
+ ///
+ internal SysMsgHandler? NoInlineCallback(SysMsgHandler cb) =>
+ NoInlineCallbackRecvQSelect(cb, RecvQMuxed);
+
+ ///
+ /// Wraps a for the priority (statsz) receive queue.
+ /// Mirrors Go (s *Server) noInlineCallbackStatsz in server/events.go.
+ ///
+ internal SysMsgHandler? NoInlineCallbackStatsz(SysMsgHandler cb) =>
+ NoInlineCallbackRecvQSelect(cb, RecvQStatsz);
+
+ ///
+ /// Core wrapper implementation. Returns a handler that pushes messages onto
+ /// the selected internal receive queue rather than executing inline.
+ /// Mirrors Go (s *Server) noInlineCallbackRecvQSelect in server/events.go.
+ ///
+ internal SysMsgHandler? NoInlineCallbackRecvQSelect(SysMsgHandler cb, int recvQSelect)
+ {
+ _mu.EnterReadLock();
+ if (!EventsEnabledLocked())
+ {
+ _mu.ExitReadLock();
+ return null;
+ }
+
+ IpQueue recvq = recvQSelect == RecvQStatsz
+ ? (_sys!.RecvQueuePriority ?? _sys.RecvQueue!)
+ : _sys!.RecvQueue!;
+ _mu.ExitReadLock();
+
+ return (sub, c, acc, subj, rply, hdr, msg) =>
+ {
+ var hdrCopy = hdr is { Length: > 0 } ? (byte[])hdr.Clone() : [];
+ var msgCopy = msg is { Length: > 0 } ? (byte[])msg.Clone() : [];
+ recvq.Push(new InSysMsg
+ {
+ Sub = sub,
+ Client = c,
+ Acc = acc,
+ Subject = subj,
+ Reply = rply,
+ Hdr = hdrCopy,
+ Msg = msgCopy,
+ Cb = cb,
+ });
+ };
+ }
+
+ // =========================================================================
+ // Group G: sysSubscribe / sysSubscribeQ / sysSubscribeInternal / systemSubscribe
+ // Mirrors Go variants in server/events.go.
+ // =========================================================================
+
+ ///
+ /// Creates an internal subscription on the system account.
+ /// Mirrors Go (s *Server) sysSubscribe in server/events.go.
+ ///
+ internal (Subscription? sub, Exception? error) SysSubscribe(string subject, SysMsgHandler? cb) =>
+ SystemSubscribe(subject, string.Empty, false, null, cb);
+
+ ///
+ /// Creates an internal subscription with a queue group on the system account.
+ /// Mirrors Go (s *Server) sysSubscribeQ in server/events.go.
+ ///
+ internal (Subscription? sub, Exception? error) SysSubscribeQ(string subject, string queue, SysMsgHandler? cb) =>
+ SystemSubscribe(subject, queue, false, null, cb);
+
+ ///
+ /// Creates an internal subscription that does NOT forward interest to routes/gateways.
+ /// Mirrors Go (s *Server) sysSubscribeInternal in server/events.go.
+ ///
+ internal (Subscription? sub, Exception? error) SysSubscribeInternal(string subject, SysMsgHandler? cb) =>
+ SystemSubscribe(subject, string.Empty, true, null, cb);
+
+ ///
+ /// Core subscription implementation used by all sysSubscribe* helpers.
+ /// Creates a subscription on the system account's internal client.
+ /// Mirrors Go (s *Server) systemSubscribe in server/events.go.
+ ///
+ internal (Subscription? sub, Exception? error) SystemSubscribe(
+ string subject,
+ string queue,
+ bool internalOnly,
+ ClientConnection? client,
+ SysMsgHandler? cb)
+ {
+ _mu.EnterWriteLock();
+ if (!EventsEnabledLocked())
+ {
+ _mu.ExitWriteLock();
+ return (null, ServerErrors.ErrNoSysAccount);
+ }
+
+ if (cb is null)
+ {
+ _mu.ExitWriteLock();
+ return (null, new ArgumentNullException(nameof(cb), "undefined message handler"));
+ }
+
+ var c = client ?? _sys!.Client!;
+ _sys!.Sid++;
+ var sid = _sys.Sid.ToString();
+ _mu.ExitWriteLock();
+
+ var subBytes = Encoding.ASCII.GetBytes(subject);
+ var sidBytes = Encoding.ASCII.GetBytes(sid);
+ byte[]? qBytes = string.IsNullOrEmpty(queue) ? null : Encoding.ASCII.GetBytes(queue);
+
+ // Map SysMsgHandler → the internal MsgHandler/subscription mechanism.
+ // The callback receives the raw message bytes split into hdr+msg.
+ SysMsgHandler capturedCb = cb;
+ var (sub, err) = c.ProcessSubEx(subBytes, qBytes, sidBytes, internalOnly, false, false);
+
+ if (err is not null)
+ return (null, err);
+
+ if (sub is not null)
+ {
+ // Attach the callback to the subscription via the InSysMsg dispatch.
+ // Store the callback in the subscription so the receive queue can call it.
+ sub.SysMsgCb = capturedCb;
+ }
+
+ return (sub, null);
+ }
+
+ // =========================================================================
+ // Group G: sysUnsubscribe
+ // Mirrors Go (s *Server) sysUnsubscribe in server/events.go.
+ // =========================================================================
+
+ ///
+ /// Unsubscribes from a system subject by removing the given subscription.
+ /// Mirrors Go (s *Server) sysUnsubscribe in server/events.go.
+ ///
+ internal void SysUnsubscribe(Subscription? sub)
+ {
+ if (sub is null) return;
+
+ _mu.EnterReadLock();
+ if (!EventsEnabledLocked())
+ {
+ _mu.ExitReadLock();
+ return;
+ }
+ _mu.ExitReadLock();
+
+ // System subscriptions are always owned by the system client (_sys.Client).
+ // Retrieve it under a read lock, then unsubscribe outside the lock.
+ _mu.EnterReadLock();
+ var sysClient = _sys?.Client;
+ _mu.ExitReadLock();
+
+ if (sub.Sid is not null && sysClient is not null)
+ sysClient.RemoveSubBySid(sub.Sid);
+ }
+
+ // =========================================================================
+ // Group G: inboxReply
+ // Mirrors Go (s *Server) inboxReply in server/events.go.
+ // =========================================================================
+
+ ///
+ /// Handles inbox replies without propagating supercluster-wide interest.
+ /// Dispatches to the registered reply handler if one exists for this subject.
+ /// Mirrors Go (s *Server) inboxReply in server/events.go.
+ ///
+ internal void InboxReply(
+ Subscription sub,
+ NatsClient c,
+ Account acc,
+ string subject,
+ string reply,
+ byte[] hdr,
+ byte[] msg)
+ {
+ _mu.EnterReadLock();
+ if (!EventsEnabledLocked() || _sys!.Replies.Count == 0)
+ {
+ _mu.ExitReadLock();
+ return;
+ }
+ _sys.Replies.TryGetValue(subject, out var handler);
+ _mu.ExitReadLock();
+
+ handler?.Invoke(sub, c, acc, subject, reply, hdr, msg);
+ }
+
+ // =========================================================================
+ // Group G: newRespInbox
+ // Mirrors Go (s *Server) newRespInbox() string in server/events.go.
+ // =========================================================================
+
+ ///
+ /// Generates a new unique response inbox subject using this server's hash prefix.
+ /// Format: $SYS._INBOX.{hash}.{8-char-random}.
+ /// Mirrors Go (s *Server) newRespInbox() string in server/events.go.
+ ///
+ internal string NewRespInbox()
+ {
+ const string digits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+ const int base62 = 62;
+ const int suffixLen = EventHelpers.ReplySuffixLen;
+
+ var inboxPre = _sys?.InboxPrefix ?? string.Empty;
+ // Strip the trailing '*' from $SYS._INBOX.{hash}.* → use as prefix
+ var prefix = inboxPre.EndsWith(".*", StringComparison.Ordinal)
+ ? inboxPre[..^1] // strip '*', keep the dot
+ : inboxPre + ".";
+
+ var suffix = new char[suffixLen];
+ var rng = (long)Random.Shared.NextInt64();
+ if (rng < 0) rng = -rng;
+
+ for (var i = 0; i < suffixLen; i++)
+ {
+ suffix[i] = digits[(int)(rng % base62)];
+ rng /= base62;
+ }
+
+ return prefix + new string(suffix);
+ }
+
+ // =========================================================================
+ // Group G: wrapChk
+ // Mirrors Go (s *Server) wrapChk in server/events.go.
+ // =========================================================================
+
+ ///
+ /// Returns a wrapper function that acquires the server write lock, checks that
+ /// events are enabled, invokes , then releases the lock.
+ /// Mirrors Go (s *Server) wrapChk(f func()) func() in server/events.go.
+ ///
+ internal Action WrapChk(Action f)
+ {
+ return () =>
+ {
+ _mu.EnterWriteLock();
+ try
+ {
+ if (!EventsEnabledLocked()) return;
+ f();
+ }
+ finally
+ {
+ _mu.ExitWriteLock();
+ }
+ };
+ }
+}
+
+// ============================================================================
+// EventFilterOptions — common filter options for system event requests
+// Mirrors Go EventFilterOptions in server/events.go.
+// ============================================================================
+
+///
+/// Filter criteria applied to system event requests (STATSZ, VARZ, CONNZ, etc.).
+/// Mirrors Go EventFilterOptions in server/events.go.
+///
+public sealed class EventFilterOptions
+{
+ /// Filter by server name (substring unless ExactMatch).
+ public string Name { get; set; } = string.Empty;
+
+ /// Filter by cluster name (substring unless ExactMatch).
+ public string Cluster { get; set; } = string.Empty;
+
+ /// Filter by host (substring unless ExactMatch).
+ public string Host { get; set; } = string.Empty;
+
+ /// When true, use exact equality instead of substring matching.
+ public bool ExactMatch { get; set; }
+
+ /// Filter by tags (all must match).
+ public List Tags { get; set; } = [];
+
+ /// Filter by JetStream domain.
+ public string Domain { get; set; } = string.Empty;
+}
+
+// ============================================================================
+// ServerApiResponse — standard API response envelope
+// Mirrors Go ServerAPIResponse in server/events.go.
+// ============================================================================
+
+///
+/// Response envelope used by system API endpoints (varz, connz, statsz, etc.).
+/// Mirrors Go ServerAPIResponse in server/events.go.
+///
+public sealed class ServerApiResponse
+{
+ public ServerInfo? Server { get; set; }
+ public object? Data { get; set; }
+ public ApiError? Error { get; set; }
+
+ /// Internal: compression type for this response. Not serialised.
+ internal int Compress { get; set; }
+}
+
+///
+/// API error returned in .
+/// Mirrors Go ApiError in server/events.go.
+///
+public sealed class ApiError
+{
+ public int Code { get; set; }
+ public string Description { get; set; } = string.Empty;
+}
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs
index 9bc5d10..20140e9 100644
--- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs
@@ -88,6 +88,100 @@ public sealed class ServerInfo
// LeafNode-specific
[JsonPropertyName("leafnode_urls")] public string[]? LeafNodeUrls { get; set; }
+ // =========================================================================
+ // Advisory / event-system fields
+ // Mirrors Go server/events.go ServerInfo advisory struct fields.
+ // These are populated by the internal send loop when publishing advisories.
+ // =========================================================================
+
+ ///
+ /// Sequence number for this server's advisory messages.
+ /// Mirrors Go Seq uint64 in ServerInfo (events.go).
+ ///
+ [JsonPropertyName("seq")]
+ public ulong Seq { get; set; }
+
+ ///
+ /// UTC timestamp of this advisory message.
+ /// Mirrors Go Time time.Time in ServerInfo (events.go).
+ ///
+ [JsonPropertyName("time")]
+ public DateTime Time { get; set; }
+
+ ///
+ /// Capability flags for this server (bitmask of ).
+ /// Mirrors Go Flags ServerCapability in ServerInfo (events.go).
+ ///
+ [JsonPropertyName("flags")]
+ public ulong Capabilities { get; set; }
+
+ ///
+ /// Server tags from options.
+ /// Mirrors Go Tags []string in ServerInfo (events.go).
+ ///
+ [JsonPropertyName("tags")]
+ [JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
+ public List? Tags { get; set; }
+
+ ///
+ /// Server metadata from options.
+ /// Mirrors Go Metadata map[string]string in ServerInfo (events.go).
+ ///
+ [JsonPropertyName("metadata")]
+ [JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
+ public Dictionary? Metadata { get; set; }
+
+ // =========================================================================
+ // Capability helpers (Group B)
+ // Mirrors Go ServerInfo capability methods in server/events.go.
+ // =========================================================================
+
+ ///
+ /// Sets the JetStream capability bit and the legacy JetStream bool field.
+ /// Mirrors Go (si *ServerInfo) SetJetStreamEnabled() in server/events.go.
+ ///
+ public void SetJetStreamEnabled()
+ {
+ Capabilities |= (ulong)ServerCapability.JetStreamEnabled;
+ JetStream = true;
+ }
+
+ ///
+ /// Returns whether this server has JetStream enabled, taking into account
+ /// both the capability flag and the legacy bool field.
+ /// Mirrors Go (si *ServerInfo) JetStreamEnabled() bool in server/events.go.
+ ///
+ public bool IsJetStreamEnabled() =>
+ (Capabilities & (ulong)ServerCapability.JetStreamEnabled) != 0 || JetStream;
+
+ ///
+ /// Sets the binary-stream-snapshot capability bit.
+ /// Mirrors Go (si *ServerInfo) SetBinaryStreamSnapshot() in server/events.go.
+ ///
+ public void SetBinaryStreamSnapshot() =>
+ Capabilities |= (ulong)ServerCapability.BinaryStreamSnapshot;
+
+ ///
+ /// Returns whether this server supports binary stream snapshots.
+ /// Mirrors Go (si *ServerInfo) BinaryStreamSnapshot() bool in server/events.go.
+ ///
+ public bool IsBinaryStreamSnapshot() =>
+ (Capabilities & (ulong)ServerCapability.BinaryStreamSnapshot) != 0;
+
+ ///
+ /// Sets the account-NRG capability bit.
+ /// Mirrors Go (si *ServerInfo) SetAccountNRG() in server/events.go.
+ ///
+ public void SetAccountNrg() =>
+ Capabilities |= (ulong)ServerCapability.AccountNrg;
+
+ ///
+ /// Returns whether this server supports moving NRG traffic into the asset account.
+ /// Mirrors Go (si *ServerInfo) AccountNRG() bool in server/events.go.
+ ///
+ public bool IsAccountNrg() =>
+ (Capabilities & (ulong)ServerCapability.AccountNrg) != 0;
+
/// Returns a shallow clone of this .
internal ServerInfo ShallowClone() => (ServerInfo)MemberwiseClone();
}