Files
natsnet/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Accounts.cs
2026-02-28 19:24:25 -05:00

758 lines
27 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Copyright 2012-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Adapted from server/server.go (account methods) in the NATS server Go source.
// Session 09: account management — configure, register, lookup, fetch.
using ZB.MOM.NatsNet.Server.Auth;
using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
// =========================================================================
// Account-mode helpers (features 30043007)
// =========================================================================
/// <summary>
/// Returns true when only the global ($G) account is defined (pre-NATS 2.0 mode).
/// Mirrors Go <c>Server.globalAccountOnly</c>.
/// </summary>
public bool GlobalAccountOnly()
{
if (_trustedKeys is not null) return false;
bool hasOthers = false;
_mu.EnterReadLock();
try
{
foreach (var kvp in _accounts)
{
var acc = kvp.Value;
// Ignore global and system accounts.
if (acc == _gacc) continue;
var sysAcc = _sysAccAtomic;
if (sysAcc != null && acc == sysAcc) continue;
hasOthers = true;
break;
}
}
finally { _mu.ExitReadLock(); }
return !hasOthers;
}
/// <summary>
/// Returns true when this server has no routes or gateways configured.
/// Mirrors Go <c>Server.standAloneMode</c>.
/// </summary>
public bool StandAloneMode()
{
var opts = GetOpts();
return opts.Cluster.Port == 0 && opts.Gateway.Port == 0;
}
/// <summary>
/// Returns the number of configured routes.
/// Mirrors Go <c>Server.configuredRoutes</c>.
/// </summary>
public int ConfiguredRoutes() => GetOpts().Routes.Count;
/// <summary>
/// Returns online JetStream peer node names from the node-info map.
/// Mirrors Go <c>Server.ActivePeers</c>.
/// </summary>
public List<string> ActivePeers()
{
var peers = new List<string>();
foreach (var kvp in _nodeToInfo)
{
if (kvp.Value is NodeInfo ni && !ni.Offline)
peers.Add(kvp.Key);
}
return peers;
}
// =========================================================================
// ConfigureAccounts (feature 3001)
// =========================================================================
/// <summary>
/// Reads accounts from options and registers/updates them.
/// Returns a set of account names whose stream imports changed (for reload)
/// and any error.
/// Mirrors Go <c>Server.configureAccounts</c>.
/// Server lock must be held on entry.
/// </summary>
public (HashSet<string> ChangedStreamImports, Exception? Error) ConfigureAccounts(bool reloading)
{
var awcsti = new HashSet<string>(StringComparer.Ordinal);
// Create the global ($G) account if not yet present.
if (_gacc == null)
{
_gacc = new Account { Name = ServerConstants.DefaultGlobalAccount };
RegisterAccountNoLock(_gacc);
}
var opts = GetOpts();
// Walk accounts from options.
foreach (var optAcc in opts.Accounts)
{
Account a;
bool create = true;
if (reloading && optAcc.Name != ServerConstants.DefaultGlobalAccount)
{
if (_accounts.TryGetValue(optAcc.Name, out var existing))
{
a = existing;
// Full import/export diffing deferred to session 11 (accounts.go).
create = false;
}
else
{
a = new Account { Name = optAcc.Name };
}
}
else
{
a = optAcc.Name == ServerConstants.DefaultGlobalAccount ? _gacc! : new Account { Name = optAcc.Name };
}
if (create)
{
// Will be a no-op for the global account (already registered).
RegisterAccountNoLock(a);
}
// If an account named $SYS is found, make it the system account.
if (optAcc.Name == ServerConstants.DefaultSystemAccount &&
string.IsNullOrEmpty(opts.SystemAccount))
{
opts.SystemAccount = ServerConstants.DefaultSystemAccount;
}
}
// Resolve system account if configured.
if (!string.IsNullOrEmpty(opts.SystemAccount))
{
// Release server lock for lookupAccount (lock ordering: account → server).
_mu.ExitWriteLock();
var (acc, err) = LookupAccountInternal(opts.SystemAccount);
_mu.EnterWriteLock();
if (err != null)
return (awcsti, new InvalidOperationException($"error resolving system account: {err.Message}", err));
if (acc != null && _sys != null && acc != _sys.Account)
_sys.Account = acc;
if (acc != null)
_sysAccAtomic = acc;
}
return (awcsti, null);
}
// =========================================================================
// Account counts (features 30223023)
// =========================================================================
/// <summary>
/// Returns the total number of registered accounts (slow, test only).
/// Mirrors Go <c>Server.numAccounts</c>.
/// </summary>
public int NumAccounts()
{
_mu.EnterReadLock();
try { return _accounts.Count; }
finally { _mu.ExitReadLock(); }
}
/// <summary>
/// Returns the number of loaded accounts.
/// Mirrors Go <c>Server.NumLoadedAccounts</c>.
/// </summary>
public int NumLoadedAccounts() => NumAccounts();
// =========================================================================
// Account registration (features 30243025)
// =========================================================================
/// <summary>
/// Returns the named account if known, or creates and registers a new one.
/// Mirrors Go <c>Server.LookupOrRegisterAccount</c>.
/// </summary>
public (Account Account, bool IsNew) LookupOrRegisterAccount(string name)
{
_mu.EnterWriteLock();
try
{
if (_accounts.TryGetValue(name, out var existing))
return (existing, false);
var acc = new Account { Name = name };
RegisterAccountNoLock(acc);
return (acc, true);
}
finally { _mu.ExitWriteLock(); }
}
/// <summary>
/// Registers a new account. Returns error if the account already exists.
/// Mirrors Go <c>Server.RegisterAccount</c>.
/// </summary>
public (Account? Account, Exception? Error) RegisterAccount(string name)
{
_mu.EnterWriteLock();
try
{
if (_accounts.ContainsKey(name))
return (null, ServerErrors.ErrAccountExists);
var acc = new Account { Name = name };
RegisterAccountNoLock(acc);
return (acc, null);
}
finally { _mu.ExitWriteLock(); }
}
// =========================================================================
// System account (features 30263030)
// =========================================================================
/// <summary>
/// Sets the named account as the server's system account.
/// Mirrors Go <c>Server.SetSystemAccount</c>.
/// </summary>
public Exception? SetSystemAccount(string accName)
{
if (_accounts.TryGetValue(accName, out var acc))
return SetSystemAccountInternal(acc);
// Not locally known — try resolver.
var (ac, _, fetchErr) = FetchAccountClaims(accName);
if (fetchErr != null) return fetchErr;
var newAcc = BuildInternalAccount(ac);
// Due to race, registerAccount returns the existing one if already registered.
var racc = RegisterAccountInternal(newAcc);
return SetSystemAccountInternal(racc ?? newAcc);
}
/// <summary>
/// Returns the system account, or null if none is set.
/// Mirrors Go <c>Server.SystemAccount</c>.
/// </summary>
public Account? SystemAccount() => _sysAccAtomic;
/// <summary>
/// Returns the global ($G) account.
/// Mirrors Go <c>Server.GlobalAccount</c>.
/// </summary>
public Account? GlobalAccount()
{
_mu.EnterReadLock();
try { return _gacc; }
finally { _mu.ExitReadLock(); }
}
/// <summary>
/// Creates a default system account ($SYS) if one does not already exist.
/// Mirrors Go <c>Server.SetDefaultSystemAccount</c>.
/// </summary>
public Exception? SetDefaultSystemAccount()
{
var (_, isNew) = LookupOrRegisterAccount(ServerConstants.DefaultSystemAccount);
if (!isNew) return null;
Debugf("Created system account: \"{0}\"", ServerConstants.DefaultSystemAccount);
return SetSystemAccount(ServerConstants.DefaultSystemAccount);
}
/// <summary>
/// Assigns <paramref name="acc"/> as the system account and starts internal
/// messaging goroutines.
/// Mirrors Go <c>Server.setSystemAccount</c>.
/// Server lock must NOT be held on entry; this method acquires/releases it.
/// </summary>
public Exception? SetSystemAccountInternal(Account acc)
{
if (acc == null)
return ServerErrors.ErrMissingAccount;
if (acc.IsExpired())
return ServerErrors.ErrAccountExpired;
if (!IsTrustedIssuer(acc.Issuer))
return ServerErrors.ErrAccountValidation;
_mu.EnterWriteLock();
try
{
if (_sys != null)
return ServerErrors.ErrAccountExists;
_sys = new InternalState { Account = acc };
}
finally { _mu.ExitWriteLock(); }
// Store atomically for fast lookup on hot paths.
_sysAccAtomic = acc;
// Full internal-messaging bootstrap (initEventTracking, sendLoop, etc.)
// is deferred to session 12 (events.go).
AddSystemAccountExports(acc);
return null;
}
// =========================================================================
// Internal client factories (features 30313034)
// =========================================================================
/// <summary>Creates an internal system client.</summary>
public ClientConnection CreateInternalSystemClient() =>
CreateInternalClient(ClientKind.System);
/// <summary>Creates an internal JetStream client.</summary>
public ClientConnection CreateInternalJetStreamClient() =>
CreateInternalClient(ClientKind.JetStream);
/// <summary>Creates an internal account client.</summary>
public ClientConnection CreateInternalAccountClient() =>
CreateInternalClient(ClientKind.Account);
/// <summary>
/// Creates and attaches a per-account send queue.
/// Mirrors Go <c>Server.newSendQ</c> call sites.
/// </summary>
internal SendQueue NewSendQueue(Account account)
{
ArgumentNullException.ThrowIfNull(account);
var existing = account.GetSendQueue();
if (existing is not null)
return existing;
var sendQueue = SendQueue.NewSendQ(this, account);
account.SetSendQueue(sendQueue);
return sendQueue;
}
/// <summary>
/// Creates an internal client of the given <paramref name="kind"/>.
/// Mirrors Go <c>Server.createInternalClient</c>.
/// </summary>
public ClientConnection CreateInternalClient(ClientKind kind)
{
if (kind != ClientKind.System && kind != ClientKind.JetStream && kind != ClientKind.Account)
throw new InvalidOperationException($"createInternalClient: unsupported kind {kind}");
var c = new ClientConnection(kind, this);
// Mirrors: c.echo = false; c.headers = true; flags.set(noReconnect)
// Full client initialisation deferred to session 10 (client.go).
return c;
}
// =========================================================================
// Subscription tracking / account sublist (features 30353038)
// =========================================================================
/// <summary>
/// Returns true if accounts should track subscriptions for route/gateway propagation.
/// Server lock must be held on entry.
/// Mirrors Go <c>Server.shouldTrackSubscriptions</c>.
/// </summary>
public bool ShouldTrackSubscriptions()
{
var opts = GetOpts();
return opts.Cluster.Port != 0 || opts.Gateway.Port != 0;
}
/// <summary>
/// Invokes <see cref="RegisterAccountNoLock"/> under the server write lock.
/// Returns the already-registered account if a duplicate is detected, or null.
/// Mirrors Go <c>Server.registerAccount</c>.
/// </summary>
public Account? RegisterAccountInternal(Account acc)
{
_mu.EnterWriteLock();
try { return RegisterAccountNoLock(acc); }
finally { _mu.ExitWriteLock(); }
}
/// <summary>
/// Sets the account's subscription index based on the NoSublistCache option.
/// Mirrors Go <c>Server.setAccountSublist</c>.
/// Server lock must be held on entry.
/// </summary>
public void SetAccountSublist(Account acc)
{
if (acc?.Sublist != null) return;
if (acc == null) return;
var opts = GetOpts();
acc.Sublist = opts?.NoSublistCache == true
? SubscriptionIndex.NewSublist(false)
: SubscriptionIndex.NewSublistWithCache();
}
/// <summary>
/// Registers an account in the server's account map.
/// If the account is already registered (race), returns the existing one.
/// Server lock must be held on entry.
/// Mirrors Go <c>Server.registerAccountNoLock</c>.
/// </summary>
public Account? RegisterAccountNoLock(Account acc)
{
// If already registered, return existing.
if (_accounts.TryGetValue(acc.Name, out var existing))
{
_tmpAccounts.TryRemove(acc.Name, out _);
return existing;
}
SetAccountSublist(acc);
SetRouteInfo(acc);
acc.Server = this;
acc.Updated = DateTime.UtcNow;
_accounts[acc.Name] = acc;
_tmpAccounts.TryRemove(acc.Name, out _);
// enableAccountTracking and registerSystemImports deferred to session 12.
EnableAccountTracking(acc);
return null;
}
// =========================================================================
// Route info for accounts (feature 3039)
// =========================================================================
/// <summary>
/// Sets the account's route-pool index based on cluster configuration.
/// Mirrors Go <c>Server.setRouteInfo</c>.
/// Both server and account locks must be held on entry.
/// </summary>
public void SetRouteInfo(Account acc)
{
const int accDedicatedRoute = -1;
if (_accRoutes != null && _accRoutes.ContainsKey(acc.Name))
{
// Dedicated route: store name in hash map; use index -1.
_accRouteByHash.TryAdd(acc.Name, null);
acc.RoutePoolIdx = accDedicatedRoute;
}
else
{
acc.RoutePoolIdx = ComputeRoutePoolIdx(_routesPoolSize, acc.Name);
if (_routesPoolSize > 1)
_accRouteByHash.TryAdd(acc.Name, acc.RoutePoolIdx);
}
}
// =========================================================================
// Account lookup (features 30403042)
// =========================================================================
/// <summary>
/// Returns the account for <paramref name="name"/> if locally known, without
/// fetching from the resolver.
/// Mirrors Go <c>Server.lookupAccountInternal</c> (private helper).
/// Lock must NOT be held on entry.
/// </summary>
public (Account? Account, Exception? Error) LookupAccountInternal(string name)
=> LookupOrFetchAccount(name, fetch: false);
/// <summary>
/// Returns the account for <paramref name="name"/>, optionally fetching from
/// the resolver if not locally known or if expired.
/// Mirrors Go <c>Server.lookupOrFetchAccount</c>.
/// Lock must NOT be held on entry.
/// </summary>
public (Account? Account, Exception? Error) LookupOrFetchAccount(string name, bool fetch)
{
_accounts.TryGetValue(name, out var acc);
if (acc != null)
{
if (acc.IsExpired())
{
Debugf("Requested account [{0}] has expired", name);
if (_accResolver != null && fetch)
{
var updateErr = UpdateAccount(acc);
if (updateErr != null)
return (null, ServerErrors.ErrAccountExpired);
}
else
{
return (null, ServerErrors.ErrAccountExpired);
}
}
return (acc, null);
}
if (_accResolver == null || !fetch)
return (null, ServerErrors.ErrMissingAccount);
return FetchAccountFromResolver(name);
}
/// <summary>
/// Public account lookup — always fetches from resolver if needed.
/// Mirrors Go <c>Server.LookupAccount</c>.
/// </summary>
public (Account? Account, Exception? Error) LookupAccount(string name)
=> LookupOrFetchAccount(name, fetch: true);
// =========================================================================
// Account update (features 30433044)
// =========================================================================
/// <summary>
/// Fetches fresh claims and updates the account if the claims have changed.
/// Mirrors Go <c>Server.updateAccount</c>.
/// Lock must NOT be held on entry.
/// </summary>
public Exception? UpdateAccount(Account acc)
{
// Don't update more than once per second unless the account is incomplete.
if (!acc.Incomplete && (DateTime.UtcNow - acc.Updated) < TimeSpan.FromSeconds(1))
{
Debugf("Requested account update for [{0}] ignored, too soon", acc.Name);
return ServerErrors.ErrAccountResolverUpdateTooSoon;
}
var (claimJwt, err) = FetchRawAccountClaims(acc.Name);
if (err != null) return err;
return UpdateAccountWithClaimJwt(acc, claimJwt);
}
/// <summary>
/// Applies updated JWT claims to the account if they differ from what is stored.
/// Mirrors Go <c>Server.updateAccountWithClaimJWT</c>.
/// Lock must NOT be held on entry.
/// </summary>
public Exception? UpdateAccountWithClaimJwt(Account acc, string claimJwt)
{
if (acc == null) return ServerErrors.ErrMissingAccount;
// If JWT hasn't changed and account is not incomplete, skip.
if (!string.IsNullOrEmpty(acc.ClaimJwt) && acc.ClaimJwt == claimJwt && !acc.Incomplete)
{
Debugf("Requested account update for [{0}], same claims detected", acc.Name);
return null;
}
var (accClaims, _, verifyErr) = VerifyAccountClaims(claimJwt);
if (verifyErr != null) return verifyErr;
if (accClaims == null) return null;
if (acc.Name != accClaims.Subject)
return ServerErrors.ErrAccountValidation;
acc.Issuer = accClaims.Issuer;
// Full UpdateAccountClaims() deferred to session 11.
acc.ClaimJwt = claimJwt;
return null;
}
// =========================================================================
// Account claims fetch / verify (features 30453048)
// =========================================================================
/// <summary>
/// Fetches the raw JWT string for an account from the resolver.
/// Mirrors Go <c>Server.fetchRawAccountClaims</c>.
/// Lock must NOT be held on entry.
/// </summary>
public (string Jwt, Exception? Error) FetchRawAccountClaims(string name)
{
if (_accResolver == null)
return (string.Empty, ServerErrors.ErrNoAccountResolver);
var start = DateTime.UtcNow;
var (jwt, err) = FetchAccountFromResolverRaw(name);
var elapsed = DateTime.UtcNow - start;
if (elapsed > TimeSpan.FromSeconds(1))
Warnf("Account [{0}] fetch took {1}", name, elapsed);
else
Debugf("Account [{0}] fetch took {1}", name, elapsed);
if (err != null)
{
Warnf("Account fetch failed: {0}", err);
return (string.Empty, err);
}
return (jwt, null);
}
/// <summary>
/// Fetches and decodes account JWT claims from the resolver.
/// Mirrors Go <c>Server.fetchAccountClaims</c>.
/// Lock must NOT be held on entry.
/// </summary>
public (AccountClaims? Claims, string Jwt, Exception? Error) FetchAccountClaims(string name)
{
var (claimJwt, err) = FetchRawAccountClaims(name);
if (err != null) return (null, string.Empty, err);
var (claims, verifiedJwt, verifyErr) = VerifyAccountClaims(claimJwt);
if (claims != null && claims.Subject != name)
return (null, string.Empty, ServerErrors.ErrAccountValidation);
return (claims, verifiedJwt, verifyErr);
}
/// <summary>
/// Decodes and validates an account JWT string.
/// Mirrors Go <c>Server.verifyAccountClaims</c>.
/// </summary>
public (AccountClaims? Claims, string Jwt, Exception? Error) VerifyAccountClaims(string claimJwt)
{
// Full JWT decoding deferred to session 06 JWT integration.
// Stub: create a minimal claims object from the raw JWT.
var claims = AccountClaims.TryDecode(claimJwt);
if (claims == null)
return (null, string.Empty, ServerErrors.ErrAccountValidation);
if (!IsTrustedIssuer(claims.Issuer))
return (null, string.Empty, ServerErrors.ErrAccountValidation);
return (claims, claimJwt, null);
}
/// <summary>
/// Fetches an account from the resolver, registers it, and returns it.
/// Mirrors Go <c>Server.fetchAccount</c>.
/// Lock must NOT be held on entry.
/// </summary>
public (Account? Account, Exception? Error) FetchAccount(string name) =>
FetchAccountFromResolver(name);
/// <summary>
/// Fetches an account from the resolver, registers it, and returns it.
/// Mirrors Go <c>Server.fetchAccount</c>.
/// Lock must NOT be held on entry.
/// </summary>
public (Account? Account, Exception? Error) FetchAccountFromResolver(string name)
{
var (accClaims, claimJwt, err) = FetchAccountClaims(name);
if (accClaims == null) return (null, err);
var acc = BuildInternalAccount(accClaims);
// Due to possible race, registerAccount may return an already-registered account.
var racc = RegisterAccountInternal(acc);
if (racc != null)
{
// Update with new claims if changed.
var updateErr = UpdateAccountWithClaimJwt(racc, claimJwt);
return updateErr != null ? (null, updateErr) : (racc, null);
}
acc.ClaimJwt = claimJwt;
return (acc, null);
}
// =========================================================================
// Account helpers
// =========================================================================
/// <summary>
/// Builds an Account from decoded claims.
/// Mirrors Go <c>Server.buildInternalAccount</c>.
/// Full JetStream limits / import / export wiring deferred to session 11.
/// </summary>
internal Account BuildInternalAccount(AccountClaims? claims)
{
var acc = new Account
{
Name = claims?.Subject ?? string.Empty,
Issuer = claims?.Issuer ?? string.Empty,
};
return acc;
}
/// <summary>
/// Fetches the raw JWT directly from the resolver without timing/logging.
/// </summary>
private (string Jwt, Exception? Error) FetchAccountFromResolverRaw(string name)
{
if (_accResolver == null)
return (string.Empty, ServerErrors.ErrNoAccountResolver);
try
{
var jwt = _accResolver.FetchAsync(name).GetAwaiter().GetResult();
return (jwt, null);
}
catch (Exception ex)
{
return (string.Empty, ex);
}
}
/// <summary>
/// Computes the route-pool index for an account name using FNV-32a.
/// Mirrors Go <c>computeRoutePoolIdx</c> (route.go).
/// </summary>
internal static int ComputeRoutePoolIdx(int poolSize, string accountName)
{
if (poolSize <= 1) return 0;
// FNV-32a hash (Go uses fnv.New32a)
uint hash = 2166136261u;
foreach (var b in System.Text.Encoding.UTF8.GetBytes(accountName))
{
hash ^= b;
hash *= 16777619u;
}
return (int)(hash % (uint)poolSize);
}
// =========================================================================
// Stubs for subsystems implemented in later sessions
// =========================================================================
/// <summary>
/// Stub: enables account tracking (session 12 — events.go).
/// </summary>
internal void EnableAccountTracking(Account acc)
{
ArgumentNullException.ThrowIfNull(acc);
Debugf("Enabled account tracking for {0}", acc.Name);
}
/// <summary>
/// Stub: registers system imports on an account (session 12).
/// </summary>
internal void RegisterSystemImports(Account acc)
{
ArgumentNullException.ThrowIfNull(acc);
acc.Imports.Services ??= new Dictionary<string, List<ServiceImportEntry>>(StringComparer.Ordinal);
}
/// <summary>
/// Stub: adds system-account exports (session 12).
/// </summary>
internal void AddSystemAccountExports(Account acc)
{
ArgumentNullException.ThrowIfNull(acc);
acc.Exports.Services ??= new Dictionary<string, ServiceExportEntry>(StringComparer.Ordinal);
}
}