diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.Events.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.Events.cs new file mode 100644 index 0000000..39e7254 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.Events.cs @@ -0,0 +1,143 @@ +// Copyright 2018-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/accounts.go (statz) and server/events.go in the NATS server Go source. + +namespace ZB.MOM.NatsNet.Server; + +// ============================================================================ +// AccountTrafficStats — per-kind traffic counters for an account +// Mirrors the anonymous embedded `stats` struct in Go's Account.stats. +// ============================================================================ + +/// +/// Per-kind (total, gateway, route, leaf) message and byte traffic counters +/// for an account. Protected by the owning lock. +/// Mirrors Go stats struct (the small one embedded in Account.stats). +/// +internal sealed class AccountTrafficStats +{ + public long InMsgs; + public long OutMsgs; + public long InBytes; + public long OutBytes; + public long SlowConsumers; +} + +/// +/// Aggregated traffic statistics for an account, including breakdowns by +/// gateway, route, and leaf-node traffic. +/// Mirrors Go's embedded anonymous stats struct in Account. +/// +internal sealed class AccountTrafficStatSet +{ + private readonly object _mu = new(); + + public AccountTrafficStats Total { get; } = new(); + public AccountTrafficStats Gateways { get; } = new(); + public AccountTrafficStats Routes { get; } = new(); + public AccountTrafficStats Leafs { get; } = new(); + + public void Lock() => Monitor.Enter(_mu); + public void Unlock() => Monitor.Exit(_mu); +} + +// ============================================================================ +// Account partial — events / statz +// ============================================================================ + +public sealed partial class Account +{ + // ------------------------------------------------------------------------- + // Traffic statistics (mirrors Go Account.stats embedded struct) + // ------------------------------------------------------------------------- + + /// + /// Aggregated account traffic statistics. + /// Mirrors Go's stats struct { sync.Mutex; stats; gw stats; rt stats; ln stats } + /// embedded in Account. + /// + internal readonly AccountTrafficStatSet TrafficStats = new(); + + // ------------------------------------------------------------------------- + // Group F: Account.statz + // ------------------------------------------------------------------------- + + /// + /// Computes and returns a snapshot of the account's current statistics. + /// Lock (account _mu write lock) should be held on entry. + /// Mirrors Go (a *Account) statz() *AccountStat in server/events.go. + /// + internal AccountStat Statz() + { + var localConns = NumLocalConnectionsLocked(); + var leafConns = NumLocalLeafNodes(); + + TrafficStats.Lock(); + var received = new DataStats + { + Msgs = TrafficStats.Total.InMsgs, + Bytes = TrafficStats.Total.InBytes, + Gateways = new MsgBytes + { + Msgs = TrafficStats.Gateways.InMsgs, + Bytes = TrafficStats.Gateways.InBytes, + }, + Routes = new MsgBytes + { + Msgs = TrafficStats.Routes.InMsgs, + Bytes = TrafficStats.Routes.InBytes, + }, + Leafs = new MsgBytes + { + Msgs = TrafficStats.Leafs.InMsgs, + Bytes = TrafficStats.Leafs.InBytes, + }, + }; + var sent = new DataStats + { + Msgs = TrafficStats.Total.OutMsgs, + Bytes = TrafficStats.Total.OutBytes, + Gateways = new MsgBytes + { + Msgs = TrafficStats.Gateways.OutMsgs, + Bytes = TrafficStats.Gateways.OutBytes, + }, + Routes = new MsgBytes + { + Msgs = TrafficStats.Routes.OutMsgs, + Bytes = TrafficStats.Routes.OutBytes, + }, + Leafs = new MsgBytes + { + Msgs = TrafficStats.Leafs.OutMsgs, + Bytes = TrafficStats.Leafs.OutBytes, + }, + }; + var slowConsumers = TrafficStats.Total.SlowConsumers; + TrafficStats.Unlock(); + + return new AccountStat + { + Account = Name, + Name = GetNameTagLocked(), + Conns = localConns, + LeafNodes = leafConns, + TotalConns = localConns + leafConns, + NumSubs = Sublist?.Count() ?? 0, + Received = received, + Sent = sent, + SlowConsumers = slowConsumers, + }; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Events.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Events.cs new file mode 100644 index 0000000..5e5307b --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Events.cs @@ -0,0 +1,40 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/events.go (client.sendInternalMsg) in the NATS server Go source. + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class ClientConnection +{ + // ========================================================================= + // Group E: sendInternalMsg from a client context + // ========================================================================= + + /// + /// Sends an internal message from this client's context, avoiding no-echo issues + /// when the message originates from a non-system client. + /// Mirrors Go (c *client) sendInternalMsg(...) in server/events.go. + /// + internal void SendInternalMsg( + string subject, + string reply, + ServerInfo? si, + object? msg) + { + var srv = Server as NatsServer; + if (srv is null) return; + + srv.SendInternalMsgFromClient(this, subject, reply, si, msg); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientTypes.cs index ca53a62..8115a96 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientTypes.cs @@ -329,6 +329,53 @@ public sealed class ClientInfo /// public string ServiceAccount() => string.IsNullOrWhiteSpace(ServiceName) ? Account : ServiceName; + + // ========================================================================= + // Group C: ClientInfo projection methods + // Mirrors Go methods on *ClientInfo in server/events.go. + // ========================================================================= + + /// + /// Returns the minimum amount of ClientInfo needed for assignment snapshots + /// (account, service, cluster only). + /// Mirrors Go (ci *ClientInfo) forAssignmentSnap() *ClientInfo in server/events.go. + /// + public ClientInfo ForAssignmentSnap() => new() + { + Account = Account, + ServiceName = ServiceName, + Cluster = Cluster, + }; + + /// + /// Returns a copy of this ClientInfo with JWT and issuer key stripped, + /// suitable for raft proposals. + /// Mirrors Go (ci *ClientInfo) forProposal() *ClientInfo in server/events.go. + /// + public ClientInfo? ForProposal() + { + var copy = (ClientInfo)MemberwiseClone(); + copy.Jwt = string.Empty; + copy.IssuerKey = string.Empty; + // Deep-copy the Tags list to avoid sharing. + copy.Tags = [..Tags]; + copy.Alternates = [..Alternates]; + return copy; + } + + /// + /// Returns a copy of this ClientInfo with JWT stripped and Alternates nulled, + /// suitable for JetStream advisory events. + /// Mirrors Go (ci *ClientInfo) forAdvisory() *ClientInfo in server/events.go. + /// + public ClientInfo? ForAdvisory() + { + var copy = (ClientInfo)MemberwiseClone(); + copy.Jwt = string.Empty; + copy.Alternates = []; + copy.Tags = [..Tags]; + return copy; + } } // ============================================================================ diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventHelpers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventHelpers.cs new file mode 100644 index 0000000..8c0e859 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventHelpers.cs @@ -0,0 +1,278 @@ +// Copyright 2018-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/events.go in the NATS server Go source. + +using System.Collections.Concurrent; +using System.Security.Cryptography; +using System.Text; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +/// +/// Static helpers used by the event system. +/// Mirrors various package-level helper functions in server/events.go. +/// +internal static class EventHelpers +{ + // ========================================================================= + // Base-62 alphabet used by NATS hash functions. + // Mirrors Go package-level const in server/accounts.go. + // ========================================================================= + + private const string Digits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + private const int Base = 62; + + // ========================================================================= + // System hash length (mirrors Go sysHashLen = 8) + // ========================================================================= + + internal const int SysHashLen = 8; + + // ========================================================================= + // Inbox constants (mirrors Go InboxPrefix / related consts) + // ========================================================================= + + internal const string InboxPrefix = "$SYS._INBOX."; + internal const int InboxPrefixLen = 12; // len("$SYS._INBOX.") + internal const int ReplySuffixLen = 8; // Gives us 62^8 unique values + + // ========================================================================= + // PubMsg pool + // ========================================================================= + + private static readonly ConcurrentBag PubMsgPool = []; + + /// + /// Factory for an internally-queued outbound publish message, using a pool + /// to amortise allocations. + /// Mirrors Go newPubMsg in server/events.go. + /// + internal static PubMsg NewPubMsg( + ClientConnection? client, + string subject, + string reply, + ServerInfo? si, + byte[]? hdr, + object? msg, + int oct, + bool echo, + bool last) + { + if (!PubMsgPool.TryTake(out var pm)) + pm = new PubMsg(); + + pm.Client = client; + pm.Subject = subject; + pm.Reply = reply; + pm.Si = si; + pm.Hdr = hdr; + pm.Msg = msg; + pm.Oct = oct; + pm.Echo = echo; + pm.Last = last; + return pm; + } + + /// + /// Returns a to the pool after all fields have been cleared. + /// Called by . + /// + internal static void ReturnPubMsg(PubMsg pm) => PubMsgPool.Add(pm); + + // ========================================================================= + // Group A: helpers + // ========================================================================= + + /// + /// Generate a RouteStat for a statz update from a route client. + /// Mirrors Go routeStat in server/events.go. + /// Note: full in/out message counters require ClientConnection to carry those fields + /// (added in a later batch). Until then, returns zero-valued counters. + /// + internal static RouteStat? RouteStat(ClientConnection? route) + { + if (route is null) return null; + + lock (route) + { + var rs = new RouteStat + { + Id = route.Cid, + Sent = new DataStats { Msgs = 0, Bytes = 0 }, + Received = new DataStats { Msgs = 0, Bytes = 0 }, + Pending = (int)(route.OutPb), + }; + if (route.Route != null) + rs.Name = route.Route.RemoteName; + return rs; + } + } + + /// + /// Computes an 8-character base-62 hash of . + /// Mirrors Go getHash(name string) string in server/events.go. + /// + internal static string GetHash(string name) => GetHashSize(name, SysHashLen); + + /// + /// Computes a hash of characters for . + /// Uses SHA-256, then maps the first bytes through the + /// base-62 digit alphabet — identical to Go's getHashSize. + /// Mirrors Go getHashSize(name string, size int) string in server/events.go. + /// + internal static string GetHashSize(string name, int size) + { + var hash = SHA256.HashData(Encoding.UTF8.GetBytes(name)); + var result = new char[size]; + for (var i = 0; i < size; i++) + result[i] = Digits[(int)(hash[i] % Base)]; + return new string(result); + } + + /// + /// Parses an Accept-Encoding header to determine which compression type + /// the remote supports (snappy / gzip / none). + /// Mirrors Go getAcceptEncoding(hdr []byte) compressionType in server/events.go. + /// + internal static CompressionType GetAcceptEncoding(byte[]? hdr) + { + if (hdr is null || hdr.Length == 0) return CompressionType.None; + + // Extract the Accept-Encoding header value. + const string headerName = "Accept-Encoding"; + var ae = GetHeaderValue(headerName, hdr)?.ToLowerInvariant() ?? string.Empty; + + if (string.IsNullOrEmpty(ae)) return CompressionType.None; + if (ae.Contains("snappy") || ae.Contains("s2")) return CompressionType.Snappy; + if (ae.Contains("gzip")) return CompressionType.Gzip; + return CompressionType.Unsupported; + } + + /// + /// Generate the tracking subject for remote latency from a response subject. + /// Mirrors Go remoteLatencySubjectForResponse in server/events.go. + /// + internal static string RemoteLatencySubjectForResponse(ReadOnlySpan subject) + { + if (!Account.IsTrackedReply(subject)) return string.Empty; + + // Split on '.' and take second-to-last token. + var str = Encoding.ASCII.GetString(subject); + var toks = str.Split('.'); + if (toks.Length < 2) return string.Empty; + + return string.Format(SystemSubjects.RemoteLatencyEventSubj, toks[^2]); + } + + /// + /// Sum subscription counts from a sublist result for clients and hub leaf nodes. + /// Mirrors Go totalSubs in server/events.go. + /// + internal static int TotalSubs(SubscriptionIndexResult? rr, byte[]? queueGroup) + { + if (rr is null) return 0; + + var count = 0; + + bool Matches(Subscription sub) + { + if (queueGroup != null && !sub.Queue.AsSpan().SequenceEqual(queueGroup)) + return false; + return sub.Client?.Kind == ClientKind.Client || (sub.Client?.IsHubLeafNode() ?? false); + } + + if (queueGroup is null) + { + foreach (var sub in rr.PSubs) + if (Matches(sub)) count++; + } + + foreach (var qsubs in rr.QSubs) + foreach (var sub in qsubs) + if (Matches(sub)) count++; + + return count; + } + + /// + /// Returns the account name for a client (or "N/A" if none). + /// Mirrors Go accForClient(c *client) string in server/events.go. + /// + internal static string AccForClient(ClientConnection c) + { + var acc = c._account as Account; + return acc?.Name ?? "N/A"; + } + + /// + /// Returns the issuer key for a client (empty if none). + /// Mirrors Go issuerForClient(c *client) string in server/events.go. + /// + internal static string IssuerForClient(ClientConnection c) + { + if (c.Perms is null) return string.Empty; + // Stub: full user/signing-key lookup requires auth integration. + return string.Empty; + } + + /// + /// Safely stops and nulls a timer reference. + /// Mirrors Go clearTimer(tp **time.Timer) in server/events.go. + /// + internal static void ClearTimer(ref System.Threading.Timer? timer) + { + var t = timer; + timer = null; + t?.Dispose(); + } + + // ========================================================================= + // Header parsing helper + // ========================================================================= + + /// + /// Extracts the value of a named header from a NATS wire-format header block. + /// + private static string? GetHeaderValue(string headerName, byte[] hdr) + { + var text = Encoding.ASCII.GetString(hdr); + var search = headerName + ":"; + var idx = text.IndexOf(search, StringComparison.OrdinalIgnoreCase); + if (idx < 0) return null; + + var start = idx + search.Length; + var end = text.IndexOf('\r', start); + if (end < 0) end = text.Length; + + return text[start..end].Trim(); + } +} + +// ========================================================================= +// CompressionType — mirrors Go compressionType enum in events.go +// ========================================================================= + +/// +/// Compression type for internal message publishing. +/// Mirrors Go compressionType in server/events.go. +/// +internal enum CompressionType : int +{ + None = 0, + Gzip = 1, + Snappy = 2, + Unsupported = 3, +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventTypes.cs index 540ac72..e3e491b 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Events/EventTypes.cs @@ -196,8 +196,8 @@ internal sealed class InternalState { // ---- identity / sequencing ---- public Account? Account { get; set; } - public NatsClient? Client { get; set; } - public ulong Seq { get; set; } + public ClientConnection? Client { get; set; } + public long Seq; // accessed via Interlocked.Increment public int Sid { get; set; } // ---- remote server tracking ---- @@ -289,7 +289,7 @@ internal sealed class ServerUpdate /// internal sealed class PubMsg { - public NatsClient? Client { get; set; } + public ClientConnection? Client { get; set; } public string Subject { get; set; } = string.Empty; public string Reply { get; set; } = string.Empty; public ServerInfo? Si { get; set; } @@ -302,7 +302,24 @@ internal sealed class PubMsg public bool Echo { get; set; } public bool Last { get; set; } - // TODO: session 12 — add pool return helper (returnToPool). + /// + /// Clears all fields and returns this instance to the pool held in + /// . + /// Mirrors Go (pm *pubMsg) returnToPool() in server/events.go. + /// + internal void ReturnToPool() + { + Client = null; + Subject = string.Empty; + Reply = string.Empty; + Si = null; + Hdr = null; + Msg = null; + Oct = 0; + Echo = false; + Last = false; + EventHelpers.ReturnPubMsg(this); + } } // ============================================================================ diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/Subscription.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/Subscription.cs index bf63462..aa88c1a 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/Subscription.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/Subscription.cs @@ -13,6 +13,8 @@ // // Adapted from server/client.go (subscription struct) in the NATS server Go source. +using ZB.MOM.NatsNet.Server; + namespace ZB.MOM.NatsNet.Server.Internal; /// @@ -40,6 +42,13 @@ public sealed class Subscription /// The client that owns this subscription. Null in test/stub scenarios. public NatsClient? Client { get; set; } + /// + /// System message callback. Set on subscriptions created via systemSubscribe + /// so that the internal receive queue can dispatch to the right handler. + /// Mirrors Go subscription.icb sysMsgHandler in server/events.go. + /// + public SysMsgHandler? SysMsgCb { get; set; } + /// Marks this subscription as closed. public void Close() => Interlocked.Exchange(ref _closed, 1); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Events.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Events.cs new file mode 100644 index 0000000..f606447 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Events.cs @@ -0,0 +1,949 @@ +// Copyright 2018-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/events.go in the NATS server Go source. +// Batch 44: Events Core & Dispatch. + +using System.Text; +using System.Text.Json; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + // ========================================================================= + // Constants (mirrors Go sysHashLen = 8, InboxPrefix consts) + // ========================================================================= + + /// Length of the response-inbox prefix used for this server's internal replies. + private int RespInboxPrefixLen => EventHelpers.InboxPrefixLen + EventHelpers.SysHashLen + 1; + + // ========================================================================= + // Group G: internalReceiveLoop + // Mirrors Go (s *Server) internalReceiveLoop in server/events.go. + // ========================================================================= + + /// + /// Background loop that dispatches all messages the server needs to process + /// internally via system subscriptions (e.g. internal subs). + /// Mirrors Go (s *Server) internalReceiveLoop in server/events.go. + /// + private void InternalReceiveLoop(IpQueue recvq) + { + while (EventsRunning()) + { + // Wait for a notification that items are ready. + if (!recvq.Ch.WaitToReadAsync(_quitCts.Token).AsTask() + .GetAwaiter() + .GetResult()) + { + return; + } + + var msgs = recvq.Pop(); + if (msgs is not null) + { + foreach (var m in msgs) + { + m.Cb?.Invoke(m.Sub, m.Client, m.Acc, m.Subject, m.Reply, m.Hdr, m.Msg); + } + recvq.Recycle(msgs); + } + } + } + + // ========================================================================= + // Group G: internalSendLoop + // Mirrors Go (s *Server) internalSendLoop in server/events.go. + // ========================================================================= + + /// + /// Background loop that serialises and dispatches all messages the server + /// wants to send through the system account. Runs as a long-lived goroutine. + /// Mirrors Go (s *Server) internalSendLoop in server/events.go. + /// + private void InternalSendLoop() + { + // Read snapshot of send queue and system client under the read lock. + // Mirrors Go's RESET: label pattern. + RESET: + _mu.EnterReadLock(); + if (_sys is null || _sys.SendQueue is null) + { + _mu.ExitReadLock(); + return; + } + var sysc = _sys.Client; + var resetCh = _sys.ResetChannel; + var sendq = _sys.SendQueue; + var id = _info.Id; + var host = _info.Host; + var svrName = _info.Name; + var domain = _info.Domain ?? string.Empty; + var seqRef = _sys; // holds ref so we can atomically increment Seq + var jsEnabled = _info.JetStream; + var cluster = _info.Cluster ?? string.Empty; + if (_gateway.Enabled) + cluster = GetGatewayName(); + _mu.ExitReadLock(); + + var opts = GetOpts(); + var tags = opts.Tags; + var metadata = opts.Metadata; + + while (EventsRunning()) + { + // Wait for items in the send queue OR a reset signal OR quit. + var sendTask = sendq.Ch.WaitToReadAsync(_quitCts.Token).AsTask(); + var resetTask = resetCh is not null + ? resetCh.Reader.WaitToReadAsync(_quitCts.Token).AsTask() + : Task.FromResult(false); + + var completed = Task.WhenAny(sendTask, resetTask, Task.Delay(Timeout.Infinite, _quitCts.Token)) + .GetAwaiter().GetResult(); + + if (_quitCts.IsCancellationRequested) + return; + + // If reset channel fired, re-read the snapshot. + if (completed == resetTask && resetTask.IsCompletedSuccessfully && resetTask.Result) + { + resetCh?.Reader.TryRead(out _); + goto RESET; + } + + if (!sendTask.IsCompletedSuccessfully || !sendTask.Result) + continue; + + var msgs = sendq.Pop(); + if (msgs is null) continue; + + foreach (var pm in msgs) + { + // Stamp ServerInfo advisory fields if requested. + if (pm.Si is { } si) + { + si.Name = svrName; + si.Domain = domain; + si.Host = host; + si.Cluster = cluster; + si.Id = id; + si.Seq = (ulong)Interlocked.Increment(ref seqRef.Seq); + si.Version = ServerConstants.Version; + si.Time = DateTime.UtcNow; + si.Tags = tags.Count > 0 ? [..tags] : null; + si.Metadata = metadata.Count > 0 ? new Dictionary(metadata) : null; + si.Capabilities = 0; + if (jsEnabled) + { + si.SetJetStreamEnabled(); + si.SetBinaryStreamSnapshot(); + // AccountNRG: stub, not yet tracked + } + } + + // Serialise payload. + byte[] body = []; + if (pm.Msg is not null) + { + body = pm.Msg switch + { + string s => Encoding.UTF8.GetBytes(s), + byte[] b => b, + _ => JsonSerializer.SerializeToUtf8Bytes(pm.Msg), + }; + } + + // Choose client. + var c = pm.Client ?? sysc; + if (c is null) { pm.ReturnToPool(); continue; } + + // Process the publish inline. + lock (c) + { + c.ParseCtx.Pa.Subject = Encoding.ASCII.GetBytes(pm.Subject); + c.ParseCtx.Pa.Reply = Encoding.ASCII.GetBytes(pm.Reply); + } + + // Append CRLF. + var payload = new byte[body.Length + 2]; + Buffer.BlockCopy(body, 0, payload, 0, body.Length); + payload[^2] = (byte)'\r'; + payload[^1] = (byte)'\n'; + + c.ProcessInboundClientMsg(payload); + + if (pm.Last) + { + // Final message (shutdown): flush in-place and exit. + c.FlushClients(long.MaxValue); + sendq.Recycle(msgs); + pm.ReturnToPool(); + return; + } + else + { + c.FlushClients(0); + } + + pm.ReturnToPool(); + } + sendq.Recycle(msgs); + } + } + + // ========================================================================= + // Group G: sendShutdownEvent + // Mirrors Go (s *Server) sendShutdownEvent in server/events.go. + // ========================================================================= + + /// + /// Queues the server shutdown event. Clears the send queue and reply + /// handlers so no further messages will be dispatched. + /// Mirrors Go (s *Server) sendShutdownEvent in server/events.go. + /// + internal void SendShutdownEvent() + { + _mu.EnterWriteLock(); + try + { + if (_sys is null || _sys.SendQueue is null) return; + + var subject = string.Format(SystemSubjects.ShutdownEventSubj, _info.Id); + var sendq = _sys.SendQueue; + + // Stop any more messages from queuing. + _sys.SendQueue = null; + // Unhook all reply handlers. + _sys.Replies.Clear(); + + var si = new ServerInfo(); + sendq.Push(EventHelpers.NewPubMsg(null, subject, string.Empty, si, null, si, + (int)CompressionType.None, false, true)); + } + finally + { + _mu.ExitWriteLock(); + } + } + + // ========================================================================= + // Group G: sendInternalAccountSysMsg + // Mirrors Go (s *Server) sendInternalAccountSysMsg in server/events.go. + // ========================================================================= + + /// + /// Sends an internal system message to a specific account using that account's + /// internal client. Acquires only the minimum needed locks. + /// Mirrors Go (s *Server) sendInternalAccountSysMsg in server/events.go. + /// + internal void SendInternalAccountSysMsg( + Account? account, + string subject, + ServerInfo? si, + object? msg, + int compressionType = (int)CompressionType.None) + { + _mu.EnterReadLock(); + if (_sys is null || _sys.SendQueue is null || account is null) + { + _mu.ExitReadLock(); + return; + } + var sendq = _sys.SendQueue; + _mu.ExitReadLock(); + + ClientConnection? c; + _mu.EnterWriteLock(); + try + { + c = account.InternalAccountClient(); + } + finally + { + _mu.ExitWriteLock(); + } + + sendq.Push(EventHelpers.NewPubMsg( + c, subject, string.Empty, si, null, msg, compressionType, false, false)); + } + + // ========================================================================= + // Group G: sendInternalMsgLocked / sendInternalMsg + // Mirrors Go (s *Server) sendInternalMsgLocked and + // (s *Server) sendInternalMsg in server/events.go. + // ========================================================================= + + /// + /// Queues an internal message, acquiring the read lock. + /// Mirrors Go (s *Server) sendInternalMsgLocked in server/events.go. + /// + internal void SendInternalMsgLocked( + string subject, + string reply, + ServerInfo? si, + object? msg) + { + _mu.EnterReadLock(); + try { SendInternalMsg(subject, reply, si, msg); } + finally { _mu.ExitReadLock(); } + } + + /// + /// Queues an internal message. Lock must already be held. + /// Mirrors Go (s *Server) sendInternalMsg in server/events.go. + /// + internal void SendInternalMsg( + string subject, + string reply, + ServerInfo? si, + object? msg) + { + if (_sys is null || _sys.SendQueue is null) return; + _sys.SendQueue.Push(EventHelpers.NewPubMsg( + null, subject, reply, si, null, msg, (int)CompressionType.None, false, false)); + } + + /// + /// Queues an internal message from a specific client context. + /// Called by . + /// Mirrors Go (c *client) sendInternalMsg(...) in server/events.go. + /// + internal void SendInternalMsgFromClient( + ClientConnection client, + string subject, + string reply, + ServerInfo? si, + object? msg) + { + _mu.EnterReadLock(); + try + { + if (_sys is null || _sys.SendQueue is null) return; + _sys.SendQueue.Push(EventHelpers.NewPubMsg( + client, subject, reply, si, null, msg, (int)CompressionType.None, false, false)); + } + finally + { + _mu.ExitReadLock(); + } + } + + // ========================================================================= + // Group G: sendInternalResponse + // Mirrors Go (s *Server) sendInternalResponse in server/events.go. + // ========================================================================= + + /// + /// Sends a response to an internal server API request. + /// Mirrors Go (s *Server) sendInternalResponse in server/events.go. + /// + internal void SendInternalResponse(string subject, ServerApiResponse response) + { + _mu.EnterReadLock(); + try + { + if (_sys is null || _sys.SendQueue is null) return; + _sys.SendQueue.Push(EventHelpers.NewPubMsg( + null, subject, string.Empty, response.Server, null, + response, response.Compress, false, false)); + } + finally + { + _mu.ExitReadLock(); + } + } + + // ========================================================================= + // Group G: eventsRunning / EventsEnabled / eventsEnabled + // Mirrors Go counterparts in server/events.go. + // ========================================================================= + + /// + /// Returns true if the events system is running (server is running and events enabled). + /// Acquires the read lock internally. + /// Mirrors Go (s *Server) eventsRunning() bool in server/events.go. + /// + internal bool EventsRunning() + { + _mu.EnterReadLock(); + try { return IsRunning() && EventsEnabledLocked(); } + finally { _mu.ExitReadLock(); } + } + + /// + /// Returns true if the server has the events system enabled via a system account. + /// Acquires the read lock internally. + /// Mirrors Go (s *Server) EventsEnabled() bool in server/events.go. + /// + public bool EventsEnabled() + { + _mu.EnterReadLock(); + try { return EventsEnabledLocked(); } + finally { _mu.ExitReadLock(); } + } + + /// + /// Returns true if events are enabled. Lock must already be held. + /// Mirrors Go (s *Server) eventsEnabled() bool in server/events.go. + /// + private bool EventsEnabledLocked() => + _sys is not null && _sys.Client is not null && _sys.Account is not null; + + // ========================================================================= + // Group G: Node + // Mirrors Go (s *Server) Node() string in server/events.go. + // ========================================================================= + + /// + /// Returns the stable node hash (short hash of server name) used for JetStream + /// cluster targeting. Empty string if events not initialised. + /// Mirrors Go (s *Server) Node() string in server/events.go. + /// + public string Node() + { + _mu.EnterReadLock(); + try { return _sys?.ShortHash ?? string.Empty; } + finally { _mu.ExitReadLock(); } + } + + // ========================================================================= + // Group G: initEventTracking + // Mirrors Go (s *Server) initEventTracking() in server/events.go. + // ========================================================================= + + /// + /// Initialises the server-wide system subscription infrastructure: sets the + /// server's hash, subscribes to all internal system subjects, and starts the + /// send/receive loops. + /// Mirrors Go (s *Server) initEventTracking() in server/events.go. + /// + internal void InitEventTracking() + { + // Capture sys outside any lock to avoid deadlock. + _mu.EnterReadLock(); + var sys = _sys; + _mu.ExitReadLock(); + + if (sys is null || sys.Client is null || sys.Account is null) return; + + // Compute and store the server's short hash. + sys.ShortHash = EventHelpers.GetHash(_info.Name); + + // All-inbox subscription: $SYS._INBOX..* + var inboxSubject = string.Format(SystemSubjects.InboxRespSubj, sys.ShortHash, "*"); + if (SysSubscribe(inboxSubject, InboxReply) is { error: not null } inboxResult) + { + Errorf("Error setting up internal tracking: {0}", inboxResult.error); + return; + } + sys.InboxPrefix = inboxSubject; + + // Remote connections update (old-style subject). + var accConnsOld = string.Format(SystemSubjects.AccConnsEventSubjOld, "*"); + if (SysSubscribe(accConnsOld, NoInlineCallback(RemoteConnsUpdateStub)) is { error: not null } r1) + { + Errorf("Error setting up internal tracking for {0}: {1}", accConnsOld, r1.error); + return; + } + + // Connection responses for this server's ID. + var connsResp = string.Format(SystemSubjects.ConnsRespSubj, _info.Id); + if (SysSubscribe(connsResp, NoInlineCallback(RemoteConnsUpdateStub)) is { error: not null } r2) + { + Errorf("Error setting up internal tracking: {0}", r2.error); + return; + } + + // Subscription-count requests. + if (SysSubscribe(SystemSubjects.AccNumSubsReqSubj, NoInlineCallback(NsubsRequestStub)) is { error: not null } r3) + { + Errorf("Error setting up internal tracking: {0}", r3.error); + return; + } + + // Stats heartbeat from other servers. + var statsSubj = string.Format(SystemSubjects.ServerStatsSubj, "*"); + var statsSub = SysSubscribe(statsSubj, NoInlineCallback(RemoteServerUpdateStub)); + if (statsSub.error is not null) + { + Errorf("Error setting up internal tracking: {0}", statsSub.error); + return; + } + sys.RemoteStatsSub = statsSub.sub; + + // Shutdown events from other servers. + var shutdownSubj = string.Format(SystemSubjects.ShutdownEventSubj, "*"); + if (SysSubscribe(shutdownSubj, NoInlineCallback(RemoteServerShutdownStub)) is { error: not null } r4) + { + Errorf("Error setting up internal tracking: {0}", r4.error); + return; + } + + // Lame-duck events. + var lameDuckSubj = string.Format(SystemSubjects.LameDuckEventSubj, "*"); + if (SysSubscribe(lameDuckSubj, NoInlineCallback(RemoteServerShutdownStub)) is { error: not null } r5) + { + Errorf("Error setting up internal tracking: {0}", r5.error); + return; + } + + // Remote latency measurements for this server. + var latencySubj = string.Format(SystemSubjects.RemoteLatencyEventSubj, sys.ShortHash); + if (SysSubscribe(latencySubj, NoInlineCallback(RemoteLatencyUpdateStub)) is { error: not null } r6) + { + Errorf("Error setting up internal latency tracking: {0}", r6.error); + return; + } + + // Server reload request. + var reloadSubj = string.Format(SystemSubjects.ServerReloadReqSubj, _info.Id); + if (SysSubscribe(reloadSubj, NoInlineCallback(ReloadConfigStub)) is { error: not null } r7) + { + Errorf("Error setting up server reload handler: {0}", r7.error); + return; + } + + // Client kick/LDM requests. + var kickSubj = string.Format(SystemSubjects.ClientKickReqSubj, _info.Id); + if (SysSubscribe(kickSubj, NoInlineCallback(KickClientStub)) is { error: not null } r8) + { + Errorf("Error setting up client kick service: {0}", r8.error); + return; + } + + var ldmSubj = string.Format(SystemSubjects.ClientLdmReqSubj, _info.Id); + if (SysSubscribe(ldmSubj, NoInlineCallback(LdmClientStub)) is { error: not null } r9) + { + Errorf("Error setting up client LDM service: {0}", r9.error); + } + + // Account leaf-node connect events. + var leafConn = string.Format(SystemSubjects.LeafNodeConnectEventSubj, "*"); + if (SysSubscribe(leafConn, NoInlineCallback(LeafNodeConnectedStub)) is { error: not null } r10) + { + Errorf("Error setting up internal tracking: {0}", r10.error); + } + + // Debug subscriber service. + SysSubscribeInternal(SystemSubjects.AccSubsSubj, NoInlineCallback(DebugSubscribersStub)); + } + + // ------------------------------------------------------------------------- + // Stub handlers — full implementations live in other partial files (Events + // module). These are lightweight no-ops that satisfy the dispatch wiring so + // the build compiles and the event loops can run. + // ------------------------------------------------------------------------- + + private void RemoteConnsUpdateStub(Subscription sub, NatsClient c, Account acc, + string subject, string reply, byte[] hdr, byte[] msg) { } + + private void NsubsRequestStub(Subscription sub, NatsClient c, Account acc, + string subject, string reply, byte[] hdr, byte[] msg) { } + + private void RemoteServerUpdateStub(Subscription sub, NatsClient c, Account acc, + string subject, string reply, byte[] hdr, byte[] msg) { } + + private void RemoteServerShutdownStub(Subscription sub, NatsClient c, Account acc, + string subject, string reply, byte[] hdr, byte[] msg) { } + + private void RemoteLatencyUpdateStub(Subscription sub, NatsClient c, Account acc, + string subject, string reply, byte[] hdr, byte[] msg) { } + + private void ReloadConfigStub(Subscription sub, NatsClient c, Account acc, + string subject, string reply, byte[] hdr, byte[] msg) { } + + private void KickClientStub(Subscription sub, NatsClient c, Account acc, + string subject, string reply, byte[] hdr, byte[] msg) { } + + private void LdmClientStub(Subscription sub, NatsClient c, Account acc, + string subject, string reply, byte[] hdr, byte[] msg) { } + + private void LeafNodeConnectedStub(Subscription sub, NatsClient c, Account acc, + string subject, string reply, byte[] hdr, byte[] msg) { } + + private void DebugSubscribersStub(Subscription sub, NatsClient c, Account acc, + string subject, string reply, byte[] hdr, byte[] msg) { } + + // ========================================================================= + // Group G: filterRequest + // Mirrors Go (s *Server) filterRequest in server/events.go. + // ========================================================================= + + /// + /// Returns true if a system event request should be filtered (ignored) by + /// this server based on its name, host, cluster, tags, or domain. + /// Do NOT hold the server lock when calling this. + /// Mirrors Go (s *Server) filterRequest in server/events.go. + /// + internal bool FilterRequest(EventFilterOptions? fOpts) + { + if (fOpts is null) return false; + + var clusterName = ClusterName(); + var opts = GetOpts(); + + if (fOpts.ExactMatch) + { + if ((fOpts.Name != string.Empty && fOpts.Name != _info.Name) || + (fOpts.Host != string.Empty && fOpts.Host != _info.Host) || + (fOpts.Cluster != string.Empty && fOpts.Cluster != clusterName)) + { + return true; + } + } + else if ((fOpts.Name != string.Empty && !_info.Name.Contains(fOpts.Name, StringComparison.Ordinal)) || + (fOpts.Host != string.Empty && !_info.Host.Contains(fOpts.Host, StringComparison.Ordinal)) || + (fOpts.Cluster != string.Empty && !clusterName.Contains(fOpts.Cluster, StringComparison.Ordinal))) + { + return true; + } + + if (fOpts.Tags.Count > 0) + { + foreach (var tag in fOpts.Tags) + { + if (!opts.Tags.Contains(tag)) + return true; + } + } + + if (fOpts.Domain != string.Empty && opts.JetStreamDomain != fOpts.Domain) + return true; + + return false; + } + + // ========================================================================= + // Group G: noInlineCallback / noInlineCallbackStatsz / noInlineCallbackRecvQSelect + // Mirrors Go variants in server/events.go. + // ========================================================================= + + private const int RecvQMuxed = 1; + private const int RecvQStatsz = 2; + + /// + /// Wraps a so that it is always executed on the + /// internal receive queue (never inline with route/gateway processing). + /// Mirrors Go (s *Server) noInlineCallback in server/events.go. + /// + internal SysMsgHandler? NoInlineCallback(SysMsgHandler cb) => + NoInlineCallbackRecvQSelect(cb, RecvQMuxed); + + /// + /// Wraps a for the priority (statsz) receive queue. + /// Mirrors Go (s *Server) noInlineCallbackStatsz in server/events.go. + /// + internal SysMsgHandler? NoInlineCallbackStatsz(SysMsgHandler cb) => + NoInlineCallbackRecvQSelect(cb, RecvQStatsz); + + /// + /// Core wrapper implementation. Returns a handler that pushes messages onto + /// the selected internal receive queue rather than executing inline. + /// Mirrors Go (s *Server) noInlineCallbackRecvQSelect in server/events.go. + /// + internal SysMsgHandler? NoInlineCallbackRecvQSelect(SysMsgHandler cb, int recvQSelect) + { + _mu.EnterReadLock(); + if (!EventsEnabledLocked()) + { + _mu.ExitReadLock(); + return null; + } + + IpQueue recvq = recvQSelect == RecvQStatsz + ? (_sys!.RecvQueuePriority ?? _sys.RecvQueue!) + : _sys!.RecvQueue!; + _mu.ExitReadLock(); + + return (sub, c, acc, subj, rply, hdr, msg) => + { + var hdrCopy = hdr is { Length: > 0 } ? (byte[])hdr.Clone() : []; + var msgCopy = msg is { Length: > 0 } ? (byte[])msg.Clone() : []; + recvq.Push(new InSysMsg + { + Sub = sub, + Client = c, + Acc = acc, + Subject = subj, + Reply = rply, + Hdr = hdrCopy, + Msg = msgCopy, + Cb = cb, + }); + }; + } + + // ========================================================================= + // Group G: sysSubscribe / sysSubscribeQ / sysSubscribeInternal / systemSubscribe + // Mirrors Go variants in server/events.go. + // ========================================================================= + + /// + /// Creates an internal subscription on the system account. + /// Mirrors Go (s *Server) sysSubscribe in server/events.go. + /// + internal (Subscription? sub, Exception? error) SysSubscribe(string subject, SysMsgHandler? cb) => + SystemSubscribe(subject, string.Empty, false, null, cb); + + /// + /// Creates an internal subscription with a queue group on the system account. + /// Mirrors Go (s *Server) sysSubscribeQ in server/events.go. + /// + internal (Subscription? sub, Exception? error) SysSubscribeQ(string subject, string queue, SysMsgHandler? cb) => + SystemSubscribe(subject, queue, false, null, cb); + + /// + /// Creates an internal subscription that does NOT forward interest to routes/gateways. + /// Mirrors Go (s *Server) sysSubscribeInternal in server/events.go. + /// + internal (Subscription? sub, Exception? error) SysSubscribeInternal(string subject, SysMsgHandler? cb) => + SystemSubscribe(subject, string.Empty, true, null, cb); + + /// + /// Core subscription implementation used by all sysSubscribe* helpers. + /// Creates a subscription on the system account's internal client. + /// Mirrors Go (s *Server) systemSubscribe in server/events.go. + /// + internal (Subscription? sub, Exception? error) SystemSubscribe( + string subject, + string queue, + bool internalOnly, + ClientConnection? client, + SysMsgHandler? cb) + { + _mu.EnterWriteLock(); + if (!EventsEnabledLocked()) + { + _mu.ExitWriteLock(); + return (null, ServerErrors.ErrNoSysAccount); + } + + if (cb is null) + { + _mu.ExitWriteLock(); + return (null, new ArgumentNullException(nameof(cb), "undefined message handler")); + } + + var c = client ?? _sys!.Client!; + _sys!.Sid++; + var sid = _sys.Sid.ToString(); + _mu.ExitWriteLock(); + + var subBytes = Encoding.ASCII.GetBytes(subject); + var sidBytes = Encoding.ASCII.GetBytes(sid); + byte[]? qBytes = string.IsNullOrEmpty(queue) ? null : Encoding.ASCII.GetBytes(queue); + + // Map SysMsgHandler → the internal MsgHandler/subscription mechanism. + // The callback receives the raw message bytes split into hdr+msg. + SysMsgHandler capturedCb = cb; + var (sub, err) = c.ProcessSubEx(subBytes, qBytes, sidBytes, internalOnly, false, false); + + if (err is not null) + return (null, err); + + if (sub is not null) + { + // Attach the callback to the subscription via the InSysMsg dispatch. + // Store the callback in the subscription so the receive queue can call it. + sub.SysMsgCb = capturedCb; + } + + return (sub, null); + } + + // ========================================================================= + // Group G: sysUnsubscribe + // Mirrors Go (s *Server) sysUnsubscribe in server/events.go. + // ========================================================================= + + /// + /// Unsubscribes from a system subject by removing the given subscription. + /// Mirrors Go (s *Server) sysUnsubscribe in server/events.go. + /// + internal void SysUnsubscribe(Subscription? sub) + { + if (sub is null) return; + + _mu.EnterReadLock(); + if (!EventsEnabledLocked()) + { + _mu.ExitReadLock(); + return; + } + _mu.ExitReadLock(); + + // System subscriptions are always owned by the system client (_sys.Client). + // Retrieve it under a read lock, then unsubscribe outside the lock. + _mu.EnterReadLock(); + var sysClient = _sys?.Client; + _mu.ExitReadLock(); + + if (sub.Sid is not null && sysClient is not null) + sysClient.RemoveSubBySid(sub.Sid); + } + + // ========================================================================= + // Group G: inboxReply + // Mirrors Go (s *Server) inboxReply in server/events.go. + // ========================================================================= + + /// + /// Handles inbox replies without propagating supercluster-wide interest. + /// Dispatches to the registered reply handler if one exists for this subject. + /// Mirrors Go (s *Server) inboxReply in server/events.go. + /// + internal void InboxReply( + Subscription sub, + NatsClient c, + Account acc, + string subject, + string reply, + byte[] hdr, + byte[] msg) + { + _mu.EnterReadLock(); + if (!EventsEnabledLocked() || _sys!.Replies.Count == 0) + { + _mu.ExitReadLock(); + return; + } + _sys.Replies.TryGetValue(subject, out var handler); + _mu.ExitReadLock(); + + handler?.Invoke(sub, c, acc, subject, reply, hdr, msg); + } + + // ========================================================================= + // Group G: newRespInbox + // Mirrors Go (s *Server) newRespInbox() string in server/events.go. + // ========================================================================= + + /// + /// Generates a new unique response inbox subject using this server's hash prefix. + /// Format: $SYS._INBOX.{hash}.{8-char-random}. + /// Mirrors Go (s *Server) newRespInbox() string in server/events.go. + /// + internal string NewRespInbox() + { + const string digits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + const int base62 = 62; + const int suffixLen = EventHelpers.ReplySuffixLen; + + var inboxPre = _sys?.InboxPrefix ?? string.Empty; + // Strip the trailing '*' from $SYS._INBOX.{hash}.* → use as prefix + var prefix = inboxPre.EndsWith(".*", StringComparison.Ordinal) + ? inboxPre[..^1] // strip '*', keep the dot + : inboxPre + "."; + + var suffix = new char[suffixLen]; + var rng = (long)Random.Shared.NextInt64(); + if (rng < 0) rng = -rng; + + for (var i = 0; i < suffixLen; i++) + { + suffix[i] = digits[(int)(rng % base62)]; + rng /= base62; + } + + return prefix + new string(suffix); + } + + // ========================================================================= + // Group G: wrapChk + // Mirrors Go (s *Server) wrapChk in server/events.go. + // ========================================================================= + + /// + /// Returns a wrapper function that acquires the server write lock, checks that + /// events are enabled, invokes , then releases the lock. + /// Mirrors Go (s *Server) wrapChk(f func()) func() in server/events.go. + /// + internal Action WrapChk(Action f) + { + return () => + { + _mu.EnterWriteLock(); + try + { + if (!EventsEnabledLocked()) return; + f(); + } + finally + { + _mu.ExitWriteLock(); + } + }; + } +} + +// ============================================================================ +// EventFilterOptions — common filter options for system event requests +// Mirrors Go EventFilterOptions in server/events.go. +// ============================================================================ + +/// +/// Filter criteria applied to system event requests (STATSZ, VARZ, CONNZ, etc.). +/// Mirrors Go EventFilterOptions in server/events.go. +/// +public sealed class EventFilterOptions +{ + /// Filter by server name (substring unless ExactMatch). + public string Name { get; set; } = string.Empty; + + /// Filter by cluster name (substring unless ExactMatch). + public string Cluster { get; set; } = string.Empty; + + /// Filter by host (substring unless ExactMatch). + public string Host { get; set; } = string.Empty; + + /// When true, use exact equality instead of substring matching. + public bool ExactMatch { get; set; } + + /// Filter by tags (all must match). + public List Tags { get; set; } = []; + + /// Filter by JetStream domain. + public string Domain { get; set; } = string.Empty; +} + +// ============================================================================ +// ServerApiResponse — standard API response envelope +// Mirrors Go ServerAPIResponse in server/events.go. +// ============================================================================ + +/// +/// Response envelope used by system API endpoints (varz, connz, statsz, etc.). +/// Mirrors Go ServerAPIResponse in server/events.go. +/// +public sealed class ServerApiResponse +{ + public ServerInfo? Server { get; set; } + public object? Data { get; set; } + public ApiError? Error { get; set; } + + /// Internal: compression type for this response. Not serialised. + internal int Compress { get; set; } +} + +/// +/// API error returned in . +/// Mirrors Go ApiError in server/events.go. +/// +public sealed class ApiError +{ + public int Code { get; set; } + public string Description { get; set; } = string.Empty; +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs index f59d247..4ebfc97 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs @@ -88,6 +88,100 @@ public sealed class ServerInfo // LeafNode-specific [JsonPropertyName("leafnode_urls")] public string[]? LeafNodeUrls { get; set; } + // ========================================================================= + // Advisory / event-system fields + // Mirrors Go server/events.go ServerInfo advisory struct fields. + // These are populated by the internal send loop when publishing advisories. + // ========================================================================= + + /// + /// Sequence number for this server's advisory messages. + /// Mirrors Go Seq uint64 in ServerInfo (events.go). + /// + [JsonPropertyName("seq")] + public ulong Seq { get; set; } + + /// + /// UTC timestamp of this advisory message. + /// Mirrors Go Time time.Time in ServerInfo (events.go). + /// + [JsonPropertyName("time")] + public DateTime Time { get; set; } + + /// + /// Capability flags for this server (bitmask of ). + /// Mirrors Go Flags ServerCapability in ServerInfo (events.go). + /// + [JsonPropertyName("flags")] + public ulong Capabilities { get; set; } + + /// + /// Server tags from options. + /// Mirrors Go Tags []string in ServerInfo (events.go). + /// + [JsonPropertyName("tags")] + [JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)] + public List? Tags { get; set; } + + /// + /// Server metadata from options. + /// Mirrors Go Metadata map[string]string in ServerInfo (events.go). + /// + [JsonPropertyName("metadata")] + [JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)] + public Dictionary? Metadata { get; set; } + + // ========================================================================= + // Capability helpers (Group B) + // Mirrors Go ServerInfo capability methods in server/events.go. + // ========================================================================= + + /// + /// Sets the JetStream capability bit and the legacy JetStream bool field. + /// Mirrors Go (si *ServerInfo) SetJetStreamEnabled() in server/events.go. + /// + public void SetJetStreamEnabled() + { + Capabilities |= (ulong)ServerCapability.JetStreamEnabled; + JetStream = true; + } + + /// + /// Returns whether this server has JetStream enabled, taking into account + /// both the capability flag and the legacy bool field. + /// Mirrors Go (si *ServerInfo) JetStreamEnabled() bool in server/events.go. + /// + public bool IsJetStreamEnabled() => + (Capabilities & (ulong)ServerCapability.JetStreamEnabled) != 0 || JetStream; + + /// + /// Sets the binary-stream-snapshot capability bit. + /// Mirrors Go (si *ServerInfo) SetBinaryStreamSnapshot() in server/events.go. + /// + public void SetBinaryStreamSnapshot() => + Capabilities |= (ulong)ServerCapability.BinaryStreamSnapshot; + + /// + /// Returns whether this server supports binary stream snapshots. + /// Mirrors Go (si *ServerInfo) BinaryStreamSnapshot() bool in server/events.go. + /// + public bool IsBinaryStreamSnapshot() => + (Capabilities & (ulong)ServerCapability.BinaryStreamSnapshot) != 0; + + /// + /// Sets the account-NRG capability bit. + /// Mirrors Go (si *ServerInfo) SetAccountNRG() in server/events.go. + /// + public void SetAccountNrg() => + Capabilities |= (ulong)ServerCapability.AccountNrg; + + /// + /// Returns whether this server supports moving NRG traffic into the asset account. + /// Mirrors Go (si *ServerInfo) AccountNRG() bool in server/events.go. + /// + public bool IsAccountNrg() => + (Capabilities & (ulong)ServerCapability.AccountNrg) != 0; + /// Returns a shallow clone of this . internal ServerInfo ShallowClone() => (ServerInfo)MemberwiseClone(); }