Files
natsnet/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs

2533 lines
85 KiB
C#

// Copyright 2018-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Adapted from server/accounts.go in the NATS server Go source.
using ZB.MOM.NatsNet.Server.Auth;
using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
using System.Text;
namespace ZB.MOM.NatsNet.Server;
// ============================================================================
// Account — full implementation
// Mirrors Go `Account` struct in server/accounts.go lines 52-119.
// ============================================================================
/// <summary>
/// Represents a NATS account, tracking clients, subscriptions, imports, exports,
/// and subject mappings. Implements <see cref="INatsAccount"/> so that
/// <see cref="ClientConnection"/> can interact with it without a hard dependency.
/// Mirrors Go <c>Account</c> struct in server/accounts.go.
/// </summary>
public sealed class Account : INatsAccount
{
// -------------------------------------------------------------------------
// Constants
// -------------------------------------------------------------------------
/// <summary>
/// jwt.NoLimit equivalent: -1 means no limit applied.
/// </summary>
private const int NoLimit = -1;
// -------------------------------------------------------------------------
// Identity fields
// -------------------------------------------------------------------------
/// <summary>Account name. Mirrors Go <c>Name string</c>.</summary>
public string Name { get; set; } = string.Empty;
/// <summary>NKey public key. Mirrors Go <c>Nkey string</c>.</summary>
public string Nkey { get; set; } = string.Empty;
/// <summary>JWT issuer key. Mirrors Go <c>Issuer string</c>.</summary>
public string Issuer { get; set; } = string.Empty;
/// <summary>Raw JWT claim string. Mirrors Go <c>claimJWT string</c>.</summary>
internal string ClaimJwt { get; set; } = string.Empty;
/// <summary>Time of last update from resolver. Mirrors Go <c>updated time.Time</c>.</summary>
internal DateTime Updated { get; set; }
// -------------------------------------------------------------------------
// Locks
// -------------------------------------------------------------------------
/// <summary>Primary account read/write lock. Mirrors Go <c>mu sync.RWMutex</c>.</summary>
private readonly ReaderWriterLockSlim _mu = new(LockRecursionPolicy.NoRecursion);
/// <summary>Send-queue mutex. Mirrors Go <c>sqmu sync.Mutex</c>.</summary>
private readonly object _sqmu = new();
/// <summary>Leaf-node list lock. Mirrors Go <c>lmu sync.RWMutex</c>.</summary>
private readonly ReaderWriterLockSlim _lmu = new(LockRecursionPolicy.NoRecursion);
/// <summary>Event ID mutex. Mirrors Go <c>eventIdsMu sync.Mutex</c>.</summary>
private readonly object _eventIdsMu = new();
/// <summary>JetStream migration/clear-observer mutex. Mirrors Go <c>jscmMu sync.Mutex</c>.</summary>
private readonly object _jscmMu = new();
// -------------------------------------------------------------------------
// Subscription index
// -------------------------------------------------------------------------
/// <summary>
/// Subscription trie for this account. Mirrors Go <c>sl *Sublist</c>.
/// Set by the server when the account is registered.
/// </summary>
internal SubscriptionIndex? Sublist { get; set; }
// -------------------------------------------------------------------------
// Internal client and send queue (stubs)
// -------------------------------------------------------------------------
/// <summary>
/// Internal account client. Mirrors Go <c>ic *client</c>.
/// TODO: session 12 — full internal client wiring.
/// </summary>
internal ClientConnection? InternalClient { get; set; }
/// <summary>
/// Per-account send queue. Mirrors Go <c>sq *sendq</c>.
/// </summary>
internal SendQueue? SendQueue { get; set; }
internal SendQueue? GetSendQueue()
{
lock (_sqmu)
{
return SendQueue;
}
}
internal void SetSendQueue(SendQueue? sendQueue)
{
lock (_sqmu)
{
SendQueue = sendQueue;
}
}
// -------------------------------------------------------------------------
// Eventing timers
// -------------------------------------------------------------------------
/// <summary>Expiration timer. Mirrors Go <c>etmr *time.Timer</c>.</summary>
private Timer? _etmr;
/// <summary>Connection-count timer. Mirrors Go <c>ctmr *time.Timer</c>.</summary>
private Timer? _ctmr;
// -------------------------------------------------------------------------
// Remote server tracking
// -------------------------------------------------------------------------
/// <summary>
/// Per-server connection and leaf-node counts.
/// Key is server ID. Mirrors Go <c>strack map[string]sconns</c>.
/// </summary>
private Dictionary<string, SConns>? _strack;
/// <summary>
/// Remote client count (sum of strack[*].Conns).
/// Mirrors Go <c>nrclients int32</c>.
/// Protected by <see cref="_mu"/>.
/// </summary>
private int _nrclients;
/// <summary>
/// System client count.
/// Mirrors Go <c>sysclients int32</c>.
/// Protected by <see cref="_mu"/>.
/// </summary>
private int _sysclients;
/// <summary>
/// Local leaf-node count.
/// Mirrors Go <c>nleafs int32</c>.
/// Protected by <see cref="_mu"/>.
/// </summary>
private int _nleafs;
/// <summary>
/// Remote leaf-node count (sum of strack[*].Leafs).
/// Mirrors Go <c>nrleafs int32</c>.
/// Protected by <see cref="_mu"/>.
/// </summary>
private int _nrleafs;
// -------------------------------------------------------------------------
// Client set
// -------------------------------------------------------------------------
/// <summary>
/// Active local clients. Mirrors Go <c>clients map[*client]struct{}</c>.
/// Protected by <see cref="_mu"/>.
/// </summary>
private HashSet<ClientConnection>? _clients;
// -------------------------------------------------------------------------
// Route and leaf-queue maps
// -------------------------------------------------------------------------
/// <summary>
/// Route map: subject → reference count.
/// Mirrors Go <c>rm map[string]int32</c>.
/// Protected by <see cref="_mu"/>.
/// </summary>
private Dictionary<string, int>? _rm;
/// <summary>
/// Leaf queue weights: subject → weight.
/// Mirrors Go <c>lqws map[string]int32</c>.
/// Protected by <see cref="_mu"/>.
/// </summary>
private Dictionary<string, int>? _lqws;
// -------------------------------------------------------------------------
// User revocations
// -------------------------------------------------------------------------
/// <summary>
/// Revoked user nkeys: key → revocation timestamp (Unix seconds).
/// Mirrors Go <c>usersRevoked map[string]int64</c>.
/// Protected by <see cref="_mu"/>.
/// </summary>
internal Dictionary<string, long>? UsersRevoked { get; set; }
// -------------------------------------------------------------------------
// Subject mappings
// -------------------------------------------------------------------------
/// <summary>
/// Ordered list of subject mappings. Mirrors Go <c>mappings []*mapping</c>.
/// Protected by <see cref="_mu"/>.
/// </summary>
private List<SubjectMapping> _mappings = [];
/// <summary>
/// Atomic flag: 1 when <see cref="_mappings"/> is non-empty.
/// Mirrors Go <c>hasMapped atomic.Bool</c>.
/// </summary>
private int _hasMapped; // 0 = false, 1 = true
// -------------------------------------------------------------------------
// Leaf nodes
// -------------------------------------------------------------------------
/// <summary>
/// Ordered list of local leaf-node clients.
/// Mirrors Go <c>lleafs []*client</c>.
/// Protected by <see cref="_lmu"/>.
/// </summary>
private List<ClientConnection> _lleafs = [];
/// <summary>
/// Cluster name → count of leaf-node connections from that cluster.
/// Mirrors Go <c>leafClusters map[string]uint64</c>.
/// Protected by <see cref="_mu"/>.
/// </summary>
private Dictionary<string, ulong>? _leafClusters;
// -------------------------------------------------------------------------
// Import / export maps
// -------------------------------------------------------------------------
/// <summary>Import tracking. Mirrors Go <c>imports importMap</c>.</summary>
internal ImportMap Imports { get; set; } = new();
/// <summary>Export tracking. Mirrors Go <c>exports exportMap</c>.</summary>
internal ExportMap Exports { get; set; } = new();
// -------------------------------------------------------------------------
// JetStream (stubs)
// -------------------------------------------------------------------------
/// <summary>
/// JetStream account state. Mirrors Go <c>js *jsAccount</c>.
/// TODO: session 19 — JetStream implementation.
/// </summary>
internal object? JetStream { get; set; }
/// <summary>
/// Per-domain JetStream limits. Mirrors Go <c>jsLimits map[string]JetStreamAccountLimits</c>.
/// TODO: session 19 — JetStream implementation.
/// </summary>
internal Dictionary<string, object>? JetStreamLimits { get; set; }
// -------------------------------------------------------------------------
// Misc identity fields
// -------------------------------------------------------------------------
/// <summary>Non-routed gateway account name. Mirrors Go <c>nrgAccount string</c>.</summary>
internal string NrgAccount { get; set; } = string.Empty;
// -------------------------------------------------------------------------
// Limits (embedded `limits` struct in Go)
// -------------------------------------------------------------------------
/// <summary>
/// Maximum payload size (-1 = unlimited). Mirrors Go embedded <c>limits.mpay int32</c>.
/// </summary>
internal int MaxPayload { get; set; } = NoLimit;
/// <summary>
/// Maximum subscriptions (-1 = unlimited). Mirrors Go embedded <c>limits.msubs int32</c>.
/// </summary>
internal int MaxSubscriptions { get; set; } = NoLimit;
/// <summary>
/// Maximum connections (-1 = unlimited). Mirrors Go embedded <c>limits.mconns int32</c>.
/// </summary>
internal int MaxConnections { get; set; } = NoLimit;
/// <summary>
/// Maximum leaf nodes (-1 = unlimited). Mirrors Go embedded <c>limits.mleafs int32</c>.
/// </summary>
internal int MaxLeafNodes { get; set; } = NoLimit;
/// <summary>
/// When true, bearer tokens are not allowed.
/// Mirrors Go embedded <c>limits.disallowBearer bool</c>.
/// </summary>
internal bool DisallowBearer { get; set; }
// -------------------------------------------------------------------------
// Expiration (atomic)
// -------------------------------------------------------------------------
/// <summary>
/// 1 when the account JWT has expired. Mirrors Go <c>expired atomic.Bool</c>.
/// </summary>
private int _expired; // 0 = not expired, 1 = expired
// -------------------------------------------------------------------------
// Miscellaneous
// -------------------------------------------------------------------------
/// <summary>
/// When true, this account's config could not be fully resolved.
/// Mirrors Go <c>incomplete bool</c>.
/// </summary>
internal bool Incomplete { get; set; }
/// <summary>
/// Signing keys for JWT validation.
/// Mirrors Go <c>signingKeys map[string]jwt.Scope</c>.
/// Value is <c>object?</c> because JWT Scope is not yet fully ported.
/// </summary>
internal Dictionary<string, object?>? SigningKeys { get; set; }
/// <summary>
/// External authorization configuration stub.
/// Mirrors Go <c>extAuth *jwt.ExternalAuthorization</c>.
/// TODO: session 11 — JWT full integration.
/// </summary>
internal object? ExternalAuth { get; set; }
/// <summary>
/// The server this account is registered with, or null if not yet registered.
/// Stored as <c>object?</c> to avoid circular reference.
/// Mirrors Go <c>srv *Server</c>.
/// </summary>
internal object? Server { get; set; }
/// <summary>
/// Loop detection subject for leaf nodes.
/// Mirrors Go <c>lds string</c>.
/// </summary>
internal string LoopDetectionSubject { get; set; } = string.Empty;
/// <summary>
/// Service reply prefix (wildcard subscription root).
/// Mirrors Go <c>siReply []byte</c>.
/// </summary>
internal byte[]? ServiceImportReply { get; set; }
/// <summary>
/// Subscription ID counter for internal use.
/// Mirrors Go <c>isid uint64</c>.
/// </summary>
private ulong _isid;
/// <summary>
/// Default permissions for users with no explicit permissions.
/// Mirrors Go <c>defaultPerms *Permissions</c>.
/// </summary>
internal Permissions? DefaultPerms { get; set; }
/// <summary>
/// Account tags from JWT. Mirrors Go <c>tags jwt.TagList</c>.
/// Stored as string array pending full JWT integration.
/// </summary>
internal string[] Tags { get; set; } = [];
/// <summary>
/// Human-readable name tag (distinct from <see cref="Name"/>).
/// Mirrors Go <c>nameTag string</c>.
/// </summary>
internal string NameTag { get; set; } = string.Empty;
/// <summary>
/// Unix-nanosecond timestamp of last max-subscription-limit log.
/// Mirrors Go <c>lastLimErr int64</c>.
/// </summary>
private long _lastLimErr;
/// <summary>
/// Route pool index (-1 = dedicated, -2 = transitioning, ≥ 0 = shared).
/// Mirrors Go <c>routePoolIdx int</c>.
/// </summary>
internal int RoutePoolIdx { get; set; }
/// <summary>
/// Message-tracing destination subject.
/// Mirrors Go <c>traceDest string</c>.
/// Protected by <see cref="_mu"/>.
/// </summary>
private string _traceDest = string.Empty;
/// <summary>
/// Tracing sampling percentage (0 = header-triggered, 1-100 = rate).
/// Mirrors Go <c>traceDestSampling int</c>.
/// Protected by <see cref="_mu"/>.
/// </summary>
private int _traceDestSampling;
/// <summary>
/// Sets account-level message trace destination subject.
/// Mirrors writes to Go <c>acc.traceDest</c> during config parsing.
/// </summary>
internal void SetMessageTraceDestination(string subject)
{
_mu.EnterWriteLock();
try
{
_traceDest = subject ?? string.Empty;
}
finally
{
_mu.ExitWriteLock();
}
}
/// <summary>
/// Returns account-level message trace destination subject.
/// Mirrors reads of Go <c>acc.traceDest</c> during config parsing.
/// </summary>
internal string GetMessageTraceDestination()
{
_mu.EnterReadLock();
try
{
return _traceDest;
}
finally
{
_mu.ExitReadLock();
}
}
/// <summary>
/// Sets account-level message trace sampling percentage.
/// Mirrors writes to Go <c>acc.traceDestSampling</c> during config parsing.
/// </summary>
internal void SetMessageTraceSampling(int sampling)
{
_mu.EnterWriteLock();
try
{
_traceDestSampling = sampling;
}
finally
{
_mu.ExitWriteLock();
}
}
/// <summary>
/// Returns account-level message trace sampling percentage.
/// Mirrors reads of Go <c>acc.traceDestSampling</c> during config parsing.
/// </summary>
internal int GetMessageTraceSampling()
{
_mu.EnterReadLock();
try
{
return _traceDestSampling;
}
finally
{
_mu.ExitReadLock();
}
}
/// <summary>
/// Sets account-level message trace destination subject.
/// Mirrors Go <c>(a *Account) setTraceDest(dest string)</c>.
/// </summary>
internal void SetTraceDest(string dest) => SetMessageTraceDestination(dest);
/// <summary>
/// Returns trace destination and sampling.
/// Mirrors Go <c>(a *Account) getTraceDestAndSampling() (string, int)</c>.
/// </summary>
internal (string Destination, int Sampling) GetTraceDestAndSampling()
{
_mu.EnterReadLock();
try
{
return (_traceDest, _traceDestSampling);
}
finally
{
_mu.ExitReadLock();
}
}
// -------------------------------------------------------------------------
// Factory
// -------------------------------------------------------------------------
/// <summary>
/// Creates a new unlimited account with the given name.
/// Mirrors Go <c>NewAccount(name string) *Account</c>.
/// </summary>
public static Account NewAccount(string name) =>
new()
{
Name = name,
MaxPayload = NoLimit,
MaxSubscriptions = NoLimit,
MaxConnections = NoLimit,
MaxLeafNodes = NoLimit,
};
// -------------------------------------------------------------------------
// Object overrides
// -------------------------------------------------------------------------
/// <summary>
/// Returns the account name. Mirrors Go <c>(a *Account) String() string</c>.
/// </summary>
public override string ToString() => Name;
/// <summary>
/// Returns the account name.
/// Mirrors Go <c>(a *Account) String() string</c>.
/// </summary>
public string String() => Name;
// -------------------------------------------------------------------------
// Shallow copy for config reload
// -------------------------------------------------------------------------
/// <summary>
/// Copies identity and config fields from the options-struct account (<c>a</c>)
/// into the live server account (<c>na</c>). The write lock on <c>na</c> must
/// be held by the caller; <c>this</c> (the options account) requires no lock.
/// Mirrors Go <c>(a *Account) shallowCopy(na *Account)</c>.
/// </summary>
internal void ShallowCopy(Account na)
{
na.Nkey = Nkey;
na.Issuer = Issuer;
na._traceDest = _traceDest;
na._traceDestSampling = _traceDestSampling;
na.NrgAccount = NrgAccount;
// Stream imports — shallow-clone each entry.
if (Imports.Streams != null)
{
na.Imports.Streams = new List<StreamImportEntry>(Imports.Streams.Count);
foreach (var si in Imports.Streams)
{
// Struct-style shallow copy via record-style clone.
na.Imports.Streams.Add(new StreamImportEntry
{
Account = si.Account,
From = si.From,
To = si.To,
Transform = si.Transform,
ReverseTransform = si.ReverseTransform,
Claim = si.Claim,
UsePublishedSubject = si.UsePublishedSubject,
Invalid = si.Invalid,
AllowTrace = si.AllowTrace,
});
}
}
// Service imports — shallow-clone each inner list.
if (Imports.Services != null)
{
na.Imports.Services = new Dictionary<string, List<ServiceImportEntry>>(Imports.Services.Count);
foreach (var (k, list) in Imports.Services)
{
var cloned = new List<ServiceImportEntry>(list.Count);
foreach (var si in list)
{
cloned.Add(new ServiceImportEntry
{
Account = si.Account,
Claim = si.Claim,
ServiceExport = si.ServiceExport,
SubscriptionId = si.SubscriptionId,
From = si.From,
To = si.To,
Transform = si.Transform,
Timestamp = si.Timestamp,
ResponseType = si.ResponseType,
Latency = si.Latency,
M1 = si.M1,
RequestingClient = si.RequestingClient,
UsePublishedSubject = si.UsePublishedSubject,
IsResponse = si.IsResponse,
Invalid = si.Invalid,
Share = si.Share,
Tracking = si.Tracking,
DidDeliver = si.DidDeliver,
AllowTrace = si.AllowTrace,
TrackingHeader = si.TrackingHeader,
});
}
na.Imports.Services[k] = cloned;
}
}
// Stream exports — shallow-clone each entry.
if (Exports.Streams != null)
{
na.Exports.Streams = new Dictionary<string, StreamExport>(Exports.Streams.Count);
foreach (var (k, se) in Exports.Streams)
{
na.Exports.Streams[k] = se == null ? null! : new StreamExport
{
TokenRequired = se.TokenRequired,
AccountPosition = se.AccountPosition,
Approved = se.Approved,
ActivationsRevoked = se.ActivationsRevoked,
};
}
}
// Service exports — shallow-clone each entry.
if (Exports.Services != null)
{
na.Exports.Services = new Dictionary<string, ServiceExportEntry>(Exports.Services.Count);
foreach (var (k, se) in Exports.Services)
{
na.Exports.Services[k] = se == null ? null! : new ServiceExportEntry
{
Account = se.Account,
ResponseType = se.ResponseType,
Latency = se.Latency,
ResponseTimer = se.ResponseTimer,
ResponseThreshold = se.ResponseThreshold,
AllowTrace = se.AllowTrace,
TokenRequired = se.TokenRequired,
AccountPosition = se.AccountPosition,
Approved = se.Approved,
ActivationsRevoked = se.ActivationsRevoked,
};
}
}
// Mappings and limits — copy by reference / value.
na._mappings = _mappings;
Interlocked.Exchange(ref na._hasMapped, _mappings.Count > 0 ? 1 : 0);
// JetStream limits — shared reference.
// TODO: session 19 — deep copy JetStream limits when ported.
na.JetStreamLimits = JetStreamLimits;
// Server-config account limits.
na.MaxPayload = MaxPayload;
na.MaxSubscriptions = MaxSubscriptions;
na.MaxConnections = MaxConnections;
na.MaxLeafNodes = MaxLeafNodes;
na.DisallowBearer = DisallowBearer;
}
// -------------------------------------------------------------------------
// Event ID generation
// -------------------------------------------------------------------------
/// <summary>
/// Generates a unique event identifier using its own dedicated lock.
/// Mirrors Go <c>(a *Account) nextEventID() string</c>.
/// </summary>
internal string NextEventId()
{
lock (_eventIdsMu)
{
return Guid.NewGuid().ToString("N");
}
}
// -------------------------------------------------------------------------
// Client accessors
// -------------------------------------------------------------------------
/// <summary>
/// Returns a snapshot list of clients. Lock must be held by the caller.
/// Mirrors Go <c>(a *Account) getClientsLocked() []*client</c>.
/// </summary>
internal List<ClientConnection> GetClientsLocked()
{
if (_clients == null || _clients.Count == 0)
return [];
return [.. _clients];
}
/// <summary>
/// Returns a thread-safe snapshot list of clients.
/// Mirrors Go <c>(a *Account) getClients() []*client</c>.
/// </summary>
internal List<ClientConnection> GetClients()
{
_mu.EnterReadLock();
try
{
return GetClientsLocked();
}
finally
{
_mu.ExitReadLock();
}
}
/// <summary>
/// Returns a snapshot of non-internal clients. Lock must be held by the caller.
/// Mirrors Go <c>(a *Account) getExternalClientsLocked() []*client</c>.
/// </summary>
internal List<ClientConnection> GetExternalClientsLocked()
{
if (_clients == null || _clients.Count == 0)
return [];
var result = new List<ClientConnection>(_clients.Count);
foreach (var c in _clients)
{
if (!IsInternalClientKind(c.Kind))
result.Add(c);
}
return result;
}
// -------------------------------------------------------------------------
// Remote server tracking
// -------------------------------------------------------------------------
/// <summary>
/// Updates the remote-server tracking table for this account based on an
/// incoming <see cref="AccountNumConns"/> message, and returns the set of
/// local clients that must be disconnected because a connection limit has
/// been exceeded (after accounting for remote connections).
/// Mirrors Go <c>(a *Account) updateRemoteServer(m *AccountNumConns) []*client</c>.
/// </summary>
internal List<ClientConnection> UpdateRemoteServer(AccountNumConns m)
{
_mu.EnterWriteLock();
try
{
_strack ??= new Dictionary<string, SConns>();
_strack.TryGetValue(m.Server.Id, out var prev);
_strack[m.Server.Id] = new SConns
{
Conns = m.Conns,
Leafs = m.LeafNodes,
};
_nrclients += m.Conns - (prev?.Conns ?? 0);
_nrleafs += m.LeafNodes - (prev?.Leafs ?? 0);
var localCount = _clients?.Count ?? 0;
// Check if total connections exceed the limit.
bool maxConnsExceeded = MaxConnections != NoLimit &&
(localCount - _sysclients + _nrclients) > MaxConnections;
List<ClientConnection> toDisconnect = [];
if (maxConnsExceeded)
{
var external = GetExternalClientsLocked();
// Sort: newest connections first (reverse chronological).
// TODO: session 12 — sort by c.Start once ClientConnection has a Start field.
// For now we cannot sort without the start time, so take from end.
int over = (localCount - _sysclients + _nrclients) - MaxConnections;
if (over < external.Count)
toDisconnect.AddRange(external.GetRange(0, over));
else
toDisconnect.AddRange(external);
}
// Check if total leaf nodes exceed the limit.
bool maxLeafsExceeded = MaxLeafNodes != NoLimit &&
(_nleafs + _nrleafs) > MaxLeafNodes;
if (maxLeafsExceeded)
{
_lmu.EnterReadLock();
try
{
int over = _nleafs + _nrleafs - MaxLeafNodes;
if (over > 0)
{
int start = Math.Max(0, _lleafs.Count - over);
toDisconnect.AddRange(_lleafs.GetRange(start, _lleafs.Count - start));
}
}
finally
{
_lmu.ExitReadLock();
}
}
return toDisconnect;
}
finally
{
_mu.ExitWriteLock();
}
}
/// <summary>
/// Removes tracking for a remote server that has shut down.
/// Mirrors Go <c>(a *Account) removeRemoteServer(sid string)</c>.
/// </summary>
internal void RemoveRemoteServer(string sid)
{
_mu.EnterWriteLock();
try
{
if (_strack != null && _strack.TryGetValue(sid, out var prev))
{
_strack.Remove(sid);
_nrclients -= prev.Conns;
_nrleafs -= prev.Leafs;
}
}
finally
{
_mu.ExitWriteLock();
}
}
/// <summary>
/// Returns the number of remote servers that have at least one connection or
/// leaf-node for this account.
/// Mirrors Go <c>(a *Account) expectedRemoteResponses() int32</c>.
/// </summary>
internal int ExpectedRemoteResponses()
{
int expected = 0;
_mu.EnterReadLock();
try
{
if (_strack != null)
{
foreach (var sc in _strack.Values)
{
if (sc.Conns > 0 || sc.Leafs > 0)
expected++;
}
}
}
finally
{
_mu.ExitReadLock();
}
return expected;
}
// -------------------------------------------------------------------------
// Eventing
// -------------------------------------------------------------------------
/// <summary>
/// Clears eventing state including timers, clients, and remote tracking.
/// Mirrors Go <c>(a *Account) clearEventing()</c>.
/// </summary>
internal void ClearEventing()
{
_mu.EnterWriteLock();
try
{
_nrclients = 0;
ClearTimerLocked(ref _etmr);
ClearTimerLocked(ref _ctmr);
_clients = null;
_strack = null;
}
finally
{
_mu.ExitWriteLock();
}
}
// -------------------------------------------------------------------------
// Name accessors
// -------------------------------------------------------------------------
/// <summary>
/// Returns the account name, thread-safely.
/// Mirrors Go <c>(a *Account) GetName() string</c>.
/// </summary>
public string GetName()
{
_mu.EnterReadLock();
try
{
return Name;
}
finally
{
_mu.ExitReadLock();
}
}
/// <summary>
/// Returns the <see cref="NameTag"/> if set, otherwise <see cref="Name"/>.
/// Acquires a read lock.
/// Mirrors Go <c>(a *Account) getNameTag() string</c>.
/// </summary>
internal string GetNameTag()
{
_mu.EnterReadLock();
try
{
return GetNameTagLocked();
}
finally
{
_mu.ExitReadLock();
}
}
/// <summary>
/// Returns the <see cref="NameTag"/> if set, otherwise <see cref="Name"/>.
/// Lock must be held by the caller.
/// Mirrors Go <c>(a *Account) getNameTagLocked() string</c>.
/// </summary>
internal string GetNameTagLocked() =>
string.IsNullOrEmpty(NameTag) ? Name : NameTag;
// -------------------------------------------------------------------------
// Connection counts
// -------------------------------------------------------------------------
/// <summary>
/// Returns the total number of active clients across all servers (local minus
/// system accounts plus remote).
/// Mirrors Go <c>(a *Account) NumConnections() int</c>.
/// </summary>
public int NumConnections()
{
_mu.EnterReadLock();
try
{
return (_clients?.Count ?? 0) - _sysclients + _nrclients;
}
finally
{
_mu.ExitReadLock();
}
}
/// <summary>
/// Returns the number of client and leaf-node connections that are on
/// remote servers.
/// Mirrors Go <c>(a *Account) NumRemoteConnections() int</c>.
/// </summary>
public int NumRemoteConnections()
{
_mu.EnterReadLock();
try
{
return _nrclients + _nrleafs;
}
finally
{
_mu.ExitReadLock();
}
}
/// <summary>
/// Returns the number of non-system, non-leaf clients on this server.
/// Mirrors Go <c>(a *Account) NumLocalConnections() int</c>.
/// </summary>
public int NumLocalConnections()
{
_mu.EnterReadLock();
try
{
return NumLocalConnectionsLocked();
}
finally
{
_mu.ExitReadLock();
}
}
/// <summary>
/// Returns local non-system, non-leaf client count. Lock must be held.
/// Mirrors Go <c>(a *Account) numLocalConnections() int</c>.
/// </summary>
internal int NumLocalConnectionsLocked() =>
(_clients?.Count ?? 0) - _sysclients - _nleafs;
/// <summary>
/// Returns local non-system, non-leaf client count. Lock must be held.
/// Mirrors Go <c>(a *Account) numLocalConnections() int</c>.
/// </summary>
internal int NumLocalConnectionsInternal() => NumLocalConnectionsLocked();
/// <summary>
/// Returns all local connections including leaf nodes (minus system clients).
/// Mirrors Go <c>(a *Account) numLocalAndLeafConnections() int</c>.
/// </summary>
internal int NumLocalAndLeafConnections()
{
_mu.EnterReadLock();
try
{
return (_clients?.Count ?? 0) - _sysclients;
}
finally
{
_mu.ExitReadLock();
}
}
/// <summary>
/// Returns the local leaf-node count.
/// Mirrors Go <c>(a *Account) numLocalLeafNodes() int</c>.
/// </summary>
internal int NumLocalLeafNodes() => _nleafs;
// -------------------------------------------------------------------------
// Connection limit checks
// -------------------------------------------------------------------------
/// <summary>
/// Returns true if the total (local + remote) client count has reached or
/// exceeded the configured limit.
/// Mirrors Go <c>(a *Account) MaxTotalConnectionsReached() bool</c>.
/// </summary>
public bool MaxTotalConnectionsReached()
{
_mu.EnterReadLock();
try
{
if (MaxConnections == NoLimit) return false;
return (_clients?.Count ?? 0) - _sysclients + _nrclients >= MaxConnections;
}
finally
{
_mu.ExitReadLock();
}
}
/// <summary>
/// Returns the configured maximum connections limit.
/// Mirrors Go <c>(a *Account) MaxActiveConnections() int</c>.
/// </summary>
public int MaxActiveConnections()
{
_mu.EnterReadLock();
try { return MaxConnections; }
finally { _mu.ExitReadLock(); }
}
// -------------------------------------------------------------------------
// Leaf-node limit checks
// -------------------------------------------------------------------------
/// <summary>
/// Returns true if the total (local + remote) leaf-node count has reached or
/// exceeded the configured limit.
/// Mirrors Go <c>(a *Account) MaxTotalLeafNodesReached() bool</c>.
/// </summary>
public bool MaxTotalLeafNodesReached()
{
_mu.EnterReadLock();
try { return MaxTotalLeafNodesReachedLocked(); }
finally { _mu.ExitReadLock(); }
}
/// <summary>
/// Lock must be held by the caller.
/// Mirrors Go <c>(a *Account) maxTotalLeafNodesReached() bool</c>.
/// </summary>
internal bool MaxTotalLeafNodesReachedLocked()
{
if (MaxLeafNodes == NoLimit) return false;
return _nleafs + _nrleafs >= MaxLeafNodes;
}
/// <summary>
/// Returns true if total leaf-node count reached the configured maximum.
/// Lock must be held by the caller.
/// Mirrors Go <c>(a *Account) maxTotalLeafNodesReached() bool</c>.
/// </summary>
internal bool MaxTotalLeafNodesReachedInternal() => MaxTotalLeafNodesReachedLocked();
/// <summary>
/// Returns the total leaf-node count (local + remote).
/// Mirrors Go <c>(a *Account) NumLeafNodes() int</c>.
/// </summary>
public int NumLeafNodes()
{
_mu.EnterReadLock();
try { return _nleafs + _nrleafs; }
finally { _mu.ExitReadLock(); }
}
/// <summary>
/// Returns the remote leaf-node count.
/// Mirrors Go <c>(a *Account) NumRemoteLeafNodes() int</c>.
/// </summary>
public int NumRemoteLeafNodes()
{
_mu.EnterReadLock();
try { return _nrleafs; }
finally { _mu.ExitReadLock(); }
}
/// <summary>
/// Returns the configured maximum leaf-nodes limit.
/// Mirrors Go <c>(a *Account) MaxActiveLeafNodes() int</c>.
/// </summary>
public int MaxActiveLeafNodes()
{
_mu.EnterReadLock();
try { return MaxLeafNodes; }
finally { _mu.ExitReadLock(); }
}
// -------------------------------------------------------------------------
// Subscription counts
// -------------------------------------------------------------------------
/// <summary>
/// Returns the number of route-map entries (subjects sent across routes).
/// Mirrors Go <c>(a *Account) RoutedSubs() int</c>.
/// </summary>
public int RoutedSubs()
{
_mu.EnterReadLock();
try { return _rm?.Count ?? 0; }
finally { _mu.ExitReadLock(); }
}
/// <summary>
/// Returns the total number of subscriptions in this account's subscription index.
/// Mirrors Go <c>(a *Account) TotalSubs() int</c>.
/// </summary>
public int TotalSubs()
{
_mu.EnterReadLock();
try
{
if (Sublist == null) return 0;
return (int)Sublist.Count();
}
finally
{
_mu.ExitReadLock();
}
}
/// <summary>
/// Returns true when there is at least one matching subscription for <paramref name="subject"/>.
/// Mirrors Go <c>(a *Account) SubscriptionInterest(subject string) bool</c>.
/// </summary>
public bool SubscriptionInterest(string subject) => Interest(subject) > 0;
/// <summary>
/// Returns total number of plain and queue subscriptions matching <paramref name="subject"/>.
/// Mirrors Go <c>(a *Account) Interest(subject string) int</c>.
/// </summary>
public int Interest(string subject)
{
_mu.EnterReadLock();
try
{
if (Sublist == null)
return 0;
var (np, nq) = Sublist.NumInterest(subject);
return np + nq;
}
finally
{
_mu.ExitReadLock();
}
}
/// <summary>
/// Increments the leaf-node count for a remote cluster.
/// Mirrors Go <c>(a *Account) registerLeafNodeCluster(cluster string)</c>.
/// </summary>
internal void RegisterLeafNodeCluster(string cluster)
{
_mu.EnterWriteLock();
try
{
_leafClusters ??= new Dictionary<string, ulong>(StringComparer.Ordinal);
_leafClusters.TryGetValue(cluster, out var current);
_leafClusters[cluster] = current + 1;
}
finally
{
_mu.ExitWriteLock();
}
}
/// <summary>
/// Returns true when this account already tracks one or more leaf nodes from <paramref name="cluster"/>.
/// Mirrors Go <c>(a *Account) hasLeafNodeCluster(cluster string) bool</c>.
/// </summary>
internal bool HasLeafNodeCluster(string cluster)
{
_mu.EnterReadLock();
try
{
return _leafClusters != null &&
_leafClusters.TryGetValue(cluster, out var count) &&
count > 0;
}
finally
{
_mu.ExitReadLock();
}
}
/// <summary>
/// Returns true when the account is leaf-cluster isolated to <paramref name="cluster"/>.
/// Mirrors Go <c>(a *Account) isLeafNodeClusterIsolated(cluster string) bool</c>.
/// </summary>
internal bool IsLeafNodeClusterIsolated(string cluster)
{
_mu.EnterReadLock();
try
{
if (string.IsNullOrEmpty(cluster))
return false;
if (_leafClusters == null || _leafClusters.Count > 1)
return false;
return _leafClusters.TryGetValue(cluster, out var count) && count == (ulong)_nleafs;
}
finally
{
_mu.ExitReadLock();
}
}
// -------------------------------------------------------------------------
// Subscription limit error throttle
// -------------------------------------------------------------------------
/// <summary>
/// Returns true when it is appropriate to log a max-subscription-limit error.
/// Rate-limited to at most once per <see cref="AccountEventConstants.DefaultMaxSubLimitReportThreshold"/>.
/// Mirrors Go <c>(a *Account) shouldLogMaxSubErr() bool</c>.
/// </summary>
internal bool ShouldLogMaxSubErr()
{
_mu.EnterReadLock();
long last = Interlocked.Read(ref _lastLimErr);
_mu.ExitReadLock();
long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L; // nanoseconds
long threshold = (long)AccountEventConstants.DefaultMaxSubLimitReportThreshold.TotalMilliseconds * 1_000_000L;
if (now - last < threshold)
return false;
_mu.EnterWriteLock();
try
{
Interlocked.Exchange(ref _lastLimErr, now);
}
finally
{
_mu.ExitWriteLock();
}
return true;
}
// -------------------------------------------------------------------------
// Expiration
// -------------------------------------------------------------------------
/// <summary>
/// Returns true when the account JWT has expired.
/// Mirrors Go <c>(a *Account) IsExpired() bool</c>.
/// </summary>
public bool IsExpired() =>
Interlocked.CompareExchange(ref _expired, 0, 0) == 1;
/// <summary>
/// Returns true when this account is backed by a JWT claim.
/// Lock must be held by the caller.
/// Mirrors Go <c>(a *Account) isClaimAccount() bool</c>.
/// </summary>
internal bool IsClaimAccount() =>
!string.IsNullOrEmpty(ClaimJwt);
/// <summary>
/// Invoked when the expiration timer fires: marks expired and collects clients.
/// Mirrors Go <c>(a *Account) expiredTimeout()</c>.
/// </summary>
private void ExpiredTimeout()
{
Interlocked.Exchange(ref _expired, 1);
var clients = GetClients();
foreach (var c in clients)
{
if (!IsInternalClientKind(c.Kind))
{
// TODO: session 12 — call c.AccountAuthExpired() once fully ported.
}
}
}
/// <summary>
/// Starts or resets the JWT expiration timer.
/// Mirrors Go <c>(a *Account) setExpirationTimer(d time.Duration)</c>.
/// </summary>
internal void SetExpirationTimer(TimeSpan d)
{
_etmr = new Timer(_ => ExpiredTimeout(), null, d, Timeout.InfiniteTimeSpan);
}
/// <summary>
/// Stops the expiration timer. Returns true if it was active.
/// Lock must be held by the caller.
/// Mirrors Go <c>(a *Account) clearExpirationTimer() bool</c>.
/// </summary>
internal bool ClearExpirationTimer()
{
if (_etmr == null)
return true;
_etmr.Dispose();
_etmr = null;
return true;
}
// -------------------------------------------------------------------------
// Subject mappings — public API
// -------------------------------------------------------------------------
/// <summary>
/// Adds a simple 1:1 subject mapping from <paramref name="src"/> to
/// <paramref name="dest"/> with weight 100.
/// Mirrors Go <c>(a *Account) AddMapping(src, dest string) error</c>.
/// </summary>
public Exception? AddMapping(string src, string dest) =>
AddWeightedMappings(src, MapDest.New(dest, 100));
/// <summary>
/// Adds weighted subject mappings for one or more destinations.
/// Total weights must not exceed 100 per cluster group. If the total is
/// less than 100 and the source was not listed as a destination, the
/// remainder is automatically routed back to the source.
/// Weights are converted to cumulative form and sorted ascending so that
/// random selection can use a single linear scan.
/// Mirrors Go <c>(a *Account) AddWeightedMappings(src string, dests ...*MapDest) error</c>.
/// </summary>
public Exception? AddWeightedMappings(string src, params MapDest[] dests)
{
_mu.EnterWriteLock();
try
{
if (!SubscriptionIndex.IsValidSubject(src))
return ServerErrors.ErrBadSubject;
bool hasWildcard = SubscriptionIndex.SubjectHasWildcard(src);
var m = new SubjectMapping
{
Source = src,
HasWildcard = hasWildcard,
Destinations = new List<Destination>(dests.Length + 1),
};
var seen = new HashSet<string>(dests.Length);
var totals = new Dictionary<string, byte>(); // cluster → cumulative weight
foreach (var d in dests)
{
if (!seen.Add(d.Subject))
return new InvalidOperationException($"duplicate entry for \"{d.Subject}\"");
if (d.Weight > 100)
return new InvalidOperationException("individual weights need to be <= 100");
totals.TryGetValue(d.Cluster, out byte tw);
int next = tw + d.Weight;
if (next > 100)
return new InvalidOperationException("total weight needs to be <= 100");
totals[d.Cluster] = (byte)next;
// Validate the transform is valid.
var validateErr = ValidateMapping(src, d.Subject);
if (validateErr != null)
return validateErr;
var (tr, trErr) = SubjectTransform.New(src, d.Subject);
if (trErr != null)
return trErr;
if (string.IsNullOrEmpty(d.Cluster))
{
m.Destinations.Add(new Destination { Transform = tr, Weight = d.Weight });
}
else
{
m.ClusterDestinations ??= new Dictionary<string, List<Destination>>();
if (!m.ClusterDestinations.TryGetValue(d.Cluster, out var clusterList))
{
clusterList = [];
m.ClusterDestinations[d.Cluster] = clusterList;
}
clusterList.Add(new Destination { Transform = tr, Weight = d.Weight });
}
}
// Process each destination list: fill remainder and convert to cumulative weights.
var destErr = ProcessDestinations(src, hasWildcard, seen, m.Destinations);
if (destErr != null) return destErr;
if (m.ClusterDestinations != null)
{
var clusterKeys = new List<string>(m.ClusterDestinations.Keys);
foreach (var cluster in clusterKeys)
{
destErr = ProcessDestinations(src, hasWildcard, seen, m.ClusterDestinations[cluster]);
if (destErr != null) return destErr;
}
}
// Replace existing entry for the same source, or append.
for (int i = 0; i < _mappings.Count; i++)
{
if (_mappings[i].Source == src)
{
_mappings[i] = m;
return null;
}
}
_mappings.Add(m);
Interlocked.Exchange(ref _hasMapped, _mappings.Count > 0 ? 1 : 0);
// TODO: session 15 — notify connected leaf nodes via lc.ForceAddToSmap(src).
return null;
}
finally
{
_mu.ExitWriteLock();
}
}
/// <summary>
/// Removes a subject mapping entry by source subject.
/// Returns true if an entry was removed.
/// Mirrors Go <c>(a *Account) RemoveMapping(src string) bool</c>.
/// </summary>
public bool RemoveMapping(string src)
{
_mu.EnterWriteLock();
try
{
for (int i = 0; i < _mappings.Count; i++)
{
if (_mappings[i].Source == src)
{
// Swap with last element to avoid shifting (order may change).
_mappings[i] = _mappings[^1];
_mappings.RemoveAt(_mappings.Count - 1);
Interlocked.Exchange(ref _hasMapped, _mappings.Count > 0 ? 1 : 0);
// TODO: session 15 — notify leaf nodes via lc.ForceRemoveFromSmap(src).
return true;
}
}
return false;
}
finally
{
_mu.ExitWriteLock();
}
}
/// <summary>
/// Returns true when there is at least one subject mapping entry.
/// Mirrors Go <c>(a *Account) hasMappings() bool</c>.
/// </summary>
internal bool HasMappings() =>
Interlocked.CompareExchange(ref _hasMapped, 0, 0) == 1;
/// <summary>
/// Selects a mapped destination subject using weighted random selection.
/// Returns (<paramref name="dest"/>, false) when no mapping matches.
/// Mirrors Go <c>(a *Account) selectMappedSubject(dest string) (string, bool)</c>.
/// </summary>
internal (string dest, bool mapped) SelectMappedSubject(string dest)
{
if (!HasMappings())
return (dest, false);
_mu.EnterReadLock();
try
{
// Tokenise the destination for wildcard subset matching.
string[]? tts = null;
SubjectMapping? m = null;
foreach (var rm in _mappings)
{
if (!rm.HasWildcard && rm.Source == dest)
{
m = rm;
break;
}
// Lazy tokenise for subset matching.
tts ??= TokenizeSubjectForMapping(dest);
if (SubjectTransform.IsSubsetMatch(tts, rm.Source))
{
m = rm;
break;
}
}
if (m == null)
return (dest, false);
// Select the destination list (cluster-scoped or global).
List<Destination> dests = m.Destinations;
if (m.ClusterDestinations != null && m.ClusterDestinations.Count > 0)
{
string clusterName = GetCachedClusterName();
if (!string.IsNullOrEmpty(clusterName) &&
m.ClusterDestinations.TryGetValue(clusterName, out var cdests))
{
dests = cdests;
}
}
if (dests.Count == 0)
return (dest, false);
// Optimise single-entry case where the full weight is 100.
Destination? selected = null;
if (dests.Count == 1 && dests[0].Weight == 100)
{
selected = dests[0];
}
else
{
byte w = (byte)(Random.Shared.Next() % 100);
foreach (var d in dests)
{
if (w < d.Weight)
{
selected = d;
break;
}
}
}
if (selected == null)
return (dest, false);
string ndest;
if (selected.Transform == null)
{
ndest = dest;
}
else if (tts != null)
{
ndest = selected.Transform.TransformTokenizedSubject(tts);
}
else
{
ndest = selected.Transform.TransformSubject(dest);
}
return (ndest, true);
}
finally
{
_mu.ExitReadLock();
}
}
// -------------------------------------------------------------------------
// Service export configuration
// -------------------------------------------------------------------------
/// <summary>
/// Configures an exported service with singleton response semantics.
/// Mirrors Go <c>(a *Account) AddServiceExport(subject string, accounts []*Account) error</c>.
/// </summary>
public Exception? AddServiceExport(string subject, IReadOnlyList<Account>? accounts = null) =>
AddServiceExportWithResponseAndAccountPos(subject, ServiceRespType.Singleton, accounts, 0);
/// <summary>
/// Configures an exported service with singleton response semantics and account-position auth.
/// Mirrors Go <c>(a *Account) addServiceExportWithAccountPos(...)</c>.
/// </summary>
public Exception? AddServiceExportWithAccountPos(string subject, IReadOnlyList<Account>? accounts, uint accountPos) =>
AddServiceExportWithResponseAndAccountPos(subject, ServiceRespType.Singleton, accounts, accountPos);
/// <summary>
/// Configures an exported service with explicit response type.
/// Mirrors Go <c>(a *Account) AddServiceExportWithResponse(...)</c>.
/// </summary>
public Exception? AddServiceExportWithResponse(string subject, ServiceRespType respType, IReadOnlyList<Account>? accounts = null) =>
AddServiceExportWithResponseAndAccountPos(subject, respType, accounts, 0);
/// <summary>
/// Configures an exported service with explicit response type and account-position auth.
/// Mirrors Go <c>(a *Account) addServiceExportWithResponseAndAccountPos(...)</c>.
/// </summary>
public Exception? AddServiceExportWithResponseAndAccountPos(string subject, ServiceRespType respType, IReadOnlyList<Account>? accounts, uint accountPos)
{
if (!SubscriptionIndex.IsValidSubject(subject))
return ServerErrors.ErrBadSubject;
_mu.EnterWriteLock();
try
{
Exports.Services ??= new Dictionary<string, ServiceExportEntry>(StringComparer.Ordinal);
if (!Exports.Services.TryGetValue(subject, out var serviceExport) || serviceExport == null)
serviceExport = new ServiceExportEntry();
if (respType != ServiceRespType.Singleton)
serviceExport.ResponseType = respType;
if (accounts != null || accountPos > 0)
{
var authErr = SetExportAuth(serviceExport, subject, accounts, accountPos);
if (authErr != null)
return authErr;
}
serviceExport.Account = this;
serviceExport.ResponseThreshold = ServerConstants.DefaultServiceExportResponseThreshold;
Exports.Services[subject] = serviceExport;
return null;
}
finally
{
_mu.ExitWriteLock();
}
}
/// <summary>
/// Enables latency tracking for <paramref name="service"/> with default sampling.
/// Mirrors Go <c>(a *Account) TrackServiceExport(service, results string) error</c>.
/// </summary>
public Exception? TrackServiceExport(string service, string results) =>
TrackServiceExportWithSampling(service, results, ServerConstants.DefaultServiceLatencySampling);
/// <summary>
/// Enables latency tracking for <paramref name="service"/> with explicit sampling.
/// Mirrors Go <c>(a *Account) TrackServiceExportWithSampling(...)</c>.
/// </summary>
public Exception? TrackServiceExportWithSampling(string service, string results, int sampling)
{
if (sampling != 0 && (sampling < 1 || sampling > 100))
return ServerErrors.ErrBadSampling;
if (!SubscriptionIndex.IsValidPublishSubject(results))
return ServerErrors.ErrBadPublishSubject;
if (IsExportService(results))
return ServerErrors.ErrBadPublishSubject;
_mu.EnterWriteLock();
try
{
if (Exports.Services == null)
return ServerErrors.ErrMissingService;
if (!Exports.Services.TryGetValue(service, out var serviceExport))
return ServerErrors.ErrMissingService;
serviceExport ??= new ServiceExportEntry();
if (serviceExport.ResponseType != ServiceRespType.Singleton)
return ServerErrors.ErrBadServiceType;
serviceExport.Latency = new InternalServiceLatency
{
Sampling = sampling,
Subject = results,
};
Exports.Services[service] = serviceExport;
if (Imports.Services != null)
{
foreach (var imports in Imports.Services.Values)
{
foreach (var import in imports)
{
if (import?.Account?.Name != Name)
continue;
if (SubjectTransform.IsSubsetMatch(SubjectTransform.TokenizeSubject(import.To), service))
import.Latency = serviceExport.Latency;
}
}
}
return null;
}
finally
{
_mu.ExitWriteLock();
}
}
/// <summary>
/// Disables latency tracking for the exported service.
/// Mirrors Go <c>(a *Account) UnTrackServiceExport(service string)</c>.
/// </summary>
public void UnTrackServiceExport(string service)
{
_mu.EnterWriteLock();
try
{
if (Exports.Services == null || !Exports.Services.TryGetValue(service, out var serviceExport) || serviceExport?.Latency == null)
return;
serviceExport.Latency = null;
if (Imports.Services == null)
return;
foreach (var imports in Imports.Services.Values)
{
foreach (var import in imports)
{
if (import?.Account?.Name != Name)
continue;
if (SubjectTransform.IsSubsetMatch(SubjectTransform.TokenizeSubject(import.To), service))
{
import.Latency = null;
import.M1 = null;
}
}
}
}
finally
{
_mu.ExitWriteLock();
}
}
// -------------------------------------------------------------------------
// Export checks
// -------------------------------------------------------------------------
/// <summary>
/// Returns true if the given service subject is exported (exact or wildcard match).
/// Mirrors Go <c>(a *Account) IsExportService(service string) bool</c>.
/// </summary>
public bool IsExportService(string service)
{
_mu.EnterReadLock();
try
{
if (Exports.Services == null)
return false;
if (Exports.Services.ContainsKey(service))
return true;
var tokens = SubjectTransform.TokenizeSubject(service);
foreach (var subj in Exports.Services.Keys)
{
if (SubjectTransform.IsSubsetMatch(tokens, subj))
return true;
}
return false;
}
finally
{
_mu.ExitReadLock();
}
}
/// <summary>
/// Returns true if the given service export has latency tracking enabled.
/// Mirrors Go <c>(a *Account) IsExportServiceTracking(service string) bool</c>.
/// </summary>
public bool IsExportServiceTracking(string service)
{
_mu.EnterReadLock();
try
{
if (Exports.Services == null)
return false;
if (Exports.Services.TryGetValue(service, out var ea))
{
if (ea == null) return false;
if (ea.Latency != null) return true;
}
var tokens = SubjectTransform.TokenizeSubject(service);
foreach (var (subj, se) in Exports.Services)
{
if (SubjectTransform.IsSubsetMatch(tokens, subj) && se?.Latency != null)
return true;
}
return false;
}
finally
{
_mu.ExitReadLock();
}
}
/// <summary>
/// Checks whether another account is approved to import this stream export.
/// Lock must be held on entry (read is sufficient).
/// Mirrors Go <c>(a *Account) checkStreamExportApproved(...) bool</c>.
/// </summary>
internal bool CheckStreamExportApproved(Account account, string subject, object? imClaim)
{
if (Exports.Streams == null) return false;
if (Exports.Streams.TryGetValue(subject, out var ea))
{
if (ea == null) return true;
return CheckAuth(ea, account, imClaim, null);
}
var tokens = SubjectTransform.TokenizeSubject(subject);
foreach (var (subj, se) in Exports.Streams)
{
if (SubjectTransform.IsSubsetMatch(tokens, subj))
{
if (se == null) return true;
return CheckAuth(se, account, imClaim, tokens);
}
}
return false;
}
/// <summary>
/// Checks whether another account is approved to import this service export.
/// Lock must be held on entry (read is sufficient).
/// Mirrors Go <c>(a *Account) checkServiceExportApproved(...) bool</c>.
/// </summary>
internal bool CheckServiceExportApproved(Account account, string subject, object? imClaim)
{
if (Exports.Services == null) return false;
if (Exports.Services.TryGetValue(subject, out var se))
{
if (se == null) return true;
return CheckAuth(se, account, imClaim, null);
}
var tokens = SubjectTransform.TokenizeSubject(subject);
foreach (var (subj, entry) in Exports.Services)
{
if (SubjectTransform.IsSubsetMatch(tokens, subj))
{
if (entry == null) return true;
return CheckAuth(entry, account, imClaim, tokens);
}
}
return false;
}
// -------------------------------------------------------------------------
// User revocation check
// -------------------------------------------------------------------------
/// <summary>
/// Returns true if the user identified by <paramref name="nkey"/> with the
/// given <paramref name="issuedAt"/> timestamp has been revoked.
/// Mirrors Go <c>(a *Account) checkUserRevoked(nkey string, issuedAt int64) bool</c>.
/// </summary>
public bool CheckUserRevoked(string nkey, long issuedAt)
{
_mu.EnterReadLock();
try
{
return IsRevoked(UsersRevoked, nkey, issuedAt);
}
finally
{
_mu.ExitReadLock();
}
}
// -------------------------------------------------------------------------
// Config-reload comparison helpers
// -------------------------------------------------------------------------
/// <summary>
/// Returns true if this account's stream imports equal <paramref name="b"/>'s.
/// Acquires this account's read lock; <paramref name="b"/> must not be
/// concurrently accessed.
/// Mirrors Go <c>(a *Account) checkStreamImportsEqual(b *Account) bool</c>.
/// </summary>
internal bool CheckStreamImportsEqual(Account b)
{
_mu.EnterReadLock();
try
{
var aStreams = Imports.Streams;
var bStreams = b.Imports.Streams;
int aLen = aStreams?.Count ?? 0;
int bLen = bStreams?.Count ?? 0;
if (aLen != bLen) return false;
if (aLen == 0) return true;
// Build an index from (accName+from+to) → entry for b.
var bIndex = new Dictionary<string, StreamImportEntry>(bLen);
foreach (var bim in bStreams!)
{
string key = (bim.Account?.Name ?? string.Empty) + bim.From + bim.To;
bIndex[key] = bim;
}
foreach (var aim in aStreams!)
{
string key = (aim.Account?.Name ?? string.Empty) + aim.From + aim.To;
if (!bIndex.TryGetValue(key, out var bim))
return false;
if (aim.AllowTrace != bim.AllowTrace)
return false;
}
return true;
}
finally
{
_mu.ExitReadLock();
}
}
/// <summary>
/// Returns true if this account's stream exports equal <paramref name="b"/>'s.
/// Acquires this account's read lock; <paramref name="b"/> must not be
/// concurrently accessed.
/// Mirrors Go <c>(a *Account) checkStreamExportsEqual(b *Account) bool</c>.
/// </summary>
internal bool CheckStreamExportsEqual(Account b)
{
_mu.EnterReadLock();
try
{
var aStreams = Exports.Streams;
var bStreams = b.Exports.Streams;
int aLen = aStreams?.Count ?? 0;
int bLen = bStreams?.Count ?? 0;
if (aLen != bLen) return false;
if (aLen == 0) return true;
foreach (var (subj, aea) in aStreams!)
{
if (!bStreams!.TryGetValue(subj, out var bea))
return false;
if (!IsStreamExportEqual(aea, bea))
return false;
}
return true;
}
finally
{
_mu.ExitReadLock();
}
}
/// <summary>
/// Returns true if this account's service exports equal <paramref name="b"/>'s.
/// Acquires this account's read lock; <paramref name="b"/> must not be
/// concurrently accessed.
/// Mirrors Go <c>(a *Account) checkServiceExportsEqual(b *Account) bool</c>.
/// </summary>
internal bool CheckServiceExportsEqual(Account b)
{
_mu.EnterReadLock();
try
{
var aServices = Exports.Services;
var bServices = b.Exports.Services;
int aLen = aServices?.Count ?? 0;
int bLen = bServices?.Count ?? 0;
if (aLen != bLen) return false;
if (aLen == 0) return true;
foreach (var (subj, aea) in aServices!)
{
if (!bServices!.TryGetValue(subj, out var bea))
return false;
if (!IsServiceExportEqual(aea, bea))
return false;
}
return true;
}
finally
{
_mu.ExitReadLock();
}
}
// -------------------------------------------------------------------------
// Leaf-node helpers
// -------------------------------------------------------------------------
/// <summary>
/// Notifies leaf nodes of a subscription change.
/// Stub — full implementation in session 15.
/// Mirrors Go <c>(a *Account) updateLeafNodes(sub, delta)</c>.
/// </summary>
internal void UpdateLeafNodes(object sub, int delta)
{
if (delta == 0 || sub is not Subscription s || s.Subject.Length == 0)
return;
var subject = Encoding.UTF8.GetString(s.Subject);
var queue = s.Queue is { Length: > 0 } ? Encoding.UTF8.GetString(s.Queue) : string.Empty;
_mu.EnterWriteLock();
try
{
_rm ??= new Dictionary<string, int>(StringComparer.Ordinal);
if (!_rm.TryGetValue(subject, out var rc))
rc = 0;
rc += delta;
if (rc <= 0)
_rm.Remove(subject);
else
_rm[subject] = rc;
if (!string.IsNullOrEmpty(queue))
{
_lqws ??= new Dictionary<string, int>(StringComparer.Ordinal);
var key = $"{subject} {queue}";
var qw = s.Qw != 0 ? s.Qw : 1;
if (!_lqws.TryGetValue(key, out var qv))
qv = 0;
qv += delta * qw;
if (qv <= 0)
_lqws.Remove(key);
else
_lqws[key] = qv;
}
}
finally
{
_mu.ExitWriteLock();
}
List<ClientConnection> leafs;
_lmu.EnterReadLock();
try { leafs = [.. _lleafs]; }
finally { _lmu.ExitReadLock(); }
foreach (var leaf in leafs)
leaf.FlushSignal();
}
// -------------------------------------------------------------------------
// addClient / removeClient
// -------------------------------------------------------------------------
/// <summary>
/// Registers a client with this account, updating system and leaf counters.
/// Returns the previous total client count.
/// Mirrors Go <c>(a *Account) addClient(c *client) int</c>.
/// </summary>
private int AddClientInternal(ClientConnection c)
{
_mu.EnterWriteLock();
int prev;
try
{
_clients ??= new HashSet<ClientConnection>();
prev = _clients.Count;
if (!_clients.Add(c))
{
// Client was already present — do nothing.
return prev;
}
if (IsInternalClientKind(c.Kind))
{
_sysclients++;
}
else if (c.Kind == ClientKind.Leaf)
{
_nleafs++;
}
}
finally
{
_mu.ExitWriteLock();
}
// Add leaf to the leaf list (uses separate lock).
if (c.Kind == ClientKind.Leaf)
{
_lmu.EnterWriteLock();
try { _lleafs.Add(c); }
finally { _lmu.ExitWriteLock(); }
}
// TODO: session 12 — notify server via c.srv.accConnsUpdate(a).
return prev;
}
/// <summary>
/// Unregisters a client from this account, updating system and leaf counters.
/// Returns the previous total client count.
/// Mirrors Go <c>(a *Account) removeClient(c *client) int</c>.
/// </summary>
private int RemoveClientInternal(ClientConnection c)
{
_mu.EnterWriteLock();
int prev;
bool wasLeaf = false;
try
{
prev = _clients?.Count ?? 0;
if (_clients == null || !_clients.Remove(c))
return prev;
if (IsInternalClientKind(c.Kind))
{
_sysclients--;
}
else if (c.Kind == ClientKind.Leaf)
{
_nleafs--;
wasLeaf = true;
// Cluster accounting for hub leaf nodes.
if (c.IsHubLeafNode())
{
// TODO: session 15 — c.RemoteCluster() for cluster accounting.
}
}
}
finally
{
_mu.ExitWriteLock();
}
if (wasLeaf)
{
RemoveLeafNode(c);
}
// TODO: session 12 — notify server via c.srv.accConnsUpdate(a).
return prev;
}
/// <summary>
/// Removes a leaf-node client from the ordered leaf list.
/// Uses <see cref="_lmu"/> internally.
/// Mirrors Go <c>(a *Account) removeLeafNode(c *client)</c>.
/// </summary>
private void RemoveLeafNode(ClientConnection c)
{
_lmu.EnterWriteLock();
try
{
int idx = _lleafs.IndexOf(c);
if (idx < 0) return;
int last = _lleafs.Count - 1;
_lleafs[idx] = _lleafs[last];
_lleafs.RemoveAt(last);
}
finally
{
_lmu.ExitWriteLock();
}
}
// -------------------------------------------------------------------------
// INatsAccount implementation
// -------------------------------------------------------------------------
/// <summary>
/// Returns true when the account is valid (not expired).
/// Mirrors Go <c>INatsAccount.IsValid</c>.
/// </summary>
bool INatsAccount.IsValid => !IsExpired();
/// <summary>
/// Delegates to <see cref="MaxTotalConnectionsReached"/>.
/// Mirrors Go <c>INatsAccount.MaxTotalConnectionsReached()</c>.
/// </summary>
bool INatsAccount.MaxTotalConnectionsReached() => MaxTotalConnectionsReached();
/// <summary>
/// Delegates to <see cref="MaxTotalLeafNodesReached"/>.
/// Mirrors Go <c>INatsAccount.MaxTotalLeafNodesReached()</c>.
/// </summary>
bool INatsAccount.MaxTotalLeafNodesReached() => MaxTotalLeafNodesReached();
/// <summary>
/// Registers a client connection. Returns the previous client count.
/// Mirrors Go <c>INatsAccount.AddClient(c)</c>.
/// </summary>
int INatsAccount.AddClient(ClientConnection c) => AddClientInternal(c);
/// <summary>
/// Unregisters a client connection. Returns the previous client count.
/// Mirrors Go <c>INatsAccount.RemoveClient(c)</c>.
/// </summary>
int INatsAccount.RemoveClient(ClientConnection c) => RemoveClientInternal(c);
// -------------------------------------------------------------------------
// Static helpers
// -------------------------------------------------------------------------
/// <summary>
/// Returns true when the user identified by <paramref name="subject"/> with
/// the given <paramref name="issuedAt"/> timestamp has been revoked.
/// Also checks the wildcard entry (jwt.All = "*").
/// Mirrors Go package-level <c>isRevoked(...) bool</c>.
/// </summary>
internal static bool IsRevoked(
Dictionary<string, long>? revocations,
string subject,
long issuedAt)
{
if (revocations == null || revocations.Count == 0)
return false;
// Check specific key.
if (revocations.TryGetValue(subject, out long ts) && ts >= issuedAt)
return true;
// Check wildcard revocation ("*" = jwt.All).
if (revocations.TryGetValue("*", out long tsAll) && tsAll >= issuedAt)
return true;
return false;
}
/// <summary>
/// Returns true if the reply is a tracked reply (ends with ".<hash>.T").
/// Mirrors Go package-level <c>isTrackedReply(reply []byte) bool</c>.
/// </summary>
internal static bool IsTrackedReply(ReadOnlySpan<byte> reply)
{
int lreply = reply.Length - 1;
return lreply > 3 && reply[lreply - 1] == '.' && reply[lreply] == 'T';
}
/// <summary>
/// Validates a mapping destination subject without creating a full transform.
/// Mirrors Go <c>ValidateMapping(src, dest string) error</c> in sublist.go.
/// Returns null on success; an exception on failure.
/// </summary>
internal static Exception? ValidateMapping(string src, string dest)
{
if (string.IsNullOrEmpty(dest))
return null;
bool sfwc = false;
foreach (var token in dest.Split('.'))
{
int length = token.Length;
if (length == 0 || sfwc)
return new MappingDestinationException(token, ServerErrors.ErrInvalidMappingDestinationSubject);
// If it looks like a mapping function, ensure it is a known one.
if (length > 4 && token[0] == '{' && token[1] == '{' &&
token[length - 2] == '}' && token[length - 1] == '}')
{
var (tt, _, _, _, terr) = SubjectTransform.IndexPlaceHolders(token);
if (terr != null) return terr;
if (tt == TransformType.BadTransform)
return new MappingDestinationException(token, ServerErrors.ErrUnknownMappingDestinationFunction);
continue;
}
if (length == 1 && token[0] == '>')
{
sfwc = true;
}
else if (token.IndexOfAny(['\t', '\n', '\f', '\r', ' ']) >= 0)
{
return ServerErrors.ErrInvalidMappingDestinationSubject;
}
}
// Verify the full transform can be constructed.
var (_, err) = SubjectTransform.New(src, dest);
return err;
}
/// <summary>
/// Returns true if the given <see cref="ClientKind"/> is an internal kind
/// (System, JetStream, or Account — not a real user connection).
/// Mirrors Go <c>isInternalClient(kind int) bool</c>.
/// </summary>
private static bool IsInternalClientKind(ClientKind kind) =>
kind is ClientKind.System or ClientKind.JetStream or ClientKind.Account;
// -------------------------------------------------------------------------
// Private helpers
// -------------------------------------------------------------------------
/// <summary>
/// Builds the cumulative-weight destination list from a list of raw-weight
/// <see cref="Destination"/> entries. If the total weight is less than 100
/// and the source was not explicitly listed as a destination, a pass-through
/// entry is auto-added for the remainder.
/// Mirrors Go <c>processDestinations(dests []*destination) ([]*destination, error)</c>.
/// </summary>
private static Exception? ProcessDestinations(
string src,
bool hasWildcard,
HashSet<string> seen,
List<Destination> dests)
{
byte totalWeight = 0;
foreach (var d in dests)
totalWeight += d.Weight;
bool haveSrc = seen.Contains(src);
// Auto-fill the remaining weight with a pass-through to the source.
if (totalWeight != 100 && !haveSrc)
{
string passThroughDest = src;
if (hasWildcard)
passThroughDest = SubjectTransform.TransformTokenize(src);
var (tr, err) = SubjectTransform.New(src, passThroughDest);
if (err != null) return err;
byte aw = dests.Count == 0 ? (byte)100 : (byte)(100 - totalWeight);
dests.Add(new Destination { Transform = tr, Weight = aw });
}
// Sort ascending by raw weight so the cumulative scan is correct.
dests.Sort((a, b) => a.Weight.CompareTo(b.Weight));
// Convert raw weights to cumulative weights.
byte cumulative = 0;
foreach (var d in dests)
{
cumulative += d.Weight;
d.Weight = cumulative;
}
return null;
}
/// <summary>
/// Tokenises a subject string into an array, using the same split logic
/// as <c>btsep</c>-based tokenisation in the Go source.
/// </summary>
private static string[] TokenizeSubjectForMapping(string subject)
{
var parts = new List<string>();
int start = 0;
for (int i = 0; i < subject.Length; i++)
{
if (subject[i] == '.')
{
parts.Add(subject[start..i]);
start = i + 1;
}
}
parts.Add(subject[start..]);
return [.. parts];
}
/// <summary>
/// Returns the cached cluster name for cluster-scoped mapping selection.
/// Delegates to the server when available; returns empty string as a stub.
/// Mirrors Go <c>a.srv.cachedClusterName()</c>.
/// TODO: session 09 — wire up Server.CachedClusterName().
/// </summary>
private string GetCachedClusterName()
{
// TODO: session 09 — Server.CachedClusterName().
return string.Empty;
}
/// <summary>
/// Stops and nulls out a timer. Lock must be held by the caller.
/// Mirrors Go <c>clearTimer(t **time.Timer)</c>.
/// </summary>
private static void ClearTimerLocked(ref Timer? t)
{
t?.Dispose();
t = null;
}
/// <summary>
/// Checks whether <paramref name="account"/> is authorised to use
/// <paramref name="ea"/> (either via explicit approval or token requirement).
/// Mirrors Go <c>(a *Account) checkAuth(...) bool</c>.
/// TODO: session 11 — full JWT activation check.
/// </summary>
private static bool CheckAuth(
ExportAuth ea,
Account account,
object? imClaim,
string[]? tokens)
{
if (ea.Approved != null && ea.Approved.ContainsKey(account.Name))
return true;
if (ea.TokenRequired)
{
// TODO: session 11 — validate activation token in imClaim.
return imClaim != null;
}
// No approved list and no token required → public export.
if (ea.Approved == null) return true;
// AccountPosition embedding check.
if (ea.AccountPosition > 0 && tokens != null)
{
int pos = (int)ea.AccountPosition - 1;
if (pos < tokens.Length && tokens[pos] == account.Name)
return true;
}
return false;
}
/// <summary>
/// Applies account-based authorization rules to an export descriptor.
/// Mirrors Go <c>setExportAuth(&amp;se.exportAuth, ...)</c>.
/// </summary>
private static Exception? SetExportAuth(ExportAuth auth, string subject, IReadOnlyList<Account>? accounts, uint accountPos)
{
if (!SubscriptionIndex.IsValidSubject(subject))
return ServerErrors.ErrBadSubject;
auth.AccountPosition = accountPos;
if (accounts == null || accounts.Count == 0)
{
auth.Approved = null;
return null;
}
var approved = new Dictionary<string, Account>(accounts.Count, StringComparer.Ordinal);
foreach (var account in accounts)
{
if (account == null)
continue;
approved[account.Name] = account;
}
auth.Approved = approved;
return null;
}
// -------------------------------------------------------------------------
// Export equality helpers
// -------------------------------------------------------------------------
private static bool IsStreamExportEqual(StreamExport? a, StreamExport? b)
{
if (a == null && b == null) return true;
if ((a == null) != (b == null)) return false;
return IsExportAuthEqual(a!, b!);
}
private static bool IsServiceExportEqual(ServiceExportEntry? a, ServiceExportEntry? b)
{
if (a == null && b == null) return true;
if ((a == null) != (b == null)) return false;
if (!IsExportAuthEqual(a!, b!)) return false;
if ((a!.Account?.Name ?? string.Empty) != (b!.Account?.Name ?? string.Empty)) return false;
if (a.ResponseType != b.ResponseType) return false;
if (a.AllowTrace != b.AllowTrace) return false;
// Latency comparison.
if ((a.Latency == null) != (b.Latency == null)) return false;
if (a.Latency != null)
{
if (a.Latency.Sampling != b.Latency!.Sampling) return false;
if (a.Latency.Subject != b.Latency.Subject) return false;
}
return true;
}
private static bool IsExportAuthEqual(ExportAuth a, ExportAuth b)
{
if (a.TokenRequired != b.TokenRequired) return false;
if (a.AccountPosition != b.AccountPosition) return false;
int aApproved = a.Approved?.Count ?? 0;
int bApproved = b.Approved?.Count ?? 0;
if (aApproved != bApproved) return false;
if (a.Approved != null)
{
foreach (var (ak, av) in a.Approved)
{
if (!b.Approved!.TryGetValue(ak, out var bv) ||
av.Name != bv.Name)
return false;
}
}
int aRevoked = a.ActivationsRevoked?.Count ?? 0;
int bRevoked = b.ActivationsRevoked?.Count ?? 0;
if (aRevoked != bRevoked) return false;
if (a.ActivationsRevoked != null)
{
foreach (var (ak, av) in a.ActivationsRevoked)
{
if (!b.ActivationsRevoked!.TryGetValue(ak, out var bv) || av != bv)
return false;
}
}
return true;
}
}