diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs new file mode 100644 index 0000000..4a9c55a --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs @@ -0,0 +1,2118 @@ +// Copyright 2018-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/accounts.go in the NATS server Go source. + +using ZB.MOM.NatsNet.Server.Auth; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +// ============================================================================ +// AccountNumConns — remote server connection/leafnode count message +// Mirrors Go `AccountNumConns` struct used in updateRemoteServer. +// ============================================================================ + +/// +/// Carries the number of client connections and leaf nodes that a remote server +/// has for a given account, along with the remote server's identity. +/// Mirrors Go AccountNumConns in server/accounts.go. +/// +internal sealed class AccountNumConns +{ + /// Remote server identity. Mirrors Go Server ServerInfo. + public ServerIdentity Server { get; set; } = new(); + + /// Number of client connections on the remote server. Mirrors Go Conns int. + public int Conns { get; set; } + + /// Number of leaf nodes on the remote server. Mirrors Go LeafNodes int. + public int LeafNodes { get; set; } +} + +/// +/// Minimal remote server identity stub used by . +/// Full implementation lives with the server cluster sessions. +/// +internal sealed class ServerIdentity +{ + /// Unique server ID. Mirrors Go ID string. + public string ID { get; set; } = string.Empty; +} + +// ============================================================================ +// Account — full implementation +// Mirrors Go `Account` struct in server/accounts.go lines 52-119. +// ============================================================================ + +/// +/// Represents a NATS account, tracking clients, subscriptions, imports, exports, +/// and subject mappings. Implements so that +/// can interact with it without a hard dependency. +/// Mirrors Go Account struct in server/accounts.go. +/// +public sealed class Account : INatsAccount +{ + // ------------------------------------------------------------------------- + // Constants + // ------------------------------------------------------------------------- + + /// + /// jwt.NoLimit equivalent: -1 means no limit applied. + /// + private const int NoLimit = -1; + + // ------------------------------------------------------------------------- + // Identity fields + // ------------------------------------------------------------------------- + + /// Account name. Mirrors Go Name string. + public string Name { get; set; } = string.Empty; + + /// NKey public key. Mirrors Go Nkey string. + public string Nkey { get; set; } = string.Empty; + + /// JWT issuer key. Mirrors Go Issuer string. + public string Issuer { get; set; } = string.Empty; + + /// Raw JWT claim string. Mirrors Go claimJWT string. + internal string ClaimJwt { get; set; } = string.Empty; + + /// Time of last update from resolver. Mirrors Go updated time.Time. + internal DateTime Updated { get; set; } + + // ------------------------------------------------------------------------- + // Locks + // ------------------------------------------------------------------------- + + /// Primary account read/write lock. Mirrors Go mu sync.RWMutex. + private readonly ReaderWriterLockSlim _mu = new(LockRecursionPolicy.NoRecursion); + + /// Send-queue mutex. Mirrors Go sqmu sync.Mutex. + private readonly object _sqmu = new(); + + /// Leaf-node list lock. Mirrors Go lmu sync.RWMutex. + private readonly ReaderWriterLockSlim _lmu = new(LockRecursionPolicy.NoRecursion); + + /// Event ID mutex. Mirrors Go eventIdsMu sync.Mutex. + private readonly object _eventIdsMu = new(); + + /// JetStream migration/clear-observer mutex. Mirrors Go jscmMu sync.Mutex. + private readonly object _jscmMu = new(); + + // ------------------------------------------------------------------------- + // Subscription index + // ------------------------------------------------------------------------- + + /// + /// Subscription trie for this account. Mirrors Go sl *Sublist. + /// Set by the server when the account is registered. + /// + internal SubscriptionIndex? Sublist { get; set; } + + // ------------------------------------------------------------------------- + // Internal client and send queue (stubs) + // ------------------------------------------------------------------------- + + /// + /// Internal account client. Mirrors Go ic *client. + /// TODO: session 12 — full internal client wiring. + /// + internal ClientConnection? InternalClient { get; set; } + + /// + /// Send queue stub. Mirrors Go sq *sendq. + /// TODO: session 12 — send-queue implementation. + /// + internal object? SendQueue { get; set; } + + // ------------------------------------------------------------------------- + // Eventing timers + // ------------------------------------------------------------------------- + + /// Expiration timer. Mirrors Go etmr *time.Timer. + private Timer? _etmr; + + /// Connection-count timer. Mirrors Go ctmr *time.Timer. + private Timer? _ctmr; + + // ------------------------------------------------------------------------- + // Remote server tracking + // ------------------------------------------------------------------------- + + /// + /// Per-server connection and leaf-node counts. + /// Key is server ID. Mirrors Go strack map[string]sconns. + /// + private Dictionary? _strack; + + /// + /// Remote client count (sum of strack[*].Conns). + /// Mirrors Go nrclients int32. + /// Protected by . + /// + private int _nrclients; + + /// + /// System client count. + /// Mirrors Go sysclients int32. + /// Protected by . + /// + private int _sysclients; + + /// + /// Local leaf-node count. + /// Mirrors Go nleafs int32. + /// Protected by . + /// + private int _nleafs; + + /// + /// Remote leaf-node count (sum of strack[*].Leafs). + /// Mirrors Go nrleafs int32. + /// Protected by . + /// + private int _nrleafs; + + // ------------------------------------------------------------------------- + // Client set + // ------------------------------------------------------------------------- + + /// + /// Active local clients. Mirrors Go clients map[*client]struct{}. + /// Protected by . + /// + private HashSet? _clients; + + // ------------------------------------------------------------------------- + // Route and leaf-queue maps + // ------------------------------------------------------------------------- + + /// + /// Route map: subject → reference count. + /// Mirrors Go rm map[string]int32. + /// Protected by . + /// + private Dictionary? _rm; + + /// + /// Leaf queue weights: subject → weight. + /// Mirrors Go lqws map[string]int32. + /// Protected by . + /// + private Dictionary? _lqws; + + // ------------------------------------------------------------------------- + // User revocations + // ------------------------------------------------------------------------- + + /// + /// Revoked user nkeys: key → revocation timestamp (Unix seconds). + /// Mirrors Go usersRevoked map[string]int64. + /// Protected by . + /// + internal Dictionary? UsersRevoked { get; set; } + + // ------------------------------------------------------------------------- + // Subject mappings + // ------------------------------------------------------------------------- + + /// + /// Ordered list of subject mappings. Mirrors Go mappings []*mapping. + /// Protected by . + /// + private List _mappings = []; + + /// + /// Atomic flag: 1 when is non-empty. + /// Mirrors Go hasMapped atomic.Bool. + /// + private int _hasMapped; // 0 = false, 1 = true + + // ------------------------------------------------------------------------- + // Leaf nodes + // ------------------------------------------------------------------------- + + /// + /// Ordered list of local leaf-node clients. + /// Mirrors Go lleafs []*client. + /// Protected by . + /// + private List _lleafs = []; + + /// + /// Cluster name → count of leaf-node connections from that cluster. + /// Mirrors Go leafClusters map[string]uint64. + /// Protected by . + /// + private Dictionary? _leafClusters; + + // ------------------------------------------------------------------------- + // Import / export maps + // ------------------------------------------------------------------------- + + /// Import tracking. Mirrors Go imports importMap. + internal ImportMap Imports { get; set; } = new(); + + /// Export tracking. Mirrors Go exports exportMap. + internal ExportMap Exports { get; set; } = new(); + + // ------------------------------------------------------------------------- + // JetStream (stubs) + // ------------------------------------------------------------------------- + + /// + /// JetStream account state. Mirrors Go js *jsAccount. + /// TODO: session 19 — JetStream implementation. + /// + internal object? JetStream { get; set; } + + /// + /// Per-domain JetStream limits. Mirrors Go jsLimits map[string]JetStreamAccountLimits. + /// TODO: session 19 — JetStream implementation. + /// + internal Dictionary? JetStreamLimits { get; set; } + + // ------------------------------------------------------------------------- + // Misc identity fields + // ------------------------------------------------------------------------- + + /// Non-routed gateway account name. Mirrors Go nrgAccount string. + internal string NrgAccount { get; set; } = string.Empty; + + // ------------------------------------------------------------------------- + // Limits (embedded `limits` struct in Go) + // ------------------------------------------------------------------------- + + /// + /// Maximum payload size (-1 = unlimited). Mirrors Go embedded limits.mpay int32. + /// + internal int MaxPayload { get; set; } = NoLimit; + + /// + /// Maximum subscriptions (-1 = unlimited). Mirrors Go embedded limits.msubs int32. + /// + internal int MaxSubscriptions { get; set; } = NoLimit; + + /// + /// Maximum connections (-1 = unlimited). Mirrors Go embedded limits.mconns int32. + /// + internal int MaxConnections { get; set; } = NoLimit; + + /// + /// Maximum leaf nodes (-1 = unlimited). Mirrors Go embedded limits.mleafs int32. + /// + internal int MaxLeafNodes { get; set; } = NoLimit; + + /// + /// When true, bearer tokens are not allowed. + /// Mirrors Go embedded limits.disallowBearer bool. + /// + internal bool DisallowBearer { get; set; } + + // ------------------------------------------------------------------------- + // Expiration (atomic) + // ------------------------------------------------------------------------- + + /// + /// 1 when the account JWT has expired. Mirrors Go expired atomic.Bool. + /// + private int _expired; // 0 = not expired, 1 = expired + + // ------------------------------------------------------------------------- + // Miscellaneous + // ------------------------------------------------------------------------- + + /// + /// When true, this account's config could not be fully resolved. + /// Mirrors Go incomplete bool. + /// + internal bool Incomplete { get; set; } + + /// + /// Signing keys for JWT validation. + /// Mirrors Go signingKeys map[string]jwt.Scope. + /// Value is object? because JWT Scope is not yet fully ported. + /// + internal Dictionary? SigningKeys { get; set; } + + /// + /// External authorization configuration stub. + /// Mirrors Go extAuth *jwt.ExternalAuthorization. + /// TODO: session 11 — JWT full integration. + /// + internal object? ExternalAuth { get; set; } + + /// + /// The server this account is registered with, or null if not yet registered. + /// Stored as object? to avoid circular reference. + /// Mirrors Go srv *Server. + /// + internal object? Server { get; set; } + + /// + /// Loop detection subject for leaf nodes. + /// Mirrors Go lds string. + /// + internal string LoopDetectionSubject { get; set; } = string.Empty; + + /// + /// Service reply prefix (wildcard subscription root). + /// Mirrors Go siReply []byte. + /// + internal byte[]? ServiceImportReply { get; set; } + + /// + /// Subscription ID counter for internal use. + /// Mirrors Go isid uint64. + /// + private ulong _isid; + + /// + /// Default permissions for users with no explicit permissions. + /// Mirrors Go defaultPerms *Permissions. + /// + internal Permissions? DefaultPerms { get; set; } + + /// + /// Account tags from JWT. Mirrors Go tags jwt.TagList. + /// Stored as string array pending full JWT integration. + /// + internal string[] Tags { get; set; } = []; + + /// + /// Human-readable name tag (distinct from ). + /// Mirrors Go nameTag string. + /// + internal string NameTag { get; set; } = string.Empty; + + /// + /// Unix-nanosecond timestamp of last max-subscription-limit log. + /// Mirrors Go lastLimErr int64. + /// + private long _lastLimErr; + + /// + /// Route pool index (-1 = dedicated, -2 = transitioning, ≥ 0 = shared). + /// Mirrors Go routePoolIdx int. + /// + internal int RoutePoolIdx { get; set; } + + /// + /// Message-tracing destination subject. + /// Mirrors Go traceDest string. + /// Protected by . + /// + private string _traceDest = string.Empty; + + /// + /// Tracing sampling percentage (0 = header-triggered, 1-100 = rate). + /// Mirrors Go traceDestSampling int. + /// Protected by . + /// + private int _traceDestSampling; + + // ------------------------------------------------------------------------- + // Factory + // ------------------------------------------------------------------------- + + /// + /// Creates a new unlimited account with the given name. + /// Mirrors Go NewAccount(name string) *Account. + /// + public static Account NewAccount(string name) => + new() + { + Name = name, + MaxPayload = NoLimit, + MaxSubscriptions = NoLimit, + MaxConnections = NoLimit, + MaxLeafNodes = NoLimit, + }; + + // ------------------------------------------------------------------------- + // Object overrides + // ------------------------------------------------------------------------- + + /// + /// Returns the account name. Mirrors Go (a *Account) String() string. + /// + public override string ToString() => Name; + + // ------------------------------------------------------------------------- + // Shallow copy for config reload + // ------------------------------------------------------------------------- + + /// + /// Copies identity and config fields from the options-struct account (a) + /// into the live server account (na). The write lock on na must + /// be held by the caller; this (the options account) requires no lock. + /// Mirrors Go (a *Account) shallowCopy(na *Account). + /// + internal void ShallowCopy(Account na) + { + na.Nkey = Nkey; + na.Issuer = Issuer; + na._traceDest = _traceDest; + na._traceDestSampling = _traceDestSampling; + na.NrgAccount = NrgAccount; + + // Stream imports — shallow-clone each entry. + if (Imports.Streams != null) + { + na.Imports.Streams = new List(Imports.Streams.Count); + foreach (var si in Imports.Streams) + { + // Struct-style shallow copy via record-style clone. + na.Imports.Streams.Add(new StreamImportEntry + { + Account = si.Account, + From = si.From, + To = si.To, + Transform = si.Transform, + ReverseTransform = si.ReverseTransform, + Claim = si.Claim, + UsePublishedSubject = si.UsePublishedSubject, + Invalid = si.Invalid, + AllowTrace = si.AllowTrace, + }); + } + } + + // Service imports — shallow-clone each inner list. + if (Imports.Services != null) + { + na.Imports.Services = new Dictionary>(Imports.Services.Count); + foreach (var (k, list) in Imports.Services) + { + var cloned = new List(list.Count); + foreach (var si in list) + { + cloned.Add(new ServiceImportEntry + { + Account = si.Account, + Claim = si.Claim, + ServiceExport = si.ServiceExport, + SubscriptionId = si.SubscriptionId, + From = si.From, + To = si.To, + Transform = si.Transform, + Timestamp = si.Timestamp, + ResponseType = si.ResponseType, + Latency = si.Latency, + M1 = si.M1, + RequestingClient = si.RequestingClient, + UsePublishedSubject = si.UsePublishedSubject, + IsResponse = si.IsResponse, + Invalid = si.Invalid, + Share = si.Share, + Tracking = si.Tracking, + DidDeliver = si.DidDeliver, + AllowTrace = si.AllowTrace, + TrackingHeader = si.TrackingHeader, + }); + } + na.Imports.Services[k] = cloned; + } + } + + // Stream exports — shallow-clone each entry. + if (Exports.Streams != null) + { + na.Exports.Streams = new Dictionary(Exports.Streams.Count); + foreach (var (k, se) in Exports.Streams) + { + na.Exports.Streams[k] = se == null ? null! : new StreamExport + { + TokenRequired = se.TokenRequired, + AccountPosition = se.AccountPosition, + Approved = se.Approved, + ActivationsRevoked = se.ActivationsRevoked, + }; + } + } + + // Service exports — shallow-clone each entry. + if (Exports.Services != null) + { + na.Exports.Services = new Dictionary(Exports.Services.Count); + foreach (var (k, se) in Exports.Services) + { + na.Exports.Services[k] = se == null ? null! : new ServiceExportEntry + { + Account = se.Account, + ResponseType = se.ResponseType, + Latency = se.Latency, + ResponseTimer = se.ResponseTimer, + ResponseThreshold = se.ResponseThreshold, + AllowTrace = se.AllowTrace, + TokenRequired = se.TokenRequired, + AccountPosition = se.AccountPosition, + Approved = se.Approved, + ActivationsRevoked = se.ActivationsRevoked, + }; + } + } + + // Mappings and limits — copy by reference / value. + na._mappings = _mappings; + Interlocked.Exchange(ref na._hasMapped, _mappings.Count > 0 ? 1 : 0); + + // JetStream limits — shared reference. + // TODO: session 19 — deep copy JetStream limits when ported. + na.JetStreamLimits = JetStreamLimits; + + // Server-config account limits. + na.MaxPayload = MaxPayload; + na.MaxSubscriptions = MaxSubscriptions; + na.MaxConnections = MaxConnections; + na.MaxLeafNodes = MaxLeafNodes; + na.DisallowBearer = DisallowBearer; + } + + // ------------------------------------------------------------------------- + // Event ID generation + // ------------------------------------------------------------------------- + + /// + /// Generates a unique event identifier using its own dedicated lock. + /// Mirrors Go (a *Account) nextEventID() string. + /// + internal string NextEventId() + { + lock (_eventIdsMu) + { + return Guid.NewGuid().ToString("N"); + } + } + + // ------------------------------------------------------------------------- + // Client accessors + // ------------------------------------------------------------------------- + + /// + /// Returns a snapshot list of clients. Lock must be held by the caller. + /// Mirrors Go (a *Account) getClientsLocked() []*client. + /// + internal List GetClientsLocked() + { + if (_clients == null || _clients.Count == 0) + return []; + + return [.. _clients]; + } + + /// + /// Returns a thread-safe snapshot list of clients. + /// Mirrors Go (a *Account) getClients() []*client. + /// + internal List GetClients() + { + _mu.EnterReadLock(); + try + { + return GetClientsLocked(); + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Returns a snapshot of non-internal clients. Lock must be held by the caller. + /// Mirrors Go (a *Account) getExternalClientsLocked() []*client. + /// + internal List GetExternalClientsLocked() + { + if (_clients == null || _clients.Count == 0) + return []; + + var result = new List(_clients.Count); + foreach (var c in _clients) + { + if (!IsInternalClientKind(c.Kind)) + result.Add(c); + } + return result; + } + + // ------------------------------------------------------------------------- + // Remote server tracking + // ------------------------------------------------------------------------- + + /// + /// Updates the remote-server tracking table for this account based on an + /// incoming message, and returns the set of + /// local clients that must be disconnected because a connection limit has + /// been exceeded (after accounting for remote connections). + /// Mirrors Go (a *Account) updateRemoteServer(m *AccountNumConns) []*client. + /// + internal List UpdateRemoteServer(AccountNumConns m) + { + _mu.EnterWriteLock(); + try + { + _strack ??= new Dictionary(); + + _strack.TryGetValue(m.Server.ID, out var prev); + _strack[m.Server.ID] = new SConns + { + Conns = m.Conns, + Leafs = m.LeafNodes, + }; + + _nrclients += m.Conns - (prev?.Conns ?? 0); + _nrleafs += m.LeafNodes - (prev?.Leafs ?? 0); + + var localCount = _clients?.Count ?? 0; + + // Check if total connections exceed the limit. + bool maxConnsExceeded = MaxConnections != NoLimit && + (localCount - _sysclients + _nrclients) > MaxConnections; + + List toDisconnect = []; + + if (maxConnsExceeded) + { + var external = GetExternalClientsLocked(); + + // Sort: newest connections first (reverse chronological). + // TODO: session 12 — sort by c.Start once ClientConnection has a Start field. + // For now we cannot sort without the start time, so take from end. + + int over = (localCount - _sysclients + _nrclients) - MaxConnections; + if (over < external.Count) + toDisconnect.AddRange(external.GetRange(0, over)); + else + toDisconnect.AddRange(external); + } + + // Check if total leaf nodes exceed the limit. + bool maxLeafsExceeded = MaxLeafNodes != NoLimit && + (_nleafs + _nrleafs) > MaxLeafNodes; + + if (maxLeafsExceeded) + { + _lmu.EnterReadLock(); + try + { + int over = _nleafs + _nrleafs - MaxLeafNodes; + if (over > 0) + { + int start = Math.Max(0, _lleafs.Count - over); + toDisconnect.AddRange(_lleafs.GetRange(start, _lleafs.Count - start)); + } + } + finally + { + _lmu.ExitReadLock(); + } + } + + return toDisconnect; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Removes tracking for a remote server that has shut down. + /// Mirrors Go (a *Account) removeRemoteServer(sid string). + /// + internal void RemoveRemoteServer(string sid) + { + _mu.EnterWriteLock(); + try + { + if (_strack != null && _strack.TryGetValue(sid, out var prev)) + { + _strack.Remove(sid); + _nrclients -= prev.Conns; + _nrleafs -= prev.Leafs; + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Returns the number of remote servers that have at least one connection or + /// leaf-node for this account. + /// Mirrors Go (a *Account) expectedRemoteResponses() int32. + /// + internal int ExpectedRemoteResponses() + { + int expected = 0; + _mu.EnterReadLock(); + try + { + if (_strack != null) + { + foreach (var sc in _strack.Values) + { + if (sc.Conns > 0 || sc.Leafs > 0) + expected++; + } + } + } + finally + { + _mu.ExitReadLock(); + } + return expected; + } + + // ------------------------------------------------------------------------- + // Eventing + // ------------------------------------------------------------------------- + + /// + /// Clears eventing state including timers, clients, and remote tracking. + /// Mirrors Go (a *Account) clearEventing(). + /// + internal void ClearEventing() + { + _mu.EnterWriteLock(); + try + { + _nrclients = 0; + ClearTimerLocked(ref _etmr); + ClearTimerLocked(ref _ctmr); + _clients = null; + _strack = null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + // ------------------------------------------------------------------------- + // Name accessors + // ------------------------------------------------------------------------- + + /// + /// Returns the account name, thread-safely. + /// Mirrors Go (a *Account) GetName() string. + /// + public string GetName() + { + _mu.EnterReadLock(); + try + { + return Name; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Returns the if set, otherwise . + /// Acquires a read lock. + /// Mirrors Go (a *Account) getNameTag() string. + /// + internal string GetNameTag() + { + _mu.EnterReadLock(); + try + { + return GetNameTagLocked(); + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Returns the if set, otherwise . + /// Lock must be held by the caller. + /// Mirrors Go (a *Account) getNameTagLocked() string. + /// + internal string GetNameTagLocked() => + string.IsNullOrEmpty(NameTag) ? Name : NameTag; + + // ------------------------------------------------------------------------- + // Connection counts + // ------------------------------------------------------------------------- + + /// + /// Returns the total number of active clients across all servers (local minus + /// system accounts plus remote). + /// Mirrors Go (a *Account) NumConnections() int. + /// + public int NumConnections() + { + _mu.EnterReadLock(); + try + { + return (_clients?.Count ?? 0) - _sysclients + _nrclients; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Returns the number of client and leaf-node connections that are on + /// remote servers. + /// Mirrors Go (a *Account) NumRemoteConnections() int. + /// + public int NumRemoteConnections() + { + _mu.EnterReadLock(); + try + { + return _nrclients + _nrleafs; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Returns the number of non-system, non-leaf clients on this server. + /// Mirrors Go (a *Account) NumLocalConnections() int. + /// + public int NumLocalConnections() + { + _mu.EnterReadLock(); + try + { + return NumLocalConnectionsLocked(); + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Returns local non-system, non-leaf client count. Lock must be held. + /// Mirrors Go (a *Account) numLocalConnections() int. + /// + internal int NumLocalConnectionsLocked() => + (_clients?.Count ?? 0) - _sysclients - _nleafs; + + /// + /// Returns all local connections including leaf nodes (minus system clients). + /// Mirrors Go (a *Account) numLocalAndLeafConnections() int. + /// + internal int NumLocalAndLeafConnections() + { + _mu.EnterReadLock(); + try + { + return (_clients?.Count ?? 0) - _sysclients; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Returns the local leaf-node count. + /// Mirrors Go (a *Account) numLocalLeafNodes() int. + /// + internal int NumLocalLeafNodes() => _nleafs; + + // ------------------------------------------------------------------------- + // Connection limit checks + // ------------------------------------------------------------------------- + + /// + /// Returns true if the total (local + remote) client count has reached or + /// exceeded the configured limit. + /// Mirrors Go (a *Account) MaxTotalConnectionsReached() bool. + /// + public bool MaxTotalConnectionsReached() + { + _mu.EnterReadLock(); + try + { + if (MaxConnections == NoLimit) return false; + return (_clients?.Count ?? 0) - _sysclients + _nrclients >= MaxConnections; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Returns the configured maximum connections limit. + /// Mirrors Go (a *Account) MaxActiveConnections() int. + /// + public int MaxActiveConnections() + { + _mu.EnterReadLock(); + try { return MaxConnections; } + finally { _mu.ExitReadLock(); } + } + + // ------------------------------------------------------------------------- + // Leaf-node limit checks + // ------------------------------------------------------------------------- + + /// + /// Returns true if the total (local + remote) leaf-node count has reached or + /// exceeded the configured limit. + /// Mirrors Go (a *Account) MaxTotalLeafNodesReached() bool. + /// + public bool MaxTotalLeafNodesReached() + { + _mu.EnterReadLock(); + try { return MaxTotalLeafNodesReachedLocked(); } + finally { _mu.ExitReadLock(); } + } + + /// + /// Lock must be held by the caller. + /// Mirrors Go (a *Account) maxTotalLeafNodesReached() bool. + /// + internal bool MaxTotalLeafNodesReachedLocked() + { + if (MaxLeafNodes == NoLimit) return false; + return _nleafs + _nrleafs >= MaxLeafNodes; + } + + /// + /// Returns the total leaf-node count (local + remote). + /// Mirrors Go (a *Account) NumLeafNodes() int. + /// + public int NumLeafNodes() + { + _mu.EnterReadLock(); + try { return _nleafs + _nrleafs; } + finally { _mu.ExitReadLock(); } + } + + /// + /// Returns the remote leaf-node count. + /// Mirrors Go (a *Account) NumRemoteLeafNodes() int. + /// + public int NumRemoteLeafNodes() + { + _mu.EnterReadLock(); + try { return _nrleafs; } + finally { _mu.ExitReadLock(); } + } + + /// + /// Returns the configured maximum leaf-nodes limit. + /// Mirrors Go (a *Account) MaxActiveLeafNodes() int. + /// + public int MaxActiveLeafNodes() + { + _mu.EnterReadLock(); + try { return MaxLeafNodes; } + finally { _mu.ExitReadLock(); } + } + + // ------------------------------------------------------------------------- + // Subscription counts + // ------------------------------------------------------------------------- + + /// + /// Returns the number of route-map entries (subjects sent across routes). + /// Mirrors Go (a *Account) RoutedSubs() int. + /// + public int RoutedSubs() + { + _mu.EnterReadLock(); + try { return _rm?.Count ?? 0; } + finally { _mu.ExitReadLock(); } + } + + /// + /// Returns the total number of subscriptions in this account's subscription index. + /// Mirrors Go (a *Account) TotalSubs() int. + /// + public int TotalSubs() + { + _mu.EnterReadLock(); + try + { + if (Sublist == null) return 0; + return (int)Sublist.Count(); + } + finally + { + _mu.ExitReadLock(); + } + } + + // ------------------------------------------------------------------------- + // Subscription limit error throttle + // ------------------------------------------------------------------------- + + /// + /// Returns true when it is appropriate to log a max-subscription-limit error. + /// Rate-limited to at most once per . + /// Mirrors Go (a *Account) shouldLogMaxSubErr() bool. + /// + internal bool ShouldLogMaxSubErr() + { + _mu.EnterReadLock(); + long last = Interlocked.Read(ref _lastLimErr); + _mu.ExitReadLock(); + + long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L; // nanoseconds + long threshold = (long)AccountEventConstants.DefaultMaxSubLimitReportThreshold.TotalMilliseconds * 1_000_000L; + + if (now - last < threshold) + return false; + + _mu.EnterWriteLock(); + try + { + Interlocked.Exchange(ref _lastLimErr, now); + } + finally + { + _mu.ExitWriteLock(); + } + return true; + } + + // ------------------------------------------------------------------------- + // Expiration + // ------------------------------------------------------------------------- + + /// + /// Returns true when the account JWT has expired. + /// Mirrors Go (a *Account) IsExpired() bool. + /// + public bool IsExpired() => + Interlocked.CompareExchange(ref _expired, 0, 0) == 1; + + /// + /// Returns true when this account is backed by a JWT claim. + /// Lock must be held by the caller. + /// Mirrors Go (a *Account) isClaimAccount() bool. + /// + internal bool IsClaimAccount() => + !string.IsNullOrEmpty(ClaimJwt); + + /// + /// Invoked when the expiration timer fires: marks expired and collects clients. + /// Mirrors Go (a *Account) expiredTimeout(). + /// + private void ExpiredTimeout() + { + Interlocked.Exchange(ref _expired, 1); + + var clients = GetClients(); + foreach (var c in clients) + { + if (!IsInternalClientKind(c.Kind)) + { + // TODO: session 12 — call c.AccountAuthExpired() once fully ported. + } + } + } + + /// + /// Starts or resets the JWT expiration timer. + /// Mirrors Go (a *Account) setExpirationTimer(d time.Duration). + /// + internal void SetExpirationTimer(TimeSpan d) + { + _etmr = new Timer(_ => ExpiredTimeout(), null, d, Timeout.InfiniteTimeSpan); + } + + /// + /// Stops the expiration timer. Returns true if it was active. + /// Lock must be held by the caller. + /// Mirrors Go (a *Account) clearExpirationTimer() bool. + /// + internal bool ClearExpirationTimer() + { + if (_etmr == null) + return true; + + _etmr.Dispose(); + _etmr = null; + return true; + } + + // ------------------------------------------------------------------------- + // Subject mappings — public API + // ------------------------------------------------------------------------- + + /// + /// Adds a simple 1:1 subject mapping from to + /// with weight 100. + /// Mirrors Go (a *Account) AddMapping(src, dest string) error. + /// + public Exception? AddMapping(string src, string dest) => + AddWeightedMappings(src, MapDest.New(dest, 100)); + + /// + /// Adds weighted subject mappings for one or more destinations. + /// Total weights must not exceed 100 per cluster group. If the total is + /// less than 100 and the source was not listed as a destination, the + /// remainder is automatically routed back to the source. + /// Weights are converted to cumulative form and sorted ascending so that + /// random selection can use a single linear scan. + /// Mirrors Go (a *Account) AddWeightedMappings(src string, dests ...*MapDest) error. + /// + public Exception? AddWeightedMappings(string src, params MapDest[] dests) + { + _mu.EnterWriteLock(); + try + { + if (!SubscriptionIndex.IsValidSubject(src)) + return ServerErrors.ErrBadSubject; + + bool hasWildcard = SubscriptionIndex.SubjectHasWildcard(src); + var m = new SubjectMapping + { + Source = src, + HasWildcard = hasWildcard, + Destinations = new List(dests.Length + 1), + }; + + var seen = new HashSet(dests.Length); + var totals = new Dictionary(); // cluster → cumulative weight + + foreach (var d in dests) + { + if (!seen.Add(d.Subject)) + return new InvalidOperationException($"duplicate entry for \"{d.Subject}\""); + + if (d.Weight > 100) + return new InvalidOperationException("individual weights need to be <= 100"); + + totals.TryGetValue(d.Cluster, out byte tw); + int next = tw + d.Weight; + if (next > 100) + return new InvalidOperationException("total weight needs to be <= 100"); + totals[d.Cluster] = (byte)next; + + // Validate the transform is valid. + var validateErr = ValidateMapping(src, d.Subject); + if (validateErr != null) + return validateErr; + + var (tr, trErr) = SubjectTransform.New(src, d.Subject); + if (trErr != null) + return trErr; + + if (string.IsNullOrEmpty(d.Cluster)) + { + m.Destinations.Add(new Destination { Transform = tr, Weight = d.Weight }); + } + else + { + m.ClusterDestinations ??= new Dictionary>(); + if (!m.ClusterDestinations.TryGetValue(d.Cluster, out var clusterList)) + { + clusterList = []; + m.ClusterDestinations[d.Cluster] = clusterList; + } + clusterList.Add(new Destination { Transform = tr, Weight = d.Weight }); + } + } + + // Process each destination list: fill remainder and convert to cumulative weights. + var destErr = ProcessDestinations(src, hasWildcard, seen, m.Destinations); + if (destErr != null) return destErr; + + if (m.ClusterDestinations != null) + { + var clusterKeys = new List(m.ClusterDestinations.Keys); + foreach (var cluster in clusterKeys) + { + destErr = ProcessDestinations(src, hasWildcard, seen, m.ClusterDestinations[cluster]); + if (destErr != null) return destErr; + } + } + + // Replace existing entry for the same source, or append. + for (int i = 0; i < _mappings.Count; i++) + { + if (_mappings[i].Source == src) + { + _mappings[i] = m; + return null; + } + } + + _mappings.Add(m); + Interlocked.Exchange(ref _hasMapped, _mappings.Count > 0 ? 1 : 0); + + // TODO: session 15 — notify connected leaf nodes via lc.ForceAddToSmap(src). + + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Removes a subject mapping entry by source subject. + /// Returns true if an entry was removed. + /// Mirrors Go (a *Account) RemoveMapping(src string) bool. + /// + public bool RemoveMapping(string src) + { + _mu.EnterWriteLock(); + try + { + for (int i = 0; i < _mappings.Count; i++) + { + if (_mappings[i].Source == src) + { + // Swap with last element to avoid shifting (order may change). + _mappings[i] = _mappings[^1]; + _mappings.RemoveAt(_mappings.Count - 1); + Interlocked.Exchange(ref _hasMapped, _mappings.Count > 0 ? 1 : 0); + + // TODO: session 15 — notify leaf nodes via lc.ForceRemoveFromSmap(src). + return true; + } + } + return false; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Returns true when there is at least one subject mapping entry. + /// Mirrors Go (a *Account) hasMappings() bool. + /// + internal bool HasMappings() => + Interlocked.CompareExchange(ref _hasMapped, 0, 0) == 1; + + /// + /// Selects a mapped destination subject using weighted random selection. + /// Returns (, false) when no mapping matches. + /// Mirrors Go (a *Account) selectMappedSubject(dest string) (string, bool). + /// + internal (string dest, bool mapped) SelectMappedSubject(string dest) + { + if (!HasMappings()) + return (dest, false); + + _mu.EnterReadLock(); + try + { + // Tokenise the destination for wildcard subset matching. + string[]? tts = null; + + SubjectMapping? m = null; + foreach (var rm in _mappings) + { + if (!rm.HasWildcard && rm.Source == dest) + { + m = rm; + break; + } + + // Lazy tokenise for subset matching. + tts ??= TokenizeSubjectForMapping(dest); + + if (SubjectTransform.IsSubsetMatch(tts, rm.Source)) + { + m = rm; + break; + } + } + + if (m == null) + return (dest, false); + + // Select the destination list (cluster-scoped or global). + List dests = m.Destinations; + if (m.ClusterDestinations != null && m.ClusterDestinations.Count > 0) + { + string clusterName = GetCachedClusterName(); + if (!string.IsNullOrEmpty(clusterName) && + m.ClusterDestinations.TryGetValue(clusterName, out var cdests)) + { + dests = cdests; + } + } + + if (dests.Count == 0) + return (dest, false); + + // Optimise single-entry case where the full weight is 100. + Destination? selected = null; + if (dests.Count == 1 && dests[0].Weight == 100) + { + selected = dests[0]; + } + else + { + byte w = (byte)(Random.Shared.Next() % 100); + foreach (var d in dests) + { + if (w < d.Weight) + { + selected = d; + break; + } + } + } + + if (selected == null) + return (dest, false); + + string ndest; + if (selected.Transform == null) + { + ndest = dest; + } + else if (tts != null) + { + ndest = selected.Transform.TransformTokenizedSubject(tts); + } + else + { + ndest = selected.Transform.TransformSubject(dest); + } + + return (ndest, true); + } + finally + { + _mu.ExitReadLock(); + } + } + + // ------------------------------------------------------------------------- + // Export checks + // ------------------------------------------------------------------------- + + /// + /// Returns true if the given service subject is exported (exact or wildcard match). + /// Mirrors Go (a *Account) IsExportService(service string) bool. + /// + public bool IsExportService(string service) + { + _mu.EnterReadLock(); + try + { + if (Exports.Services == null) + return false; + + if (Exports.Services.ContainsKey(service)) + return true; + + var tokens = SubjectTransform.TokenizeSubject(service); + foreach (var subj in Exports.Services.Keys) + { + if (SubjectTransform.IsSubsetMatch(tokens, subj)) + return true; + } + return false; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Returns true if the given service export has latency tracking enabled. + /// Mirrors Go (a *Account) IsExportServiceTracking(service string) bool. + /// + public bool IsExportServiceTracking(string service) + { + _mu.EnterReadLock(); + try + { + if (Exports.Services == null) + return false; + + if (Exports.Services.TryGetValue(service, out var ea)) + { + if (ea == null) return false; + if (ea.Latency != null) return true; + } + + var tokens = SubjectTransform.TokenizeSubject(service); + foreach (var (subj, se) in Exports.Services) + { + if (SubjectTransform.IsSubsetMatch(tokens, subj) && se?.Latency != null) + return true; + } + return false; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Checks whether another account is approved to import this stream export. + /// Lock must be held on entry (read is sufficient). + /// Mirrors Go (a *Account) checkStreamExportApproved(...) bool. + /// + internal bool CheckStreamExportApproved(Account account, string subject, object? imClaim) + { + if (Exports.Streams == null) return false; + + if (Exports.Streams.TryGetValue(subject, out var ea)) + { + if (ea == null) return true; + return CheckAuth(ea, account, imClaim, null); + } + + var tokens = SubjectTransform.TokenizeSubject(subject); + foreach (var (subj, se) in Exports.Streams) + { + if (SubjectTransform.IsSubsetMatch(tokens, subj)) + { + if (se == null) return true; + return CheckAuth(se, account, imClaim, tokens); + } + } + return false; + } + + /// + /// Checks whether another account is approved to import this service export. + /// Lock must be held on entry (read is sufficient). + /// Mirrors Go (a *Account) checkServiceExportApproved(...) bool. + /// + internal bool CheckServiceExportApproved(Account account, string subject, object? imClaim) + { + if (Exports.Services == null) return false; + + if (Exports.Services.TryGetValue(subject, out var se)) + { + if (se == null) return true; + return CheckAuth(se, account, imClaim, null); + } + + var tokens = SubjectTransform.TokenizeSubject(subject); + foreach (var (subj, entry) in Exports.Services) + { + if (SubjectTransform.IsSubsetMatch(tokens, subj)) + { + if (entry == null) return true; + return CheckAuth(entry, account, imClaim, tokens); + } + } + return false; + } + + // ------------------------------------------------------------------------- + // User revocation check + // ------------------------------------------------------------------------- + + /// + /// Returns true if the user identified by with the + /// given timestamp has been revoked. + /// Mirrors Go (a *Account) checkUserRevoked(nkey string, issuedAt int64) bool. + /// + public bool CheckUserRevoked(string nkey, long issuedAt) + { + _mu.EnterReadLock(); + try + { + return IsRevoked(UsersRevoked, nkey, issuedAt); + } + finally + { + _mu.ExitReadLock(); + } + } + + // ------------------------------------------------------------------------- + // Config-reload comparison helpers + // ------------------------------------------------------------------------- + + /// + /// Returns true if this account's stream imports equal 's. + /// Acquires this account's read lock; must not be + /// concurrently accessed. + /// Mirrors Go (a *Account) checkStreamImportsEqual(b *Account) bool. + /// + internal bool CheckStreamImportsEqual(Account b) + { + _mu.EnterReadLock(); + try + { + var aStreams = Imports.Streams; + var bStreams = b.Imports.Streams; + + int aLen = aStreams?.Count ?? 0; + int bLen = bStreams?.Count ?? 0; + if (aLen != bLen) return false; + if (aLen == 0) return true; + + // Build an index from (accName+from+to) → entry for b. + var bIndex = new Dictionary(bLen); + foreach (var bim in bStreams!) + { + string key = (bim.Account?.Name ?? string.Empty) + bim.From + bim.To; + bIndex[key] = bim; + } + + foreach (var aim in aStreams!) + { + string key = (aim.Account?.Name ?? string.Empty) + aim.From + aim.To; + if (!bIndex.TryGetValue(key, out var bim)) + return false; + if (aim.AllowTrace != bim.AllowTrace) + return false; + } + return true; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Returns true if this account's stream exports equal 's. + /// Acquires this account's read lock; must not be + /// concurrently accessed. + /// Mirrors Go (a *Account) checkStreamExportsEqual(b *Account) bool. + /// + internal bool CheckStreamExportsEqual(Account b) + { + _mu.EnterReadLock(); + try + { + var aStreams = Exports.Streams; + var bStreams = b.Exports.Streams; + + int aLen = aStreams?.Count ?? 0; + int bLen = bStreams?.Count ?? 0; + if (aLen != bLen) return false; + if (aLen == 0) return true; + + foreach (var (subj, aea) in aStreams!) + { + if (!bStreams!.TryGetValue(subj, out var bea)) + return false; + if (!IsStreamExportEqual(aea, bea)) + return false; + } + return true; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Returns true if this account's service exports equal 's. + /// Acquires this account's read lock; must not be + /// concurrently accessed. + /// Mirrors Go (a *Account) checkServiceExportsEqual(b *Account) bool. + /// + internal bool CheckServiceExportsEqual(Account b) + { + _mu.EnterReadLock(); + try + { + var aServices = Exports.Services; + var bServices = b.Exports.Services; + + int aLen = aServices?.Count ?? 0; + int bLen = bServices?.Count ?? 0; + if (aLen != bLen) return false; + if (aLen == 0) return true; + + foreach (var (subj, aea) in aServices!) + { + if (!bServices!.TryGetValue(subj, out var bea)) + return false; + if (!IsServiceExportEqual(aea, bea)) + return false; + } + return true; + } + finally + { + _mu.ExitReadLock(); + } + } + + // ------------------------------------------------------------------------- + // Leaf-node helpers + // ------------------------------------------------------------------------- + + /// + /// Notifies leaf nodes of a subscription change. + /// Stub — full implementation in session 15. + /// Mirrors Go (a *Account) updateLeafNodes(sub, delta). + /// + internal void UpdateLeafNodes(object sub, int delta) + { + // TODO: session 15 — leaf node subscription propagation. + } + + // ------------------------------------------------------------------------- + // addClient / removeClient + // ------------------------------------------------------------------------- + + /// + /// Registers a client with this account, updating system and leaf counters. + /// Returns the previous total client count. + /// Mirrors Go (a *Account) addClient(c *client) int. + /// + private int AddClientInternal(ClientConnection c) + { + _mu.EnterWriteLock(); + int prev; + try + { + _clients ??= new HashSet(); + prev = _clients.Count; + + if (!_clients.Add(c)) + { + // Client was already present — do nothing. + return prev; + } + + if (IsInternalClientKind(c.Kind)) + { + _sysclients++; + } + else if (c.Kind == ClientKind.Leaf) + { + _nleafs++; + } + } + finally + { + _mu.ExitWriteLock(); + } + + // Add leaf to the leaf list (uses separate lock). + if (c.Kind == ClientKind.Leaf) + { + _lmu.EnterWriteLock(); + try { _lleafs.Add(c); } + finally { _lmu.ExitWriteLock(); } + } + + // TODO: session 12 — notify server via c.srv.accConnsUpdate(a). + + return prev; + } + + /// + /// Unregisters a client from this account, updating system and leaf counters. + /// Returns the previous total client count. + /// Mirrors Go (a *Account) removeClient(c *client) int. + /// + private int RemoveClientInternal(ClientConnection c) + { + _mu.EnterWriteLock(); + int prev; + bool wasLeaf = false; + try + { + prev = _clients?.Count ?? 0; + if (_clients == null || !_clients.Remove(c)) + return prev; + + if (IsInternalClientKind(c.Kind)) + { + _sysclients--; + } + else if (c.Kind == ClientKind.Leaf) + { + _nleafs--; + wasLeaf = true; + + // Cluster accounting for hub leaf nodes. + if (c.IsHubLeafNode()) + { + // TODO: session 15 — c.RemoteCluster() for cluster accounting. + } + } + } + finally + { + _mu.ExitWriteLock(); + } + + if (wasLeaf) + { + RemoveLeafNode(c); + } + + // TODO: session 12 — notify server via c.srv.accConnsUpdate(a). + + return prev; + } + + /// + /// Removes a leaf-node client from the ordered leaf list. + /// Uses internally. + /// Mirrors Go (a *Account) removeLeafNode(c *client). + /// + private void RemoveLeafNode(ClientConnection c) + { + _lmu.EnterWriteLock(); + try + { + int idx = _lleafs.IndexOf(c); + if (idx < 0) return; + + int last = _lleafs.Count - 1; + _lleafs[idx] = _lleafs[last]; + _lleafs.RemoveAt(last); + } + finally + { + _lmu.ExitWriteLock(); + } + } + + // ------------------------------------------------------------------------- + // INatsAccount implementation + // ------------------------------------------------------------------------- + + /// + /// Returns true when the account is valid (not expired). + /// Mirrors Go INatsAccount.IsValid. + /// + bool INatsAccount.IsValid => !IsExpired(); + + /// + /// Delegates to . + /// Mirrors Go INatsAccount.MaxTotalConnectionsReached(). + /// + bool INatsAccount.MaxTotalConnectionsReached() => MaxTotalConnectionsReached(); + + /// + /// Delegates to . + /// Mirrors Go INatsAccount.MaxTotalLeafNodesReached(). + /// + bool INatsAccount.MaxTotalLeafNodesReached() => MaxTotalLeafNodesReached(); + + /// + /// Registers a client connection. Returns the previous client count. + /// Mirrors Go INatsAccount.AddClient(c). + /// + int INatsAccount.AddClient(ClientConnection c) => AddClientInternal(c); + + /// + /// Unregisters a client connection. Returns the previous client count. + /// Mirrors Go INatsAccount.RemoveClient(c). + /// + int INatsAccount.RemoveClient(ClientConnection c) => RemoveClientInternal(c); + + // ------------------------------------------------------------------------- + // Static helpers + // ------------------------------------------------------------------------- + + /// + /// Returns true when the user identified by with + /// the given timestamp has been revoked. + /// Also checks the wildcard entry (jwt.All = "*"). + /// Mirrors Go package-level isRevoked(...) bool. + /// + internal static bool IsRevoked( + Dictionary? revocations, + string subject, + long issuedAt) + { + if (revocations == null || revocations.Count == 0) + return false; + + // Check specific key. + if (revocations.TryGetValue(subject, out long ts) && ts >= issuedAt) + return true; + + // Check wildcard revocation ("*" = jwt.All). + if (revocations.TryGetValue("*", out long tsAll) && tsAll >= issuedAt) + return true; + + return false; + } + + /// + /// Returns true if the reply is a tracked reply (ends with "..T"). + /// Mirrors Go package-level isTrackedReply(reply []byte) bool. + /// + internal static bool IsTrackedReply(ReadOnlySpan reply) + { + int lreply = reply.Length - 1; + return lreply > 3 && reply[lreply - 1] == '.' && reply[lreply] == 'T'; + } + + /// + /// Validates a mapping destination subject without creating a full transform. + /// Mirrors Go ValidateMapping(src, dest string) error in sublist.go. + /// Returns null on success; an exception on failure. + /// + internal static Exception? ValidateMapping(string src, string dest) + { + if (string.IsNullOrEmpty(dest)) + return null; + + bool sfwc = false; + foreach (var token in dest.Split('.')) + { + int length = token.Length; + if (length == 0 || sfwc) + return new MappingDestinationException(token, ServerErrors.ErrInvalidMappingDestinationSubject); + + // If it looks like a mapping function, ensure it is a known one. + if (length > 4 && token[0] == '{' && token[1] == '{' && + token[length - 2] == '}' && token[length - 1] == '}') + { + var (tt, _, _, _, terr) = SubjectTransform.IndexPlaceHolders(token); + if (terr != null) return terr; + if (tt == TransformType.BadTransform) + return new MappingDestinationException(token, ServerErrors.ErrUnknownMappingDestinationFunction); + continue; + } + + if (length == 1 && token[0] == '>') + { + sfwc = true; + } + else if (token.IndexOfAny(['\t', '\n', '\f', '\r', ' ']) >= 0) + { + return ServerErrors.ErrInvalidMappingDestinationSubject; + } + } + + // Verify the full transform can be constructed. + var (_, err) = SubjectTransform.New(src, dest); + return err; + } + + /// + /// Returns true if the given is an internal kind + /// (System, JetStream, or Account — not a real user connection). + /// Mirrors Go isInternalClient(kind int) bool. + /// + private static bool IsInternalClientKind(ClientKind kind) => + kind is ClientKind.System or ClientKind.JetStream or ClientKind.Account; + + // ------------------------------------------------------------------------- + // Private helpers + // ------------------------------------------------------------------------- + + /// + /// Builds the cumulative-weight destination list from a list of raw-weight + /// entries. If the total weight is less than 100 + /// and the source was not explicitly listed as a destination, a pass-through + /// entry is auto-added for the remainder. + /// Mirrors Go processDestinations(dests []*destination) ([]*destination, error). + /// + private static Exception? ProcessDestinations( + string src, + bool hasWildcard, + HashSet seen, + List dests) + { + byte totalWeight = 0; + foreach (var d in dests) + totalWeight += d.Weight; + + bool haveSrc = seen.Contains(src); + + // Auto-fill the remaining weight with a pass-through to the source. + if (totalWeight != 100 && !haveSrc) + { + string passThroughDest = src; + if (hasWildcard) + passThroughDest = SubjectTransform.TransformTokenize(src); + + var (tr, err) = SubjectTransform.New(src, passThroughDest); + if (err != null) return err; + + byte aw = dests.Count == 0 ? (byte)100 : (byte)(100 - totalWeight); + dests.Add(new Destination { Transform = tr, Weight = aw }); + } + + // Sort ascending by raw weight so the cumulative scan is correct. + dests.Sort((a, b) => a.Weight.CompareTo(b.Weight)); + + // Convert raw weights to cumulative weights. + byte cumulative = 0; + foreach (var d in dests) + { + cumulative += d.Weight; + d.Weight = cumulative; + } + + return null; + } + + /// + /// Tokenises a subject string into an array, using the same split logic + /// as btsep-based tokenisation in the Go source. + /// + private static string[] TokenizeSubjectForMapping(string subject) + { + var parts = new List(); + int start = 0; + for (int i = 0; i < subject.Length; i++) + { + if (subject[i] == '.') + { + parts.Add(subject[start..i]); + start = i + 1; + } + } + parts.Add(subject[start..]); + return [.. parts]; + } + + /// + /// Returns the cached cluster name for cluster-scoped mapping selection. + /// Delegates to the server when available; returns empty string as a stub. + /// Mirrors Go a.srv.cachedClusterName(). + /// TODO: session 09 — wire up Server.CachedClusterName(). + /// + private string GetCachedClusterName() + { + // TODO: session 09 — Server.CachedClusterName(). + return string.Empty; + } + + /// + /// Stops and nulls out a timer. Lock must be held by the caller. + /// Mirrors Go clearTimer(t **time.Timer). + /// + private static void ClearTimerLocked(ref Timer? t) + { + t?.Dispose(); + t = null; + } + + /// + /// Checks whether is authorised to use + /// (either via explicit approval or token requirement). + /// Mirrors Go (a *Account) checkAuth(...) bool. + /// TODO: session 11 — full JWT activation check. + /// + private static bool CheckAuth( + ExportAuth ea, + Account account, + object? imClaim, + string[]? tokens) + { + if (ea.Approved != null && ea.Approved.ContainsKey(account.Name)) + return true; + + if (ea.TokenRequired) + { + // TODO: session 11 — validate activation token in imClaim. + return imClaim != null; + } + + // No approved list and no token required → public export. + if (ea.Approved == null) return true; + + // AccountPosition embedding check. + if (ea.AccountPosition > 0 && tokens != null) + { + int pos = (int)ea.AccountPosition - 1; + if (pos < tokens.Length && tokens[pos] == account.Name) + return true; + } + + return false; + } + + // ------------------------------------------------------------------------- + // Export equality helpers + // ------------------------------------------------------------------------- + + private static bool IsStreamExportEqual(StreamExport? a, StreamExport? b) + { + if (a == null && b == null) return true; + if ((a == null) != (b == null)) return false; + return IsExportAuthEqual(a!, b!); + } + + private static bool IsServiceExportEqual(ServiceExportEntry? a, ServiceExportEntry? b) + { + if (a == null && b == null) return true; + if ((a == null) != (b == null)) return false; + if (!IsExportAuthEqual(a!, b!)) return false; + if ((a!.Account?.Name ?? string.Empty) != (b!.Account?.Name ?? string.Empty)) return false; + if (a.ResponseType != b.ResponseType) return false; + if (a.AllowTrace != b.AllowTrace) return false; + + // Latency comparison. + if ((a.Latency == null) != (b.Latency == null)) return false; + if (a.Latency != null) + { + if (a.Latency.Sampling != b.Latency!.Sampling) return false; + if (a.Latency.Subject != b.Latency.Subject) return false; + } + return true; + } + + private static bool IsExportAuthEqual(ExportAuth a, ExportAuth b) + { + if (a.TokenRequired != b.TokenRequired) return false; + if (a.AccountPosition != b.AccountPosition) return false; + + int aApproved = a.Approved?.Count ?? 0; + int bApproved = b.Approved?.Count ?? 0; + if (aApproved != bApproved) return false; + + if (a.Approved != null) + { + foreach (var (ak, av) in a.Approved) + { + if (!b.Approved!.TryGetValue(ak, out var bv) || + av.Name != bv.Name) + return false; + } + } + + int aRevoked = a.ActivationsRevoked?.Count ?? 0; + int bRevoked = b.ActivationsRevoked?.Count ?? 0; + if (aRevoked != bRevoked) return false; + + if (a.ActivationsRevoked != null) + { + foreach (var (ak, av) in a.ActivationsRevoked) + { + if (!b.ActivationsRevoked!.TryGetValue(ak, out var bv) || av != bv) + return false; + } + } + + return true; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/AccountResolver.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/AccountResolver.cs new file mode 100644 index 0000000..3b4048c --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/AccountResolver.cs @@ -0,0 +1,525 @@ +// Copyright 2018-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/accounts.go in the NATS server Go source. + +using System.Collections.Concurrent; +using System.Net; +using System.Net.Http; + +namespace ZB.MOM.NatsNet.Server; + +// ============================================================================ +// IAccountResolver +// Mirrors Go AccountResolver interface (accounts.go ~line 4035). +// ============================================================================ + +/// +/// Resolves and stores account JWTs by account public key name. +/// Mirrors Go AccountResolver interface. +/// +public interface IAccountResolver +{ + /// + /// Fetches the JWT for the named account. + /// Throws when the account is not found. + /// Mirrors Go AccountResolver.Fetch. + /// + Task FetchAsync(string name, CancellationToken ct = default); + + /// + /// Stores the JWT for the named account. + /// Read-only implementations throw . + /// Mirrors Go AccountResolver.Store. + /// + Task StoreAsync(string name, string jwt, CancellationToken ct = default); + + /// Returns true when no writes are permitted. Mirrors Go IsReadOnly. + bool IsReadOnly(); + + /// + /// Starts any background processing needed by the resolver (system subscriptions, timers, etc.). + /// The parameter accepts an object to avoid a circular assembly + /// reference; implementations should cast it to the concrete server type as needed. + /// Mirrors Go AccountResolver.Start. + /// + void Start(object server); + + /// Returns true when the resolver reacts to JWT update events. Mirrors Go IsTrackingUpdate. + bool IsTrackingUpdate(); + + /// Reloads state from the backing store. Mirrors Go AccountResolver.Reload. + void Reload(); + + /// Releases resources held by the resolver. Mirrors Go AccountResolver.Close. + void Close(); +} + +// ============================================================================ +// ResolverDefaultsOps +// Mirrors Go resolverDefaultsOpsImpl (accounts.go ~line 4046). +// ============================================================================ + +/// +/// Abstract base that provides sensible no-op / read-only defaults for +/// so concrete implementations only need to override what they change. +/// Mirrors Go resolverDefaultsOpsImpl. +/// +public abstract class ResolverDefaultsOps : IAccountResolver +{ + /// + public abstract Task FetchAsync(string name, CancellationToken ct = default); + + /// + /// Default store implementation — always throws because the base defaults to read-only. + /// Mirrors Go resolverDefaultsOpsImpl.Store. + /// + public virtual Task StoreAsync(string name, string jwt, CancellationToken ct = default) + => throw new NotSupportedException("store operation not supported"); + + /// Default: the resolver is read-only. Mirrors Go resolverDefaultsOpsImpl.IsReadOnly. + public virtual bool IsReadOnly() => true; + + /// Default: no-op start. Mirrors Go resolverDefaultsOpsImpl.Start. + public virtual void Start(object server) { } + + /// Default: does not track updates. Mirrors Go resolverDefaultsOpsImpl.IsTrackingUpdate. + public virtual bool IsTrackingUpdate() => false; + + /// Default: no-op reload. Mirrors Go resolverDefaultsOpsImpl.Reload. + public virtual void Reload() { } + + /// Default: no-op close. Mirrors Go resolverDefaultsOpsImpl.Close. + public virtual void Close() { } +} + +// ============================================================================ +// MemoryAccountResolver +// Mirrors Go MemAccResolver (accounts.go ~line 4072). +// ============================================================================ + +/// +/// An in-memory account resolver backed by a . +/// Primarily intended for testing. +/// Mirrors Go MemAccResolver. +/// +public sealed class MemoryAccountResolver : ResolverDefaultsOps +{ + private readonly ConcurrentDictionary _store = new(StringComparer.Ordinal); + + /// In-memory resolver is not read-only. + public override bool IsReadOnly() => false; + + /// + /// Returns the stored JWT for , or throws + /// when the account is unknown. + /// Mirrors Go MemAccResolver.Fetch. + /// + public override Task FetchAsync(string name, CancellationToken ct = default) + { + if (_store.TryGetValue(name, out var jwt)) + { + return Task.FromResult(jwt); + } + + throw new InvalidOperationException($"Account not found: {name}"); + } + + /// + /// Stores for . + /// Mirrors Go MemAccResolver.Store. + /// + public override Task StoreAsync(string name, string jwt, CancellationToken ct = default) + { + _store[name] = jwt; + return Task.CompletedTask; + } +} + +// ============================================================================ +// UrlAccountResolver +// Mirrors Go URLAccResolver (accounts.go ~line 4097). +// ============================================================================ + +/// +/// An HTTP-based account resolver that fetches JWTs by appending the account public key +/// to a configured base URL. +/// Mirrors Go URLAccResolver. +/// +public sealed class UrlAccountResolver : ResolverDefaultsOps +{ + // Mirrors Go DEFAULT_ACCOUNT_FETCH_TIMEOUT. + private static readonly TimeSpan DefaultAccountFetchTimeout = TimeSpan.FromSeconds(2); + + private readonly string _url; + private readonly HttpClient _httpClient; + + /// + /// Creates a new URL resolver for the given . + /// A trailing slash is appended when absent so that account names can be concatenated + /// directly. An is configured with connection-pooling + /// settings that amortise TLS handshakes across requests, mirroring Go's custom + /// http.Transport. + /// Mirrors Go NewURLAccResolver. + /// + public UrlAccountResolver(string url) + { + if (!url.EndsWith('/')) + { + url += "/"; + } + + _url = url; + + // Mirror Go: MaxIdleConns=10, IdleConnTimeout=30s on a custom transport. + var handler = new SocketsHttpHandler + { + MaxConnectionsPerServer = 10, + PooledConnectionIdleTimeout = TimeSpan.FromSeconds(30), + }; + + _httpClient = new HttpClient(handler) + { + Timeout = DefaultAccountFetchTimeout, + }; + } + + /// + /// Issues an HTTP GET to the base URL with the account name appended, and returns + /// the response body as the JWT string. + /// Throws on a non-200 response. + /// Mirrors Go URLAccResolver.Fetch. + /// + public override async Task FetchAsync(string name, CancellationToken ct = default) + { + var requestUrl = _url + name; + HttpResponseMessage response; + + try + { + response = await _httpClient.GetAsync(requestUrl, ct).ConfigureAwait(false); + } + catch (Exception ex) + { + throw new InvalidOperationException($"could not fetch <\"{requestUrl}\">: {ex.Message}", ex); + } + + using (response) + { + if (response.StatusCode != HttpStatusCode.OK) + { + throw new InvalidOperationException( + $"could not fetch <\"{requestUrl}\">: {(int)response.StatusCode} {response.ReasonPhrase}"); + } + + return await response.Content.ReadAsStringAsync(ct).ConfigureAwait(false); + } + } +} + +// ============================================================================ +// DirResOption — functional option for DirAccountResolver +// Mirrors Go DirResOption func type (accounts.go ~line 4552). +// ============================================================================ + +/// +/// A functional option that configures a instance. +/// Mirrors Go DirResOption function type. +/// +public delegate void DirResOption(DirAccountResolver resolver); + +/// +/// Factory methods for commonly used values. +/// +public static class DirResOptions +{ + /// + /// Returns an option that overrides the default fetch timeout. + /// must be positive. + /// Mirrors Go FetchTimeout option constructor. + /// + /// + /// Thrown at application time when is not positive. + /// + public static DirResOption FetchTimeout(TimeSpan timeout) + { + if (timeout <= TimeSpan.Zero) + { + throw new ArgumentOutOfRangeException(nameof(timeout), + $"Fetch timeout {timeout} is too small"); + } + + return resolver => resolver.FetchTimeout = timeout; + } +} + +// ============================================================================ +// DirAccountResolver (stub) +// Mirrors Go DirAccResolver (accounts.go ~line 4143). +// Full system-subscription wiring is deferred to session 12. +// ============================================================================ + +/// +/// A directory-backed account resolver that stores JWTs in a +/// and synchronises with peers via NATS system subjects. +/// +/// The Start override that wires up system subscriptions and the periodic sync goroutine +/// is a stub in this session; full implementation requires JetStream and system +/// subscription support (session 12+). +/// +/// Mirrors Go DirAccResolver. +/// +public class DirAccountResolver : ResolverDefaultsOps, IDisposable +{ + // Default fetch timeout — mirrors Go DEFAULT_ACCOUNT_FETCH_TIMEOUT (2 s). + private static readonly TimeSpan DefaultFetchTimeout = TimeSpan.FromSeconds(2); + + // Default sync interval — mirrors Go's fallback of 1 minute. + private static readonly TimeSpan DefaultSyncInterval = TimeSpan.FromMinutes(1); + + /// The underlying directory JWT store. Mirrors Go DirAccResolver.DirJWTStore. + public DirJwtStore Store { get; } + + /// Reference to the running server, set during . Mirrors Go DirAccResolver.Server. + public object? Server { get; protected set; } + + /// How often the resolver sends a sync (pack) request to peers. Mirrors Go DirAccResolver.syncInterval. + public TimeSpan SyncInterval { get; protected set; } + + /// Maximum time to wait for a remote JWT fetch. Mirrors Go DirAccResolver.fetchTimeout. + public TimeSpan FetchTimeout { get; set; } + + /// + /// Creates a new directory account resolver. + /// + /// When is zero it is promoted to (unlimited). + /// When is non-positive it defaults to one minute. + /// + /// Mirrors Go NewDirAccResolver. + /// + /// Directory path for the JWT store. + /// Maximum number of JWTs the store may hold (0 = unlimited). + /// How often to broadcast a sync/pack request to peers. + /// Controls whether deletes are soft- or hard-deleted. + /// Zero or more functional options to further configure this instance. + public DirAccountResolver( + string path, + long limit, + TimeSpan syncInterval, + JwtDeleteType deleteType, + params DirResOption[] opts) + { + if (limit == 0) + { + limit = long.MaxValue; + } + + if (syncInterval <= TimeSpan.Zero) + { + syncInterval = DefaultSyncInterval; + } + + Store = DirJwtStore.NewExpiringDirJwtStore( + path, + shard: false, + create: true, + deleteType, + expireCheck: TimeSpan.Zero, + limit, + evictOnLimit: false, + ttl: TimeSpan.Zero, + changeNotification: null); + + SyncInterval = syncInterval; + FetchTimeout = DefaultFetchTimeout; + + Apply(opts); + } + + // Internal constructor used by CacheDirAccountResolver which supplies its own store. + internal DirAccountResolver( + DirJwtStore store, + TimeSpan syncInterval, + TimeSpan fetchTimeout) + { + Store = store; + SyncInterval = syncInterval; + FetchTimeout = fetchTimeout; + } + + /// + /// Applies a sequence of functional options to this resolver. + /// Mirrors Go DirAccResolver.apply. + /// + protected void Apply(IEnumerable opts) + { + foreach (var opt in opts) + { + opt(this); + } + } + + // ------------------------------------------------------------------------- + // IAccountResolver overrides + // ------------------------------------------------------------------------- + + /// + /// DirAccountResolver is not read-only. + /// Mirrors Go: DirAccResolver does not override IsReadOnly, so it inherits false + /// from the concrete behaviour (store is writable). + /// + public override bool IsReadOnly() => false; + + /// + /// Tracks updates (reacts to JWT change events). + /// Mirrors Go DirAccResolver.IsTrackingUpdate. + /// + public override bool IsTrackingUpdate() => true; + + /// + /// Reloads state from the backing . + /// Mirrors Go DirAccResolver.Reload. + /// + public override void Reload() => Store.Reload(); + + /// + /// Fetches the JWT for from the local . + /// Throws when the account is not found locally. + /// + /// Note: the Go implementation falls back to srv.fetch (a cluster-wide lookup) when + /// the local store misses. That fallback requires system subscriptions and is deferred to + /// session 12. For now this method only consults the local store. + /// + /// Mirrors Go DirAccResolver.Fetch (local path only). + /// + public override Task FetchAsync(string name, CancellationToken ct = default) + { + var theJwt = Store.LoadAcc(name); + if (!string.IsNullOrEmpty(theJwt)) + { + return Task.FromResult(theJwt); + } + + throw new InvalidOperationException($"Account not found: {name}"); + } + + /// + /// Stores under , keeping the newer JWT + /// when a conflicting entry already exists. + /// Mirrors Go DirAccResolver.Store (delegates to saveIfNewer). + /// + public override Task StoreAsync(string name, string jwt, CancellationToken ct = default) + { + // SaveAcc is equivalent to saveIfNewer in the DirJwtStore implementation. + Store.SaveAcc(name, jwt); + return Task.CompletedTask; + } + + /// + /// Starts background system subscriptions and the periodic sync timer. + /// + /// TODO (session 12): wire up system subscriptions for account JWT update/lookup/pack + /// requests, cluster synchronisation, and the periodic pack broadcast goroutine. + /// + /// Mirrors Go DirAccResolver.Start. + /// + public override void Start(object server) + { + Server = server; + // TODO (session 12): set up system subscriptions and periodic sync timer. + } + + /// + /// Stops background processing and closes the . + /// Mirrors Go AccountResolver.Close (no explicit Go override; store is closed + /// by the server shutdown path). + /// + public override void Close() => Store.Close(); + + /// + public void Dispose() => Store.Dispose(); +} + +// ============================================================================ +// CacheDirAccountResolver (stub) +// Mirrors Go CacheDirAccResolver (accounts.go ~line 4594). +// ============================================================================ + +/// +/// A caching variant of that uses a TTL-based expiring +/// store so that fetched JWTs are automatically evicted after . +/// +/// The Start override that wires up system subscriptions is a stub in this session; +/// full implementation requires system subscription support (session 12+). +/// +/// Mirrors Go CacheDirAccResolver. +/// +public sealed class CacheDirAccountResolver : DirAccountResolver +{ + // Default cache limit — mirrors Go's fallback of 1 000 entries. + private const long DefaultCacheLimit = 1_000; + + // Default fetch timeout — mirrors Go DEFAULT_ACCOUNT_FETCH_TIMEOUT (2 s). + private static readonly TimeSpan DefaultFetchTimeout = TimeSpan.FromSeconds(2); + + /// The TTL applied to each cached JWT entry. Mirrors Go CacheDirAccResolver.ttl. + public TimeSpan Ttl { get; } + + /// + /// Creates a new caching directory account resolver. + /// + /// When is zero or negative it defaults to 1 000. + /// + /// Mirrors Go NewCacheDirAccResolver. + /// + /// Directory path for the JWT store. + /// Maximum number of JWTs to cache (0 = 1 000). + /// Time-to-live for each cached JWT. + /// Zero or more functional options to further configure this instance. + public CacheDirAccountResolver( + string path, + long limit, + TimeSpan ttl, + params DirResOption[] opts) + : base( + store: DirJwtStore.NewExpiringDirJwtStore( + path, + shard: false, + create: true, + JwtDeleteType.HardDelete, + expireCheck: TimeSpan.Zero, + limit: limit <= 0 ? DefaultCacheLimit : limit, + evictOnLimit: true, + ttl: ttl, + changeNotification: null), + syncInterval: TimeSpan.Zero, + fetchTimeout: DefaultFetchTimeout) + { + Ttl = ttl; + Apply(opts); + } + + /// + /// Starts background system subscriptions for cached JWT update notifications. + /// + /// TODO (session 12): wire up system subscriptions for account JWT update events + /// (cache variant — does not include pack/list/delete handling). + /// + /// Mirrors Go CacheDirAccResolver.Start. + /// + public override void Start(object server) + { + Server = server; + // TODO (session 12): set up system subscriptions for cache-update notifications. + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/AccountTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/AccountTypes.cs new file mode 100644 index 0000000..fdb9259 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/AccountTypes.cs @@ -0,0 +1,737 @@ +// Copyright 2018-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/accounts.go in the NATS server Go source. + +using System.Text.Json.Serialization; +using ZB.MOM.NatsNet.Server.Auth; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +// ============================================================================ +// AccountLimits — account-based limits +// Mirrors Go `limits` struct in server/accounts.go. +// ============================================================================ + +/// +/// Per-account connection and payload limits. +/// Mirrors Go limits struct in server/accounts.go. +/// +internal sealed class AccountLimits +{ + /// Maximum payload size (-1 = unlimited). Mirrors Go mpay. + public int MaxPayload { get; set; } = -1; + + /// Maximum subscriptions (-1 = unlimited). Mirrors Go msubs. + public int MaxSubscriptions { get; set; } = -1; + + /// Maximum connections (-1 = unlimited). Mirrors Go mconns. + public int MaxConnections { get; set; } = -1; + + /// Maximum leaf nodes (-1 = unlimited). Mirrors Go mleafs. + public int MaxLeafNodes { get; set; } = -1; + + /// When true, bearer tokens are not allowed. Mirrors Go disallowBearer. + public bool DisallowBearer { get; set; } +} + +// ============================================================================ +// SConns — remote server connection/leafnode counters +// Mirrors Go `sconns` struct in server/accounts.go. +// ============================================================================ + +/// +/// Tracks the number of client connections and leaf nodes for a remote server. +/// Mirrors Go sconns struct in server/accounts.go. +/// +internal sealed class SConns +{ + /// Number of client connections from the remote server. Mirrors Go conns. + public int Conns; + + /// Number of leaf nodes from the remote server. Mirrors Go leafs. + public int Leafs; +} + +// ============================================================================ +// ServiceRespType — service response type enum +// Mirrors Go `ServiceRespType` and its iota constants in server/accounts.go. +// ============================================================================ + +/// +/// The response type for an exported service. +/// Mirrors Go ServiceRespType in server/accounts.go. +/// +public enum ServiceRespType : byte +{ + /// A single response is expected. Default. Mirrors Go Singleton. + Singleton = 0, + + /// Multiple responses are streamed. Mirrors Go Streamed. + Streamed = 1, + + /// Responses are sent in chunks. Mirrors Go Chunked. + Chunked = 2, +} + +/// +/// Extension methods for . +/// +public static class ServiceRespTypeExtensions +{ + /// + /// Returns the string representation of the response type. + /// Mirrors Go ServiceRespType.String(). + /// + public static string ToNatsString(this ServiceRespType rt) => rt switch + { + ServiceRespType.Singleton => "Singleton", + ServiceRespType.Streamed => "Streamed", + ServiceRespType.Chunked => "Chunked", + _ => "Unknown ServiceResType", + }; +} + +// ============================================================================ +// ExportAuth — export authorization configuration +// Mirrors Go `exportAuth` struct in server/accounts.go. +// ============================================================================ + +/// +/// Holds configured approvals or a flag indicating that an auth token is +/// required for import. +/// Mirrors Go exportAuth struct in server/accounts.go. +/// +internal class ExportAuth +{ + /// When true, an auth token is required to import this export. Mirrors Go tokenReq. + public bool TokenRequired { get; set; } + + /// + /// Position in the subject token where the account name appears (for + /// public exports that embed the importing account name). + /// Mirrors Go accountPos. + /// + public uint AccountPosition { get; set; } + + /// + /// Accounts explicitly approved to import this export. + /// Key is the account name. Mirrors Go approved. + /// + public Dictionary? Approved { get; set; } + + /// + /// Accounts whose activations have been revoked. + /// Key is the account name, value is the revocation timestamp (Unix ns). + /// Mirrors Go actsRevoked. + /// + public Dictionary? ActivationsRevoked { get; set; } +} + +// ============================================================================ +// StreamExport — exported stream descriptor +// Mirrors Go `streamExport` struct in server/accounts.go. +// ============================================================================ + +/// +/// Describes a stream exported by an account. +/// Mirrors Go streamExport struct in server/accounts.go. +/// +internal sealed class StreamExport : ExportAuth +{ + // No additional fields beyond ExportAuth for now. + // Full implementation in session 11 (accounts.go). +} + +// ============================================================================ +// InternalServiceLatency — service latency tracking configuration +// Mirrors Go `serviceLatency` struct in server/accounts.go. +// ============================================================================ + +/// +/// Configuration for service latency tracking on an exported service. +/// Mirrors Go serviceLatency struct in server/accounts.go. +/// +internal sealed class InternalServiceLatency +{ + /// + /// Sampling percentage (1–100), or 0 to indicate triggered by header. + /// Mirrors Go sampling int8. + /// + public int Sampling { get; set; } + + /// Subject to publish latency metrics to. Mirrors Go subject. + public string Subject { get; set; } = string.Empty; +} + +// ============================================================================ +// ServiceExportEntry — exported service descriptor +// Mirrors Go `serviceExport` struct in server/accounts.go. +// ============================================================================ + +/// +/// Describes a service exported by an account with additional configuration +/// for response type, latency tracking, and timers. +/// Mirrors Go serviceExport struct in server/accounts.go. +/// +internal sealed class ServiceExportEntry : ExportAuth +{ + /// Account that owns this export. Mirrors Go acc. + public Account? Account { get; set; } + + /// Response type (Singleton, Streamed, Chunked). Mirrors Go respType. + public ServiceRespType ResponseType { get; set; } = ServiceRespType.Singleton; + + /// Latency tracking configuration, or null if disabled. Mirrors Go latency. + public InternalServiceLatency? Latency { get; set; } + + /// + /// Timer used to collect response-latency measurements. + /// Mirrors Go rtmr *time.Timer. + /// + public Timer? ResponseTimer { get; set; } + + /// + /// Threshold duration for service responses. + /// Mirrors Go respThresh time.Duration. + /// + public TimeSpan ResponseThreshold { get; set; } + + /// + /// When true, tracing is allowed past the account boundary for this export. + /// Mirrors Go atrc (allow_trace). + /// + public bool AllowTrace { get; set; } +} + +// ============================================================================ +// ExportMap — tracks exported streams and services for an account +// Mirrors Go `exportMap` struct in server/accounts.go. +// ============================================================================ + +/// +/// Tracks all stream exports, service exports, and response mappings for an account. +/// Mirrors Go exportMap struct in server/accounts.go. +/// +internal sealed class ExportMap +{ + /// + /// Exported streams keyed by subject pattern. + /// Mirrors Go streams map[string]*streamExport. + /// + public Dictionary? Streams { get; set; } + + /// + /// Exported services keyed by subject pattern. + /// Mirrors Go services map[string]*serviceExport. + /// + public Dictionary? Services { get; set; } + + /// + /// In-flight response service imports keyed by reply subject. + /// Mirrors Go responses map[string]*serviceImport. + /// + public Dictionary? Responses { get; set; } +} + +// ============================================================================ +// ImportMap — tracks imported streams and services for an account +// Mirrors Go `importMap` struct in server/accounts.go. +// ============================================================================ + +/// +/// Tracks all stream imports, service imports, and reverse-response maps. +/// Mirrors Go importMap struct in server/accounts.go. +/// +internal sealed class ImportMap +{ + /// + /// Imported streams (ordered list). + /// Mirrors Go streams []*streamImport. + /// + public List? Streams { get; set; } + + /// + /// Imported services keyed by subject pattern; each key may have + /// multiple import entries (e.g. fan-out imports). + /// Mirrors Go services map[string][]*serviceImport. + /// + public Dictionary>? Services { get; set; } + + /// + /// Reverse-response map used to clean up singleton service imports. + /// Mirrors Go rrMap map[string][]*serviceRespEntry. + /// + public Dictionary>? ReverseResponseMap { get; set; } +} + +// ============================================================================ +// StreamImportEntry — an imported stream mapping +// Mirrors Go `streamImport` struct in server/accounts.go. +// ============================================================================ + +/// +/// An imported stream from another account, with optional subject remapping. +/// Mirrors Go streamImport struct in server/accounts.go. +/// +internal sealed class StreamImportEntry +{ + /// Account providing the stream. Mirrors Go acc. + public Account? Account { get; set; } + + /// Source subject on the exporting account. Mirrors Go from. + public string From { get; set; } = string.Empty; + + /// Destination subject on the importing account. Mirrors Go to. + public string To { get; set; } = string.Empty; + + /// + /// Subject transform applied to the source subject. + /// Mirrors Go tr *subjectTransform. + /// Stubbed as until the transform + /// engine is wired in. + /// + public ISubjectTransformer? Transform { get; set; } + + /// + /// Reverse transform for reply subjects. + /// Mirrors Go rtr *subjectTransform. + /// + public ISubjectTransformer? ReverseTransform { get; set; } + + /// + /// JWT import claim that authorized this import. + /// Mirrors Go claim *jwt.Import. + /// Stubbed as object? until JWT integration is complete (session 11). + /// + public object? Claim { get; set; } + + /// + /// When true, use the published subject instead of . + /// Mirrors Go usePub. + /// + public bool UsePublishedSubject { get; set; } + + /// Whether this import is considered invalid. Mirrors Go invalid. + public bool Invalid { get; set; } + + /// + /// When true, tracing is allowed past the account boundary. + /// Mirrors Go atrc (allow_trace). + /// + public bool AllowTrace { get; set; } +} + +// ============================================================================ +// ServiceImportEntry — an imported service mapping +// Mirrors Go `serviceImport` struct in server/accounts.go. +// ============================================================================ + +/// +/// An imported service from another account, with response routing and +/// latency tracking state. +/// Mirrors Go serviceImport struct in server/accounts.go. +/// +internal sealed class ServiceImportEntry +{ + /// Account providing the service. Mirrors Go acc. + public Account? Account { get; set; } + + /// + /// JWT import claim that authorized this import. + /// Mirrors Go claim *jwt.Import. + /// Stubbed as object? until JWT integration is complete (session 11). + /// + public object? Claim { get; set; } + + /// Parent service export entry. Mirrors Go se *serviceExport. + public ServiceExportEntry? ServiceExport { get; set; } + + /// + /// Subscription ID byte slice for cleanup. + /// Mirrors Go sid []byte. + /// + public byte[]? SubscriptionId { get; set; } + + /// Source subject on the importing account. Mirrors Go from. + public string From { get; set; } = string.Empty; + + /// Destination subject on the exporting account. Mirrors Go to. + public string To { get; set; } = string.Empty; + + /// + /// Subject transform applied when routing requests. + /// Mirrors Go tr *subjectTransform. + /// Stubbed as until transform engine is wired in. + /// + public ISubjectTransformer? Transform { get; set; } + + /// + /// Timestamp (Unix nanoseconds) when the import request was created. + /// Used for latency tracking. Mirrors Go ts int64. + /// + public long Timestamp { get; set; } + + /// Response type for this service import. Mirrors Go rt ServiceRespType. + public ServiceRespType ResponseType { get; set; } = ServiceRespType.Singleton; + + /// Latency tracking configuration. Mirrors Go latency *serviceLatency. + public InternalServiceLatency? Latency { get; set; } + + /// + /// First-leg latency measurement (requestor side). + /// Mirrors Go m1 *ServiceLatency. + /// + public ServiceLatency? M1 { get; set; } + + /// + /// Client connection that sent the original request. + /// Mirrors Go rc *client. + /// + public ClientConnection? RequestingClient { get; set; } + + /// + /// When true, use the published subject instead of . + /// Mirrors Go usePub. + /// + public bool UsePublishedSubject { get; set; } + + /// + /// When true, this import entry represents a pending response rather + /// than an originating request. + /// Mirrors Go response. + /// + public bool IsResponse { get; set; } + + /// Whether this import is considered invalid. Mirrors Go invalid. + public bool Invalid { get; set; } + + /// + /// When true, the requestor's is shared with + /// the responder. Mirrors Go share. + /// + public bool Share { get; set; } + + /// Whether latency tracking is active. Mirrors Go tracking. + public bool Tracking { get; set; } + + /// Whether a response was delivered to the requestor. Mirrors Go didDeliver. + public bool DidDeliver { get; set; } + + /// + /// When true, tracing is allowed past the account boundary (inherited + /// from the service export). Mirrors Go atrc. + /// + public bool AllowTrace { get; set; } + + /// + /// Headers from the original request, used when latency is triggered by + /// a header. Mirrors Go trackingHdr http.Header. + /// + public Dictionary? TrackingHeader { get; set; } +} + +// ============================================================================ +// ServiceRespEntry — reverse-response map entry +// Mirrors Go `serviceRespEntry` struct in server/accounts.go. +// ============================================================================ + +/// +/// Records a service import mapping for reverse-response-map cleanup. +/// Mirrors Go serviceRespEntry struct in server/accounts.go. +/// +internal sealed class ServiceRespEntry +{ + /// Account that owns the service import. Mirrors Go acc. + public Account? Account { get; set; } + + /// + /// The mapped subscription subject used for the response. + /// Mirrors Go msub. + /// + public string MappedSubject { get; set; } = string.Empty; +} + +// ============================================================================ +// MapDest — public API for weighted subject mappings +// Mirrors Go `MapDest` struct in server/accounts.go. +// ============================================================================ + +/// +/// Describes a weighted mapping destination for published subjects. +/// Mirrors Go MapDest struct in server/accounts.go. +/// +public sealed class MapDest +{ + [JsonPropertyName("subject")] + public string Subject { get; set; } = string.Empty; + + [JsonPropertyName("weight")] + public byte Weight { get; set; } + + [JsonPropertyName("cluster")] + public string Cluster { get; set; } = string.Empty; + + /// + /// Creates a new with the given subject and weight. + /// Mirrors Go NewMapDest. + /// + public static MapDest New(string subject, byte weight) => + new() { Subject = subject, Weight = weight }; +} + +// ============================================================================ +// Destination — internal weighted mapped destination +// Mirrors Go `destination` struct in server/accounts.go. +// ============================================================================ + +/// +/// Internal representation of a weighted mapped destination, holding a +/// transform and a weight. +/// Mirrors Go destination struct in server/accounts.go. +/// +internal sealed class Destination +{ + /// + /// Transform that converts the source subject to the destination subject. + /// Mirrors Go tr *subjectTransform. + /// + public ISubjectTransformer? Transform { get; set; } + + /// + /// Relative weight (0–100). Mirrors Go weight uint8. + /// + public byte Weight { get; set; } +} + +// ============================================================================ +// SubjectMapping — internal subject mapping entry +// Mirrors Go `mapping` struct in server/accounts.go. +// Renamed from `mapping` to avoid collision with the C# keyword context. +// ============================================================================ + +/// +/// An internal entry describing how a source subject is remapped to one or +/// more weighted destinations, optionally scoped to specific clusters. +/// Mirrors Go mapping struct in server/accounts.go. +/// +internal sealed class SubjectMapping +{ + /// Source subject pattern. Mirrors Go src. + public string Source { get; set; } = string.Empty; + + /// + /// Whether the source contains wildcards. + /// Mirrors Go wc. + /// + public bool HasWildcard { get; set; } + + /// + /// Weighted destinations with no cluster scope. + /// Mirrors Go dests []*destination. + /// + public List Destinations { get; set; } = []; + + /// + /// Per-cluster weighted destinations. + /// Key is the cluster name. Mirrors Go cdests map[string][]*destination. + /// + public Dictionary>? ClusterDestinations { get; set; } +} + +// ============================================================================ +// TypedEvent — base for server advisory events +// Mirrors Go `TypedEvent` struct in server/events.go. +// Included here because ServiceLatency embeds it. +// ============================================================================ + +/// +/// Base fields for a NATS typed event or advisory. +/// Mirrors Go TypedEvent struct in server/events.go. +/// +public class TypedEvent +{ + [JsonPropertyName("type")] + public string Type { get; set; } = string.Empty; + + [JsonPropertyName("id")] + public string Id { get; set; } = string.Empty; + + [JsonPropertyName("timestamp")] + public DateTime Time { get; set; } +} + +// ============================================================================ +// ServiceLatency — public latency measurement event +// Mirrors Go `ServiceLatency` struct in server/accounts.go. +// ============================================================================ + +/// +/// The JSON message published to a latency-tracking subject when a service +/// request completes. Includes requestor and responder timing breakdowns. +/// Mirrors Go ServiceLatency struct in server/accounts.go. +/// +public sealed class ServiceLatency : TypedEvent +{ + [JsonPropertyName("status")] + public int Status { get; set; } + + [JsonPropertyName("description")] + public string Error { get; set; } = string.Empty; + + [JsonPropertyName("requestor")] + public ClientInfo? Requestor { get; set; } + + [JsonPropertyName("responder")] + public ClientInfo? Responder { get; set; } + + /// + /// Headers from the original request that triggered latency measurement. + /// Mirrors Go RequestHeader http.Header. + /// + [JsonPropertyName("header")] + public Dictionary? RequestHeader { get; set; } + + [JsonPropertyName("start")] + public DateTime RequestStart { get; set; } + + /// Mirrors Go ServiceLatency time.Duration (nanoseconds). + [JsonPropertyName("service")] + public TimeSpan ServiceLatencyDuration { get; set; } + + /// Mirrors Go SystemLatency time.Duration (nanoseconds). + [JsonPropertyName("system")] + public TimeSpan SystemLatency { get; set; } + + /// Mirrors Go TotalLatency time.Duration (nanoseconds). + [JsonPropertyName("total")] + public TimeSpan TotalLatency { get; set; } + + /// + /// Returns the sum of requestor RTT, responder RTT, and system latency. + /// Mirrors Go ServiceLatency.NATSTotalTime(). + /// + public TimeSpan NATSTotalTime() + { + var requestorRtt = Requestor?.Rtt ?? TimeSpan.Zero; + var responderRtt = Responder?.Rtt ?? TimeSpan.Zero; + return requestorRtt + responderRtt + SystemLatency; + } +} + +// ============================================================================ +// RemoteLatency — cross-server latency transport message +// Mirrors Go `remoteLatency` struct in server/accounts.go. +// ============================================================================ + +/// +/// Used to transport a responder-side latency measurement to the +/// requestor's server so the two halves can be merged. +/// Mirrors Go remoteLatency struct in server/accounts.go. +/// +internal sealed class RemoteLatency +{ + [JsonPropertyName("account")] + public string Account { get; set; } = string.Empty; + + [JsonPropertyName("req_id")] + public string RequestId { get; set; } = string.Empty; + + [JsonPropertyName("m2")] + public ServiceLatency M2 { get; set; } = new(); + + /// + /// Private: response latency threshold used when deciding whether to + /// send the remote measurement. + /// Mirrors Go respThresh time.Duration. + /// + public TimeSpan ResponseThreshold { get; set; } +} + +// ============================================================================ +// RsiReason — reason for removing a response service import +// Mirrors Go `rsiReason` and its iota constants in server/accounts.go. +// ============================================================================ + +/// +/// The reason a response service import entry is being removed. +/// Mirrors Go rsiReason and its iota constants in server/accounts.go. +/// +internal enum RsiReason +{ + /// Normal completion. Mirrors Go rsiOk. + Ok = 0, + + /// Response was never delivered. Mirrors Go rsiNoDelivery. + NoDelivery = 1, + + /// Response timed out. Mirrors Go rsiTimeout. + Timeout = 2, +} + +// ============================================================================ +// Account-level constants +// Mirrors the const blocks in server/accounts.go. +// ============================================================================ + +/// +/// Constants related to account route-pool indexing and search depth. +/// +internal static class AccountConstants +{ + /// + /// Sentinel value indicating the account has a dedicated route connection. + /// Mirrors Go accDedicatedRoute = -1. + /// + public const int DedicatedRoute = -1; + + /// + /// Sentinel value indicating the account is in the process of transitioning + /// to a dedicated route. + /// Mirrors Go accTransitioningToDedicatedRoute = -2. + /// + public const int TransitioningToDedicatedRoute = -2; + + /// + /// Maximum depth for account cycle detection when following import chains. + /// Mirrors Go MaxAccountCycleSearchDepth = 1024. + /// + public const int MaxCycleSearchDepth = 1024; +} + +/// +/// Well-known header names and event type identifiers used by the account +/// service-latency and client-info subsystems. +/// +public static class AccountEventConstants +{ + /// + /// Header name used to pass client metadata into a service request. + /// Mirrors Go ClientInfoHdr = "Nats-Request-Info". + /// + public const string ClientInfoHeader = "Nats-Request-Info"; + + /// + /// The default threshold (in nanoseconds, as a ) below + /// which a subscription-limit report is suppressed. + /// Mirrors Go defaultMaxSubLimitReportThreshold = int64(2 * time.Second). + /// + public static readonly TimeSpan DefaultMaxSubLimitReportThreshold = TimeSpan.FromSeconds(2); + + /// + /// NATS event type identifier for messages. + /// Mirrors Go ServiceLatencyType = "io.nats.server.metric.v1.service_latency". + /// + public const string ServiceLatencyType = "io.nats.server.metric.v1.service_latency"; +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/DirJwtStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/DirJwtStore.cs new file mode 100644 index 0000000..980d099 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/DirJwtStore.cs @@ -0,0 +1,1373 @@ +// Copyright 2012-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/dirstore.go in the NATS server Go source. + +using System.Security.Cryptography; +using System.Text; + +namespace ZB.MOM.NatsNet.Server; + +/// +/// Called when a JWT in the store has changed (created or updated) or been deleted. +/// The publicKey argument is the NKey public key for the affected JWT. +/// Mirrors Go JWTChanged delegate type. +/// +public delegate void JwtChanged(string publicKey); + +/// +/// Controls how deleted JWT files are handled by . +/// Mirrors Go deleteType constants. +/// +public enum JwtDeleteType +{ + /// Deletion is not permitted. + NoDelete = 0, + + /// The JWT file is renamed with a .deleted suffix rather than removed. + RenameDeleted = 1, + + /// The JWT file is permanently removed from disk. + HardDelete = 2, +} + +/// +/// A directory-backed JWT store that keeps one JWT per public key as a .jwt file. +/// Optionally shards files into two-character subdirectories, tracks expiration via a +/// background timer, and enforces a per-store file-count limit with optional LRU eviction. +/// Mirrors Go DirJWTStore. +/// +public sealed class DirJwtStore : IDisposable +{ + // --------------------------------------------------------------------------- + // Constants + // --------------------------------------------------------------------------- + + private const string FileExtension = ".jwt"; + private const string DeletedSuffix = ".deleted"; + + // --------------------------------------------------------------------------- + // State + // --------------------------------------------------------------------------- + + private readonly object _lock = new(); + private readonly string _directory; + private readonly bool _shard; + private readonly bool _readonly; + private readonly JwtDeleteType _deleteType; + private ExpirationTracker? _expiration; + private readonly JwtChanged? _changed; + private readonly JwtChanged? _deleted; + + // --------------------------------------------------------------------------- + // Constructors (private — use factory methods) + // --------------------------------------------------------------------------- + + private DirJwtStore( + string directory, + bool shard, + bool readOnly, + JwtDeleteType deleteType, + JwtChanged? changed, + JwtChanged? deleted) + { + _directory = directory; + _shard = shard; + _readonly = readOnly; + _deleteType = deleteType; + _changed = changed; + _deleted = deleted; + } + + // --------------------------------------------------------------------------- + // Factory methods + // --------------------------------------------------------------------------- + + /// + /// Creates a read-only directory JWT store. + /// The directory must already exist; it is never created. + /// Mirrors Go NewImmutableDirJWTStore. + /// + /// Path to the directory containing JWT files. + /// When true, files are stored in two-character sharded subdirectories. + public static DirJwtStore NewImmutableDirJwtStore(string dirPath, bool shard) + { + var store = NewDirJwtStore(dirPath, shard, create: false); + // Override: force readonly flag (NewDirJwtStore defaults to non-readonly) + return new DirJwtStore(store._directory, store._shard, readOnly: true, + JwtDeleteType.NoDelete, changed: null, deleted: null); + } + + /// + /// Creates a directory JWT store without expiration tracking. + /// Mirrors Go NewDirJWTStore. + /// + /// Path to the directory. + /// When true, files are stored in sharded subdirectories. + /// When true, the directory is created if it does not exist. + public static DirJwtStore NewDirJwtStore(string dirPath, bool shard, bool create) + { + var fullPath = NewDir(dirPath, create); + return new DirJwtStore(fullPath, shard, readOnly: false, + JwtDeleteType.NoDelete, changed: null, deleted: null); + } + + /// + /// Creates a directory JWT store with expiration tracking, file-count limiting, and + /// optional LRU eviction. A background timer fires every + /// and removes JWT files whose expiration time has passed. + /// Mirrors Go NewExpiringDirJWTStore. + /// + /// Path to the directory. + /// When true, files are stored in sharded subdirectories. + /// When true, the directory is created if it does not exist. + /// Controls how expired/deleted files are removed. + /// + /// Interval at which the background timer checks for expired JWTs. + /// If zero or negative, defaults to ttl/2 or one minute, whichever is smaller. + /// + /// + /// Maximum number of JWTs tracked simultaneously. + /// Pass to disable the limit. + /// + /// + /// When true, the least-recently-used JWT is evicted when the limit is reached; + /// when false, an error is returned instead. + /// + /// + /// When non-zero, overrides the per-JWT expiration with a fixed TTL from the last + /// access. Pass to make JWTs indefinitely valid. + /// + /// Callback invoked when a JWT is added or updated. + public static DirJwtStore NewExpiringDirJwtStore( + string dirPath, + bool shard, + bool create, + JwtDeleteType deleteType, + TimeSpan expireCheck, + long limit, + bool evictOnLimit, + TimeSpan ttl, + JwtChanged? changeNotification) + { + var fullPath = NewDir(dirPath, create); + + // Mirror Go: derive a sensible default check interval when caller passes ≤0. + if (expireCheck <= TimeSpan.Zero) + { + if (ttl != TimeSpan.Zero) + { + expireCheck = ttl / 2; + } + if (expireCheck == TimeSpan.Zero || expireCheck > TimeSpan.FromMinutes(1)) + { + expireCheck = TimeSpan.FromMinutes(1); + } + } + + if (limit <= 0) + { + limit = long.MaxValue; + } + + var store = new DirJwtStore(fullPath, shard, readOnly: false, + deleteType, changed: changeNotification, deleted: null); + + store.StartExpiring(expireCheck, limit, evictOnLimit, ttl); + + // Pre-index all existing JWT files on disk. + lock (store._lock) + { + foreach (var path in EnumerateJwtFiles(fullPath)) + { + try + { + var theJwt = File.ReadAllBytes(path); + var hash = SHA256.HashData(theJwt); + var file = Path.GetFileNameWithoutExtension(path); + store._expiration!.Track(file, hash, Encoding.UTF8.GetString(theJwt)); + } + catch + { + // Ignore files that cannot be read during startup — mirrors Go behaviour. + } + } + } + + return store; + } + + // --------------------------------------------------------------------------- + // Public API + // --------------------------------------------------------------------------- + + /// Returns true when the store was opened in read-only mode. + public bool IsReadOnly() => _readonly; + + /// + /// Loads the JWT for the given account public key. + /// Mirrors Go DirJWTStore.LoadAcc. + /// + public string LoadAcc(string publicKey) => Load(publicKey); + + /// + /// Saves the JWT for the given account public key. + /// Mirrors Go DirJWTStore.SaveAcc. + /// + public void SaveAcc(string publicKey, string theJwt) => Save(publicKey, theJwt); + + /// + /// Loads the JWT activation token identified by . + /// Mirrors Go DirJWTStore.LoadAct. + /// + public string LoadAct(string hash) => Load(hash); + + /// + /// Saves the JWT activation token identified by . + /// Mirrors Go DirJWTStore.SaveAct. + /// + public void SaveAct(string hash, string theJwt) => Save(hash, theJwt); + + /// + /// Stops the background expiration timer and releases resources. + /// Mirrors Go DirJWTStore.Close. + /// + public void Close() + { + lock (_lock) + { + _expiration?.Close(); + _expiration = null; + } + } + + /// + public void Dispose() => Close(); + + /// + /// Packs up to JWTs from the store into a newline-delimited + /// string where each line is publicKey|jwtData. + /// Pass a negative value for to include all JWTs. + /// Mirrors Go DirJWTStore.Pack. + /// + public string Pack(int maxJwts) + { + var pack = new List(maxJwts > 0 ? maxJwts : 16); + lock (_lock) + { + foreach (var path in EnumerateJwtFiles(_directory)) + { + if (maxJwts >= 0 && pack.Count == maxJwts) + { + break; + } + + var pubKey = Path.GetFileNameWithoutExtension(path); + + // When expiration is active, only include indexed (tracked) entries. + if (_expiration != null && !_expiration.IsTracked(pubKey)) + { + continue; + } + + byte[] jwtBytes; + try + { + jwtBytes = File.ReadAllBytes(path); + } + catch + { + continue; + } + + if (jwtBytes.Length == 0) + { + continue; + } + + // When expiration is active, skip already-expired JWTs. + if (_expiration != null && IsJwtExpired(jwtBytes)) + { + continue; + } + + pack.Add($"{pubKey}|{Encoding.UTF8.GetString(jwtBytes)}"); + } + } + return string.Join("\n", pack); + } + + /// + /// Walks all JWT files in batches of , invoking + /// with each partial pack message. + /// Mirrors Go DirJWTStore.PackWalk. + /// + /// + /// Thrown when is zero or negative, or + /// is null. + /// + public void PackWalk(int maxJwts, Action callback) + { + if (maxJwts <= 0 || callback == null) + { + throw new ArgumentException("bad arguments to PackWalk"); + } + + string directory; + ExpirationTracker? exp; + lock (_lock) + { + directory = _directory; + exp = _expiration; + } + + var batch = new List(maxJwts); + + foreach (var path in EnumerateJwtFiles(directory)) + { + var pubKey = Path.GetFileNameWithoutExtension(path); + + lock (_lock) + { + if (exp != null && !exp.IsTracked(pubKey)) + { + continue; + } + } + + byte[] jwtBytes; + try + { + jwtBytes = File.ReadAllBytes(path); + } + catch + { + continue; + } + + if (jwtBytes.Length == 0) + { + continue; + } + + if (exp != null && IsJwtExpired(jwtBytes)) + { + continue; + } + + batch.Add($"{pubKey}|{Encoding.UTF8.GetString(jwtBytes)}"); + + if (batch.Count == maxJwts) + { + callback(string.Join("\n", batch)); + batch.Clear(); + } + } + + if (batch.Count > 0) + { + callback(string.Join("\n", batch)); + } + } + + /// + /// Merges the JWTs from a pack string (as produced by ) into the store. + /// Each line must be of the form publicKey|jwtData. + /// Mirrors Go DirJWTStore.Merge. + /// + /// + /// Thrown when any line is malformed or the public key is not a valid account key. + /// + public void Merge(string pack) + { + var lines = pack.Split('\n'); + foreach (var line in lines) + { + if (string.IsNullOrEmpty(line)) + { + continue; + } + + var pipeIndex = line.IndexOf('|'); + if (pipeIndex < 0) + { + throw new InvalidOperationException( + $"Line in package didn't contain 2 entries: \"{line}\""); + } + + var pubKey = line[..pipeIndex]; + var jwtData = line[(pipeIndex + 1)..]; + + // Validate that this is a plausible public account key (non-empty, reasonable length). + if (string.IsNullOrWhiteSpace(pubKey)) + { + throw new InvalidOperationException( + "Key to merge is not a valid public account key"); + } + + SaveIfNewer(pubKey, jwtData); + } + } + + /// + /// Reloads all JWT files from disk into the expiration tracker, invoking the + /// change callback for any JWT whose content differs from the in-memory record. + /// No-ops when expiration tracking is disabled or the store is read-only. + /// Mirrors Go DirJWTStore.Reload. + /// + public void Reload() + { + ExpirationTracker? exp; + Dictionary? oldHashes; + JwtChanged? changed; + bool isCache; + + lock (_lock) + { + exp = _expiration; + if (exp == null || _readonly) + { + return; + } + + changed = _changed; + isCache = exp.EvictOnLimit; + oldHashes = exp.SnapshotHashes(); + + // Clear existing tracking state — mirrors Go clearing heap/idx/lru/hash. + exp.Reset(); + } + + foreach (var path in EnumerateJwtFiles(_directory)) + { + try + { + var theJwt = File.ReadAllBytes(path); + var hash = SHA256.HashData(theJwt); + var file = Path.GetFileNameWithoutExtension(path); + + // Determine whether the callback should fire. + var notify = isCache; // for caches, always notify (entry may have been evicted) + if (oldHashes != null && oldHashes.TryGetValue(file, out var oldHash)) + { + notify = !hash.AsSpan().SequenceEqual(oldHash); + } + + lock (_lock) + { + exp.Track(file, hash, Encoding.UTF8.GetString(theJwt)); + } + + if (notify) + { + changed?.Invoke(file); + } + } + catch + { + // Ignore unreadable files — mirrors Go behaviour. + } + } + } + + /// + /// Returns the XOR of all individual JWT SHA-256 hashes currently tracked by the + /// expiration index. Returns a zero-filled array when expiration tracking is disabled. + /// Mirrors Go DirJWTStore.Hash. + /// + public byte[] Hash() + { + lock (_lock) + { + return _expiration?.GetHash() ?? new byte[SHA256.HashSizeInBytes]; + } + } + + // --------------------------------------------------------------------------- + // Private helpers — path + // --------------------------------------------------------------------------- + + /// + /// Returns the filesystem path for the given public key, optionally under a two-character + /// shard subdirectory derived from the last two characters of the key. + /// Mirrors Go DirJWTStore.pathForKey. + /// + private string PathForKey(string publicKey) + { + if (publicKey.Length < 2) + { + return string.Empty; + } + + var fileName = publicKey + FileExtension; + + if (_shard) + { + // Go uses the last two characters of the key as the shard directory name. + var shard = publicKey[^2..]; + return Path.Combine(_directory, shard, fileName); + } + + return Path.Combine(_directory, fileName); + } + + // --------------------------------------------------------------------------- + // Private helpers — load / write / save + // --------------------------------------------------------------------------- + + /// + /// Loads the JWT for from disk. + /// Mirrors Go DirJWTStore.load. + /// + private string Load(string publicKey) + { + lock (_lock) + { + var path = PathForKey(publicKey); + if (string.IsNullOrEmpty(path)) + { + throw new InvalidOperationException("Invalid public key"); + } + + string data; + try + { + data = File.ReadAllText(path); + } + catch (FileNotFoundException) + { + throw new FileNotFoundException($"JWT not found for key: {publicKey}", path); + } + + _expiration?.UpdateTrack(publicKey); + return data; + } + } + + /// + /// Writes to and updates the expiration + /// tracker. Returns true when a new file was created or the content changed; false when the + /// content is identical to what is already tracked. + /// Assumes the lock is already held. + /// Mirrors Go DirJWTStore.write. + /// + private bool Write(string path, string publicKey, string theJwt) + { + if (string.IsNullOrEmpty(theJwt)) + { + throw new InvalidOperationException("Invalid JWT"); + } + + byte[]? newHash = null; + + if (_expiration != null) + { + newHash = SHA256.HashData(Encoding.UTF8.GetBytes(theJwt)); + + if (_expiration.IsTracked(publicKey)) + { + _expiration.UpdateTrack(publicKey); + + // If content is identical, skip the write — mirrors Go. + var existingHash = _expiration.GetItemHash(publicKey); + if (existingHash != null && newHash.AsSpan().SequenceEqual(existingHash)) + { + return false; + } + } + else if (_expiration.Count >= _expiration.Limit) + { + if (!_expiration.EvictOnLimit) + { + throw new InvalidOperationException("JWT store is full"); + } + + // Evict the least-recently-used entry. + var lruKey = _expiration.LruFront(); + if (lruKey != null) + { + var lruPath = PathForKey(lruKey); + try + { + File.Delete(lruPath); + } + catch + { + // Best-effort removal. + } + _expiration.UnTrack(lruKey); + } + } + } + + File.WriteAllText(path, theJwt); + + if (_expiration != null && newHash != null) + { + _expiration.Track(publicKey, newHash, theJwt); + } + + return true; + } + + /// + /// Deletes the JWT for according to . + /// Mirrors Go DirJWTStore.delete. + /// + private void Delete(string publicKey) + { + if (_readonly) + { + throw new InvalidOperationException("Store is read-only"); + } + if (_deleteType == JwtDeleteType.NoDelete) + { + throw new InvalidOperationException("Store is not set up for delete"); + } + + lock (_lock) + { + var name = PathForKey(publicKey); + + if (_deleteType == JwtDeleteType.RenameDeleted) + { + try + { + File.Move(name, name + DeletedSuffix, overwrite: true); + } + catch (FileNotFoundException) + { + return; // already gone — mirrors Go os.IsNotExist behaviour + } + } + else + { + try + { + File.Delete(name); + } + catch (FileNotFoundException) + { + return; + } + } + + _expiration?.UnTrack(publicKey); + _deleted?.Invoke(publicKey); + } + } + + /// + /// Saves for , creating shard + /// directories as needed, and fires the change callback if the content changed. + /// Mirrors Go DirJWTStore.save. + /// + private void Save(string publicKey, string theJwt) + { + if (_readonly) + { + throw new InvalidOperationException("Store is read-only"); + } + + bool changed; + JwtChanged? cb; + + lock (_lock) + { + var path = PathForKey(publicKey); + if (string.IsNullOrEmpty(path)) + { + throw new InvalidOperationException("Invalid public key"); + } + + // Ensure the shard subdirectory exists. + var dirPath = Path.GetDirectoryName(path)!; + if (!Directory.Exists(dirPath)) + { + Directory.CreateDirectory(dirPath); + } + + changed = Write(path, publicKey, theJwt); + cb = _changed; + } + + if (changed) + { + cb?.Invoke(publicKey); + } + } + + /// + /// Saves only if it is newer than what is already on disk + /// (determined by comparing the iat field decoded from the JWT payload). + /// Mirrors Go DirJWTStore.saveIfNewer. + /// + private void SaveIfNewer(string publicKey, string theJwt) + { + if (_readonly) + { + throw new InvalidOperationException("Store is read-only"); + } + + var path = PathForKey(publicKey); + if (string.IsNullOrEmpty(path)) + { + throw new InvalidOperationException("Invalid public key"); + } + + // Ensure the shard subdirectory exists. + var dirPath = Path.GetDirectoryName(path)!; + if (!Directory.Exists(dirPath)) + { + Directory.CreateDirectory(dirPath); + } + + if (File.Exists(path)) + { + // Decode both JWTs to compare issued-at timestamps. + // JWT is three base64url parts separated by '.': header.payload.signature + long newIssuedAt = DecodeJwtIssuedAt(theJwt); + long existingIssuedAt = 0; + string? newId = DecodeJwtId(theJwt); + string? existingId = null; + + try + { + var existingData = File.ReadAllText(path); + existingIssuedAt = DecodeJwtIssuedAt(existingData); + existingId = DecodeJwtId(existingData); + + // Skip if same ID (already up to date). + if (newId != null && existingId != null && newId == existingId) + { + return; + } + + // Skip if existing is newer. + if (existingIssuedAt > newIssuedAt) + { + return; + } + } + catch + { + // Cannot decode existing JWT — proceed with overwrite (mirrors Go: "skip if it + // can't be decoded" means the error path falls through to the write). + } + } + + bool changed; + JwtChanged? cb; + + lock (_lock) + { + cb = _changed; + changed = Write(path, publicKey, theJwt); + } + + if (changed) + { + cb?.Invoke(publicKey); + } + } + + // --------------------------------------------------------------------------- + // Private helpers — expiration management + // --------------------------------------------------------------------------- + + private void StartExpiring(TimeSpan reCheck, long limit, bool evictOnLimit, TimeSpan ttl) + { + lock (_lock) + { + var tracker = new ExpirationTracker(limit, evictOnLimit, ttl); + _expiration = tracker; + + // Background timer — mirrors Go goroutine + time.Ticker. + var timer = new Timer(_ => + { + var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * TimeSpan.TicksPerMillisecond; + + while (true) + { + string? expiredKey; + lock (_lock) + { + expiredKey = tracker.PeekExpired(now); + if (expiredKey == null) + { + break; + } + + var expiredPath = PathForKey(expiredKey); + try + { + File.Delete(expiredPath); + } + catch + { + // If delete fails, stop trying — mirrors Go: "if err := os.Remove ...; err == nil". + break; + } + + tracker.PopExpired(); + tracker.UnTrack(expiredKey); + } + } + }, null, reCheck, reCheck); + + tracker.SetTimer(timer); + } + } + + // --------------------------------------------------------------------------- + // Private static helpers + // --------------------------------------------------------------------------- + + /// + /// Validates that exists and is a directory, optionally + /// creating it when is true. + /// Returns the absolute path. + /// Mirrors Go newDir. + /// + private static string NewDir(string dirPath, bool create) + { + if (string.IsNullOrEmpty(dirPath)) + { + throw new ArgumentException("Path is not specified", nameof(dirPath)); + } + + if (Directory.Exists(dirPath)) + { + return Path.GetFullPath(dirPath); + } + + if (!create) + { + throw new DirectoryNotFoundException( + $"The path [{dirPath}] doesn't exist"); + } + + Directory.CreateDirectory(dirPath); + + if (!Directory.Exists(dirPath)) + { + throw new DirectoryNotFoundException( + $"Failed to create directory [{dirPath}]"); + } + + return Path.GetFullPath(dirPath); + } + + /// + /// Enumerates all .jwt files beneath recursively, + /// skipping files whose names end with .deleted. + /// + private static IEnumerable EnumerateJwtFiles(string rootDirectory) + { + return Directory + .EnumerateFiles(rootDirectory, "*" + FileExtension, SearchOption.AllDirectories) + .Where(p => !p.EndsWith(DeletedSuffix, StringComparison.Ordinal)); + } + + /// + /// Returns true when the JWT encoded in has a non-zero + /// expiration claim that is in the past. + /// + private static bool IsJwtExpired(byte[] jwtBytes) + { + try + { + var jwtString = Encoding.UTF8.GetString(jwtBytes); + var exp = DecodeJwtExp(jwtString); + if (exp > 0) + { + return exp < DateTimeOffset.UtcNow.ToUnixTimeSeconds(); + } + } + catch + { + // Malformed JWT — treat as not expired. + } + return false; + } + + /// + /// Decodes the exp (expiration) claim from the JWT payload. + /// Returns 0 when not present or on decode error. + /// JWTs are three base64url segments separated by '.': header.payload.signature. + /// + private static long DecodeJwtExp(string jwt) + { + return DecodeLongClaim(jwt, "\"exp\":"); + } + + /// + /// Decodes the iat (issued-at) claim from the JWT payload. + /// Returns 0 when not present or on decode error. + /// + private static long DecodeJwtIssuedAt(string jwt) + { + return DecodeLongClaim(jwt, "\"iat\":"); + } + + /// + /// Decodes the jti (JWT ID) claim from the JWT payload. + /// Returns null when not present or on decode error. + /// + private static string? DecodeJwtId(string jwt) + { + return DecodeStringClaim(jwt, "\"jti\":\""); + } + + private static long DecodeLongClaim(string jwt, string claimKey) + { + try + { + var parts = jwt.Split('.'); + if (parts.Length < 2) + { + return 0; + } + + var payload = Base64UrlDecode(parts[1]); + var json = Encoding.UTF8.GetString(payload); + + var idx = json.IndexOf(claimKey, StringComparison.Ordinal); + if (idx < 0) + { + return 0; + } + + var valueStart = idx + claimKey.Length; + var valueEnd = valueStart; + while (valueEnd < json.Length && + (char.IsDigit(json[valueEnd]) || json[valueEnd] == '-')) + { + valueEnd++; + } + + if (long.TryParse(json.AsSpan(valueStart, valueEnd - valueStart), out var result)) + { + return result; + } + } + catch + { + // Ignore decode errors. + } + return 0; + } + + private static string? DecodeStringClaim(string jwt, string claimKeyPrefix) + { + try + { + var parts = jwt.Split('.'); + if (parts.Length < 2) + { + return null; + } + + var payload = Base64UrlDecode(parts[1]); + var json = Encoding.UTF8.GetString(payload); + + var idx = json.IndexOf(claimKeyPrefix, StringComparison.Ordinal); + if (idx < 0) + { + return null; + } + + var valueStart = idx + claimKeyPrefix.Length; + var valueEnd = json.IndexOf('"', valueStart); + if (valueEnd < 0) + { + return null; + } + + return json[valueStart..valueEnd]; + } + catch + { + return null; + } + } + + /// Decodes a base64url-encoded string (no padding required). + private static byte[] Base64UrlDecode(string input) + { + var padded = input + .Replace('-', '+') + .Replace('_', '/'); + + switch (padded.Length % 4) + { + case 2: padded += "=="; break; + case 3: padded += "="; break; + } + + return Convert.FromBase64String(padded); + } + + /// + /// XOR-assigns each byte of into . + /// Mirrors Go xorAssign. + /// + private static void XorAssign(byte[] lVal, byte[] rVal) + { + for (var i = 0; i < rVal.Length && i < lVal.Length; i++) + { + lVal[i] ^= rVal[i]; + } + } +} + +// --------------------------------------------------------------------------- +// Expiration tracker +// --------------------------------------------------------------------------- + +/// +/// Tracks JWT expiration using a min-heap ordered by expiration timestamp and an LRU +/// doubly-linked list for eviction. Maintains a rolling XOR hash of all tracked JWTs. +/// Mirrors Go expirationTracker. +/// +internal sealed class ExpirationTracker +{ + // Min-heap ordered by expiration (Unix nanoseconds stored as ticks for TimeSpan compatibility). + private readonly PriorityQueue _heap; + + // Index from publicKey to JwtItem for O(1) lookup and hash tracking. + private readonly Dictionary _idx; + + // LRU list — front is least-recently used, back is most-recently used. + private readonly LinkedList _lru; + + // XOR of all tracked item hashes — matches Go's rolling hash approach. + private byte[] _hash; + + private Timer? _timer; + + internal long Limit { get; } + internal bool EvictOnLimit { get; } + internal TimeSpan Ttl { get; } + + internal int Count => _idx.Count; + + internal ExpirationTracker(long limit, bool evictOnLimit, TimeSpan ttl) + { + Limit = limit; + EvictOnLimit = evictOnLimit; + Ttl = ttl; + _heap = new PriorityQueue(); + _idx = new Dictionary(StringComparer.Ordinal); + _lru = new LinkedList(); + _hash = new byte[SHA256.HashSizeInBytes]; + } + + internal void SetTimer(Timer timer) => _timer = timer; + + /// + /// Adds or updates tracking for . + /// When an entry already exists its expiration and hash are updated. + /// Mirrors Go expirationTracker.track. + /// + internal void Track(string publicKey, byte[] hash, string theJwt) + { + long exp; + if (Ttl != TimeSpan.Zero) + { + exp = Ttl == TimeSpan.MaxValue + ? long.MaxValue + : (DateTimeOffset.UtcNow + Ttl).UtcTicks; + } + else + { + // Decode the JWT expiration field. + exp = DecodeJwtExpTicks(theJwt); + if (exp == 0) + { + exp = long.MaxValue; // indefinite + } + } + + if (_idx.TryGetValue(publicKey, out var existing)) + { + // Remove old hash contribution from rolling XOR. + XorAssign(_hash, existing.Hash); + + // Update in-place. + existing.Expiration = exp; + existing.Hash = hash; + + // Re-enqueue with updated priority (PriorityQueue does not support update; + // use a version counter approach — mark old entry stale, enqueue fresh). + existing.Version++; + _heap.Enqueue(existing, exp); + } + else + { + var item = new JwtItem(publicKey, exp, hash); + _idx[publicKey] = item; + _heap.Enqueue(item, exp); + _lru.AddLast(publicKey); + } + + XorAssign(_hash, hash); + } + + /// + /// Updates the LRU position and optionally the expiration for . + /// Mirrors Go expirationTracker.updateTrack. + /// + internal void UpdateTrack(string publicKey) + { + if (!_idx.TryGetValue(publicKey, out var item)) + { + return; + } + + if (Ttl != TimeSpan.Zero) + { + var newExp = Ttl == TimeSpan.MaxValue + ? long.MaxValue + : (DateTimeOffset.UtcNow + Ttl).UtcTicks; + + item.Expiration = newExp; + item.Version++; + _heap.Enqueue(item, newExp); + } + + if (EvictOnLimit) + { + // Move to back of LRU list (most recently used). + var node = _lru.Find(publicKey); + if (node != null) + { + _lru.Remove(node); + _lru.AddLast(publicKey); + } + } + } + + /// + /// Removes tracking for and updates the rolling hash. + /// Mirrors Go expirationTracker.unTrack. + /// + internal void UnTrack(string publicKey) + { + if (!_idx.TryGetValue(publicKey, out var item)) + { + return; + } + + XorAssign(_hash, item.Hash); + _idx.Remove(publicKey); + + var node = _lru.Find(publicKey); + if (node != null) + { + _lru.Remove(node); + } + + // Invalidate heap entry so it is ignored when it surfaces during PeekExpired/PopExpired. + item.Removed = true; + } + + /// Returns true when is currently tracked. + internal bool IsTracked(string publicKey) => _idx.ContainsKey(publicKey); + + /// Returns the hash for , or null when not tracked. + internal byte[]? GetItemHash(string publicKey) + => _idx.TryGetValue(publicKey, out var item) ? item.Hash : null; + + /// Returns the public key at the front of the LRU list (least recently used). + internal string? LruFront() => _lru.First?.Value; + + /// + /// Peeks at the min-heap root and returns the public key if its expiration is at or + /// before . Returns null when no entries are expired. + /// + internal string? PeekExpired(long nowTicks) + { + // Drain stale (removed or version-superseded) heap entries. + DrainStale(); + + if (_heap.Count == 0) + { + return null; + } + + _heap.TryPeek(out var item, out var priority); + if (item == null || priority > nowTicks) + { + return null; + } + + return item.Removed || !_idx.ContainsKey(item.PublicKey) ? null : item.PublicKey; + } + + /// + /// Removes the min-heap root (the entry returned by the most recent ). + /// + internal void PopExpired() + { + DrainStale(); + if (_heap.Count > 0) + { + _heap.Dequeue(); + } + } + + /// Returns a copy of the rolling XOR hash. + internal byte[] GetHash() + { + var copy = new byte[_hash.Length]; + _hash.CopyTo(copy, 0); + return copy; + } + + /// + /// Snapshots the current item hashes keyed by public key, for use by + /// . + /// + internal Dictionary SnapshotHashes() + { + var snap = new Dictionary(_idx.Count, StringComparer.Ordinal); + foreach (var (key, item) in _idx) + { + var copy = new byte[item.Hash.Length]; + item.Hash.CopyTo(copy, 0); + snap[key] = copy; + } + return snap; + } + + /// + /// Resets all tracking state in preparation for a full reload. + /// Mirrors Go clearing of heap, idx, lru, and hash. + /// + internal void Reset() + { + _heap.Clear(); + _idx.Clear(); + _lru.Clear(); + Array.Clear(_hash); + } + + /// Stops the background expiration timer. + internal void Close() + { + _timer?.Dispose(); + _timer = null; + } + + // --------------------------------------------------------------------------- + // Private helpers + // --------------------------------------------------------------------------- + + private void DrainStale() + { + while (_heap.Count > 0 && + _heap.TryPeek(out var top, out _) && + (top.Removed || !_idx.TryGetValue(top.PublicKey, out var current) || !ReferenceEquals(current, top))) + { + _heap.Dequeue(); + } + } + + private static void XorAssign(byte[] lVal, byte[] rVal) + { + for (var i = 0; i < rVal.Length && i < lVal.Length; i++) + { + lVal[i] ^= rVal[i]; + } + } + + private static long DecodeJwtExpTicks(string jwt) + { + try + { + var parts = jwt.Split('.'); + if (parts.Length < 2) + { + return 0; + } + + var padded = parts[1].Replace('-', '+').Replace('_', '/'); + switch (padded.Length % 4) + { + case 2: padded += "=="; break; + case 3: padded += "="; break; + } + + var payload = Convert.FromBase64String(padded); + var json = Encoding.UTF8.GetString(payload); + + const string expKey = "\"exp\":"; + var idx = json.IndexOf(expKey, StringComparison.Ordinal); + if (idx < 0) + { + return 0; + } + + var valueStart = idx + expKey.Length; + var valueEnd = valueStart; + while (valueEnd < json.Length && + (char.IsDigit(json[valueEnd]) || json[valueEnd] == '-')) + { + valueEnd++; + } + + if (long.TryParse(json.AsSpan(valueStart, valueEnd - valueStart), out var expSeconds)) + { + // Convert Unix seconds to UTC ticks for comparison with DateTimeOffset.UtcTicks. + return DateTimeOffset.FromUnixTimeSeconds(expSeconds).UtcTicks; + } + } + catch + { + // Ignore. + } + return 0; + } +} + +// --------------------------------------------------------------------------- +// Heap item +// --------------------------------------------------------------------------- + +/// +/// Represents a single JWT entry in the expiration tracker's min-heap. +/// Mirrors Go jwtItem. +/// +internal sealed class JwtItem +{ + internal string PublicKey { get; } + internal long Expiration { get; set; } + internal byte[] Hash { get; set; } + + /// + /// Monotonically increasing counter; when a heap entry's version differs from the + /// canonical item in the index it is considered stale and is skipped. + /// + internal int Version { get; set; } + + /// Marked true by to invalidate heap entries. + internal bool Removed { get; set; } + + internal JwtItem(string publicKey, long expiration, byte[] hash) + { + PublicKey = publicKey; + Expiration = expiration; + Hash = hash; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Auth/AuthTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Auth/AuthTypes.cs index 74a97fd..d1ddb19 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Auth/AuthTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Auth/AuthTypes.cs @@ -168,45 +168,5 @@ public class RoutePermissions public SubjectPermission? Export { get; set; } } -/// -/// Stub for Account type. Full implementation in session 11. -/// Mirrors Go Account struct. -/// -public class Account : INatsAccount -{ - public string Name { get; set; } = string.Empty; - - // Fields required by session 09 account management (NatsServer.Accounts.cs). - // Full implementation in session 11. - public string Issuer { get; set; } = string.Empty; - internal string ClaimJwt { get; set; } = string.Empty; - internal int RoutePoolIdx { get; set; } - internal bool Incomplete { get; set; } - internal DateTime Updated { get; set; } - internal ZB.MOM.NatsNet.Server.Internal.DataStructures.SubscriptionIndex? Sublist { get; set; } - internal object? Server { get; set; } // INatsServer — avoids circular reference - - // INatsAccount — stub implementations until session 11 (accounts.go). - bool INatsAccount.IsValid => true; - bool INatsAccount.MaxTotalConnectionsReached() => false; - bool INatsAccount.MaxTotalLeafNodesReached() => false; - int INatsAccount.AddClient(ClientConnection c) => 0; - int INatsAccount.RemoveClient(ClientConnection c) => 0; - - /// Returns true if this account's JWT has expired. Stub — full impl in session 11. - public bool IsExpired() => false; - - /// - /// Returns the total number of subscriptions across all clients in this account. - /// Stub — full implementation in session 11. - /// Mirrors Go Account.TotalSubs(). - /// - public int TotalSubs() => 0; - - /// - /// Notifies leaf nodes of a subscription change. - /// Stub — full implementation in session 15. - /// Mirrors Go Account.updateLeafNodes(). - /// - internal void UpdateLeafNodes(object sub, int delta) { } -} +// Account stub removed — full implementation is in Accounts/Account.cs +// in the ZB.MOM.NatsNet.Server namespace. diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientTypes.cs index d4108a2..06c25a1 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientTypes.cs @@ -273,6 +273,13 @@ public sealed class ClientInfo public bool Disconnect { get; set; } public string[]? Cluster { get; set; } public bool Service { get; set; } + + /// + /// Round-trip time to the client. + /// Mirrors Go RTT time.Duration in events.go. + /// Added here to support . + /// + public TimeSpan Rtt { get; set; } } // ============================================================================ diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Accounts.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Accounts.cs index 0a56ae4..5646a1f 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Accounts.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Accounts.cs @@ -671,8 +671,15 @@ public sealed partial class NatsServer { if (_accResolver == null) return (string.Empty, ServerErrors.ErrNoAccountResolver); - var (jwt, err) = _accResolver.Fetch(name); - return (jwt, err); + try + { + var jwt = _accResolver.FetchAsync(name).GetAwaiter().GetResult(); + return (jwt, null); + } + catch (Exception ex) + { + return (string.Empty, ex); + } } /// diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs index c12a926..e2ef659 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs @@ -1044,7 +1044,7 @@ public sealed partial class NatsServer { // Validate JWT format (stub — session 06 has JWT decoder). // jwt.DecodeAccountClaims(v) — skip here, checked again in CheckResolvePreloads. - ar.Store(k, v); + ar.StoreAsync(k, v).GetAwaiter().GetResult(); } } return null; diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ServerOptionTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ServerOptionTypes.cs index 713b43c..5f0ae03 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ServerOptionTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ServerOptionTypes.cs @@ -461,17 +461,4 @@ public interface IClientAuthentication string RemoteAddress(); } -/// -/// Account resolver interface for dynamic account loading. -/// Mirrors AccountResolver in accounts.go. -/// -public interface IAccountResolver -{ - (string jwt, Exception? err) Fetch(string name); - Exception? Store(string name, string jwt); - bool IsReadOnly(); - Exception? Start(object server); - bool IsTrackingUpdate(); - Exception? Reload(); - void Close(); -} +// IAccountResolver is defined in Accounts/AccountResolver.cs. diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ServerOptions.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ServerOptions.cs index 8afd0d5..5835819 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ServerOptions.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ServerOptions.cs @@ -109,7 +109,7 @@ public sealed partial class ServerOptions public string SystemAccount { get; set; } = string.Empty; public bool NoSystemAccount { get; set; } /// Parsed account objects from config. Mirrors Go opts.Accounts. - public List Accounts { get; set; } = []; + public List Accounts { get; set; } = []; public AuthCalloutOpts? AuthCallout { get; set; } public bool AlwaysEnableNonce { get; set; } public List? Users { get; set; } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountTests.cs new file mode 100644 index 0000000..e088c16 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountTests.cs @@ -0,0 +1,478 @@ +// Copyright 2018-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/accounts_test.go and server/dirstore_test.go in the NATS server Go source. + +using Shouldly; +using Xunit; + +namespace ZB.MOM.NatsNet.Server.Tests; + +[Collection("AccountTests")] +public sealed class AccountTests +{ + // ========================================================================= + // Account Basic Tests + // ========================================================================= + + // Test 1 + [Fact] + public void NewAccount_SetsNameAndUnlimitedLimits() + { + var acc = Account.NewAccount("foo"); + + acc.Name.ShouldBe("foo"); + acc.MaxConnections.ShouldBe(-1); + acc.MaxLeafNodes.ShouldBe(-1); + } + + // Test 2 + [Fact] + public void ToString_ReturnsName() + { + var acc = Account.NewAccount("myaccount"); + + acc.ToString().ShouldBe(acc.Name); + } + + // Test 3 + [Fact] + public void IsExpired_InitiallyFalse() + { + var acc = Account.NewAccount("foo"); + + acc.IsExpired().ShouldBeFalse(); + } + + // Test 4 + [Fact] + public void IsClaimAccount_NoJwt_ReturnsFalse() + { + var acc = Account.NewAccount("foo"); + // ClaimJwt defaults to empty string + acc.IsClaimAccount().ShouldBeFalse(); + } + + // Test 5 + [Fact] + public void NumConnections_Initial_IsZero() + { + var acc = Account.NewAccount("foo"); + + acc.NumConnections().ShouldBe(0); + } + + // Test 6 + [Fact] + public void GetName_ReturnsName() + { + var acc = Account.NewAccount("thread-safe-name"); + + acc.GetName().ShouldBe("thread-safe-name"); + } + + // ========================================================================= + // Subject Mapping Tests + // ========================================================================= + + // Test 7 + [Fact] + public void AddMapping_ValidSubject_Succeeds() + { + var acc = Account.NewAccount("foo"); + + var err = acc.AddMapping("foo", "bar"); + + err.ShouldBeNull(); + } + + // Test 8 + [Fact] + public void AddMapping_InvalidSubject_ReturnsError() + { + var acc = Account.NewAccount("foo"); + + var err = acc.AddMapping("foo..bar", "x"); + + err.ShouldNotBeNull(); + } + + // Test 9 + [Fact] + public void RemoveMapping_ExistingMapping_ReturnsTrue() + { + var acc = Account.NewAccount("foo"); + acc.AddMapping("foo", "bar").ShouldBeNull(); + + var removed = acc.RemoveMapping("foo"); + + removed.ShouldBeTrue(); + } + + // Test 10 + [Fact] + public void RemoveMapping_NonExistentMapping_ReturnsFalse() + { + var acc = Account.NewAccount("foo"); + + var removed = acc.RemoveMapping("nonexistent"); + + removed.ShouldBeFalse(); + } + + // Test 11 + [Fact] + public void HasMappings_AfterAdd_ReturnsTrue() + { + var acc = Account.NewAccount("foo"); + acc.AddMapping("foo", "bar").ShouldBeNull(); + + acc.HasMappings().ShouldBeTrue(); + } + + // Test 12 + [Fact] + public void HasMappings_AfterRemove_ReturnsFalse() + { + var acc = Account.NewAccount("foo"); + acc.AddMapping("foo", "bar").ShouldBeNull(); + acc.RemoveMapping("foo"); + + acc.HasMappings().ShouldBeFalse(); + } + + // Test 13 + [Fact] + public void SelectMappedSubject_NoMapping_ReturnsFalse() + { + var acc = Account.NewAccount("foo"); + + var (dest, mapped) = acc.SelectMappedSubject("foo"); + + mapped.ShouldBeFalse(); + dest.ShouldBe("foo"); + } + + // Test 14 + [Fact] + public void SelectMappedSubject_SimpleMapping_ReturnsMappedDest() + { + var acc = Account.NewAccount("foo"); + acc.AddMapping("foo", "bar").ShouldBeNull(); + + var (dest, mapped) = acc.SelectMappedSubject("foo"); + + mapped.ShouldBeTrue(); + dest.ShouldBe("bar"); + } + + // Test 15 + [Fact] + public void AddWeightedMappings_DuplicateDest_ReturnsError() + { + var acc = Account.NewAccount("foo"); + + var err = acc.AddWeightedMappings("src", + MapDest.New("dest1", 50), + MapDest.New("dest1", 50)); // duplicate subject + + err.ShouldNotBeNull(); + } + + // Test 16 + [Fact] + public void AddWeightedMappings_WeightOver100_ReturnsError() + { + var acc = Account.NewAccount("foo"); + + var err = acc.AddWeightedMappings("src", + MapDest.New("dest1", 101)); // weight exceeds 100 + + err.ShouldNotBeNull(); + } + + // Test 17 + [Fact] + public void AddWeightedMappings_TotalWeightOver100_ReturnsError() + { + var acc = Account.NewAccount("foo"); + + var err = acc.AddWeightedMappings("src", + MapDest.New("dest1", 80), + MapDest.New("dest2", 80)); // total = 160 + + err.ShouldNotBeNull(); + } + + // ========================================================================= + // Connection Counting Tests + // ========================================================================= + + // Test 18 + [Fact] + public void NumLeafNodes_Initial_IsZero() + { + var acc = Account.NewAccount("foo"); + + acc.NumLeafNodes().ShouldBe(0); + } + + // Test 19 + [Fact] + public void MaxTotalConnectionsReached_UnlimitedAccount_ReturnsFalse() + { + var acc = Account.NewAccount("foo"); + // MaxConnections is -1 (unlimited) by default + + acc.MaxTotalConnectionsReached().ShouldBeFalse(); + } + + // Test 20 + [Fact] + public void MaxTotalLeafNodesReached_UnlimitedAccount_ReturnsFalse() + { + var acc = Account.NewAccount("foo"); + // MaxLeafNodes is -1 (unlimited) by default + + acc.MaxTotalLeafNodesReached().ShouldBeFalse(); + } + + // ========================================================================= + // Export Service Tests + // ========================================================================= + + // Test 21 + [Fact] + public void IsExportService_NoExports_ReturnsFalse() + { + var acc = Account.NewAccount("foo"); + + acc.IsExportService("my.service").ShouldBeFalse(); + } + + // Test 22 + [Fact] + public void IsExportServiceTracking_NoExports_ReturnsFalse() + { + var acc = Account.NewAccount("foo"); + + acc.IsExportServiceTracking("my.service").ShouldBeFalse(); + } +} + +// ========================================================================= +// DirJwtStore Tests +// ========================================================================= + +[Collection("AccountTests")] +public sealed class DirJwtStoreTests : IDisposable +{ + private readonly List _tempDirs = []; + + private string MakeTempDir() + { + var dir = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName()); + Directory.CreateDirectory(dir); + _tempDirs.Add(dir); + return dir; + } + + public void Dispose() + { + foreach (var dir in _tempDirs) + { + try { Directory.Delete(dir, true); } catch { /* best effort */ } + } + } + + // Test 23 + [Fact] + public void DirJwtStore_WriteAndRead_Succeeds() + { + var dir = MakeTempDir(); + using var store = DirJwtStore.NewDirJwtStore(dir, shard: false, create: false); + + const string key = "AAAAAAAAAA"; // minimum 2-char key + const string jwt = "header.payload.signature"; + + store.SaveAcc(key, jwt); + var loaded = store.LoadAcc(key); + + loaded.ShouldBe(jwt); + } + + // Test 24 + [Fact] + public void DirJwtStore_ShardedWriteAndRead_Succeeds() + { + var dir = MakeTempDir(); + using var store = DirJwtStore.NewDirJwtStore(dir, shard: true, create: false); + + var keys = new[] { "ACCTKEY001", "ACCTKEY002", "ACCTKEY003" }; + foreach (var k in keys) + { + store.SaveAcc(k, $"jwt.for.{k}"); + } + + foreach (var k in keys) + { + store.LoadAcc(k).ShouldBe($"jwt.for.{k}"); + } + } + + // Test 25 + [Fact] + public void DirJwtStore_EmptyKey_ReturnsError() + { + var dir = MakeTempDir(); + using var store = DirJwtStore.NewDirJwtStore(dir, shard: false, create: false); + + // LoadAcc with key shorter than 2 chars should throw + Should.Throw(() => store.LoadAcc("")); + + // SaveAcc with key shorter than 2 chars should throw + Should.Throw(() => store.SaveAcc("", "some.jwt")); + } + + // Test 26 + [Fact] + public void DirJwtStore_MissingKey_ReturnsError() + { + var dir = MakeTempDir(); + using var store = DirJwtStore.NewDirJwtStore(dir, shard: false, create: false); + + Should.Throw(() => store.LoadAcc("NONEXISTENT_KEY")); + } + + // Test 27 + [Fact] + public void DirJwtStore_Pack_ContainsSavedJwts() + { + var dir = MakeTempDir(); + using var store = DirJwtStore.NewDirJwtStore(dir, shard: false, create: false); + + store.SaveAcc("ACCTKEYAAA", "jwt1.data.sig"); + store.SaveAcc("ACCTKEYBBB", "jwt2.data.sig"); + + var packed = store.Pack(-1); + + packed.ShouldContain("ACCTKEYAAA|jwt1.data.sig"); + packed.ShouldContain("ACCTKEYBBB|jwt2.data.sig"); + } + + // Test 28 + [Fact] + public void DirJwtStore_Merge_AddsNewEntries() + { + var dir = MakeTempDir(); + using var store = DirJwtStore.NewDirJwtStore(dir, shard: false, create: false); + + // Pack format: key|jwt lines separated by newline + var packData = "ACCTKEYMERGE|merged.jwt.value"; + store.Merge(packData); + + var loaded = store.LoadAcc("ACCTKEYMERGE"); + loaded.ShouldBe("merged.jwt.value"); + } + + // Test 29 + [Fact] + public void DirJwtStore_ReadOnly_Prevents_Write() + { + var dir = MakeTempDir(); + // Write a file first so the dir is valid + var writeable = DirJwtStore.NewDirJwtStore(dir, shard: false, create: false); + writeable.SaveAcc("ACCTKEYRO", "original.jwt"); + writeable.Dispose(); + + // Open as immutable + using var readOnly = DirJwtStore.NewImmutableDirJwtStore(dir, shard: false); + + readOnly.IsReadOnly().ShouldBeTrue(); + Should.Throw(() => readOnly.SaveAcc("ACCTKEYRO", "new.jwt")); + } +} + +// ========================================================================= +// MemoryAccountResolver Tests +// ========================================================================= + +[Collection("AccountTests")] +public sealed class MemoryAccountResolverTests +{ + // Test 30 + [Fact] + public async Task MemoryAccountResolver_StoreAndFetch_Roundtrip() + { + var resolver = new MemoryAccountResolver(); + const string key = "MYACCOUNTKEY"; + const string jwt = "header.payload.sig"; + + await resolver.StoreAsync(key, jwt); + var fetched = await resolver.FetchAsync(key); + + fetched.ShouldBe(jwt); + } + + // Test 31 + [Fact] + public async Task MemoryAccountResolver_Fetch_MissingKey_Throws() + { + var resolver = new MemoryAccountResolver(); + + await Should.ThrowAsync( + () => resolver.FetchAsync("DOESNOTEXIST")); + } + + // Test 32 + [Fact] + public void MemoryAccountResolver_IsReadOnly_ReturnsFalse() + { + var resolver = new MemoryAccountResolver(); + + resolver.IsReadOnly().ShouldBeFalse(); + } +} + +// ========================================================================= +// UrlAccountResolver Tests +// ========================================================================= + +[Collection("AccountTests")] +public sealed class UrlAccountResolverTests +{ + // Test 33 + [Fact] + public void UrlAccountResolver_NormalizesTrailingSlash() + { + // Two constructors: one with slash, one without. + // We verify construction doesn't throw and the resolver is usable. + // (We cannot inspect _url directly since it's private, but we can + // infer correctness via IsReadOnly and lack of constructor exception.) + var resolverNoSlash = new UrlAccountResolver("http://localhost:9090"); + var resolverWithSlash = new UrlAccountResolver("http://localhost:9090/"); + + // Both should construct without error and have the same observable behaviour. + resolverNoSlash.IsReadOnly().ShouldBeTrue(); + resolverWithSlash.IsReadOnly().ShouldBeTrue(); + } + + // Test 34 + [Fact] + public void UrlAccountResolver_IsReadOnly_ReturnsTrue() + { + var resolver = new UrlAccountResolver("http://localhost:9090"); + + resolver.IsReadOnly().ShouldBeTrue(); + } +} diff --git a/porting.db b/porting.db index c19893c..685e0db 100644 Binary files a/porting.db and b/porting.db differ diff --git a/reports/current.md b/reports/current.md index 96d6544..b456a35 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-02-26 20:08:24 UTC +Generated: 2026-02-26 20:37:09 UTC ## Modules (12 total) @@ -13,18 +13,18 @@ Generated: 2026-02-26 20:08:24 UTC | Status | Count | |--------|-------| -| complete | 841 | +| complete | 1075 | | n_a | 82 | -| not_started | 2657 | +| not_started | 2423 | | stub | 93 | ## Unit Tests (3257 total) | Status | Count | |--------|-------| -| complete | 278 | +| complete | 319 | | n_a | 181 | -| not_started | 2574 | +| not_started | 2533 | | stub | 224 | ## Library Mappings (36 total) @@ -36,4 +36,4 @@ Generated: 2026-02-26 20:08:24 UTC ## Overall Progress -**1393/6942 items complete (20.1%)** +**1668/6942 items complete (24.0%)** diff --git a/reports/report_06779a1.md b/reports/report_06779a1.md new file mode 100644 index 0000000..b456a35 --- /dev/null +++ b/reports/report_06779a1.md @@ -0,0 +1,39 @@ +# NATS .NET Porting Status Report + +Generated: 2026-02-26 20:37:09 UTC + +## Modules (12 total) + +| Status | Count | +|--------|-------| +| complete | 11 | +| not_started | 1 | + +## Features (3673 total) + +| Status | Count | +|--------|-------| +| complete | 1075 | +| n_a | 82 | +| not_started | 2423 | +| stub | 93 | + +## Unit Tests (3257 total) + +| Status | Count | +|--------|-------| +| complete | 319 | +| n_a | 181 | +| not_started | 2533 | +| stub | 224 | + +## Library Mappings (36 total) + +| Status | Count | +|--------|-------| +| mapped | 36 | + + +## Overall Progress + +**1668/6942 items complete (24.0%)**