// Copyright 2012-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // // Adapted from server/server.go (account methods) in the NATS server Go source. // Session 09: account management — configure, register, lookup, fetch. using ZB.MOM.NatsNet.Server.Auth; using ZB.MOM.NatsNet.Server.Internal; using ZB.MOM.NatsNet.Server.Internal.DataStructures; namespace ZB.MOM.NatsNet.Server; public sealed partial class NatsServer { // ========================================================================= // Account-mode helpers (features 3004–3007) // ========================================================================= /// /// Returns true when only the global ($G) account is defined (pre-NATS 2.0 mode). /// Mirrors Go Server.globalAccountOnly. /// public bool GlobalAccountOnly() { if (_trustedKeys is not null) return false; bool hasOthers = false; _mu.EnterReadLock(); try { foreach (var kvp in _accounts) { var acc = kvp.Value; // Ignore global and system accounts. if (acc == _gacc) continue; var sysAcc = _sysAccAtomic; if (sysAcc != null && acc == sysAcc) continue; hasOthers = true; break; } } finally { _mu.ExitReadLock(); } return !hasOthers; } /// /// Returns true when this server has no routes or gateways configured. /// Mirrors Go Server.standAloneMode. /// public bool StandAloneMode() { var opts = GetOpts(); return opts.Cluster.Port == 0 && opts.Gateway.Port == 0; } /// /// Returns the number of configured routes. /// Mirrors Go Server.configuredRoutes. /// public int ConfiguredRoutes() => GetOpts().Routes.Count; /// /// Returns online JetStream peer node names from the node-info map. /// Mirrors Go Server.ActivePeers. /// public List ActivePeers() { var peers = new List(); foreach (var kvp in _nodeToInfo) { if (kvp.Value is NodeInfo ni && !ni.Offline) peers.Add(kvp.Key); } return peers; } // ========================================================================= // ConfigureAccounts (feature 3001) // ========================================================================= /// /// Reads accounts from options and registers/updates them. /// Returns a set of account names whose stream imports changed (for reload) /// and any error. /// Mirrors Go Server.configureAccounts. /// Server lock must be held on entry. /// public (HashSet ChangedStreamImports, Exception? Error) ConfigureAccounts(bool reloading) { var awcsti = new HashSet(StringComparer.Ordinal); // Create the global ($G) account if not yet present. if (_gacc == null) { _gacc = new Account { Name = ServerConstants.DefaultGlobalAccount }; RegisterAccountNoLock(_gacc); } var opts = GetOpts(); // Walk accounts from options. foreach (var optAcc in opts.Accounts) { Account a; bool create = true; if (reloading && optAcc.Name != ServerConstants.DefaultGlobalAccount) { if (_accounts.TryGetValue(optAcc.Name, out var existing)) { a = existing; // Full import/export diffing deferred to session 11 (accounts.go). create = false; } else { a = new Account { Name = optAcc.Name }; } } else { a = optAcc.Name == ServerConstants.DefaultGlobalAccount ? _gacc! : new Account { Name = optAcc.Name }; } if (create) { // Will be a no-op for the global account (already registered). RegisterAccountNoLock(a); } // If an account named $SYS is found, make it the system account. if (optAcc.Name == ServerConstants.DefaultSystemAccount && string.IsNullOrEmpty(opts.SystemAccount)) { opts.SystemAccount = ServerConstants.DefaultSystemAccount; } } // Resolve system account if configured. if (!string.IsNullOrEmpty(opts.SystemAccount)) { // Release server lock for lookupAccount (lock ordering: account → server). _mu.ExitWriteLock(); var (acc, err) = LookupAccountInternal(opts.SystemAccount); _mu.EnterWriteLock(); if (err != null) return (awcsti, new InvalidOperationException($"error resolving system account: {err.Message}", err)); if (acc != null && _sys != null && acc != _sys.Account) _sys.Account = acc; if (acc != null) _sysAccAtomic = acc; } return (awcsti, null); } // ========================================================================= // Account counts (features 3022–3023) // ========================================================================= /// /// Returns the total number of registered accounts (slow, test only). /// Mirrors Go Server.numAccounts. /// public int NumAccounts() { _mu.EnterReadLock(); try { return _accounts.Count; } finally { _mu.ExitReadLock(); } } /// /// Returns the number of loaded accounts. /// Mirrors Go Server.NumLoadedAccounts. /// public int NumLoadedAccounts() => NumAccounts(); // ========================================================================= // Account registration (features 3024–3025) // ========================================================================= /// /// Returns the named account if known, or creates and registers a new one. /// Mirrors Go Server.LookupOrRegisterAccount. /// public (Account Account, bool IsNew) LookupOrRegisterAccount(string name) { _mu.EnterWriteLock(); try { if (_accounts.TryGetValue(name, out var existing)) return (existing, false); var acc = new Account { Name = name }; RegisterAccountNoLock(acc); return (acc, true); } finally { _mu.ExitWriteLock(); } } /// /// Registers a new account. Returns error if the account already exists. /// Mirrors Go Server.RegisterAccount. /// public (Account? Account, Exception? Error) RegisterAccount(string name) { _mu.EnterWriteLock(); try { if (_accounts.ContainsKey(name)) return (null, ServerErrors.ErrAccountExists); var acc = new Account { Name = name }; RegisterAccountNoLock(acc); return (acc, null); } finally { _mu.ExitWriteLock(); } } // ========================================================================= // System account (features 3026–3030) // ========================================================================= /// /// Sets the named account as the server's system account. /// Mirrors Go Server.SetSystemAccount. /// public Exception? SetSystemAccount(string accName) { if (_accounts.TryGetValue(accName, out var acc)) return SetSystemAccountInternal(acc); // Not locally known — try resolver. var (ac, _, fetchErr) = FetchAccountClaims(accName); if (fetchErr != null) return fetchErr; var newAcc = BuildInternalAccount(ac); // Due to race, registerAccount returns the existing one if already registered. var racc = RegisterAccountInternal(newAcc); return SetSystemAccountInternal(racc ?? newAcc); } /// /// Returns the system account, or null if none is set. /// Mirrors Go Server.SystemAccount. /// public Account? SystemAccount() => _sysAccAtomic; /// /// Returns the global ($G) account. /// Mirrors Go Server.GlobalAccount. /// public Account? GlobalAccount() { _mu.EnterReadLock(); try { return _gacc; } finally { _mu.ExitReadLock(); } } /// /// Creates a default system account ($SYS) if one does not already exist. /// Mirrors Go Server.SetDefaultSystemAccount. /// public Exception? SetDefaultSystemAccount() { var (_, isNew) = LookupOrRegisterAccount(ServerConstants.DefaultSystemAccount); if (!isNew) return null; Debugf("Created system account: \"{0}\"", ServerConstants.DefaultSystemAccount); return SetSystemAccount(ServerConstants.DefaultSystemAccount); } /// /// Assigns as the system account and starts internal /// messaging goroutines. /// Mirrors Go Server.setSystemAccount. /// Server lock must NOT be held on entry; this method acquires/releases it. /// public Exception? SetSystemAccountInternal(Account acc) { if (acc == null) return ServerErrors.ErrMissingAccount; if (acc.IsExpired()) return ServerErrors.ErrAccountExpired; if (!IsTrustedIssuer(acc.Issuer)) return ServerErrors.ErrAccountValidation; _mu.EnterWriteLock(); try { if (_sys != null) return ServerErrors.ErrAccountExists; _sys = new InternalState { Account = acc }; } finally { _mu.ExitWriteLock(); } // Store atomically for fast lookup on hot paths. _sysAccAtomic = acc; // Full internal-messaging bootstrap (initEventTracking, sendLoop, etc.) // is deferred to session 12 (events.go). AddSystemAccountExports(acc); return null; } // ========================================================================= // Internal client factories (features 3031–3034) // ========================================================================= /// Creates an internal system client. public ClientConnection CreateInternalSystemClient() => CreateInternalClient(ClientKind.System); /// Creates an internal JetStream client. public ClientConnection CreateInternalJetStreamClient() => CreateInternalClient(ClientKind.JetStream); /// Creates an internal account client. public ClientConnection CreateInternalAccountClient() => CreateInternalClient(ClientKind.Account); /// /// Creates an internal client of the given . /// Mirrors Go Server.createInternalClient. /// public ClientConnection CreateInternalClient(ClientKind kind) { if (kind != ClientKind.System && kind != ClientKind.JetStream && kind != ClientKind.Account) throw new InvalidOperationException($"createInternalClient: unsupported kind {kind}"); var c = new ClientConnection(kind, this); // Mirrors: c.echo = false; c.headers = true; flags.set(noReconnect) // Full client initialisation deferred to session 10 (client.go). return c; } // ========================================================================= // Subscription tracking / account sublist (features 3035–3038) // ========================================================================= /// /// Returns true if accounts should track subscriptions for route/gateway propagation. /// Server lock must be held on entry. /// Mirrors Go Server.shouldTrackSubscriptions. /// public bool ShouldTrackSubscriptions() { var opts = GetOpts(); return opts.Cluster.Port != 0 || opts.Gateway.Port != 0; } /// /// Invokes under the server write lock. /// Returns the already-registered account if a duplicate is detected, or null. /// Mirrors Go Server.registerAccount. /// public Account? RegisterAccountInternal(Account acc) { _mu.EnterWriteLock(); try { return RegisterAccountNoLock(acc); } finally { _mu.ExitWriteLock(); } } /// /// Sets the account's subscription index based on the NoSublistCache option. /// Mirrors Go Server.setAccountSublist. /// Server lock must be held on entry. /// public void SetAccountSublist(Account acc) { if (acc?.Sublist != null) return; if (acc == null) return; var opts = GetOpts(); acc.Sublist = opts?.NoSublistCache == true ? SubscriptionIndex.NewSublist(false) : SubscriptionIndex.NewSublistWithCache(); } /// /// Registers an account in the server's account map. /// If the account is already registered (race), returns the existing one. /// Server lock must be held on entry. /// Mirrors Go Server.registerAccountNoLock. /// public Account? RegisterAccountNoLock(Account acc) { // If already registered, return existing. if (_accounts.TryGetValue(acc.Name, out var existing)) { _tmpAccounts.TryRemove(acc.Name, out _); return existing; } SetAccountSublist(acc); SetRouteInfo(acc); acc.Server = this; acc.Updated = DateTime.UtcNow; _accounts[acc.Name] = acc; _tmpAccounts.TryRemove(acc.Name, out _); // enableAccountTracking and registerSystemImports deferred to session 12. EnableAccountTracking(acc); return null; } // ========================================================================= // Route info for accounts (feature 3039) // ========================================================================= /// /// Sets the account's route-pool index based on cluster configuration. /// Mirrors Go Server.setRouteInfo. /// Both server and account locks must be held on entry. /// public void SetRouteInfo(Account acc) { const int accDedicatedRoute = -1; if (_accRoutes != null && _accRoutes.ContainsKey(acc.Name)) { // Dedicated route: store name in hash map; use index -1. _accRouteByHash.TryAdd(acc.Name, null); acc.RoutePoolIdx = accDedicatedRoute; } else { acc.RoutePoolIdx = ComputeRoutePoolIdx(_routesPoolSize, acc.Name); if (_routesPoolSize > 1) _accRouteByHash.TryAdd(acc.Name, acc.RoutePoolIdx); } } // ========================================================================= // Account lookup (features 3040–3042) // ========================================================================= /// /// Returns the account for if locally known, without /// fetching from the resolver. /// Mirrors Go Server.lookupAccountInternal (private helper). /// Lock must NOT be held on entry. /// public (Account? Account, Exception? Error) LookupAccountInternal(string name) => LookupOrFetchAccount(name, fetch: false); /// /// Returns the account for , optionally fetching from /// the resolver if not locally known or if expired. /// Mirrors Go Server.lookupOrFetchAccount. /// Lock must NOT be held on entry. /// public (Account? Account, Exception? Error) LookupOrFetchAccount(string name, bool fetch) { _accounts.TryGetValue(name, out var acc); if (acc != null) { if (acc.IsExpired()) { Debugf("Requested account [{0}] has expired", name); if (_accResolver != null && fetch) { var updateErr = UpdateAccount(acc); if (updateErr != null) return (null, ServerErrors.ErrAccountExpired); } else { return (null, ServerErrors.ErrAccountExpired); } } return (acc, null); } if (_accResolver == null || !fetch) return (null, ServerErrors.ErrMissingAccount); return FetchAccountFromResolver(name); } /// /// Public account lookup — always fetches from resolver if needed. /// Mirrors Go Server.LookupAccount. /// public (Account? Account, Exception? Error) LookupAccount(string name) => LookupOrFetchAccount(name, fetch: true); // ========================================================================= // Account update (features 3043–3044) // ========================================================================= /// /// Fetches fresh claims and updates the account if the claims have changed. /// Mirrors Go Server.updateAccount. /// Lock must NOT be held on entry. /// public Exception? UpdateAccount(Account acc) { // Don't update more than once per second unless the account is incomplete. if (!acc.Incomplete && (DateTime.UtcNow - acc.Updated) < TimeSpan.FromSeconds(1)) { Debugf("Requested account update for [{0}] ignored, too soon", acc.Name); return ServerErrors.ErrAccountResolverUpdateTooSoon; } var (claimJwt, err) = FetchRawAccountClaims(acc.Name); if (err != null) return err; return UpdateAccountWithClaimJwt(acc, claimJwt); } /// /// Applies updated JWT claims to the account if they differ from what is stored. /// Mirrors Go Server.updateAccountWithClaimJWT. /// Lock must NOT be held on entry. /// public Exception? UpdateAccountWithClaimJwt(Account acc, string claimJwt) { if (acc == null) return ServerErrors.ErrMissingAccount; // If JWT hasn't changed and account is not incomplete, skip. if (!string.IsNullOrEmpty(acc.ClaimJwt) && acc.ClaimJwt == claimJwt && !acc.Incomplete) { Debugf("Requested account update for [{0}], same claims detected", acc.Name); return null; } var (accClaims, _, verifyErr) = VerifyAccountClaims(claimJwt); if (verifyErr != null) return verifyErr; if (accClaims == null) return null; if (acc.Name != accClaims.Subject) return ServerErrors.ErrAccountValidation; acc.Issuer = accClaims.Issuer; // Full UpdateAccountClaims() deferred to session 11. acc.ClaimJwt = claimJwt; return null; } // ========================================================================= // Account claims fetch / verify (features 3045–3048) // ========================================================================= /// /// Fetches the raw JWT string for an account from the resolver. /// Mirrors Go Server.fetchRawAccountClaims. /// Lock must NOT be held on entry. /// public (string Jwt, Exception? Error) FetchRawAccountClaims(string name) { if (_accResolver == null) return (string.Empty, ServerErrors.ErrNoAccountResolver); var start = DateTime.UtcNow; var (jwt, err) = FetchAccountFromResolverRaw(name); var elapsed = DateTime.UtcNow - start; if (elapsed > TimeSpan.FromSeconds(1)) Warnf("Account [{0}] fetch took {1}", name, elapsed); else Debugf("Account [{0}] fetch took {1}", name, elapsed); if (err != null) { Warnf("Account fetch failed: {0}", err); return (string.Empty, err); } return (jwt, null); } /// /// Fetches and decodes account JWT claims from the resolver. /// Mirrors Go Server.fetchAccountClaims. /// Lock must NOT be held on entry. /// public (AccountClaims? Claims, string Jwt, Exception? Error) FetchAccountClaims(string name) { var (claimJwt, err) = FetchRawAccountClaims(name); if (err != null) return (null, string.Empty, err); var (claims, verifiedJwt, verifyErr) = VerifyAccountClaims(claimJwt); if (claims != null && claims.Subject != name) return (null, string.Empty, ServerErrors.ErrAccountValidation); return (claims, verifiedJwt, verifyErr); } /// /// Decodes and validates an account JWT string. /// Mirrors Go Server.verifyAccountClaims. /// public (AccountClaims? Claims, string Jwt, Exception? Error) VerifyAccountClaims(string claimJwt) { // Full JWT decoding deferred to session 06 JWT integration. // Stub: create a minimal claims object from the raw JWT. var claims = AccountClaims.TryDecode(claimJwt); if (claims == null) return (null, string.Empty, ServerErrors.ErrAccountValidation); if (!IsTrustedIssuer(claims.Issuer)) return (null, string.Empty, ServerErrors.ErrAccountValidation); return (claims, claimJwt, null); } /// /// Fetches an account from the resolver, registers it, and returns it. /// Mirrors Go Server.fetchAccount. /// Lock must NOT be held on entry. /// public (Account? Account, Exception? Error) FetchAccountFromResolver(string name) { var (accClaims, claimJwt, err) = FetchAccountClaims(name); if (accClaims == null) return (null, err); var acc = BuildInternalAccount(accClaims); // Due to possible race, registerAccount may return an already-registered account. var racc = RegisterAccountInternal(acc); if (racc != null) { // Update with new claims if changed. var updateErr = UpdateAccountWithClaimJwt(racc, claimJwt); return updateErr != null ? (null, updateErr) : (racc, null); } acc.ClaimJwt = claimJwt; return (acc, null); } // ========================================================================= // Account helpers // ========================================================================= /// /// Builds an Account from decoded claims. /// Mirrors Go Server.buildInternalAccount. /// Full JetStream limits / import / export wiring deferred to session 11. /// internal Account BuildInternalAccount(AccountClaims? claims) { var acc = new Account { Name = claims?.Subject ?? string.Empty, Issuer = claims?.Issuer ?? string.Empty, }; return acc; } /// /// Fetches the raw JWT directly from the resolver without timing/logging. /// private (string Jwt, Exception? Error) FetchAccountFromResolverRaw(string name) { if (_accResolver == null) return (string.Empty, ServerErrors.ErrNoAccountResolver); try { var jwt = _accResolver.FetchAsync(name).GetAwaiter().GetResult(); return (jwt, null); } catch (Exception ex) { return (string.Empty, ex); } } /// /// Computes the route-pool index for an account name using FNV-32a. /// Mirrors Go computeRoutePoolIdx (route.go). /// internal static int ComputeRoutePoolIdx(int poolSize, string accountName) { if (poolSize <= 1) return 0; // FNV-32a hash (Go uses fnv.New32a) uint hash = 2166136261u; foreach (var b in System.Text.Encoding.UTF8.GetBytes(accountName)) { hash ^= b; hash *= 16777619u; } return (int)(hash % (uint)poolSize); } // ========================================================================= // Stubs for subsystems implemented in later sessions // ========================================================================= /// /// Stub: enables account tracking (session 12 — events.go). /// internal void EnableAccountTracking(Account acc) { ArgumentNullException.ThrowIfNull(acc); Debugf("Enabled account tracking for {0}", acc.Name); } /// /// Stub: registers system imports on an account (session 12). /// internal void RegisterSystemImports(Account acc) { ArgumentNullException.ThrowIfNull(acc); acc.Imports.Services ??= new Dictionary>(StringComparer.Ordinal); } /// /// Stub: adds system-account exports (session 12). /// internal void AddSystemAccountExports(Account acc) { ArgumentNullException.ThrowIfNull(acc); acc.Exports.Services ??= new Dictionary(StringComparer.Ordinal); } }