Files
natsnet/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Events.cs
Joseph Doherty dff3000461 feat(batch44): implement events core dispatch — Groups A-G from events.go
Port 46 deferred features from server/events.go covering the internal event
system foundation:

- Group A (EventHelpers.cs): NewPubMsg/pool, RouteStat, GetHash/GetHashSize,
  GetAcceptEncoding, RemoteLatencySubjectForResponse, TotalSubs, AccForClient,
  IssuerForClient, ClearTimer; CompressionType enum
- Group B (NatsServerTypes.cs): ServerInfo advisory fields (Seq, Time,
  Capabilities, Tags, Metadata) + capability methods (SetJetStreamEnabled,
  IsJetStreamEnabled, SetBinaryStreamSnapshot, IsBinaryStreamSnapshot,
  SetAccountNrg, IsAccountNrg)
- Group C (ClientTypes.cs): ForAssignmentSnap, ForProposal, ForAdvisory on
  ClientInfo
- Group D (EventTypes.cs): PubMsg.ReturnToPool; PubMsg.Client typed as
  ClientConnection?; InternalState.Client typed as ClientConnection?;
  InternalState.Seq changed to long field for Interlocked.Increment
- Group E (ClientConnection.Events.cs): SendInternalMsg delegates to server
- Group F (Account.Events.cs): AccountTrafficStats/Set + Account.Statz()
- Group G (NatsServer.Events.cs): InternalReceiveLoop, InternalSendLoop,
  SendShutdownEvent, SendInternalAccountSysMsg, SendInternalMsgLocked,
  SendInternalMsg, SendInternalMsgFromClient, SendInternalResponse,
  EventsRunning, EventsEnabled, Node, InitEventTracking, FilterRequest,
  NoInlineCallback*, SysSubscribe*, SystemSubscribe, SysUnsubscribe,
  InboxReply, NewRespInbox, WrapChk; EventFilterOptions, ServerApiResponse,
  ApiError types
- Subscription.SysMsgCb field added for system subscription dispatch
2026-03-01 09:01:03 -05:00

950 lines
36 KiB
C#

// Copyright 2018-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Adapted from server/events.go in the NATS server Go source.
// Batch 44: Events Core & Dispatch.
using System.Text;
using System.Text.Json;
using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
// =========================================================================
// Constants (mirrors Go sysHashLen = 8, InboxPrefix consts)
// =========================================================================
/// <summary>Length of the response-inbox prefix used for this server's internal replies.</summary>
private int RespInboxPrefixLen => EventHelpers.InboxPrefixLen + EventHelpers.SysHashLen + 1;
// =========================================================================
// Group G: internalReceiveLoop
// Mirrors Go <c>(s *Server) internalReceiveLoop</c> in server/events.go.
// =========================================================================
/// <summary>
/// Background loop that dispatches all messages the server needs to process
/// internally via system subscriptions (e.g. internal subs).
/// Mirrors Go <c>(s *Server) internalReceiveLoop</c> in server/events.go.
/// </summary>
private void InternalReceiveLoop(IpQueue<InSysMsg> recvq)
{
while (EventsRunning())
{
// Wait for a notification that items are ready.
if (!recvq.Ch.WaitToReadAsync(_quitCts.Token).AsTask()
.GetAwaiter()
.GetResult())
{
return;
}
var msgs = recvq.Pop();
if (msgs is not null)
{
foreach (var m in msgs)
{
m.Cb?.Invoke(m.Sub, m.Client, m.Acc, m.Subject, m.Reply, m.Hdr, m.Msg);
}
recvq.Recycle(msgs);
}
}
}
// =========================================================================
// Group G: internalSendLoop
// Mirrors Go <c>(s *Server) internalSendLoop</c> in server/events.go.
// =========================================================================
/// <summary>
/// Background loop that serialises and dispatches all messages the server
/// wants to send through the system account. Runs as a long-lived goroutine.
/// Mirrors Go <c>(s *Server) internalSendLoop</c> in server/events.go.
/// </summary>
private void InternalSendLoop()
{
// Read snapshot of send queue and system client under the read lock.
// Mirrors Go's RESET: label pattern.
RESET:
_mu.EnterReadLock();
if (_sys is null || _sys.SendQueue is null)
{
_mu.ExitReadLock();
return;
}
var sysc = _sys.Client;
var resetCh = _sys.ResetChannel;
var sendq = _sys.SendQueue;
var id = _info.Id;
var host = _info.Host;
var svrName = _info.Name;
var domain = _info.Domain ?? string.Empty;
var seqRef = _sys; // holds ref so we can atomically increment Seq
var jsEnabled = _info.JetStream;
var cluster = _info.Cluster ?? string.Empty;
if (_gateway.Enabled)
cluster = GetGatewayName();
_mu.ExitReadLock();
var opts = GetOpts();
var tags = opts.Tags;
var metadata = opts.Metadata;
while (EventsRunning())
{
// Wait for items in the send queue OR a reset signal OR quit.
var sendTask = sendq.Ch.WaitToReadAsync(_quitCts.Token).AsTask();
var resetTask = resetCh is not null
? resetCh.Reader.WaitToReadAsync(_quitCts.Token).AsTask()
: Task.FromResult(false);
var completed = Task.WhenAny(sendTask, resetTask, Task.Delay(Timeout.Infinite, _quitCts.Token))
.GetAwaiter().GetResult();
if (_quitCts.IsCancellationRequested)
return;
// If reset channel fired, re-read the snapshot.
if (completed == resetTask && resetTask.IsCompletedSuccessfully && resetTask.Result)
{
resetCh?.Reader.TryRead(out _);
goto RESET;
}
if (!sendTask.IsCompletedSuccessfully || !sendTask.Result)
continue;
var msgs = sendq.Pop();
if (msgs is null) continue;
foreach (var pm in msgs)
{
// Stamp ServerInfo advisory fields if requested.
if (pm.Si is { } si)
{
si.Name = svrName;
si.Domain = domain;
si.Host = host;
si.Cluster = cluster;
si.Id = id;
si.Seq = (ulong)Interlocked.Increment(ref seqRef.Seq);
si.Version = ServerConstants.Version;
si.Time = DateTime.UtcNow;
si.Tags = tags.Count > 0 ? [..tags] : null;
si.Metadata = metadata.Count > 0 ? new Dictionary<string, string>(metadata) : null;
si.Capabilities = 0;
if (jsEnabled)
{
si.SetJetStreamEnabled();
si.SetBinaryStreamSnapshot();
// AccountNRG: stub, not yet tracked
}
}
// Serialise payload.
byte[] body = [];
if (pm.Msg is not null)
{
body = pm.Msg switch
{
string s => Encoding.UTF8.GetBytes(s),
byte[] b => b,
_ => JsonSerializer.SerializeToUtf8Bytes(pm.Msg),
};
}
// Choose client.
var c = pm.Client ?? sysc;
if (c is null) { pm.ReturnToPool(); continue; }
// Process the publish inline.
lock (c)
{
c.ParseCtx.Pa.Subject = Encoding.ASCII.GetBytes(pm.Subject);
c.ParseCtx.Pa.Reply = Encoding.ASCII.GetBytes(pm.Reply);
}
// Append CRLF.
var payload = new byte[body.Length + 2];
Buffer.BlockCopy(body, 0, payload, 0, body.Length);
payload[^2] = (byte)'\r';
payload[^1] = (byte)'\n';
c.ProcessInboundClientMsg(payload);
if (pm.Last)
{
// Final message (shutdown): flush in-place and exit.
c.FlushClients(long.MaxValue);
sendq.Recycle(msgs);
pm.ReturnToPool();
return;
}
else
{
c.FlushClients(0);
}
pm.ReturnToPool();
}
sendq.Recycle(msgs);
}
}
// =========================================================================
// Group G: sendShutdownEvent
// Mirrors Go <c>(s *Server) sendShutdownEvent</c> in server/events.go.
// =========================================================================
/// <summary>
/// Queues the server shutdown event. Clears the send queue and reply
/// handlers so no further messages will be dispatched.
/// Mirrors Go <c>(s *Server) sendShutdownEvent</c> in server/events.go.
/// </summary>
internal void SendShutdownEvent()
{
_mu.EnterWriteLock();
try
{
if (_sys is null || _sys.SendQueue is null) return;
var subject = string.Format(SystemSubjects.ShutdownEventSubj, _info.Id);
var sendq = _sys.SendQueue;
// Stop any more messages from queuing.
_sys.SendQueue = null;
// Unhook all reply handlers.
_sys.Replies.Clear();
var si = new ServerInfo();
sendq.Push(EventHelpers.NewPubMsg(null, subject, string.Empty, si, null, si,
(int)CompressionType.None, false, true));
}
finally
{
_mu.ExitWriteLock();
}
}
// =========================================================================
// Group G: sendInternalAccountSysMsg
// Mirrors Go <c>(s *Server) sendInternalAccountSysMsg</c> in server/events.go.
// =========================================================================
/// <summary>
/// Sends an internal system message to a specific account using that account's
/// internal client. Acquires only the minimum needed locks.
/// Mirrors Go <c>(s *Server) sendInternalAccountSysMsg</c> in server/events.go.
/// </summary>
internal void SendInternalAccountSysMsg(
Account? account,
string subject,
ServerInfo? si,
object? msg,
int compressionType = (int)CompressionType.None)
{
_mu.EnterReadLock();
if (_sys is null || _sys.SendQueue is null || account is null)
{
_mu.ExitReadLock();
return;
}
var sendq = _sys.SendQueue;
_mu.ExitReadLock();
ClientConnection? c;
_mu.EnterWriteLock();
try
{
c = account.InternalAccountClient();
}
finally
{
_mu.ExitWriteLock();
}
sendq.Push(EventHelpers.NewPubMsg(
c, subject, string.Empty, si, null, msg, compressionType, false, false));
}
// =========================================================================
// Group G: sendInternalMsgLocked / sendInternalMsg
// Mirrors Go <c>(s *Server) sendInternalMsgLocked</c> and
// <c>(s *Server) sendInternalMsg</c> in server/events.go.
// =========================================================================
/// <summary>
/// Queues an internal message, acquiring the read lock.
/// Mirrors Go <c>(s *Server) sendInternalMsgLocked</c> in server/events.go.
/// </summary>
internal void SendInternalMsgLocked(
string subject,
string reply,
ServerInfo? si,
object? msg)
{
_mu.EnterReadLock();
try { SendInternalMsg(subject, reply, si, msg); }
finally { _mu.ExitReadLock(); }
}
/// <summary>
/// Queues an internal message. Lock must already be held.
/// Mirrors Go <c>(s *Server) sendInternalMsg</c> in server/events.go.
/// </summary>
internal void SendInternalMsg(
string subject,
string reply,
ServerInfo? si,
object? msg)
{
if (_sys is null || _sys.SendQueue is null) return;
_sys.SendQueue.Push(EventHelpers.NewPubMsg(
null, subject, reply, si, null, msg, (int)CompressionType.None, false, false));
}
/// <summary>
/// Queues an internal message from a specific client context.
/// Called by <see cref="ClientConnection.SendInternalMsg"/>.
/// Mirrors Go <c>(c *client) sendInternalMsg(...)</c> in server/events.go.
/// </summary>
internal void SendInternalMsgFromClient(
ClientConnection client,
string subject,
string reply,
ServerInfo? si,
object? msg)
{
_mu.EnterReadLock();
try
{
if (_sys is null || _sys.SendQueue is null) return;
_sys.SendQueue.Push(EventHelpers.NewPubMsg(
client, subject, reply, si, null, msg, (int)CompressionType.None, false, false));
}
finally
{
_mu.ExitReadLock();
}
}
// =========================================================================
// Group G: sendInternalResponse
// Mirrors Go <c>(s *Server) sendInternalResponse</c> in server/events.go.
// =========================================================================
/// <summary>
/// Sends a response to an internal server API request.
/// Mirrors Go <c>(s *Server) sendInternalResponse</c> in server/events.go.
/// </summary>
internal void SendInternalResponse(string subject, ServerApiResponse response)
{
_mu.EnterReadLock();
try
{
if (_sys is null || _sys.SendQueue is null) return;
_sys.SendQueue.Push(EventHelpers.NewPubMsg(
null, subject, string.Empty, response.Server, null,
response, response.Compress, false, false));
}
finally
{
_mu.ExitReadLock();
}
}
// =========================================================================
// Group G: eventsRunning / EventsEnabled / eventsEnabled
// Mirrors Go counterparts in server/events.go.
// =========================================================================
/// <summary>
/// Returns true if the events system is running (server is running and events enabled).
/// Acquires the read lock internally.
/// Mirrors Go <c>(s *Server) eventsRunning() bool</c> in server/events.go.
/// </summary>
internal bool EventsRunning()
{
_mu.EnterReadLock();
try { return IsRunning() && EventsEnabledLocked(); }
finally { _mu.ExitReadLock(); }
}
/// <summary>
/// Returns true if the server has the events system enabled via a system account.
/// Acquires the read lock internally.
/// Mirrors Go <c>(s *Server) EventsEnabled() bool</c> in server/events.go.
/// </summary>
public bool EventsEnabled()
{
_mu.EnterReadLock();
try { return EventsEnabledLocked(); }
finally { _mu.ExitReadLock(); }
}
/// <summary>
/// Returns true if events are enabled. Lock must already be held.
/// Mirrors Go <c>(s *Server) eventsEnabled() bool</c> in server/events.go.
/// </summary>
private bool EventsEnabledLocked() =>
_sys is not null && _sys.Client is not null && _sys.Account is not null;
// =========================================================================
// Group G: Node
// Mirrors Go <c>(s *Server) Node() string</c> in server/events.go.
// =========================================================================
/// <summary>
/// Returns the stable node hash (short hash of server name) used for JetStream
/// cluster targeting. Empty string if events not initialised.
/// Mirrors Go <c>(s *Server) Node() string</c> in server/events.go.
/// </summary>
public string Node()
{
_mu.EnterReadLock();
try { return _sys?.ShortHash ?? string.Empty; }
finally { _mu.ExitReadLock(); }
}
// =========================================================================
// Group G: initEventTracking
// Mirrors Go <c>(s *Server) initEventTracking()</c> in server/events.go.
// =========================================================================
/// <summary>
/// Initialises the server-wide system subscription infrastructure: sets the
/// server's hash, subscribes to all internal system subjects, and starts the
/// send/receive loops.
/// Mirrors Go <c>(s *Server) initEventTracking()</c> in server/events.go.
/// </summary>
internal void InitEventTracking()
{
// Capture sys outside any lock to avoid deadlock.
_mu.EnterReadLock();
var sys = _sys;
_mu.ExitReadLock();
if (sys is null || sys.Client is null || sys.Account is null) return;
// Compute and store the server's short hash.
sys.ShortHash = EventHelpers.GetHash(_info.Name);
// All-inbox subscription: $SYS._INBOX.<hash>.*
var inboxSubject = string.Format(SystemSubjects.InboxRespSubj, sys.ShortHash, "*");
if (SysSubscribe(inboxSubject, InboxReply) is { error: not null } inboxResult)
{
Errorf("Error setting up internal tracking: {0}", inboxResult.error);
return;
}
sys.InboxPrefix = inboxSubject;
// Remote connections update (old-style subject).
var accConnsOld = string.Format(SystemSubjects.AccConnsEventSubjOld, "*");
if (SysSubscribe(accConnsOld, NoInlineCallback(RemoteConnsUpdateStub)) is { error: not null } r1)
{
Errorf("Error setting up internal tracking for {0}: {1}", accConnsOld, r1.error);
return;
}
// Connection responses for this server's ID.
var connsResp = string.Format(SystemSubjects.ConnsRespSubj, _info.Id);
if (SysSubscribe(connsResp, NoInlineCallback(RemoteConnsUpdateStub)) is { error: not null } r2)
{
Errorf("Error setting up internal tracking: {0}", r2.error);
return;
}
// Subscription-count requests.
if (SysSubscribe(SystemSubjects.AccNumSubsReqSubj, NoInlineCallback(NsubsRequestStub)) is { error: not null } r3)
{
Errorf("Error setting up internal tracking: {0}", r3.error);
return;
}
// Stats heartbeat from other servers.
var statsSubj = string.Format(SystemSubjects.ServerStatsSubj, "*");
var statsSub = SysSubscribe(statsSubj, NoInlineCallback(RemoteServerUpdateStub));
if (statsSub.error is not null)
{
Errorf("Error setting up internal tracking: {0}", statsSub.error);
return;
}
sys.RemoteStatsSub = statsSub.sub;
// Shutdown events from other servers.
var shutdownSubj = string.Format(SystemSubjects.ShutdownEventSubj, "*");
if (SysSubscribe(shutdownSubj, NoInlineCallback(RemoteServerShutdownStub)) is { error: not null } r4)
{
Errorf("Error setting up internal tracking: {0}", r4.error);
return;
}
// Lame-duck events.
var lameDuckSubj = string.Format(SystemSubjects.LameDuckEventSubj, "*");
if (SysSubscribe(lameDuckSubj, NoInlineCallback(RemoteServerShutdownStub)) is { error: not null } r5)
{
Errorf("Error setting up internal tracking: {0}", r5.error);
return;
}
// Remote latency measurements for this server.
var latencySubj = string.Format(SystemSubjects.RemoteLatencyEventSubj, sys.ShortHash);
if (SysSubscribe(latencySubj, NoInlineCallback(RemoteLatencyUpdateStub)) is { error: not null } r6)
{
Errorf("Error setting up internal latency tracking: {0}", r6.error);
return;
}
// Server reload request.
var reloadSubj = string.Format(SystemSubjects.ServerReloadReqSubj, _info.Id);
if (SysSubscribe(reloadSubj, NoInlineCallback(ReloadConfigStub)) is { error: not null } r7)
{
Errorf("Error setting up server reload handler: {0}", r7.error);
return;
}
// Client kick/LDM requests.
var kickSubj = string.Format(SystemSubjects.ClientKickReqSubj, _info.Id);
if (SysSubscribe(kickSubj, NoInlineCallback(KickClientStub)) is { error: not null } r8)
{
Errorf("Error setting up client kick service: {0}", r8.error);
return;
}
var ldmSubj = string.Format(SystemSubjects.ClientLdmReqSubj, _info.Id);
if (SysSubscribe(ldmSubj, NoInlineCallback(LdmClientStub)) is { error: not null } r9)
{
Errorf("Error setting up client LDM service: {0}", r9.error);
}
// Account leaf-node connect events.
var leafConn = string.Format(SystemSubjects.LeafNodeConnectEventSubj, "*");
if (SysSubscribe(leafConn, NoInlineCallback(LeafNodeConnectedStub)) is { error: not null } r10)
{
Errorf("Error setting up internal tracking: {0}", r10.error);
}
// Debug subscriber service.
SysSubscribeInternal(SystemSubjects.AccSubsSubj, NoInlineCallback(DebugSubscribersStub));
}
// -------------------------------------------------------------------------
// Stub handlers — full implementations live in other partial files (Events
// module). These are lightweight no-ops that satisfy the dispatch wiring so
// the build compiles and the event loops can run.
// -------------------------------------------------------------------------
private void RemoteConnsUpdateStub(Subscription sub, NatsClient c, Account acc,
string subject, string reply, byte[] hdr, byte[] msg) { }
private void NsubsRequestStub(Subscription sub, NatsClient c, Account acc,
string subject, string reply, byte[] hdr, byte[] msg) { }
private void RemoteServerUpdateStub(Subscription sub, NatsClient c, Account acc,
string subject, string reply, byte[] hdr, byte[] msg) { }
private void RemoteServerShutdownStub(Subscription sub, NatsClient c, Account acc,
string subject, string reply, byte[] hdr, byte[] msg) { }
private void RemoteLatencyUpdateStub(Subscription sub, NatsClient c, Account acc,
string subject, string reply, byte[] hdr, byte[] msg) { }
private void ReloadConfigStub(Subscription sub, NatsClient c, Account acc,
string subject, string reply, byte[] hdr, byte[] msg) { }
private void KickClientStub(Subscription sub, NatsClient c, Account acc,
string subject, string reply, byte[] hdr, byte[] msg) { }
private void LdmClientStub(Subscription sub, NatsClient c, Account acc,
string subject, string reply, byte[] hdr, byte[] msg) { }
private void LeafNodeConnectedStub(Subscription sub, NatsClient c, Account acc,
string subject, string reply, byte[] hdr, byte[] msg) { }
private void DebugSubscribersStub(Subscription sub, NatsClient c, Account acc,
string subject, string reply, byte[] hdr, byte[] msg) { }
// =========================================================================
// Group G: filterRequest
// Mirrors Go <c>(s *Server) filterRequest</c> in server/events.go.
// =========================================================================
/// <summary>
/// Returns true if a system event request should be filtered (ignored) by
/// this server based on its name, host, cluster, tags, or domain.
/// Do NOT hold the server lock when calling this.
/// Mirrors Go <c>(s *Server) filterRequest</c> in server/events.go.
/// </summary>
internal bool FilterRequest(EventFilterOptions? fOpts)
{
if (fOpts is null) return false;
var clusterName = ClusterName();
var opts = GetOpts();
if (fOpts.ExactMatch)
{
if ((fOpts.Name != string.Empty && fOpts.Name != _info.Name) ||
(fOpts.Host != string.Empty && fOpts.Host != _info.Host) ||
(fOpts.Cluster != string.Empty && fOpts.Cluster != clusterName))
{
return true;
}
}
else if ((fOpts.Name != string.Empty && !_info.Name.Contains(fOpts.Name, StringComparison.Ordinal)) ||
(fOpts.Host != string.Empty && !_info.Host.Contains(fOpts.Host, StringComparison.Ordinal)) ||
(fOpts.Cluster != string.Empty && !clusterName.Contains(fOpts.Cluster, StringComparison.Ordinal)))
{
return true;
}
if (fOpts.Tags.Count > 0)
{
foreach (var tag in fOpts.Tags)
{
if (!opts.Tags.Contains(tag))
return true;
}
}
if (fOpts.Domain != string.Empty && opts.JetStreamDomain != fOpts.Domain)
return true;
return false;
}
// =========================================================================
// Group G: noInlineCallback / noInlineCallbackStatsz / noInlineCallbackRecvQSelect
// Mirrors Go variants in server/events.go.
// =========================================================================
private const int RecvQMuxed = 1;
private const int RecvQStatsz = 2;
/// <summary>
/// Wraps a <see cref="SysMsgHandler"/> so that it is always executed on the
/// internal receive queue (never inline with route/gateway processing).
/// Mirrors Go <c>(s *Server) noInlineCallback</c> in server/events.go.
/// </summary>
internal SysMsgHandler? NoInlineCallback(SysMsgHandler cb) =>
NoInlineCallbackRecvQSelect(cb, RecvQMuxed);
/// <summary>
/// Wraps a <see cref="SysMsgHandler"/> for the priority (statsz) receive queue.
/// Mirrors Go <c>(s *Server) noInlineCallbackStatsz</c> in server/events.go.
/// </summary>
internal SysMsgHandler? NoInlineCallbackStatsz(SysMsgHandler cb) =>
NoInlineCallbackRecvQSelect(cb, RecvQStatsz);
/// <summary>
/// Core wrapper implementation. Returns a handler that pushes messages onto
/// the selected internal receive queue rather than executing inline.
/// Mirrors Go <c>(s *Server) noInlineCallbackRecvQSelect</c> in server/events.go.
/// </summary>
internal SysMsgHandler? NoInlineCallbackRecvQSelect(SysMsgHandler cb, int recvQSelect)
{
_mu.EnterReadLock();
if (!EventsEnabledLocked())
{
_mu.ExitReadLock();
return null;
}
IpQueue<InSysMsg> recvq = recvQSelect == RecvQStatsz
? (_sys!.RecvQueuePriority ?? _sys.RecvQueue!)
: _sys!.RecvQueue!;
_mu.ExitReadLock();
return (sub, c, acc, subj, rply, hdr, msg) =>
{
var hdrCopy = hdr is { Length: > 0 } ? (byte[])hdr.Clone() : [];
var msgCopy = msg is { Length: > 0 } ? (byte[])msg.Clone() : [];
recvq.Push(new InSysMsg
{
Sub = sub,
Client = c,
Acc = acc,
Subject = subj,
Reply = rply,
Hdr = hdrCopy,
Msg = msgCopy,
Cb = cb,
});
};
}
// =========================================================================
// Group G: sysSubscribe / sysSubscribeQ / sysSubscribeInternal / systemSubscribe
// Mirrors Go variants in server/events.go.
// =========================================================================
/// <summary>
/// Creates an internal subscription on the system account.
/// Mirrors Go <c>(s *Server) sysSubscribe</c> in server/events.go.
/// </summary>
internal (Subscription? sub, Exception? error) SysSubscribe(string subject, SysMsgHandler? cb) =>
SystemSubscribe(subject, string.Empty, false, null, cb);
/// <summary>
/// Creates an internal subscription with a queue group on the system account.
/// Mirrors Go <c>(s *Server) sysSubscribeQ</c> in server/events.go.
/// </summary>
internal (Subscription? sub, Exception? error) SysSubscribeQ(string subject, string queue, SysMsgHandler? cb) =>
SystemSubscribe(subject, queue, false, null, cb);
/// <summary>
/// Creates an internal subscription that does NOT forward interest to routes/gateways.
/// Mirrors Go <c>(s *Server) sysSubscribeInternal</c> in server/events.go.
/// </summary>
internal (Subscription? sub, Exception? error) SysSubscribeInternal(string subject, SysMsgHandler? cb) =>
SystemSubscribe(subject, string.Empty, true, null, cb);
/// <summary>
/// Core subscription implementation used by all <c>sysSubscribe*</c> helpers.
/// Creates a subscription on the system account's internal client.
/// Mirrors Go <c>(s *Server) systemSubscribe</c> in server/events.go.
/// </summary>
internal (Subscription? sub, Exception? error) SystemSubscribe(
string subject,
string queue,
bool internalOnly,
ClientConnection? client,
SysMsgHandler? cb)
{
_mu.EnterWriteLock();
if (!EventsEnabledLocked())
{
_mu.ExitWriteLock();
return (null, ServerErrors.ErrNoSysAccount);
}
if (cb is null)
{
_mu.ExitWriteLock();
return (null, new ArgumentNullException(nameof(cb), "undefined message handler"));
}
var c = client ?? _sys!.Client!;
_sys!.Sid++;
var sid = _sys.Sid.ToString();
_mu.ExitWriteLock();
var subBytes = Encoding.ASCII.GetBytes(subject);
var sidBytes = Encoding.ASCII.GetBytes(sid);
byte[]? qBytes = string.IsNullOrEmpty(queue) ? null : Encoding.ASCII.GetBytes(queue);
// Map SysMsgHandler → the internal MsgHandler/subscription mechanism.
// The callback receives the raw message bytes split into hdr+msg.
SysMsgHandler capturedCb = cb;
var (sub, err) = c.ProcessSubEx(subBytes, qBytes, sidBytes, internalOnly, false, false);
if (err is not null)
return (null, err);
if (sub is not null)
{
// Attach the callback to the subscription via the InSysMsg dispatch.
// Store the callback in the subscription so the receive queue can call it.
sub.SysMsgCb = capturedCb;
}
return (sub, null);
}
// =========================================================================
// Group G: sysUnsubscribe
// Mirrors Go <c>(s *Server) sysUnsubscribe</c> in server/events.go.
// =========================================================================
/// <summary>
/// Unsubscribes from a system subject by removing the given subscription.
/// Mirrors Go <c>(s *Server) sysUnsubscribe</c> in server/events.go.
/// </summary>
internal void SysUnsubscribe(Subscription? sub)
{
if (sub is null) return;
_mu.EnterReadLock();
if (!EventsEnabledLocked())
{
_mu.ExitReadLock();
return;
}
_mu.ExitReadLock();
// System subscriptions are always owned by the system client (_sys.Client).
// Retrieve it under a read lock, then unsubscribe outside the lock.
_mu.EnterReadLock();
var sysClient = _sys?.Client;
_mu.ExitReadLock();
if (sub.Sid is not null && sysClient is not null)
sysClient.RemoveSubBySid(sub.Sid);
}
// =========================================================================
// Group G: inboxReply
// Mirrors Go <c>(s *Server) inboxReply</c> in server/events.go.
// =========================================================================
/// <summary>
/// Handles inbox replies without propagating supercluster-wide interest.
/// Dispatches to the registered reply handler if one exists for this subject.
/// Mirrors Go <c>(s *Server) inboxReply</c> in server/events.go.
/// </summary>
internal void InboxReply(
Subscription sub,
NatsClient c,
Account acc,
string subject,
string reply,
byte[] hdr,
byte[] msg)
{
_mu.EnterReadLock();
if (!EventsEnabledLocked() || _sys!.Replies.Count == 0)
{
_mu.ExitReadLock();
return;
}
_sys.Replies.TryGetValue(subject, out var handler);
_mu.ExitReadLock();
handler?.Invoke(sub, c, acc, subject, reply, hdr, msg);
}
// =========================================================================
// Group G: newRespInbox
// Mirrors Go <c>(s *Server) newRespInbox() string</c> in server/events.go.
// =========================================================================
/// <summary>
/// Generates a new unique response inbox subject using this server's hash prefix.
/// Format: $SYS._INBOX.{hash}.{8-char-random}.
/// Mirrors Go <c>(s *Server) newRespInbox() string</c> in server/events.go.
/// </summary>
internal string NewRespInbox()
{
const string digits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
const int base62 = 62;
const int suffixLen = EventHelpers.ReplySuffixLen;
var inboxPre = _sys?.InboxPrefix ?? string.Empty;
// Strip the trailing '*' from $SYS._INBOX.{hash}.* → use as prefix
var prefix = inboxPre.EndsWith(".*", StringComparison.Ordinal)
? inboxPre[..^1] // strip '*', keep the dot
: inboxPre + ".";
var suffix = new char[suffixLen];
var rng = (long)Random.Shared.NextInt64();
if (rng < 0) rng = -rng;
for (var i = 0; i < suffixLen; i++)
{
suffix[i] = digits[(int)(rng % base62)];
rng /= base62;
}
return prefix + new string(suffix);
}
// =========================================================================
// Group G: wrapChk
// Mirrors Go <c>(s *Server) wrapChk</c> in server/events.go.
// =========================================================================
/// <summary>
/// Returns a wrapper function that acquires the server write lock, checks that
/// events are enabled, invokes <paramref name="f"/>, then releases the lock.
/// Mirrors Go <c>(s *Server) wrapChk(f func()) func()</c> in server/events.go.
/// </summary>
internal Action WrapChk(Action f)
{
return () =>
{
_mu.EnterWriteLock();
try
{
if (!EventsEnabledLocked()) return;
f();
}
finally
{
_mu.ExitWriteLock();
}
};
}
}
// ============================================================================
// EventFilterOptions — common filter options for system event requests
// Mirrors Go <c>EventFilterOptions</c> in server/events.go.
// ============================================================================
/// <summary>
/// Filter criteria applied to system event requests (STATSZ, VARZ, CONNZ, etc.).
/// Mirrors Go <c>EventFilterOptions</c> in server/events.go.
/// </summary>
public sealed class EventFilterOptions
{
/// <summary>Filter by server name (substring unless ExactMatch).</summary>
public string Name { get; set; } = string.Empty;
/// <summary>Filter by cluster name (substring unless ExactMatch).</summary>
public string Cluster { get; set; } = string.Empty;
/// <summary>Filter by host (substring unless ExactMatch).</summary>
public string Host { get; set; } = string.Empty;
/// <summary>When true, use exact equality instead of substring matching.</summary>
public bool ExactMatch { get; set; }
/// <summary>Filter by tags (all must match).</summary>
public List<string> Tags { get; set; } = [];
/// <summary>Filter by JetStream domain.</summary>
public string Domain { get; set; } = string.Empty;
}
// ============================================================================
// ServerApiResponse — standard API response envelope
// Mirrors Go <c>ServerAPIResponse</c> in server/events.go.
// ============================================================================
/// <summary>
/// Response envelope used by system API endpoints (varz, connz, statsz, etc.).
/// Mirrors Go <c>ServerAPIResponse</c> in server/events.go.
/// </summary>
public sealed class ServerApiResponse
{
public ServerInfo? Server { get; set; }
public object? Data { get; set; }
public ApiError? Error { get; set; }
/// <summary>Internal: compression type for this response. Not serialised.</summary>
internal int Compress { get; set; }
}
/// <summary>
/// API error returned in <see cref="ServerApiResponse.Error"/>.
/// Mirrors Go <c>ApiError</c> in server/events.go.
/// </summary>
public sealed class ApiError
{
public int Code { get; set; }
public string Description { get; set; } = string.Empty;
}