feat(batch44): implement events core dispatch — Groups A-G from events.go
Port 46 deferred features from server/events.go covering the internal event system foundation: - Group A (EventHelpers.cs): NewPubMsg/pool, RouteStat, GetHash/GetHashSize, GetAcceptEncoding, RemoteLatencySubjectForResponse, TotalSubs, AccForClient, IssuerForClient, ClearTimer; CompressionType enum - Group B (NatsServerTypes.cs): ServerInfo advisory fields (Seq, Time, Capabilities, Tags, Metadata) + capability methods (SetJetStreamEnabled, IsJetStreamEnabled, SetBinaryStreamSnapshot, IsBinaryStreamSnapshot, SetAccountNrg, IsAccountNrg) - Group C (ClientTypes.cs): ForAssignmentSnap, ForProposal, ForAdvisory on ClientInfo - Group D (EventTypes.cs): PubMsg.ReturnToPool; PubMsg.Client typed as ClientConnection?; InternalState.Client typed as ClientConnection?; InternalState.Seq changed to long field for Interlocked.Increment - Group E (ClientConnection.Events.cs): SendInternalMsg delegates to server - Group F (Account.Events.cs): AccountTrafficStats/Set + Account.Statz() - Group G (NatsServer.Events.cs): InternalReceiveLoop, InternalSendLoop, SendShutdownEvent, SendInternalAccountSysMsg, SendInternalMsgLocked, SendInternalMsg, SendInternalMsgFromClient, SendInternalResponse, EventsRunning, EventsEnabled, Node, InitEventTracking, FilterRequest, NoInlineCallback*, SysSubscribe*, SystemSubscribe, SysUnsubscribe, InboxReply, NewRespInbox, WrapChk; EventFilterOptions, ServerApiResponse, ApiError types - Subscription.SysMsgCb field added for system subscription dispatch
This commit is contained in:
143
dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.Events.cs
Normal file
143
dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.Events.cs
Normal file
@@ -0,0 +1,143 @@
|
||||
// Copyright 2018-2026 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Adapted from server/accounts.go (statz) and server/events.go in the NATS server Go source.
|
||||
|
||||
namespace ZB.MOM.NatsNet.Server;
|
||||
|
||||
// ============================================================================
|
||||
// AccountTrafficStats — per-kind traffic counters for an account
|
||||
// Mirrors the anonymous embedded `stats` struct in Go's Account.stats.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Per-kind (total, gateway, route, leaf) message and byte traffic counters
|
||||
/// for an account. Protected by the owning <see cref="AccountTrafficStatSet"/> lock.
|
||||
/// Mirrors Go <c>stats</c> struct (the small one embedded in Account.stats).
|
||||
/// </summary>
|
||||
internal sealed class AccountTrafficStats
|
||||
{
|
||||
public long InMsgs;
|
||||
public long OutMsgs;
|
||||
public long InBytes;
|
||||
public long OutBytes;
|
||||
public long SlowConsumers;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Aggregated traffic statistics for an account, including breakdowns by
|
||||
/// gateway, route, and leaf-node traffic.
|
||||
/// Mirrors Go's embedded anonymous <c>stats struct</c> in <c>Account</c>.
|
||||
/// </summary>
|
||||
internal sealed class AccountTrafficStatSet
|
||||
{
|
||||
private readonly object _mu = new();
|
||||
|
||||
public AccountTrafficStats Total { get; } = new();
|
||||
public AccountTrafficStats Gateways { get; } = new();
|
||||
public AccountTrafficStats Routes { get; } = new();
|
||||
public AccountTrafficStats Leafs { get; } = new();
|
||||
|
||||
public void Lock() => Monitor.Enter(_mu);
|
||||
public void Unlock() => Monitor.Exit(_mu);
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Account partial — events / statz
|
||||
// ============================================================================
|
||||
|
||||
public sealed partial class Account
|
||||
{
|
||||
// -------------------------------------------------------------------------
|
||||
// Traffic statistics (mirrors Go Account.stats embedded struct)
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Aggregated account traffic statistics.
|
||||
/// Mirrors Go's <c>stats struct { sync.Mutex; stats; gw stats; rt stats; ln stats }</c>
|
||||
/// embedded in <c>Account</c>.
|
||||
/// </summary>
|
||||
internal readonly AccountTrafficStatSet TrafficStats = new();
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Group F: Account.statz
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Computes and returns a snapshot of the account's current statistics.
|
||||
/// Lock (account _mu write lock) should be held on entry.
|
||||
/// Mirrors Go <c>(a *Account) statz() *AccountStat</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal AccountStat Statz()
|
||||
{
|
||||
var localConns = NumLocalConnectionsLocked();
|
||||
var leafConns = NumLocalLeafNodes();
|
||||
|
||||
TrafficStats.Lock();
|
||||
var received = new DataStats
|
||||
{
|
||||
Msgs = TrafficStats.Total.InMsgs,
|
||||
Bytes = TrafficStats.Total.InBytes,
|
||||
Gateways = new MsgBytes
|
||||
{
|
||||
Msgs = TrafficStats.Gateways.InMsgs,
|
||||
Bytes = TrafficStats.Gateways.InBytes,
|
||||
},
|
||||
Routes = new MsgBytes
|
||||
{
|
||||
Msgs = TrafficStats.Routes.InMsgs,
|
||||
Bytes = TrafficStats.Routes.InBytes,
|
||||
},
|
||||
Leafs = new MsgBytes
|
||||
{
|
||||
Msgs = TrafficStats.Leafs.InMsgs,
|
||||
Bytes = TrafficStats.Leafs.InBytes,
|
||||
},
|
||||
};
|
||||
var sent = new DataStats
|
||||
{
|
||||
Msgs = TrafficStats.Total.OutMsgs,
|
||||
Bytes = TrafficStats.Total.OutBytes,
|
||||
Gateways = new MsgBytes
|
||||
{
|
||||
Msgs = TrafficStats.Gateways.OutMsgs,
|
||||
Bytes = TrafficStats.Gateways.OutBytes,
|
||||
},
|
||||
Routes = new MsgBytes
|
||||
{
|
||||
Msgs = TrafficStats.Routes.OutMsgs,
|
||||
Bytes = TrafficStats.Routes.OutBytes,
|
||||
},
|
||||
Leafs = new MsgBytes
|
||||
{
|
||||
Msgs = TrafficStats.Leafs.OutMsgs,
|
||||
Bytes = TrafficStats.Leafs.OutBytes,
|
||||
},
|
||||
};
|
||||
var slowConsumers = TrafficStats.Total.SlowConsumers;
|
||||
TrafficStats.Unlock();
|
||||
|
||||
return new AccountStat
|
||||
{
|
||||
Account = Name,
|
||||
Name = GetNameTagLocked(),
|
||||
Conns = localConns,
|
||||
LeafNodes = leafConns,
|
||||
TotalConns = localConns + leafConns,
|
||||
NumSubs = Sublist?.Count() ?? 0,
|
||||
Received = received,
|
||||
Sent = sent,
|
||||
SlowConsumers = slowConsumers,
|
||||
};
|
||||
}
|
||||
}
|
||||
40
dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Events.cs
Normal file
40
dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Events.cs
Normal file
@@ -0,0 +1,40 @@
|
||||
// Copyright 2012-2026 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Adapted from server/events.go (client.sendInternalMsg) in the NATS server Go source.
|
||||
|
||||
namespace ZB.MOM.NatsNet.Server;
|
||||
|
||||
public sealed partial class ClientConnection
|
||||
{
|
||||
// =========================================================================
|
||||
// Group E: sendInternalMsg from a client context
|
||||
// =========================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Sends an internal message from this client's context, avoiding no-echo issues
|
||||
/// when the message originates from a non-system client.
|
||||
/// Mirrors Go <c>(c *client) sendInternalMsg(...)</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal void SendInternalMsg(
|
||||
string subject,
|
||||
string reply,
|
||||
ServerInfo? si,
|
||||
object? msg)
|
||||
{
|
||||
var srv = Server as NatsServer;
|
||||
if (srv is null) return;
|
||||
|
||||
srv.SendInternalMsgFromClient(this, subject, reply, si, msg);
|
||||
}
|
||||
}
|
||||
@@ -329,6 +329,53 @@ public sealed class ClientInfo
|
||||
/// </summary>
|
||||
public string ServiceAccount() =>
|
||||
string.IsNullOrWhiteSpace(ServiceName) ? Account : ServiceName;
|
||||
|
||||
// =========================================================================
|
||||
// Group C: ClientInfo projection methods
|
||||
// Mirrors Go methods on *ClientInfo in server/events.go.
|
||||
// =========================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Returns the minimum amount of ClientInfo needed for assignment snapshots
|
||||
/// (account, service, cluster only).
|
||||
/// Mirrors Go <c>(ci *ClientInfo) forAssignmentSnap() *ClientInfo</c> in server/events.go.
|
||||
/// </summary>
|
||||
public ClientInfo ForAssignmentSnap() => new()
|
||||
{
|
||||
Account = Account,
|
||||
ServiceName = ServiceName,
|
||||
Cluster = Cluster,
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// Returns a copy of this ClientInfo with JWT and issuer key stripped,
|
||||
/// suitable for raft proposals.
|
||||
/// Mirrors Go <c>(ci *ClientInfo) forProposal() *ClientInfo</c> in server/events.go.
|
||||
/// </summary>
|
||||
public ClientInfo? ForProposal()
|
||||
{
|
||||
var copy = (ClientInfo)MemberwiseClone();
|
||||
copy.Jwt = string.Empty;
|
||||
copy.IssuerKey = string.Empty;
|
||||
// Deep-copy the Tags list to avoid sharing.
|
||||
copy.Tags = [..Tags];
|
||||
copy.Alternates = [..Alternates];
|
||||
return copy;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns a copy of this ClientInfo with JWT stripped and Alternates nulled,
|
||||
/// suitable for JetStream advisory events.
|
||||
/// Mirrors Go <c>(ci *ClientInfo) forAdvisory() *ClientInfo</c> in server/events.go.
|
||||
/// </summary>
|
||||
public ClientInfo? ForAdvisory()
|
||||
{
|
||||
var copy = (ClientInfo)MemberwiseClone();
|
||||
copy.Jwt = string.Empty;
|
||||
copy.Alternates = [];
|
||||
copy.Tags = [..Tags];
|
||||
return copy;
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
|
||||
278
dotnet/src/ZB.MOM.NatsNet.Server/Events/EventHelpers.cs
Normal file
278
dotnet/src/ZB.MOM.NatsNet.Server/Events/EventHelpers.cs
Normal file
@@ -0,0 +1,278 @@
|
||||
// Copyright 2018-2026 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Adapted from server/events.go in the NATS server Go source.
|
||||
|
||||
using System.Collections.Concurrent;
|
||||
using System.Security.Cryptography;
|
||||
using System.Text;
|
||||
using ZB.MOM.NatsNet.Server.Internal;
|
||||
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
|
||||
|
||||
namespace ZB.MOM.NatsNet.Server;
|
||||
|
||||
/// <summary>
|
||||
/// Static helpers used by the event system.
|
||||
/// Mirrors various package-level helper functions in server/events.go.
|
||||
/// </summary>
|
||||
internal static class EventHelpers
|
||||
{
|
||||
// =========================================================================
|
||||
// Base-62 alphabet used by NATS hash functions.
|
||||
// Mirrors Go package-level const in server/accounts.go.
|
||||
// =========================================================================
|
||||
|
||||
private const string Digits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
|
||||
private const int Base = 62;
|
||||
|
||||
// =========================================================================
|
||||
// System hash length (mirrors Go sysHashLen = 8)
|
||||
// =========================================================================
|
||||
|
||||
internal const int SysHashLen = 8;
|
||||
|
||||
// =========================================================================
|
||||
// Inbox constants (mirrors Go InboxPrefix / related consts)
|
||||
// =========================================================================
|
||||
|
||||
internal const string InboxPrefix = "$SYS._INBOX.";
|
||||
internal const int InboxPrefixLen = 12; // len("$SYS._INBOX.")
|
||||
internal const int ReplySuffixLen = 8; // Gives us 62^8 unique values
|
||||
|
||||
// =========================================================================
|
||||
// PubMsg pool
|
||||
// =========================================================================
|
||||
|
||||
private static readonly ConcurrentBag<PubMsg> PubMsgPool = [];
|
||||
|
||||
/// <summary>
|
||||
/// Factory for an internally-queued outbound publish message, using a pool
|
||||
/// to amortise allocations.
|
||||
/// Mirrors Go <c>newPubMsg</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal static PubMsg NewPubMsg(
|
||||
ClientConnection? client,
|
||||
string subject,
|
||||
string reply,
|
||||
ServerInfo? si,
|
||||
byte[]? hdr,
|
||||
object? msg,
|
||||
int oct,
|
||||
bool echo,
|
||||
bool last)
|
||||
{
|
||||
if (!PubMsgPool.TryTake(out var pm))
|
||||
pm = new PubMsg();
|
||||
|
||||
pm.Client = client;
|
||||
pm.Subject = subject;
|
||||
pm.Reply = reply;
|
||||
pm.Si = si;
|
||||
pm.Hdr = hdr;
|
||||
pm.Msg = msg;
|
||||
pm.Oct = oct;
|
||||
pm.Echo = echo;
|
||||
pm.Last = last;
|
||||
return pm;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns a <see cref="PubMsg"/> to the pool after all fields have been cleared.
|
||||
/// Called by <see cref="PubMsg.ReturnToPool"/>.
|
||||
/// </summary>
|
||||
internal static void ReturnPubMsg(PubMsg pm) => PubMsgPool.Add(pm);
|
||||
|
||||
// =========================================================================
|
||||
// Group A: helpers
|
||||
// =========================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Generate a RouteStat for a statz update from a route client.
|
||||
/// Mirrors Go <c>routeStat</c> in server/events.go.
|
||||
/// Note: full in/out message counters require ClientConnection to carry those fields
|
||||
/// (added in a later batch). Until then, returns zero-valued counters.
|
||||
/// </summary>
|
||||
internal static RouteStat? RouteStat(ClientConnection? route)
|
||||
{
|
||||
if (route is null) return null;
|
||||
|
||||
lock (route)
|
||||
{
|
||||
var rs = new RouteStat
|
||||
{
|
||||
Id = route.Cid,
|
||||
Sent = new DataStats { Msgs = 0, Bytes = 0 },
|
||||
Received = new DataStats { Msgs = 0, Bytes = 0 },
|
||||
Pending = (int)(route.OutPb),
|
||||
};
|
||||
if (route.Route != null)
|
||||
rs.Name = route.Route.RemoteName;
|
||||
return rs;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Computes an 8-character base-62 hash of <paramref name="name"/>.
|
||||
/// Mirrors Go <c>getHash(name string) string</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal static string GetHash(string name) => GetHashSize(name, SysHashLen);
|
||||
|
||||
/// <summary>
|
||||
/// Computes a hash of <paramref name="size"/> characters for <paramref name="name"/>.
|
||||
/// Uses SHA-256, then maps the first <paramref name="size"/> bytes through the
|
||||
/// base-62 digit alphabet — identical to Go's <c>getHashSize</c>.
|
||||
/// Mirrors Go <c>getHashSize(name string, size int) string</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal static string GetHashSize(string name, int size)
|
||||
{
|
||||
var hash = SHA256.HashData(Encoding.UTF8.GetBytes(name));
|
||||
var result = new char[size];
|
||||
for (var i = 0; i < size; i++)
|
||||
result[i] = Digits[(int)(hash[i] % Base)];
|
||||
return new string(result);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Parses an Accept-Encoding header to determine which compression type
|
||||
/// the remote supports (snappy / gzip / none).
|
||||
/// Mirrors Go <c>getAcceptEncoding(hdr []byte) compressionType</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal static CompressionType GetAcceptEncoding(byte[]? hdr)
|
||||
{
|
||||
if (hdr is null || hdr.Length == 0) return CompressionType.None;
|
||||
|
||||
// Extract the Accept-Encoding header value.
|
||||
const string headerName = "Accept-Encoding";
|
||||
var ae = GetHeaderValue(headerName, hdr)?.ToLowerInvariant() ?? string.Empty;
|
||||
|
||||
if (string.IsNullOrEmpty(ae)) return CompressionType.None;
|
||||
if (ae.Contains("snappy") || ae.Contains("s2")) return CompressionType.Snappy;
|
||||
if (ae.Contains("gzip")) return CompressionType.Gzip;
|
||||
return CompressionType.Unsupported;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Generate the tracking subject for remote latency from a response subject.
|
||||
/// Mirrors Go <c>remoteLatencySubjectForResponse</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal static string RemoteLatencySubjectForResponse(ReadOnlySpan<byte> subject)
|
||||
{
|
||||
if (!Account.IsTrackedReply(subject)) return string.Empty;
|
||||
|
||||
// Split on '.' and take second-to-last token.
|
||||
var str = Encoding.ASCII.GetString(subject);
|
||||
var toks = str.Split('.');
|
||||
if (toks.Length < 2) return string.Empty;
|
||||
|
||||
return string.Format(SystemSubjects.RemoteLatencyEventSubj, toks[^2]);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Sum subscription counts from a sublist result for clients and hub leaf nodes.
|
||||
/// Mirrors Go <c>totalSubs</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal static int TotalSubs(SubscriptionIndexResult? rr, byte[]? queueGroup)
|
||||
{
|
||||
if (rr is null) return 0;
|
||||
|
||||
var count = 0;
|
||||
|
||||
bool Matches(Subscription sub)
|
||||
{
|
||||
if (queueGroup != null && !sub.Queue.AsSpan().SequenceEqual(queueGroup))
|
||||
return false;
|
||||
return sub.Client?.Kind == ClientKind.Client || (sub.Client?.IsHubLeafNode() ?? false);
|
||||
}
|
||||
|
||||
if (queueGroup is null)
|
||||
{
|
||||
foreach (var sub in rr.PSubs)
|
||||
if (Matches(sub)) count++;
|
||||
}
|
||||
|
||||
foreach (var qsubs in rr.QSubs)
|
||||
foreach (var sub in qsubs)
|
||||
if (Matches(sub)) count++;
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns the account name for a client (or "N/A" if none).
|
||||
/// Mirrors Go <c>accForClient(c *client) string</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal static string AccForClient(ClientConnection c)
|
||||
{
|
||||
var acc = c._account as Account;
|
||||
return acc?.Name ?? "N/A";
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns the issuer key for a client (empty if none).
|
||||
/// Mirrors Go <c>issuerForClient(c *client) string</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal static string IssuerForClient(ClientConnection c)
|
||||
{
|
||||
if (c.Perms is null) return string.Empty;
|
||||
// Stub: full user/signing-key lookup requires auth integration.
|
||||
return string.Empty;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Safely stops and nulls a timer reference.
|
||||
/// Mirrors Go <c>clearTimer(tp **time.Timer)</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal static void ClearTimer(ref System.Threading.Timer? timer)
|
||||
{
|
||||
var t = timer;
|
||||
timer = null;
|
||||
t?.Dispose();
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// Header parsing helper
|
||||
// =========================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Extracts the value of a named header from a NATS wire-format header block.
|
||||
/// </summary>
|
||||
private static string? GetHeaderValue(string headerName, byte[] hdr)
|
||||
{
|
||||
var text = Encoding.ASCII.GetString(hdr);
|
||||
var search = headerName + ":";
|
||||
var idx = text.IndexOf(search, StringComparison.OrdinalIgnoreCase);
|
||||
if (idx < 0) return null;
|
||||
|
||||
var start = idx + search.Length;
|
||||
var end = text.IndexOf('\r', start);
|
||||
if (end < 0) end = text.Length;
|
||||
|
||||
return text[start..end].Trim();
|
||||
}
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// CompressionType — mirrors Go compressionType enum in events.go
|
||||
// =========================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Compression type for internal message publishing.
|
||||
/// Mirrors Go <c>compressionType</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal enum CompressionType : int
|
||||
{
|
||||
None = 0,
|
||||
Gzip = 1,
|
||||
Snappy = 2,
|
||||
Unsupported = 3,
|
||||
}
|
||||
@@ -196,8 +196,8 @@ internal sealed class InternalState
|
||||
{
|
||||
// ---- identity / sequencing ----
|
||||
public Account? Account { get; set; }
|
||||
public NatsClient? Client { get; set; }
|
||||
public ulong Seq { get; set; }
|
||||
public ClientConnection? Client { get; set; }
|
||||
public long Seq; // accessed via Interlocked.Increment
|
||||
public int Sid { get; set; }
|
||||
|
||||
// ---- remote server tracking ----
|
||||
@@ -289,7 +289,7 @@ internal sealed class ServerUpdate
|
||||
/// </summary>
|
||||
internal sealed class PubMsg
|
||||
{
|
||||
public NatsClient? Client { get; set; }
|
||||
public ClientConnection? Client { get; set; }
|
||||
public string Subject { get; set; } = string.Empty;
|
||||
public string Reply { get; set; } = string.Empty;
|
||||
public ServerInfo? Si { get; set; }
|
||||
@@ -302,7 +302,24 @@ internal sealed class PubMsg
|
||||
public bool Echo { get; set; }
|
||||
public bool Last { get; set; }
|
||||
|
||||
// TODO: session 12 — add pool return helper (returnToPool).
|
||||
/// <summary>
|
||||
/// Clears all fields and returns this instance to the pool held in
|
||||
/// <see cref="EventHelpers.NewPubMsg"/>.
|
||||
/// Mirrors Go <c>(pm *pubMsg) returnToPool()</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal void ReturnToPool()
|
||||
{
|
||||
Client = null;
|
||||
Subject = string.Empty;
|
||||
Reply = string.Empty;
|
||||
Si = null;
|
||||
Hdr = null;
|
||||
Msg = null;
|
||||
Oct = 0;
|
||||
Echo = false;
|
||||
Last = false;
|
||||
EventHelpers.ReturnPubMsg(this);
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
|
||||
@@ -13,6 +13,8 @@
|
||||
//
|
||||
// Adapted from server/client.go (subscription struct) in the NATS server Go source.
|
||||
|
||||
using ZB.MOM.NatsNet.Server;
|
||||
|
||||
namespace ZB.MOM.NatsNet.Server.Internal;
|
||||
|
||||
/// <summary>
|
||||
@@ -40,6 +42,13 @@ public sealed class Subscription
|
||||
/// <summary>The client that owns this subscription. Null in test/stub scenarios.</summary>
|
||||
public NatsClient? Client { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// System message callback. Set on subscriptions created via <c>systemSubscribe</c>
|
||||
/// so that the internal receive queue can dispatch to the right handler.
|
||||
/// Mirrors Go <c>subscription.icb sysMsgHandler</c> in server/events.go.
|
||||
/// </summary>
|
||||
public SysMsgHandler? SysMsgCb { get; set; }
|
||||
|
||||
/// <summary>Marks this subscription as closed.</summary>
|
||||
public void Close() => Interlocked.Exchange(ref _closed, 1);
|
||||
|
||||
|
||||
949
dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Events.cs
Normal file
949
dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Events.cs
Normal file
@@ -0,0 +1,949 @@
|
||||
// Copyright 2018-2026 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Adapted from server/events.go in the NATS server Go source.
|
||||
// Batch 44: Events Core & Dispatch.
|
||||
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using ZB.MOM.NatsNet.Server.Internal;
|
||||
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
|
||||
|
||||
namespace ZB.MOM.NatsNet.Server;
|
||||
|
||||
public sealed partial class NatsServer
|
||||
{
|
||||
// =========================================================================
|
||||
// Constants (mirrors Go sysHashLen = 8, InboxPrefix consts)
|
||||
// =========================================================================
|
||||
|
||||
/// <summary>Length of the response-inbox prefix used for this server's internal replies.</summary>
|
||||
private int RespInboxPrefixLen => EventHelpers.InboxPrefixLen + EventHelpers.SysHashLen + 1;
|
||||
|
||||
// =========================================================================
|
||||
// Group G: internalReceiveLoop
|
||||
// Mirrors Go <c>(s *Server) internalReceiveLoop</c> in server/events.go.
|
||||
// =========================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Background loop that dispatches all messages the server needs to process
|
||||
/// internally via system subscriptions (e.g. internal subs).
|
||||
/// Mirrors Go <c>(s *Server) internalReceiveLoop</c> in server/events.go.
|
||||
/// </summary>
|
||||
private void InternalReceiveLoop(IpQueue<InSysMsg> recvq)
|
||||
{
|
||||
while (EventsRunning())
|
||||
{
|
||||
// Wait for a notification that items are ready.
|
||||
if (!recvq.Ch.WaitToReadAsync(_quitCts.Token).AsTask()
|
||||
.GetAwaiter()
|
||||
.GetResult())
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var msgs = recvq.Pop();
|
||||
if (msgs is not null)
|
||||
{
|
||||
foreach (var m in msgs)
|
||||
{
|
||||
m.Cb?.Invoke(m.Sub, m.Client, m.Acc, m.Subject, m.Reply, m.Hdr, m.Msg);
|
||||
}
|
||||
recvq.Recycle(msgs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// Group G: internalSendLoop
|
||||
// Mirrors Go <c>(s *Server) internalSendLoop</c> in server/events.go.
|
||||
// =========================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Background loop that serialises and dispatches all messages the server
|
||||
/// wants to send through the system account. Runs as a long-lived goroutine.
|
||||
/// Mirrors Go <c>(s *Server) internalSendLoop</c> in server/events.go.
|
||||
/// </summary>
|
||||
private void InternalSendLoop()
|
||||
{
|
||||
// Read snapshot of send queue and system client under the read lock.
|
||||
// Mirrors Go's RESET: label pattern.
|
||||
RESET:
|
||||
_mu.EnterReadLock();
|
||||
if (_sys is null || _sys.SendQueue is null)
|
||||
{
|
||||
_mu.ExitReadLock();
|
||||
return;
|
||||
}
|
||||
var sysc = _sys.Client;
|
||||
var resetCh = _sys.ResetChannel;
|
||||
var sendq = _sys.SendQueue;
|
||||
var id = _info.Id;
|
||||
var host = _info.Host;
|
||||
var svrName = _info.Name;
|
||||
var domain = _info.Domain ?? string.Empty;
|
||||
var seqRef = _sys; // holds ref so we can atomically increment Seq
|
||||
var jsEnabled = _info.JetStream;
|
||||
var cluster = _info.Cluster ?? string.Empty;
|
||||
if (_gateway.Enabled)
|
||||
cluster = GetGatewayName();
|
||||
_mu.ExitReadLock();
|
||||
|
||||
var opts = GetOpts();
|
||||
var tags = opts.Tags;
|
||||
var metadata = opts.Metadata;
|
||||
|
||||
while (EventsRunning())
|
||||
{
|
||||
// Wait for items in the send queue OR a reset signal OR quit.
|
||||
var sendTask = sendq.Ch.WaitToReadAsync(_quitCts.Token).AsTask();
|
||||
var resetTask = resetCh is not null
|
||||
? resetCh.Reader.WaitToReadAsync(_quitCts.Token).AsTask()
|
||||
: Task.FromResult(false);
|
||||
|
||||
var completed = Task.WhenAny(sendTask, resetTask, Task.Delay(Timeout.Infinite, _quitCts.Token))
|
||||
.GetAwaiter().GetResult();
|
||||
|
||||
if (_quitCts.IsCancellationRequested)
|
||||
return;
|
||||
|
||||
// If reset channel fired, re-read the snapshot.
|
||||
if (completed == resetTask && resetTask.IsCompletedSuccessfully && resetTask.Result)
|
||||
{
|
||||
resetCh?.Reader.TryRead(out _);
|
||||
goto RESET;
|
||||
}
|
||||
|
||||
if (!sendTask.IsCompletedSuccessfully || !sendTask.Result)
|
||||
continue;
|
||||
|
||||
var msgs = sendq.Pop();
|
||||
if (msgs is null) continue;
|
||||
|
||||
foreach (var pm in msgs)
|
||||
{
|
||||
// Stamp ServerInfo advisory fields if requested.
|
||||
if (pm.Si is { } si)
|
||||
{
|
||||
si.Name = svrName;
|
||||
si.Domain = domain;
|
||||
si.Host = host;
|
||||
si.Cluster = cluster;
|
||||
si.Id = id;
|
||||
si.Seq = (ulong)Interlocked.Increment(ref seqRef.Seq);
|
||||
si.Version = ServerConstants.Version;
|
||||
si.Time = DateTime.UtcNow;
|
||||
si.Tags = tags.Count > 0 ? [..tags] : null;
|
||||
si.Metadata = metadata.Count > 0 ? new Dictionary<string, string>(metadata) : null;
|
||||
si.Capabilities = 0;
|
||||
if (jsEnabled)
|
||||
{
|
||||
si.SetJetStreamEnabled();
|
||||
si.SetBinaryStreamSnapshot();
|
||||
// AccountNRG: stub, not yet tracked
|
||||
}
|
||||
}
|
||||
|
||||
// Serialise payload.
|
||||
byte[] body = [];
|
||||
if (pm.Msg is not null)
|
||||
{
|
||||
body = pm.Msg switch
|
||||
{
|
||||
string s => Encoding.UTF8.GetBytes(s),
|
||||
byte[] b => b,
|
||||
_ => JsonSerializer.SerializeToUtf8Bytes(pm.Msg),
|
||||
};
|
||||
}
|
||||
|
||||
// Choose client.
|
||||
var c = pm.Client ?? sysc;
|
||||
if (c is null) { pm.ReturnToPool(); continue; }
|
||||
|
||||
// Process the publish inline.
|
||||
lock (c)
|
||||
{
|
||||
c.ParseCtx.Pa.Subject = Encoding.ASCII.GetBytes(pm.Subject);
|
||||
c.ParseCtx.Pa.Reply = Encoding.ASCII.GetBytes(pm.Reply);
|
||||
}
|
||||
|
||||
// Append CRLF.
|
||||
var payload = new byte[body.Length + 2];
|
||||
Buffer.BlockCopy(body, 0, payload, 0, body.Length);
|
||||
payload[^2] = (byte)'\r';
|
||||
payload[^1] = (byte)'\n';
|
||||
|
||||
c.ProcessInboundClientMsg(payload);
|
||||
|
||||
if (pm.Last)
|
||||
{
|
||||
// Final message (shutdown): flush in-place and exit.
|
||||
c.FlushClients(long.MaxValue);
|
||||
sendq.Recycle(msgs);
|
||||
pm.ReturnToPool();
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
c.FlushClients(0);
|
||||
}
|
||||
|
||||
pm.ReturnToPool();
|
||||
}
|
||||
sendq.Recycle(msgs);
|
||||
}
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// Group G: sendShutdownEvent
|
||||
// Mirrors Go <c>(s *Server) sendShutdownEvent</c> in server/events.go.
|
||||
// =========================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Queues the server shutdown event. Clears the send queue and reply
|
||||
/// handlers so no further messages will be dispatched.
|
||||
/// Mirrors Go <c>(s *Server) sendShutdownEvent</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal void SendShutdownEvent()
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
if (_sys is null || _sys.SendQueue is null) return;
|
||||
|
||||
var subject = string.Format(SystemSubjects.ShutdownEventSubj, _info.Id);
|
||||
var sendq = _sys.SendQueue;
|
||||
|
||||
// Stop any more messages from queuing.
|
||||
_sys.SendQueue = null;
|
||||
// Unhook all reply handlers.
|
||||
_sys.Replies.Clear();
|
||||
|
||||
var si = new ServerInfo();
|
||||
sendq.Push(EventHelpers.NewPubMsg(null, subject, string.Empty, si, null, si,
|
||||
(int)CompressionType.None, false, true));
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// Group G: sendInternalAccountSysMsg
|
||||
// Mirrors Go <c>(s *Server) sendInternalAccountSysMsg</c> in server/events.go.
|
||||
// =========================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Sends an internal system message to a specific account using that account's
|
||||
/// internal client. Acquires only the minimum needed locks.
|
||||
/// Mirrors Go <c>(s *Server) sendInternalAccountSysMsg</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal void SendInternalAccountSysMsg(
|
||||
Account? account,
|
||||
string subject,
|
||||
ServerInfo? si,
|
||||
object? msg,
|
||||
int compressionType = (int)CompressionType.None)
|
||||
{
|
||||
_mu.EnterReadLock();
|
||||
if (_sys is null || _sys.SendQueue is null || account is null)
|
||||
{
|
||||
_mu.ExitReadLock();
|
||||
return;
|
||||
}
|
||||
var sendq = _sys.SendQueue;
|
||||
_mu.ExitReadLock();
|
||||
|
||||
ClientConnection? c;
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
c = account.InternalAccountClient();
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
|
||||
sendq.Push(EventHelpers.NewPubMsg(
|
||||
c, subject, string.Empty, si, null, msg, compressionType, false, false));
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// Group G: sendInternalMsgLocked / sendInternalMsg
|
||||
// Mirrors Go <c>(s *Server) sendInternalMsgLocked</c> and
|
||||
// <c>(s *Server) sendInternalMsg</c> in server/events.go.
|
||||
// =========================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Queues an internal message, acquiring the read lock.
|
||||
/// Mirrors Go <c>(s *Server) sendInternalMsgLocked</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal void SendInternalMsgLocked(
|
||||
string subject,
|
||||
string reply,
|
||||
ServerInfo? si,
|
||||
object? msg)
|
||||
{
|
||||
_mu.EnterReadLock();
|
||||
try { SendInternalMsg(subject, reply, si, msg); }
|
||||
finally { _mu.ExitReadLock(); }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Queues an internal message. Lock must already be held.
|
||||
/// Mirrors Go <c>(s *Server) sendInternalMsg</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal void SendInternalMsg(
|
||||
string subject,
|
||||
string reply,
|
||||
ServerInfo? si,
|
||||
object? msg)
|
||||
{
|
||||
if (_sys is null || _sys.SendQueue is null) return;
|
||||
_sys.SendQueue.Push(EventHelpers.NewPubMsg(
|
||||
null, subject, reply, si, null, msg, (int)CompressionType.None, false, false));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Queues an internal message from a specific client context.
|
||||
/// Called by <see cref="ClientConnection.SendInternalMsg"/>.
|
||||
/// Mirrors Go <c>(c *client) sendInternalMsg(...)</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal void SendInternalMsgFromClient(
|
||||
ClientConnection client,
|
||||
string subject,
|
||||
string reply,
|
||||
ServerInfo? si,
|
||||
object? msg)
|
||||
{
|
||||
_mu.EnterReadLock();
|
||||
try
|
||||
{
|
||||
if (_sys is null || _sys.SendQueue is null) return;
|
||||
_sys.SendQueue.Push(EventHelpers.NewPubMsg(
|
||||
client, subject, reply, si, null, msg, (int)CompressionType.None, false, false));
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitReadLock();
|
||||
}
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// Group G: sendInternalResponse
|
||||
// Mirrors Go <c>(s *Server) sendInternalResponse</c> in server/events.go.
|
||||
// =========================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Sends a response to an internal server API request.
|
||||
/// Mirrors Go <c>(s *Server) sendInternalResponse</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal void SendInternalResponse(string subject, ServerApiResponse response)
|
||||
{
|
||||
_mu.EnterReadLock();
|
||||
try
|
||||
{
|
||||
if (_sys is null || _sys.SendQueue is null) return;
|
||||
_sys.SendQueue.Push(EventHelpers.NewPubMsg(
|
||||
null, subject, string.Empty, response.Server, null,
|
||||
response, response.Compress, false, false));
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitReadLock();
|
||||
}
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// Group G: eventsRunning / EventsEnabled / eventsEnabled
|
||||
// Mirrors Go counterparts in server/events.go.
|
||||
// =========================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Returns true if the events system is running (server is running and events enabled).
|
||||
/// Acquires the read lock internally.
|
||||
/// Mirrors Go <c>(s *Server) eventsRunning() bool</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal bool EventsRunning()
|
||||
{
|
||||
_mu.EnterReadLock();
|
||||
try { return IsRunning() && EventsEnabledLocked(); }
|
||||
finally { _mu.ExitReadLock(); }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns true if the server has the events system enabled via a system account.
|
||||
/// Acquires the read lock internally.
|
||||
/// Mirrors Go <c>(s *Server) EventsEnabled() bool</c> in server/events.go.
|
||||
/// </summary>
|
||||
public bool EventsEnabled()
|
||||
{
|
||||
_mu.EnterReadLock();
|
||||
try { return EventsEnabledLocked(); }
|
||||
finally { _mu.ExitReadLock(); }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns true if events are enabled. Lock must already be held.
|
||||
/// Mirrors Go <c>(s *Server) eventsEnabled() bool</c> in server/events.go.
|
||||
/// </summary>
|
||||
private bool EventsEnabledLocked() =>
|
||||
_sys is not null && _sys.Client is not null && _sys.Account is not null;
|
||||
|
||||
// =========================================================================
|
||||
// Group G: Node
|
||||
// Mirrors Go <c>(s *Server) Node() string</c> in server/events.go.
|
||||
// =========================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Returns the stable node hash (short hash of server name) used for JetStream
|
||||
/// cluster targeting. Empty string if events not initialised.
|
||||
/// Mirrors Go <c>(s *Server) Node() string</c> in server/events.go.
|
||||
/// </summary>
|
||||
public string Node()
|
||||
{
|
||||
_mu.EnterReadLock();
|
||||
try { return _sys?.ShortHash ?? string.Empty; }
|
||||
finally { _mu.ExitReadLock(); }
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// Group G: initEventTracking
|
||||
// Mirrors Go <c>(s *Server) initEventTracking()</c> in server/events.go.
|
||||
// =========================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Initialises the server-wide system subscription infrastructure: sets the
|
||||
/// server's hash, subscribes to all internal system subjects, and starts the
|
||||
/// send/receive loops.
|
||||
/// Mirrors Go <c>(s *Server) initEventTracking()</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal void InitEventTracking()
|
||||
{
|
||||
// Capture sys outside any lock to avoid deadlock.
|
||||
_mu.EnterReadLock();
|
||||
var sys = _sys;
|
||||
_mu.ExitReadLock();
|
||||
|
||||
if (sys is null || sys.Client is null || sys.Account is null) return;
|
||||
|
||||
// Compute and store the server's short hash.
|
||||
sys.ShortHash = EventHelpers.GetHash(_info.Name);
|
||||
|
||||
// All-inbox subscription: $SYS._INBOX.<hash>.*
|
||||
var inboxSubject = string.Format(SystemSubjects.InboxRespSubj, sys.ShortHash, "*");
|
||||
if (SysSubscribe(inboxSubject, InboxReply) is { error: not null } inboxResult)
|
||||
{
|
||||
Errorf("Error setting up internal tracking: {0}", inboxResult.error);
|
||||
return;
|
||||
}
|
||||
sys.InboxPrefix = inboxSubject;
|
||||
|
||||
// Remote connections update (old-style subject).
|
||||
var accConnsOld = string.Format(SystemSubjects.AccConnsEventSubjOld, "*");
|
||||
if (SysSubscribe(accConnsOld, NoInlineCallback(RemoteConnsUpdateStub)) is { error: not null } r1)
|
||||
{
|
||||
Errorf("Error setting up internal tracking for {0}: {1}", accConnsOld, r1.error);
|
||||
return;
|
||||
}
|
||||
|
||||
// Connection responses for this server's ID.
|
||||
var connsResp = string.Format(SystemSubjects.ConnsRespSubj, _info.Id);
|
||||
if (SysSubscribe(connsResp, NoInlineCallback(RemoteConnsUpdateStub)) is { error: not null } r2)
|
||||
{
|
||||
Errorf("Error setting up internal tracking: {0}", r2.error);
|
||||
return;
|
||||
}
|
||||
|
||||
// Subscription-count requests.
|
||||
if (SysSubscribe(SystemSubjects.AccNumSubsReqSubj, NoInlineCallback(NsubsRequestStub)) is { error: not null } r3)
|
||||
{
|
||||
Errorf("Error setting up internal tracking: {0}", r3.error);
|
||||
return;
|
||||
}
|
||||
|
||||
// Stats heartbeat from other servers.
|
||||
var statsSubj = string.Format(SystemSubjects.ServerStatsSubj, "*");
|
||||
var statsSub = SysSubscribe(statsSubj, NoInlineCallback(RemoteServerUpdateStub));
|
||||
if (statsSub.error is not null)
|
||||
{
|
||||
Errorf("Error setting up internal tracking: {0}", statsSub.error);
|
||||
return;
|
||||
}
|
||||
sys.RemoteStatsSub = statsSub.sub;
|
||||
|
||||
// Shutdown events from other servers.
|
||||
var shutdownSubj = string.Format(SystemSubjects.ShutdownEventSubj, "*");
|
||||
if (SysSubscribe(shutdownSubj, NoInlineCallback(RemoteServerShutdownStub)) is { error: not null } r4)
|
||||
{
|
||||
Errorf("Error setting up internal tracking: {0}", r4.error);
|
||||
return;
|
||||
}
|
||||
|
||||
// Lame-duck events.
|
||||
var lameDuckSubj = string.Format(SystemSubjects.LameDuckEventSubj, "*");
|
||||
if (SysSubscribe(lameDuckSubj, NoInlineCallback(RemoteServerShutdownStub)) is { error: not null } r5)
|
||||
{
|
||||
Errorf("Error setting up internal tracking: {0}", r5.error);
|
||||
return;
|
||||
}
|
||||
|
||||
// Remote latency measurements for this server.
|
||||
var latencySubj = string.Format(SystemSubjects.RemoteLatencyEventSubj, sys.ShortHash);
|
||||
if (SysSubscribe(latencySubj, NoInlineCallback(RemoteLatencyUpdateStub)) is { error: not null } r6)
|
||||
{
|
||||
Errorf("Error setting up internal latency tracking: {0}", r6.error);
|
||||
return;
|
||||
}
|
||||
|
||||
// Server reload request.
|
||||
var reloadSubj = string.Format(SystemSubjects.ServerReloadReqSubj, _info.Id);
|
||||
if (SysSubscribe(reloadSubj, NoInlineCallback(ReloadConfigStub)) is { error: not null } r7)
|
||||
{
|
||||
Errorf("Error setting up server reload handler: {0}", r7.error);
|
||||
return;
|
||||
}
|
||||
|
||||
// Client kick/LDM requests.
|
||||
var kickSubj = string.Format(SystemSubjects.ClientKickReqSubj, _info.Id);
|
||||
if (SysSubscribe(kickSubj, NoInlineCallback(KickClientStub)) is { error: not null } r8)
|
||||
{
|
||||
Errorf("Error setting up client kick service: {0}", r8.error);
|
||||
return;
|
||||
}
|
||||
|
||||
var ldmSubj = string.Format(SystemSubjects.ClientLdmReqSubj, _info.Id);
|
||||
if (SysSubscribe(ldmSubj, NoInlineCallback(LdmClientStub)) is { error: not null } r9)
|
||||
{
|
||||
Errorf("Error setting up client LDM service: {0}", r9.error);
|
||||
}
|
||||
|
||||
// Account leaf-node connect events.
|
||||
var leafConn = string.Format(SystemSubjects.LeafNodeConnectEventSubj, "*");
|
||||
if (SysSubscribe(leafConn, NoInlineCallback(LeafNodeConnectedStub)) is { error: not null } r10)
|
||||
{
|
||||
Errorf("Error setting up internal tracking: {0}", r10.error);
|
||||
}
|
||||
|
||||
// Debug subscriber service.
|
||||
SysSubscribeInternal(SystemSubjects.AccSubsSubj, NoInlineCallback(DebugSubscribersStub));
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Stub handlers — full implementations live in other partial files (Events
|
||||
// module). These are lightweight no-ops that satisfy the dispatch wiring so
|
||||
// the build compiles and the event loops can run.
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
private void RemoteConnsUpdateStub(Subscription sub, NatsClient c, Account acc,
|
||||
string subject, string reply, byte[] hdr, byte[] msg) { }
|
||||
|
||||
private void NsubsRequestStub(Subscription sub, NatsClient c, Account acc,
|
||||
string subject, string reply, byte[] hdr, byte[] msg) { }
|
||||
|
||||
private void RemoteServerUpdateStub(Subscription sub, NatsClient c, Account acc,
|
||||
string subject, string reply, byte[] hdr, byte[] msg) { }
|
||||
|
||||
private void RemoteServerShutdownStub(Subscription sub, NatsClient c, Account acc,
|
||||
string subject, string reply, byte[] hdr, byte[] msg) { }
|
||||
|
||||
private void RemoteLatencyUpdateStub(Subscription sub, NatsClient c, Account acc,
|
||||
string subject, string reply, byte[] hdr, byte[] msg) { }
|
||||
|
||||
private void ReloadConfigStub(Subscription sub, NatsClient c, Account acc,
|
||||
string subject, string reply, byte[] hdr, byte[] msg) { }
|
||||
|
||||
private void KickClientStub(Subscription sub, NatsClient c, Account acc,
|
||||
string subject, string reply, byte[] hdr, byte[] msg) { }
|
||||
|
||||
private void LdmClientStub(Subscription sub, NatsClient c, Account acc,
|
||||
string subject, string reply, byte[] hdr, byte[] msg) { }
|
||||
|
||||
private void LeafNodeConnectedStub(Subscription sub, NatsClient c, Account acc,
|
||||
string subject, string reply, byte[] hdr, byte[] msg) { }
|
||||
|
||||
private void DebugSubscribersStub(Subscription sub, NatsClient c, Account acc,
|
||||
string subject, string reply, byte[] hdr, byte[] msg) { }
|
||||
|
||||
// =========================================================================
|
||||
// Group G: filterRequest
|
||||
// Mirrors Go <c>(s *Server) filterRequest</c> in server/events.go.
|
||||
// =========================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Returns true if a system event request should be filtered (ignored) by
|
||||
/// this server based on its name, host, cluster, tags, or domain.
|
||||
/// Do NOT hold the server lock when calling this.
|
||||
/// Mirrors Go <c>(s *Server) filterRequest</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal bool FilterRequest(EventFilterOptions? fOpts)
|
||||
{
|
||||
if (fOpts is null) return false;
|
||||
|
||||
var clusterName = ClusterName();
|
||||
var opts = GetOpts();
|
||||
|
||||
if (fOpts.ExactMatch)
|
||||
{
|
||||
if ((fOpts.Name != string.Empty && fOpts.Name != _info.Name) ||
|
||||
(fOpts.Host != string.Empty && fOpts.Host != _info.Host) ||
|
||||
(fOpts.Cluster != string.Empty && fOpts.Cluster != clusterName))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
else if ((fOpts.Name != string.Empty && !_info.Name.Contains(fOpts.Name, StringComparison.Ordinal)) ||
|
||||
(fOpts.Host != string.Empty && !_info.Host.Contains(fOpts.Host, StringComparison.Ordinal)) ||
|
||||
(fOpts.Cluster != string.Empty && !clusterName.Contains(fOpts.Cluster, StringComparison.Ordinal)))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
if (fOpts.Tags.Count > 0)
|
||||
{
|
||||
foreach (var tag in fOpts.Tags)
|
||||
{
|
||||
if (!opts.Tags.Contains(tag))
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
if (fOpts.Domain != string.Empty && opts.JetStreamDomain != fOpts.Domain)
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// Group G: noInlineCallback / noInlineCallbackStatsz / noInlineCallbackRecvQSelect
|
||||
// Mirrors Go variants in server/events.go.
|
||||
// =========================================================================
|
||||
|
||||
private const int RecvQMuxed = 1;
|
||||
private const int RecvQStatsz = 2;
|
||||
|
||||
/// <summary>
|
||||
/// Wraps a <see cref="SysMsgHandler"/> so that it is always executed on the
|
||||
/// internal receive queue (never inline with route/gateway processing).
|
||||
/// Mirrors Go <c>(s *Server) noInlineCallback</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal SysMsgHandler? NoInlineCallback(SysMsgHandler cb) =>
|
||||
NoInlineCallbackRecvQSelect(cb, RecvQMuxed);
|
||||
|
||||
/// <summary>
|
||||
/// Wraps a <see cref="SysMsgHandler"/> for the priority (statsz) receive queue.
|
||||
/// Mirrors Go <c>(s *Server) noInlineCallbackStatsz</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal SysMsgHandler? NoInlineCallbackStatsz(SysMsgHandler cb) =>
|
||||
NoInlineCallbackRecvQSelect(cb, RecvQStatsz);
|
||||
|
||||
/// <summary>
|
||||
/// Core wrapper implementation. Returns a handler that pushes messages onto
|
||||
/// the selected internal receive queue rather than executing inline.
|
||||
/// Mirrors Go <c>(s *Server) noInlineCallbackRecvQSelect</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal SysMsgHandler? NoInlineCallbackRecvQSelect(SysMsgHandler cb, int recvQSelect)
|
||||
{
|
||||
_mu.EnterReadLock();
|
||||
if (!EventsEnabledLocked())
|
||||
{
|
||||
_mu.ExitReadLock();
|
||||
return null;
|
||||
}
|
||||
|
||||
IpQueue<InSysMsg> recvq = recvQSelect == RecvQStatsz
|
||||
? (_sys!.RecvQueuePriority ?? _sys.RecvQueue!)
|
||||
: _sys!.RecvQueue!;
|
||||
_mu.ExitReadLock();
|
||||
|
||||
return (sub, c, acc, subj, rply, hdr, msg) =>
|
||||
{
|
||||
var hdrCopy = hdr is { Length: > 0 } ? (byte[])hdr.Clone() : [];
|
||||
var msgCopy = msg is { Length: > 0 } ? (byte[])msg.Clone() : [];
|
||||
recvq.Push(new InSysMsg
|
||||
{
|
||||
Sub = sub,
|
||||
Client = c,
|
||||
Acc = acc,
|
||||
Subject = subj,
|
||||
Reply = rply,
|
||||
Hdr = hdrCopy,
|
||||
Msg = msgCopy,
|
||||
Cb = cb,
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// Group G: sysSubscribe / sysSubscribeQ / sysSubscribeInternal / systemSubscribe
|
||||
// Mirrors Go variants in server/events.go.
|
||||
// =========================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Creates an internal subscription on the system account.
|
||||
/// Mirrors Go <c>(s *Server) sysSubscribe</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal (Subscription? sub, Exception? error) SysSubscribe(string subject, SysMsgHandler? cb) =>
|
||||
SystemSubscribe(subject, string.Empty, false, null, cb);
|
||||
|
||||
/// <summary>
|
||||
/// Creates an internal subscription with a queue group on the system account.
|
||||
/// Mirrors Go <c>(s *Server) sysSubscribeQ</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal (Subscription? sub, Exception? error) SysSubscribeQ(string subject, string queue, SysMsgHandler? cb) =>
|
||||
SystemSubscribe(subject, queue, false, null, cb);
|
||||
|
||||
/// <summary>
|
||||
/// Creates an internal subscription that does NOT forward interest to routes/gateways.
|
||||
/// Mirrors Go <c>(s *Server) sysSubscribeInternal</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal (Subscription? sub, Exception? error) SysSubscribeInternal(string subject, SysMsgHandler? cb) =>
|
||||
SystemSubscribe(subject, string.Empty, true, null, cb);
|
||||
|
||||
/// <summary>
|
||||
/// Core subscription implementation used by all <c>sysSubscribe*</c> helpers.
|
||||
/// Creates a subscription on the system account's internal client.
|
||||
/// Mirrors Go <c>(s *Server) systemSubscribe</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal (Subscription? sub, Exception? error) SystemSubscribe(
|
||||
string subject,
|
||||
string queue,
|
||||
bool internalOnly,
|
||||
ClientConnection? client,
|
||||
SysMsgHandler? cb)
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
if (!EventsEnabledLocked())
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
return (null, ServerErrors.ErrNoSysAccount);
|
||||
}
|
||||
|
||||
if (cb is null)
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
return (null, new ArgumentNullException(nameof(cb), "undefined message handler"));
|
||||
}
|
||||
|
||||
var c = client ?? _sys!.Client!;
|
||||
_sys!.Sid++;
|
||||
var sid = _sys.Sid.ToString();
|
||||
_mu.ExitWriteLock();
|
||||
|
||||
var subBytes = Encoding.ASCII.GetBytes(subject);
|
||||
var sidBytes = Encoding.ASCII.GetBytes(sid);
|
||||
byte[]? qBytes = string.IsNullOrEmpty(queue) ? null : Encoding.ASCII.GetBytes(queue);
|
||||
|
||||
// Map SysMsgHandler → the internal MsgHandler/subscription mechanism.
|
||||
// The callback receives the raw message bytes split into hdr+msg.
|
||||
SysMsgHandler capturedCb = cb;
|
||||
var (sub, err) = c.ProcessSubEx(subBytes, qBytes, sidBytes, internalOnly, false, false);
|
||||
|
||||
if (err is not null)
|
||||
return (null, err);
|
||||
|
||||
if (sub is not null)
|
||||
{
|
||||
// Attach the callback to the subscription via the InSysMsg dispatch.
|
||||
// Store the callback in the subscription so the receive queue can call it.
|
||||
sub.SysMsgCb = capturedCb;
|
||||
}
|
||||
|
||||
return (sub, null);
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// Group G: sysUnsubscribe
|
||||
// Mirrors Go <c>(s *Server) sysUnsubscribe</c> in server/events.go.
|
||||
// =========================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Unsubscribes from a system subject by removing the given subscription.
|
||||
/// Mirrors Go <c>(s *Server) sysUnsubscribe</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal void SysUnsubscribe(Subscription? sub)
|
||||
{
|
||||
if (sub is null) return;
|
||||
|
||||
_mu.EnterReadLock();
|
||||
if (!EventsEnabledLocked())
|
||||
{
|
||||
_mu.ExitReadLock();
|
||||
return;
|
||||
}
|
||||
_mu.ExitReadLock();
|
||||
|
||||
// System subscriptions are always owned by the system client (_sys.Client).
|
||||
// Retrieve it under a read lock, then unsubscribe outside the lock.
|
||||
_mu.EnterReadLock();
|
||||
var sysClient = _sys?.Client;
|
||||
_mu.ExitReadLock();
|
||||
|
||||
if (sub.Sid is not null && sysClient is not null)
|
||||
sysClient.RemoveSubBySid(sub.Sid);
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// Group G: inboxReply
|
||||
// Mirrors Go <c>(s *Server) inboxReply</c> in server/events.go.
|
||||
// =========================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Handles inbox replies without propagating supercluster-wide interest.
|
||||
/// Dispatches to the registered reply handler if one exists for this subject.
|
||||
/// Mirrors Go <c>(s *Server) inboxReply</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal void InboxReply(
|
||||
Subscription sub,
|
||||
NatsClient c,
|
||||
Account acc,
|
||||
string subject,
|
||||
string reply,
|
||||
byte[] hdr,
|
||||
byte[] msg)
|
||||
{
|
||||
_mu.EnterReadLock();
|
||||
if (!EventsEnabledLocked() || _sys!.Replies.Count == 0)
|
||||
{
|
||||
_mu.ExitReadLock();
|
||||
return;
|
||||
}
|
||||
_sys.Replies.TryGetValue(subject, out var handler);
|
||||
_mu.ExitReadLock();
|
||||
|
||||
handler?.Invoke(sub, c, acc, subject, reply, hdr, msg);
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// Group G: newRespInbox
|
||||
// Mirrors Go <c>(s *Server) newRespInbox() string</c> in server/events.go.
|
||||
// =========================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Generates a new unique response inbox subject using this server's hash prefix.
|
||||
/// Format: $SYS._INBOX.{hash}.{8-char-random}.
|
||||
/// Mirrors Go <c>(s *Server) newRespInbox() string</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal string NewRespInbox()
|
||||
{
|
||||
const string digits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
|
||||
const int base62 = 62;
|
||||
const int suffixLen = EventHelpers.ReplySuffixLen;
|
||||
|
||||
var inboxPre = _sys?.InboxPrefix ?? string.Empty;
|
||||
// Strip the trailing '*' from $SYS._INBOX.{hash}.* → use as prefix
|
||||
var prefix = inboxPre.EndsWith(".*", StringComparison.Ordinal)
|
||||
? inboxPre[..^1] // strip '*', keep the dot
|
||||
: inboxPre + ".";
|
||||
|
||||
var suffix = new char[suffixLen];
|
||||
var rng = (long)Random.Shared.NextInt64();
|
||||
if (rng < 0) rng = -rng;
|
||||
|
||||
for (var i = 0; i < suffixLen; i++)
|
||||
{
|
||||
suffix[i] = digits[(int)(rng % base62)];
|
||||
rng /= base62;
|
||||
}
|
||||
|
||||
return prefix + new string(suffix);
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// Group G: wrapChk
|
||||
// Mirrors Go <c>(s *Server) wrapChk</c> in server/events.go.
|
||||
// =========================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Returns a wrapper function that acquires the server write lock, checks that
|
||||
/// events are enabled, invokes <paramref name="f"/>, then releases the lock.
|
||||
/// Mirrors Go <c>(s *Server) wrapChk(f func()) func()</c> in server/events.go.
|
||||
/// </summary>
|
||||
internal Action WrapChk(Action f)
|
||||
{
|
||||
return () =>
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
if (!EventsEnabledLocked()) return;
|
||||
f();
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// EventFilterOptions — common filter options for system event requests
|
||||
// Mirrors Go <c>EventFilterOptions</c> in server/events.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Filter criteria applied to system event requests (STATSZ, VARZ, CONNZ, etc.).
|
||||
/// Mirrors Go <c>EventFilterOptions</c> in server/events.go.
|
||||
/// </summary>
|
||||
public sealed class EventFilterOptions
|
||||
{
|
||||
/// <summary>Filter by server name (substring unless ExactMatch).</summary>
|
||||
public string Name { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>Filter by cluster name (substring unless ExactMatch).</summary>
|
||||
public string Cluster { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>Filter by host (substring unless ExactMatch).</summary>
|
||||
public string Host { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>When true, use exact equality instead of substring matching.</summary>
|
||||
public bool ExactMatch { get; set; }
|
||||
|
||||
/// <summary>Filter by tags (all must match).</summary>
|
||||
public List<string> Tags { get; set; } = [];
|
||||
|
||||
/// <summary>Filter by JetStream domain.</summary>
|
||||
public string Domain { get; set; } = string.Empty;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// ServerApiResponse — standard API response envelope
|
||||
// Mirrors Go <c>ServerAPIResponse</c> in server/events.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Response envelope used by system API endpoints (varz, connz, statsz, etc.).
|
||||
/// Mirrors Go <c>ServerAPIResponse</c> in server/events.go.
|
||||
/// </summary>
|
||||
public sealed class ServerApiResponse
|
||||
{
|
||||
public ServerInfo? Server { get; set; }
|
||||
public object? Data { get; set; }
|
||||
public ApiError? Error { get; set; }
|
||||
|
||||
/// <summary>Internal: compression type for this response. Not serialised.</summary>
|
||||
internal int Compress { get; set; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// API error returned in <see cref="ServerApiResponse.Error"/>.
|
||||
/// Mirrors Go <c>ApiError</c> in server/events.go.
|
||||
/// </summary>
|
||||
public sealed class ApiError
|
||||
{
|
||||
public int Code { get; set; }
|
||||
public string Description { get; set; } = string.Empty;
|
||||
}
|
||||
@@ -88,6 +88,100 @@ public sealed class ServerInfo
|
||||
// LeafNode-specific
|
||||
[JsonPropertyName("leafnode_urls")] public string[]? LeafNodeUrls { get; set; }
|
||||
|
||||
// =========================================================================
|
||||
// Advisory / event-system fields
|
||||
// Mirrors Go server/events.go ServerInfo advisory struct fields.
|
||||
// These are populated by the internal send loop when publishing advisories.
|
||||
// =========================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Sequence number for this server's advisory messages.
|
||||
/// Mirrors Go <c>Seq uint64</c> in ServerInfo (events.go).
|
||||
/// </summary>
|
||||
[JsonPropertyName("seq")]
|
||||
public ulong Seq { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// UTC timestamp of this advisory message.
|
||||
/// Mirrors Go <c>Time time.Time</c> in ServerInfo (events.go).
|
||||
/// </summary>
|
||||
[JsonPropertyName("time")]
|
||||
public DateTime Time { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Capability flags for this server (bitmask of <see cref="ServerCapability"/>).
|
||||
/// Mirrors Go <c>Flags ServerCapability</c> in ServerInfo (events.go).
|
||||
/// </summary>
|
||||
[JsonPropertyName("flags")]
|
||||
public ulong Capabilities { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Server tags from options.
|
||||
/// Mirrors Go <c>Tags []string</c> in ServerInfo (events.go).
|
||||
/// </summary>
|
||||
[JsonPropertyName("tags")]
|
||||
[JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
|
||||
public List<string>? Tags { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Server metadata from options.
|
||||
/// Mirrors Go <c>Metadata map[string]string</c> in ServerInfo (events.go).
|
||||
/// </summary>
|
||||
[JsonPropertyName("metadata")]
|
||||
[JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
|
||||
public Dictionary<string, string>? Metadata { get; set; }
|
||||
|
||||
// =========================================================================
|
||||
// Capability helpers (Group B)
|
||||
// Mirrors Go ServerInfo capability methods in server/events.go.
|
||||
// =========================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Sets the JetStream capability bit and the legacy JetStream bool field.
|
||||
/// Mirrors Go <c>(si *ServerInfo) SetJetStreamEnabled()</c> in server/events.go.
|
||||
/// </summary>
|
||||
public void SetJetStreamEnabled()
|
||||
{
|
||||
Capabilities |= (ulong)ServerCapability.JetStreamEnabled;
|
||||
JetStream = true;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns whether this server has JetStream enabled, taking into account
|
||||
/// both the capability flag and the legacy bool field.
|
||||
/// Mirrors Go <c>(si *ServerInfo) JetStreamEnabled() bool</c> in server/events.go.
|
||||
/// </summary>
|
||||
public bool IsJetStreamEnabled() =>
|
||||
(Capabilities & (ulong)ServerCapability.JetStreamEnabled) != 0 || JetStream;
|
||||
|
||||
/// <summary>
|
||||
/// Sets the binary-stream-snapshot capability bit.
|
||||
/// Mirrors Go <c>(si *ServerInfo) SetBinaryStreamSnapshot()</c> in server/events.go.
|
||||
/// </summary>
|
||||
public void SetBinaryStreamSnapshot() =>
|
||||
Capabilities |= (ulong)ServerCapability.BinaryStreamSnapshot;
|
||||
|
||||
/// <summary>
|
||||
/// Returns whether this server supports binary stream snapshots.
|
||||
/// Mirrors Go <c>(si *ServerInfo) BinaryStreamSnapshot() bool</c> in server/events.go.
|
||||
/// </summary>
|
||||
public bool IsBinaryStreamSnapshot() =>
|
||||
(Capabilities & (ulong)ServerCapability.BinaryStreamSnapshot) != 0;
|
||||
|
||||
/// <summary>
|
||||
/// Sets the account-NRG capability bit.
|
||||
/// Mirrors Go <c>(si *ServerInfo) SetAccountNRG()</c> in server/events.go.
|
||||
/// </summary>
|
||||
public void SetAccountNrg() =>
|
||||
Capabilities |= (ulong)ServerCapability.AccountNrg;
|
||||
|
||||
/// <summary>
|
||||
/// Returns whether this server supports moving NRG traffic into the asset account.
|
||||
/// Mirrors Go <c>(si *ServerInfo) AccountNRG() bool</c> in server/events.go.
|
||||
/// </summary>
|
||||
public bool IsAccountNrg() =>
|
||||
(Capabilities & (ulong)ServerCapability.AccountNrg) != 0;
|
||||
|
||||
/// <summary>Returns a shallow clone of this <see cref="ServerInfo"/>.</summary>
|
||||
internal ServerInfo ShallowClone() => (ServerInfo)MemberwiseClone();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user