diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Auth/AuthTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Auth/AuthTypes.cs index 0396adf..d43cc55 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Auth/AuthTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Auth/AuthTypes.cs @@ -167,10 +167,23 @@ public class RoutePermissions } /// -/// Stub for Account type. Full implementation in later sessions. +/// Stub for Account type. Full implementation in session 11. /// Mirrors Go Account struct. /// public class Account { - public string Name { get; set; } = string.Empty; + public string Name { get; set; } = string.Empty; + + // Fields required by session 09 account management (NatsServer.Accounts.cs). + // Full implementation in session 11. + public string Issuer { get; set; } = string.Empty; + internal string ClaimJwt { get; set; } = string.Empty; + internal int RoutePoolIdx { get; set; } + internal bool Incomplete { get; set; } + internal DateTime Updated { get; set; } + internal ZB.MOM.NatsNet.Server.Internal.DataStructures.SubscriptionIndex? Sublist { get; set; } + internal object? Server { get; set; } // INatsServer — avoids circular reference + + /// Returns true if this account's JWT has expired. Stub — full impl in session 11. + public bool IsExpired() => false; } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Accounts.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Accounts.cs new file mode 100644 index 0000000..0a56ae4 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Accounts.cs @@ -0,0 +1,713 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/server.go (account methods) in the NATS server Go source. +// Session 09: account management — configure, register, lookup, fetch. + +using ZB.MOM.NatsNet.Server.Auth; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + // ========================================================================= + // Account-mode helpers (features 3004–3007) + // ========================================================================= + + /// + /// Returns true when only the global ($G) account is defined (pre-NATS 2.0 mode). + /// Mirrors Go Server.globalAccountOnly. + /// + public bool GlobalAccountOnly() + { + if (_trustedKeys is not null) return false; + + bool hasOthers = false; + _mu.EnterReadLock(); + try + { + foreach (var kvp in _accounts) + { + var acc = kvp.Value; + // Ignore global and system accounts. + if (acc == _gacc) continue; + var sysAcc = _sysAccAtomic; + if (sysAcc != null && acc == sysAcc) continue; + hasOthers = true; + break; + } + } + finally { _mu.ExitReadLock(); } + + return !hasOthers; + } + + /// + /// Returns true when this server has no routes or gateways configured. + /// Mirrors Go Server.standAloneMode. + /// + public bool StandAloneMode() + { + var opts = GetOpts(); + return opts.Cluster.Port == 0 && opts.Gateway.Port == 0; + } + + /// + /// Returns the number of configured routes. + /// Mirrors Go Server.configuredRoutes. + /// + public int ConfiguredRoutes() => GetOpts().Routes.Count; + + /// + /// Returns online JetStream peer node names from the node-info map. + /// Mirrors Go Server.ActivePeers. + /// + public List ActivePeers() + { + var peers = new List(); + foreach (var kvp in _nodeToInfo) + { + if (kvp.Value is NodeInfo ni && !ni.Offline) + peers.Add(kvp.Key); + } + return peers; + } + + // ========================================================================= + // ConfigureAccounts (feature 3001) + // ========================================================================= + + /// + /// Reads accounts from options and registers/updates them. + /// Returns a set of account names whose stream imports changed (for reload) + /// and any error. + /// Mirrors Go Server.configureAccounts. + /// Server lock must be held on entry. + /// + public (HashSet ChangedStreamImports, Exception? Error) ConfigureAccounts(bool reloading) + { + var awcsti = new HashSet(StringComparer.Ordinal); + + // Create the global ($G) account if not yet present. + if (_gacc == null) + { + _gacc = new Account { Name = ServerConstants.DefaultGlobalAccount }; + RegisterAccountNoLock(_gacc); + } + + var opts = GetOpts(); + + // Walk accounts from options. + foreach (var optAcc in opts.Accounts) + { + Account a; + bool create = true; + + if (reloading && optAcc.Name != ServerConstants.DefaultGlobalAccount) + { + if (_accounts.TryGetValue(optAcc.Name, out var existing)) + { + a = existing; + // Full import/export diffing deferred to session 11 (accounts.go). + create = false; + } + else + { + a = new Account { Name = optAcc.Name }; + } + } + else + { + a = optAcc.Name == ServerConstants.DefaultGlobalAccount ? _gacc! : new Account { Name = optAcc.Name }; + } + + if (create) + { + // Will be a no-op for the global account (already registered). + RegisterAccountNoLock(a); + } + + // If an account named $SYS is found, make it the system account. + if (optAcc.Name == ServerConstants.DefaultSystemAccount && + string.IsNullOrEmpty(opts.SystemAccount)) + { + opts.SystemAccount = ServerConstants.DefaultSystemAccount; + } + } + + // Resolve system account if configured. + if (!string.IsNullOrEmpty(opts.SystemAccount)) + { + // Release server lock for lookupAccount (lock ordering: account → server). + _mu.ExitWriteLock(); + var (acc, err) = LookupAccountInternal(opts.SystemAccount); + _mu.EnterWriteLock(); + + if (err != null) + return (awcsti, new InvalidOperationException($"error resolving system account: {err.Message}", err)); + + if (acc != null && _sys != null && acc != _sys.Account) + _sys.Account = acc; + + if (acc != null) + _sysAccAtomic = acc; + } + + return (awcsti, null); + } + + // ========================================================================= + // Account counts (features 3022–3023) + // ========================================================================= + + /// + /// Returns the total number of registered accounts (slow, test only). + /// Mirrors Go Server.numAccounts. + /// + public int NumAccounts() + { + _mu.EnterReadLock(); + try { return _accounts.Count; } + finally { _mu.ExitReadLock(); } + } + + /// + /// Returns the number of loaded accounts. + /// Mirrors Go Server.NumLoadedAccounts. + /// + public int NumLoadedAccounts() => NumAccounts(); + + // ========================================================================= + // Account registration (features 3024–3025) + // ========================================================================= + + /// + /// Returns the named account if known, or creates and registers a new one. + /// Mirrors Go Server.LookupOrRegisterAccount. + /// + public (Account Account, bool IsNew) LookupOrRegisterAccount(string name) + { + _mu.EnterWriteLock(); + try + { + if (_accounts.TryGetValue(name, out var existing)) + return (existing, false); + + var acc = new Account { Name = name }; + RegisterAccountNoLock(acc); + return (acc, true); + } + finally { _mu.ExitWriteLock(); } + } + + /// + /// Registers a new account. Returns error if the account already exists. + /// Mirrors Go Server.RegisterAccount. + /// + public (Account? Account, Exception? Error) RegisterAccount(string name) + { + _mu.EnterWriteLock(); + try + { + if (_accounts.ContainsKey(name)) + return (null, ServerErrors.ErrAccountExists); + + var acc = new Account { Name = name }; + RegisterAccountNoLock(acc); + return (acc, null); + } + finally { _mu.ExitWriteLock(); } + } + + // ========================================================================= + // System account (features 3026–3030) + // ========================================================================= + + /// + /// Sets the named account as the server's system account. + /// Mirrors Go Server.SetSystemAccount. + /// + public Exception? SetSystemAccount(string accName) + { + if (_accounts.TryGetValue(accName, out var acc)) + return SetSystemAccountInternal(acc); + + // Not locally known — try resolver. + var (ac, _, fetchErr) = FetchAccountClaims(accName); + if (fetchErr != null) return fetchErr; + + var newAcc = BuildInternalAccount(ac); + // Due to race, registerAccount returns the existing one if already registered. + var racc = RegisterAccountInternal(newAcc); + return SetSystemAccountInternal(racc ?? newAcc); + } + + /// + /// Returns the system account, or null if none is set. + /// Mirrors Go Server.SystemAccount. + /// + public Account? SystemAccount() => _sysAccAtomic; + + /// + /// Returns the global ($G) account. + /// Mirrors Go Server.GlobalAccount. + /// + public Account? GlobalAccount() + { + _mu.EnterReadLock(); + try { return _gacc; } + finally { _mu.ExitReadLock(); } + } + + /// + /// Creates a default system account ($SYS) if one does not already exist. + /// Mirrors Go Server.SetDefaultSystemAccount. + /// + public Exception? SetDefaultSystemAccount() + { + var (_, isNew) = LookupOrRegisterAccount(ServerConstants.DefaultSystemAccount); + if (!isNew) return null; + Debugf("Created system account: \"{0}\"", ServerConstants.DefaultSystemAccount); + return SetSystemAccount(ServerConstants.DefaultSystemAccount); + } + + /// + /// Assigns as the system account and starts internal + /// messaging goroutines. + /// Mirrors Go Server.setSystemAccount. + /// Server lock must NOT be held on entry; this method acquires/releases it. + /// + public Exception? SetSystemAccountInternal(Account acc) + { + if (acc == null) + return ServerErrors.ErrMissingAccount; + if (acc.IsExpired()) + return ServerErrors.ErrAccountExpired; + if (!IsTrustedIssuer(acc.Issuer)) + return ServerErrors.ErrAccountValidation; + + _mu.EnterWriteLock(); + try + { + if (_sys != null) + return ServerErrors.ErrAccountExists; + + _sys = new InternalState { Account = acc }; + } + finally { _mu.ExitWriteLock(); } + + // Store atomically for fast lookup on hot paths. + _sysAccAtomic = acc; + + // Full internal-messaging bootstrap (initEventTracking, sendLoop, etc.) + // is deferred to session 12 (events.go). + AddSystemAccountExports(acc); + + return null; + } + + // ========================================================================= + // Internal client factories (features 3031–3034) + // ========================================================================= + + /// Creates an internal system client. + public ClientConnection CreateInternalSystemClient() => + CreateInternalClient(ClientKind.System); + + /// Creates an internal JetStream client. + public ClientConnection CreateInternalJetStreamClient() => + CreateInternalClient(ClientKind.JetStream); + + /// Creates an internal account client. + public ClientConnection CreateInternalAccountClient() => + CreateInternalClient(ClientKind.Account); + + /// + /// Creates an internal client of the given . + /// Mirrors Go Server.createInternalClient. + /// + public ClientConnection CreateInternalClient(ClientKind kind) + { + if (kind != ClientKind.System && kind != ClientKind.JetStream && kind != ClientKind.Account) + throw new InvalidOperationException($"createInternalClient: unsupported kind {kind}"); + + var c = new ClientConnection(kind, this); + // Mirrors: c.echo = false; c.headers = true; flags.set(noReconnect) + // Full client initialisation deferred to session 10 (client.go). + return c; + } + + // ========================================================================= + // Subscription tracking / account sublist (features 3035–3038) + // ========================================================================= + + /// + /// Returns true if accounts should track subscriptions for route/gateway propagation. + /// Server lock must be held on entry. + /// Mirrors Go Server.shouldTrackSubscriptions. + /// + public bool ShouldTrackSubscriptions() + { + var opts = GetOpts(); + return opts.Cluster.Port != 0 || opts.Gateway.Port != 0; + } + + /// + /// Invokes under the server write lock. + /// Returns the already-registered account if a duplicate is detected, or null. + /// Mirrors Go Server.registerAccount. + /// + public Account? RegisterAccountInternal(Account acc) + { + _mu.EnterWriteLock(); + try { return RegisterAccountNoLock(acc); } + finally { _mu.ExitWriteLock(); } + } + + /// + /// Sets the account's subscription index based on the NoSublistCache option. + /// Mirrors Go Server.setAccountSublist. + /// Server lock must be held on entry. + /// + public void SetAccountSublist(Account acc) + { + if (acc?.Sublist != null) return; + if (acc == null) return; + var opts = GetOpts(); + acc.Sublist = opts?.NoSublistCache == true + ? SubscriptionIndex.NewSublist(false) + : SubscriptionIndex.NewSublistWithCache(); + } + + /// + /// Registers an account in the server's account map. + /// If the account is already registered (race), returns the existing one. + /// Server lock must be held on entry. + /// Mirrors Go Server.registerAccountNoLock. + /// + public Account? RegisterAccountNoLock(Account acc) + { + // If already registered, return existing. + if (_accounts.TryGetValue(acc.Name, out var existing)) + { + _tmpAccounts.TryRemove(acc.Name, out _); + return existing; + } + + SetAccountSublist(acc); + SetRouteInfo(acc); + acc.Server = this; + acc.Updated = DateTime.UtcNow; + + _accounts[acc.Name] = acc; + _tmpAccounts.TryRemove(acc.Name, out _); + + // enableAccountTracking and registerSystemImports deferred to session 12. + EnableAccountTracking(acc); + + return null; + } + + // ========================================================================= + // Route info for accounts (feature 3039) + // ========================================================================= + + /// + /// Sets the account's route-pool index based on cluster configuration. + /// Mirrors Go Server.setRouteInfo. + /// Both server and account locks must be held on entry. + /// + public void SetRouteInfo(Account acc) + { + const int accDedicatedRoute = -1; + + if (_accRoutes != null && _accRoutes.ContainsKey(acc.Name)) + { + // Dedicated route: store name in hash map; use index -1. + _accRouteByHash.TryAdd(acc.Name, null); + acc.RoutePoolIdx = accDedicatedRoute; + } + else + { + acc.RoutePoolIdx = ComputeRoutePoolIdx(_routesPoolSize, acc.Name); + if (_routesPoolSize > 1) + _accRouteByHash.TryAdd(acc.Name, acc.RoutePoolIdx); + } + } + + // ========================================================================= + // Account lookup (features 3040–3042) + // ========================================================================= + + /// + /// Returns the account for if locally known, without + /// fetching from the resolver. + /// Mirrors Go Server.lookupAccountInternal (private helper). + /// Lock must NOT be held on entry. + /// + public (Account? Account, Exception? Error) LookupAccountInternal(string name) + => LookupOrFetchAccount(name, fetch: false); + + /// + /// Returns the account for , optionally fetching from + /// the resolver if not locally known or if expired. + /// Mirrors Go Server.lookupOrFetchAccount. + /// Lock must NOT be held on entry. + /// + public (Account? Account, Exception? Error) LookupOrFetchAccount(string name, bool fetch) + { + _accounts.TryGetValue(name, out var acc); + + if (acc != null) + { + if (acc.IsExpired()) + { + Debugf("Requested account [{0}] has expired", name); + if (_accResolver != null && fetch) + { + var updateErr = UpdateAccount(acc); + if (updateErr != null) + return (null, ServerErrors.ErrAccountExpired); + } + else + { + return (null, ServerErrors.ErrAccountExpired); + } + } + return (acc, null); + } + + if (_accResolver == null || !fetch) + return (null, ServerErrors.ErrMissingAccount); + + return FetchAccountFromResolver(name); + } + + /// + /// Public account lookup — always fetches from resolver if needed. + /// Mirrors Go Server.LookupAccount. + /// + public (Account? Account, Exception? Error) LookupAccount(string name) + => LookupOrFetchAccount(name, fetch: true); + + // ========================================================================= + // Account update (features 3043–3044) + // ========================================================================= + + /// + /// Fetches fresh claims and updates the account if the claims have changed. + /// Mirrors Go Server.updateAccount. + /// Lock must NOT be held on entry. + /// + public Exception? UpdateAccount(Account acc) + { + // Don't update more than once per second unless the account is incomplete. + if (!acc.Incomplete && (DateTime.UtcNow - acc.Updated) < TimeSpan.FromSeconds(1)) + { + Debugf("Requested account update for [{0}] ignored, too soon", acc.Name); + return ServerErrors.ErrAccountResolverUpdateTooSoon; + } + + var (claimJwt, err) = FetchRawAccountClaims(acc.Name); + if (err != null) return err; + + return UpdateAccountWithClaimJwt(acc, claimJwt); + } + + /// + /// Applies updated JWT claims to the account if they differ from what is stored. + /// Mirrors Go Server.updateAccountWithClaimJWT. + /// Lock must NOT be held on entry. + /// + public Exception? UpdateAccountWithClaimJwt(Account acc, string claimJwt) + { + if (acc == null) return ServerErrors.ErrMissingAccount; + + // If JWT hasn't changed and account is not incomplete, skip. + if (!string.IsNullOrEmpty(acc.ClaimJwt) && acc.ClaimJwt == claimJwt && !acc.Incomplete) + { + Debugf("Requested account update for [{0}], same claims detected", acc.Name); + return null; + } + + var (accClaims, _, verifyErr) = VerifyAccountClaims(claimJwt); + if (verifyErr != null) return verifyErr; + if (accClaims == null) return null; + + if (acc.Name != accClaims.Subject) + return ServerErrors.ErrAccountValidation; + + acc.Issuer = accClaims.Issuer; + // Full UpdateAccountClaims() deferred to session 11. + acc.ClaimJwt = claimJwt; + return null; + } + + // ========================================================================= + // Account claims fetch / verify (features 3045–3048) + // ========================================================================= + + /// + /// Fetches the raw JWT string for an account from the resolver. + /// Mirrors Go Server.fetchRawAccountClaims. + /// Lock must NOT be held on entry. + /// + public (string Jwt, Exception? Error) FetchRawAccountClaims(string name) + { + if (_accResolver == null) + return (string.Empty, ServerErrors.ErrNoAccountResolver); + + var start = DateTime.UtcNow; + var (jwt, err) = FetchAccountFromResolverRaw(name); + var elapsed = DateTime.UtcNow - start; + + if (elapsed > TimeSpan.FromSeconds(1)) + Warnf("Account [{0}] fetch took {1}", name, elapsed); + else + Debugf("Account [{0}] fetch took {1}", name, elapsed); + + if (err != null) + { + Warnf("Account fetch failed: {0}", err); + return (string.Empty, err); + } + return (jwt, null); + } + + /// + /// Fetches and decodes account JWT claims from the resolver. + /// Mirrors Go Server.fetchAccountClaims. + /// Lock must NOT be held on entry. + /// + public (AccountClaims? Claims, string Jwt, Exception? Error) FetchAccountClaims(string name) + { + var (claimJwt, err) = FetchRawAccountClaims(name); + if (err != null) return (null, string.Empty, err); + + var (claims, verifiedJwt, verifyErr) = VerifyAccountClaims(claimJwt); + if (claims != null && claims.Subject != name) + return (null, string.Empty, ServerErrors.ErrAccountValidation); + + return (claims, verifiedJwt, verifyErr); + } + + /// + /// Decodes and validates an account JWT string. + /// Mirrors Go Server.verifyAccountClaims. + /// + public (AccountClaims? Claims, string Jwt, Exception? Error) VerifyAccountClaims(string claimJwt) + { + // Full JWT decoding deferred to session 06 JWT integration. + // Stub: create a minimal claims object from the raw JWT. + var claims = AccountClaims.TryDecode(claimJwt); + if (claims == null) + return (null, string.Empty, ServerErrors.ErrAccountValidation); + + if (!IsTrustedIssuer(claims.Issuer)) + return (null, string.Empty, ServerErrors.ErrAccountValidation); + + return (claims, claimJwt, null); + } + + /// + /// Fetches an account from the resolver, registers it, and returns it. + /// Mirrors Go Server.fetchAccount. + /// Lock must NOT be held on entry. + /// + public (Account? Account, Exception? Error) FetchAccountFromResolver(string name) + { + var (accClaims, claimJwt, err) = FetchAccountClaims(name); + if (accClaims == null) return (null, err); + + var acc = BuildInternalAccount(accClaims); + // Due to possible race, registerAccount may return an already-registered account. + var racc = RegisterAccountInternal(acc); + if (racc != null) + { + // Update with new claims if changed. + var updateErr = UpdateAccountWithClaimJwt(racc, claimJwt); + return updateErr != null ? (null, updateErr) : (racc, null); + } + + acc.ClaimJwt = claimJwt; + return (acc, null); + } + + // ========================================================================= + // Account helpers + // ========================================================================= + + /// + /// Builds an Account from decoded claims. + /// Mirrors Go Server.buildInternalAccount. + /// Full JetStream limits / import / export wiring deferred to session 11. + /// + internal Account BuildInternalAccount(AccountClaims? claims) + { + var acc = new Account + { + Name = claims?.Subject ?? string.Empty, + Issuer = claims?.Issuer ?? string.Empty, + }; + return acc; + } + + /// + /// Fetches the raw JWT directly from the resolver without timing/logging. + /// + private (string Jwt, Exception? Error) FetchAccountFromResolverRaw(string name) + { + if (_accResolver == null) + return (string.Empty, ServerErrors.ErrNoAccountResolver); + var (jwt, err) = _accResolver.Fetch(name); + return (jwt, err); + } + + /// + /// Computes the route-pool index for an account name using FNV-32a. + /// Mirrors Go computeRoutePoolIdx (route.go). + /// + internal static int ComputeRoutePoolIdx(int poolSize, string accountName) + { + if (poolSize <= 1) return 0; + // FNV-32a hash (Go uses fnv.New32a) + uint hash = 2166136261u; + foreach (var b in System.Text.Encoding.UTF8.GetBytes(accountName)) + { + hash ^= b; + hash *= 16777619u; + } + return (int)(hash % (uint)poolSize); + } + + // ========================================================================= + // Stubs for subsystems implemented in later sessions + // ========================================================================= + + /// + /// Stub: enables account tracking (session 12 — events.go). + /// + internal void EnableAccountTracking(Account acc) { /* session 12 */ } + + /// + /// Stub: registers system imports on an account (session 12). + /// + internal void RegisterSystemImports(Account acc) { /* session 12 */ } + + /// + /// Stub: adds system-account exports (session 12). + /// + internal void AddSystemAccountExports(Account acc) { /* session 12 */ } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs new file mode 100644 index 0000000..f13ec5e --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs @@ -0,0 +1,1058 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/server.go in the NATS server Go source (lines 85–2579). +// Session 09: server initialization, validation, compression helpers. + +using System.Net; +using System.Text.RegularExpressions; +using ZB.MOM.NatsNet.Server.Auth; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + // ========================================================================= + // Server protocol version helpers (features 2974–2975) + // ========================================================================= + + /// + /// Returns the latest server-to-server protocol version, unless the test + /// override option is set. + /// Mirrors Go Server.getServerProto(). + /// + public int GetServerProto() + { + var opts = GetOpts(); + var proto = ServerProtocol.MsgTraceProto; + if (opts.OverrideProto < 0) + { + // overrideProto was set by SetServerProtoForTest as (wantedProto+1)*-1 + proto = (opts.OverrideProto * -1) - 1; + } + return proto; + } + + /// + /// Converts a wanted protocol level into the override value stored in options. + /// Mirrors Go setServerProtoForTest (test helper). + /// + public static int SetServerProtoForTest(int wantedProto) => (wantedProto + 1) * -1; + + // ========================================================================= + // Compression helpers (features 2976–2982) + // ========================================================================= + + /// + /// Validates and normalises the field. + /// Mirrors Go validateAndNormalizeCompressionOption. + /// + public static void ValidateAndNormalizeCompressionOption(CompressionOpts? c, string chosenModeForOn) + { + if (c is null) return; + + var cmtl = c.Mode.ToLowerInvariant(); + // Resolve "on" to the caller-chosen default. + switch (cmtl) + { + case "on": + case "enabled": + case "true": + cmtl = chosenModeForOn; + break; + } + + switch (cmtl) + { + case "not supported": + case "not_supported": + c.Mode = CompressionMode.NotSupported; + break; + + case "disabled": + case "off": + case "false": + c.Mode = CompressionMode.Off; + break; + + case "accept": + c.Mode = CompressionMode.Accept; + break; + + case "auto": + case "s2_auto": + { + List rtts; + if (c.RttThresholds.Count == 0) + { + rtts = [.. DefaultCompressionS2AutoRttThresholds]; + } + else + { + rtts = []; + foreach (var n in c.RttThresholds) + { + var t = n < TimeSpan.Zero ? TimeSpan.Zero : n; + if (rtts.Count > 0 && t != TimeSpan.Zero) + { + foreach (var v in rtts) + { + if (t < v) + throw new InvalidOperationException( + $"RTT threshold values {c.RttThresholds} should be in ascending order"); + } + } + rtts.Add(t); + } + + // Trim trailing zeros. + int stop = -1; + for (int i = rtts.Count - 1; i >= 0; i--) + { + if (rtts[i] != TimeSpan.Zero) { stop = i; break; } + } + rtts = rtts[..(stop + 1)]; + + if (rtts.Count > 4) + throw new InvalidOperationException( + $"Compression mode \"{c.Mode}\" should have no more than 4 RTT thresholds"); + if (rtts.Count == 0) + throw new InvalidOperationException( + $"Compression mode \"{c.Mode}\" requires at least one RTT threshold"); + } + c.Mode = CompressionMode.S2Auto; + c.RttThresholds = rtts; + break; + } + + case "fast": + case "s2_fast": + c.Mode = CompressionMode.S2Fast; + break; + + case "better": + case "s2_better": + c.Mode = CompressionMode.S2Better; + break; + + case "best": + case "s2_best": + c.Mode = CompressionMode.S2Best; + break; + + default: + throw new InvalidOperationException($"Unsupported compression mode \"{c.Mode}\""); + } + } + + /// + /// Default RTT thresholds for mode. + /// Mirrors Go defaultCompressionS2AutoRTTThresholds. + /// + public static readonly TimeSpan[] DefaultCompressionS2AutoRttThresholds = + [ + TimeSpan.FromMilliseconds(10), + TimeSpan.FromMilliseconds(50), + TimeSpan.FromMilliseconds(100), + ]; + + /// + /// Returns true if the compression mode requires negotiation with the remote. + /// Mirrors Go needsCompression. + /// + public static bool NeedsCompression(string m) => + m != string.Empty && m != CompressionMode.Off && m != CompressionMode.NotSupported; + + /// + /// Selects the effective compression mode given our local () + /// and remote () compression modes. + /// Mirrors Go selectCompressionMode. + /// + public static string SelectCompressionMode(string scm, string rcm) + { + if (rcm == CompressionMode.NotSupported || rcm == string.Empty) + return CompressionMode.NotSupported; + + switch (rcm) + { + case CompressionMode.Off: + return CompressionMode.Off; + + case CompressionMode.Accept: + return scm == CompressionMode.Accept ? CompressionMode.Off : scm; + + case CompressionMode.S2Auto: + case CompressionMode.S2Uncompressed: + case CompressionMode.S2Fast: + case CompressionMode.S2Better: + case CompressionMode.S2Best: + if (scm == CompressionMode.Accept) + return rcm == CompressionMode.S2Auto ? CompressionMode.S2Fast : rcm; + return scm; + + default: + throw new InvalidOperationException($"Unsupported route compression mode \"{rcm}\""); + } + } + + /// + /// Returns S2Auto if the configured mode is S2Auto; otherwise returns . + /// Mirrors Go compressionModeForInfoProtocol. + /// + public static string CompressionModeForInfoProtocol(CompressionOpts co, string cm) => + co.Mode == CompressionMode.S2Auto ? CompressionMode.S2Auto : cm; + + /// + /// Given an RTT and the configured thresholds, returns the appropriate S2 level. + /// Mirrors Go selectS2AutoModeBasedOnRTT. + /// + public static string SelectS2AutoModeBasedOnRtt(TimeSpan rtt, IReadOnlyList rttThresholds) + { + int idx = -1; + for (int i = 0; i < rttThresholds.Count; i++) + { + if (rtt <= rttThresholds[i]) { idx = i; break; } + } + if (idx < 0) + { + // Not found — use "best" when ≥3 levels, otherwise last index. + idx = rttThresholds.Count >= 3 ? 3 : rttThresholds.Count - 1; + } + return idx switch + { + 0 => CompressionMode.S2Uncompressed, + 1 => CompressionMode.S2Fast, + 2 => CompressionMode.S2Better, + _ => CompressionMode.S2Best, + }; + } + + /// + /// Returns true if the two are logically equal. + /// Mirrors Go compressOptsEqual. + /// + public static bool CompressOptsEqual(CompressionOpts? c1, CompressionOpts? c2) + { + if (ReferenceEquals(c1, c2)) return true; + if (c1 is null || c2 is null) return false; + if (c1.Mode != c2.Mode) return false; + + if (c1.Mode == CompressionMode.S2Auto) + { + var c1Rtts = c1.RttThresholds.Count == 0 + ? (IReadOnlyList)DefaultCompressionS2AutoRttThresholds + : c1.RttThresholds; + var c2Rtts = c2.RttThresholds.Count == 0 + ? (IReadOnlyList)DefaultCompressionS2AutoRttThresholds + : c2.RttThresholds; + + if (c1Rtts.Count != c2Rtts.Count) return false; + for (int i = 0; i < c1Rtts.Count; i++) + if (c1Rtts[i] != c2Rtts[i]) return false; + } + return true; + } + + // ========================================================================= + // Factory methods (features 2983–2985) + // ========================================================================= + + /// + /// Deprecated factory. Use instead. + /// Mirrors Go New. + /// + public static NatsServer? New(ServerOptions opts) + { + var (s, _) = NewServer(opts); + return s; + } + + /// + /// Creates a server from an options file path if is set. + /// Mirrors Go NewServerFromConfig. + /// + public static (NatsServer? Server, Exception? Error) NewServerFromConfig(ServerOptions opts) + { + if (!string.IsNullOrEmpty(opts.ConfigFile) && string.IsNullOrEmpty(opts.ConfigDigest())) + { + // opts.ProcessConfigFile(opts.ConfigFile) — full config file parsing deferred to session 03. + // For now, skip re-processing since Phase 6 tests supply options directly. + } + return NewServer(opts); + } + + /// + /// Creates and fully initializes a new NATS server. + /// Mirrors Go NewServer. + /// + public static (NatsServer? Server, Exception? Error) NewServer(ServerOptions opts) + { + opts.SetBaselineOptions(); + + // Generate server NKey identity. + // In Go: nkeys.CreateServer() — simplified here with Guid-based id. + var pub = Guid.NewGuid().ToString("N"); // mirrors kp.PublicKey() + // xkp (curve keys for encryption) — stub; full implementation in session 09 crypto. + var xpub = string.Empty; + + var serverName = !string.IsNullOrEmpty(opts.ServerName) ? opts.ServerName : pub; + var httpBasePath = ServerOptions.NormalizeBasePath(opts.HttpBasePath); + + // Validate options. + var valErr = ValidateOptions(opts); + if (valErr != null) return (null, valErr); + + var now = DateTime.UtcNow; + var tlsReq = opts.TlsConfig != null; + var verify = tlsReq && opts.TlsVerify; + + var info = new ServerInfo + { + Id = pub, + XKey = xpub, + Version = ServerConstants.Version, + Proto = ServerConstants.Proto, + GitCommit = ServerConstants.GitCommit, + GoVersion = System.Runtime.InteropServices.RuntimeInformation.FrameworkDescription, + Name = serverName, + Host = opts.Host, + Port = opts.Port, + AuthRequired = false, + TlsRequired = tlsReq && !opts.AllowNonTls, + TlsVerify = verify, + MaxPayload = opts.MaxPayload, + JetStream = opts.JetStream, + Headers = !opts.NoHeaderSupport, + Cluster = opts.Cluster.Name, + Domain = opts.JetStreamDomain, + JsApiLevel = 1, // stub — session 19 + }; + + if (tlsReq && !info.TlsRequired) + info.TlsAvailable = true; + + var s = new NatsServer(opts) + { + _info = info, + _configFile = opts.ConfigFile, + _start = now, + _configTime = now, + _pub = pub, + _xpub = xpub, + _httpBasePath = httpBasePath, + }; + + // Fill sync semaphore. + // (Already at capacity since SemaphoreSlim starts at MaxConcurrentSyncRequests) + + if (opts.TlsRateLimit > 0) + s._connRateCounter = RateCounterFactory.Create(opts.TlsRateLimit); + + // Process trusted operator keys. + if (!s.ProcessTrustedKeys()) + return (null, new InvalidOperationException("Error processing trusted operator keys")); + + // Handle leaf-node-only scenario (no cluster, needs stable cluster name). + if (opts.LeafNode.Remotes.Count > 0 && opts.Cluster.Port == 0 && string.IsNullOrEmpty(opts.Cluster.Name)) + { + s._leafNoCluster = true; + opts.Cluster.Name = opts.ServerName; + } + + if (!string.IsNullOrEmpty(opts.Cluster.Name)) + { + s._cnMu.EnterWriteLock(); + try { s._cn = opts.Cluster.Name; } + finally { s._cnMu.ExitWriteLock(); } + } + + s._mu.EnterWriteLock(); + try + { + // Process proxies trusted public keys (stub — session 09 proxy). + s.ProcessProxiesTrustedKeys(); + + // JetStream node info. + if (opts.JetStream) + { + var ourNode = GetHash(serverName); + s._nodeToInfo.TryAdd(ourNode, new NodeInfo + { + Name = serverName, + Version = ServerConstants.Version, + Cluster = opts.Cluster.Name, + Domain = opts.JetStreamDomain, + Id = info.Id, + Tags = [.. opts.Tags], + Js = true, + BinarySnapshots = true, + AccountNrg = true, + }); + } + + // Route resolver. + s._routeResolver = null; // Default system DNS — session 14 + + // URL maps. + // (Initialized via new() in field declarations) + + // Assign leaf options. + s._leafNodeEnabled = opts.LeafNode.Port != 0 || opts.LeafNode.Remotes.Count > 0; + + // OCSP (stub — session 23). + // s.EnableOcsp() — deferred + + // Gateway (stub — session 16). + // s.NewGateway(opts) — deferred + + // Cluster name. + if (opts.Cluster.Port != 0 && string.IsNullOrEmpty(opts.Cluster.Name)) + s._info.Cluster = Guid.NewGuid().ToString("N"); + else if (!string.IsNullOrEmpty(opts.Cluster.Name)) + s._info.Cluster = opts.Cluster.Name; + + // INFO host/port (stub — needs listener port, session 10). + s.SetInfoHostPort(); + + // Client tracking. + // (Initialized in field declarations) + + // Closed-clients ring buffer. + s._closed = new ClosedRingBuffer(opts.MaxClosedClients); + + // Route structures. + s.InitRouteStructures(opts); + + // Quit channel already via CancellationTokenSource. + + // Check account resolver. + var resolverErr = s.ConfigureResolver(); + if (resolverErr != null) + { + s._mu.ExitWriteLock(); + return (null, resolverErr); + } + + // URL account resolver health check (stub — session 11 for URLAccResolver). + + // Operator mode bootstrap: inject temporary system account if needed. + // (stub — requires full Account implementation from session 11) + + // Configure accounts. + var (_, accErr) = s.ConfigureAccounts(false); + if (accErr != null) + { + s._mu.ExitWriteLock(); + return (null, accErr); + } + + // Configure authorization (stub — session 09 auth). + s.ConfigureAuthorization(); + + // Signal handler (stub — session 04 already handled signals separately). + s.HandleSignals(); + } + finally + { + if (s._mu.IsWriteLockHeld) + s._mu.ExitWriteLock(); + } + + return (s, null); + } + + // ========================================================================= + // Route structures (feature 2986) + // ========================================================================= + + /// + /// Initializes route tracking structures based on pool size and pinned accounts. + /// Mirrors Go Server.initRouteStructures. + /// Server lock must be held on entry. + /// + public void InitRouteStructures(ServerOptions opts) + { + _routes = []; + _routesPoolSize = opts.Cluster.PoolSize > 0 ? opts.Cluster.PoolSize : 1; + + if (opts.Cluster.PinnedAccounts.Count > 0) + { + _accRoutes = []; + foreach (var acc in opts.Cluster.PinnedAccounts) + _accRoutes[acc] = []; + } + } + + // ========================================================================= + // logRejectedTLSConns background loop (feature 2987) + // ========================================================================= + + /// + /// Background loop that logs TLS rate-limited connection rejections every second. + /// Mirrors Go Server.logRejectedTLSConns. + /// + internal async Task LogRejectedTlsConnsAsync(CancellationToken ct) + { + using var timer = new PeriodicTimer(TimeSpan.FromSeconds(1)); + while (!ct.IsCancellationRequested) + { + try { await timer.WaitForNextTickAsync(ct); } + catch (OperationCanceledException) { break; } + + if (_connRateCounter is not null) + { + var blocked = _connRateCounter.CountBlocked(); + if (blocked > 0) + Warnf("Rejected {0} connections due to TLS rate limiting", blocked); + } + } + } + + // ========================================================================= + // Cluster name (features 2988–2991) + // ========================================================================= + + /// Returns the cluster name from the server info. + public string ClusterName() + { + _mu.EnterReadLock(); + try { return _info.Cluster ?? string.Empty; } + finally { _mu.ExitReadLock(); } + } + + /// Returns cluster name via the dedicated cluster-name lock (faster). + public string CachedClusterName() + { + _cnMu.EnterReadLock(); + try { return _cn; } + finally { _cnMu.ExitReadLock(); } + } + + /// + /// Updates the cluster name and notifies affected leaf nodes. + /// Mirrors Go Server.setClusterName. + /// + public void SetClusterName(string name) + { + _mu.EnterWriteLock(); + _info.Cluster = name; + _routeInfo.Cluster = name; + _mu.ExitWriteLock(); + + _cnMu.EnterWriteLock(); + try { _cn = name; } + finally { _cnMu.ExitWriteLock(); } + + Noticef("Cluster name updated to {0}", name); + } + + /// Returns true if the cluster name was not set in config (dynamic assignment). + public bool IsClusterNameDynamic() + { + _optsMu.EnterReadLock(); + try { return string.IsNullOrEmpty(_opts.Cluster.Name); } + finally { _optsMu.ExitReadLock(); } + } + + // ========================================================================= + // Server name / URLs (features 2992–2994) + // ========================================================================= + + /// Returns the configured server name. + public string ServerName() => GetOpts().ServerName; + + /// + /// Returns the URL used to connect clients. + /// Mirrors Go Server.ClientURL. + /// + public string ClientUrl() + { + var opts = GetOpts(); + var scheme = opts.TlsConfig != null ? "tls" : "nats"; + return $"{scheme}://{opts.Host}:{opts.Port}"; + } + + /// + /// Returns the URL used to connect WebSocket clients. + /// Mirrors Go Server.WebsocketURL. + /// + public string WebsocketUrl() + { + var opts = GetOpts(); + var scheme = opts.Websocket.TlsConfig != null ? "wss" : "ws"; + return $"{scheme}://{opts.Websocket.Host}:{opts.Websocket.Port}"; + } + + // ========================================================================= + // Validation (features 2995–2997) + // ========================================================================= + + /// + /// Validates cluster configuration options. + /// Mirrors Go validateCluster. + /// + public static Exception? ValidateCluster(ServerOptions o) + { + if (!string.IsNullOrEmpty(o.Cluster.Name) && o.Cluster.Name.Contains(' ')) + return ServerErrors.ErrClusterNameHasSpaces; + + if (!string.IsNullOrEmpty(o.Cluster.Compression.Mode)) + { + try { ValidateAndNormalizeCompressionOption(o.Cluster.Compression, CompressionMode.S2Fast); } + catch (Exception ex) { return ex; } + } + + var pinnedErr = ValidatePinnedCerts(o.Cluster.TlsPinnedCerts); + if (pinnedErr != null) + return new InvalidOperationException($"cluster: {pinnedErr.Message}", pinnedErr); + + // Sync gateway name with cluster name. + if (!string.IsNullOrEmpty(o.Gateway.Name) && o.Gateway.Name != o.Cluster.Name) + { + if (!string.IsNullOrEmpty(o.Cluster.Name)) + return ServerErrors.ErrClusterNameConfigConflict; + o.Cluster.Name = o.Gateway.Name; + } + + if (o.Cluster.PinnedAccounts.Count > 0) + { + if (o.Cluster.PoolSize < 0) + return new InvalidOperationException("pool_size cannot be negative if pinned accounts are specified"); + + var seen = new HashSet(StringComparer.Ordinal); + foreach (var a in o.Cluster.PinnedAccounts) + { + if (!seen.Add(a)) + return new InvalidOperationException( + $"found duplicate account name \"{a}\" in pinned accounts list"); + } + } + return null; + } + + /// + /// Validates pinned certificate SHA-256 fingerprints. + /// Mirrors Go validatePinnedCerts. + /// + public static Exception? ValidatePinnedCerts(PinnedCertSet? pinned) + { + if (pinned is null) return null; + var re = new Regex("^[a-f0-9]{64}$", RegexOptions.Compiled); + foreach (var certId in pinned) + { + if (!re.IsMatch(certId.ToLowerInvariant())) + return new InvalidOperationException( + $"error parsing 'pinned_certs' key {certId} does not look like lower case hex-encoded sha256 of DER encoded SubjectPublicKeyInfo"); + } + return null; + } + + /// + /// Validates all server options. + /// Mirrors Go validateOptions. + /// + public static Exception? ValidateOptions(ServerOptions o) + { + if (o.LameDuckDuration > TimeSpan.Zero && o.LameDuckGracePeriod >= o.LameDuckDuration) + return new InvalidOperationException( + $"lame duck grace period ({o.LameDuckGracePeriod}) should be strictly lower than lame duck duration ({o.LameDuckDuration})"); + + if ((long)o.MaxPayload > o.MaxPending) + return new InvalidOperationException( + $"max_payload ({o.MaxPayload}) cannot be higher than max_pending ({o.MaxPending})"); + + if (!string.IsNullOrEmpty(o.ServerName) && o.ServerName.Contains(' ')) + return new InvalidOperationException("server name cannot contain spaces"); + + // Trusted operators, leafnode, auth, proxies, gateway, cluster, MQTT, websocket + // — validation stubs delegating to not-yet-ported subsystems. + var err = ValidateCluster(o); + return err; + } + + // ========================================================================= + // Options accessors (features 2998–2999) + // ========================================================================= + + /// Returns a thread-safe snapshot of the current options. + public ServerOptions GetOpts() + { + _optsMu.EnterReadLock(); + try { return _opts; } + finally { _optsMu.ExitReadLock(); } + } + + /// Replaces the options atomically (used during config reload). + public void SetOpts(ServerOptions opts) + { + _optsMu.EnterWriteLock(); + try { _opts = opts; } + finally { _optsMu.ExitWriteLock(); } + } + + // ========================================================================= + // Global account (feature 3000) + // ========================================================================= + + /// + /// Returns the global account (internal, lock-protected). + /// Mirrors Go Server.globalAccount. + /// + internal Account? GlobalAccountInternal() + { + _mu.EnterReadLock(); + try { return _gacc; } + finally { _mu.ExitReadLock(); } + } + + // ========================================================================= + // Trusted keys (features 3008–3011) + // ========================================================================= + + /// + /// Returns true if the given issuer key is trusted. + /// Mirrors Go Server.isTrustedIssuer. + /// + public bool IsTrustedIssuer(string issuer) + { + _mu.EnterReadLock(); + try + { + if (_trustedKeys is null && string.IsNullOrEmpty(issuer)) return true; + return _trustedKeys?.Contains(issuer) == true; + } + finally { _mu.ExitReadLock(); } + } + + /// + /// Processes binary-stamped and options-based trusted NKeys. + /// Mirrors Go Server.processTrustedKeys. + /// + public bool ProcessTrustedKeys() + { + _strictSigningKeyUsage = []; + var opts = GetOpts(); + + if (!string.IsNullOrEmpty(StampedTrustedKeys) && !InitStampedTrustedKeys()) + return false; + + if (opts.TrustedKeys is { Count: > 0 }) + { + foreach (var key in opts.TrustedKeys) + { + if (!IsValidPublicOperatorKey(key)) + return false; + } + _trustedKeys = [.. opts.TrustedKeys]; + + foreach (var claim in opts.TrustedOperators) + { + // stub: claim.StrictSigningKeyUsage / claim.SigningKeys — session 06 + // Will be populated in auth session when TrustedOperator is fully typed. + } + } + return true; + } + + /// + /// Parses a space-separated list of public operator NKeys. + /// Mirrors Go checkTrustedKeyString. + /// + public static List? CheckTrustedKeyString(string keys) + { + var tks = keys.Split(' ', StringSplitOptions.RemoveEmptyEntries); + if (tks.Length == 0) return null; + foreach (var key in tks) + { + if (!IsValidPublicOperatorKey(key)) return null; + } + return [.. tks]; + } + + /// + /// Initialises trusted keys from the binary-stamped field. + /// Mirrors Go Server.initStampedTrustedKeys. + /// + public bool InitStampedTrustedKeys() + { + if (GetOpts().TrustedKeys is { Count: > 0 }) return false; + var tks = CheckTrustedKeyString(StampedTrustedKeys); + if (tks is null) return false; + _trustedKeys = tks; + return true; + } + + // ========================================================================= + // CLI helpers (features 3012–3014) + // ========================================================================= + + /// Prints to stderr and exits with code 1. + public static void PrintAndDie(string msg) + { + Console.Error.WriteLine(msg); + Environment.Exit(1); + } + + /// Prints the server version string and exits with code 0. + public static void PrintServerAndExit() + { + Console.WriteLine($"nats-server: v{ServerConstants.Version}"); + Environment.Exit(0); + } + + /// + /// Processes subcommands in the CLI args array. + /// Mirrors Go ProcessCommandLineArgs. + /// Returns (showVersion, showHelp, error). + /// + public static (bool ShowVersion, bool ShowHelp, Exception? Error) + ProcessCommandLineArgs(string[] args) + { + foreach (var arg in args) + { + switch (arg.ToLowerInvariant()) + { + case "version": return (true, false, null); + case "help": return (false, true, null); + default: + return (false, false, + new InvalidOperationException($"Unknown argument: {arg}")); + } + } + return (false, false, null); + } + + // ========================================================================= + // Running state (features 3015–3016) + // ========================================================================= + + /// Returns true if the server is running. + public bool Running() => IsRunning(); + + /// Protected check on running state. + private bool IsRunning() => Interlocked.CompareExchange(ref _running, 0, 0) != 0; + + /// Returns true if the server is shutting down. + public bool IsShuttingDown() => Interlocked.CompareExchange(ref _shutdown, 0, 0) != 0; + + // ========================================================================= + // PID file (feature 3017) + // ========================================================================= + + /// + /// Writes the process PID to the configured PID file. + /// Mirrors Go Server.logPid. + /// + public Exception? LogPid() + { + var pidFile = GetOpts().PidFile; + if (string.IsNullOrEmpty(pidFile)) return null; + try + { + File.WriteAllText(pidFile, Environment.ProcessId.ToString()); + return null; + } + catch (Exception ex) { return ex; } + } + + // ========================================================================= + // Active account counters (features 3018–3021) + // ========================================================================= + + /// Returns the number of reserved accounts (currently always 1). + public int NumReservedAccounts() => 1; + + /// Reports the number of active accounts on this server. + public int NumActiveAccounts() => Interlocked.CompareExchange(ref _activeAccounts, 0, 0); + + // ========================================================================= + // Misc helpers + // ========================================================================= + + /// + /// Sets the INFO host/port from the current listener endpoint. + /// Full implementation in session 10 (listener startup). + /// Stub sets to opts values. + /// + internal void SetInfoHostPort() + { + var opts = GetOpts(); + _info.Host = opts.Host; + _info.Port = opts.Port; + } + + /// + /// Stub: configure authorization (session 06 auth). + /// Called from NewServer. + /// + internal void ConfigureAuthorization() + { + // Full implementation in session 09 (auth handlers are in session 06). + // Users/NKeys maps are populated from opts here. + var opts = GetOpts(); + _users = opts.Users?.ToDictionary(u => u.Username, StringComparer.Ordinal) + ?? []; + _nkeys = opts.Nkeys?.ToDictionary(nk => nk.Nkey, StringComparer.Ordinal) + ?? []; + } + + /// + /// Stub: start signal handler (session 04 already has signal handling). + /// + internal void HandleSignals() { } + + /// + /// Stub: process proxies trusted keys (session 08/09). + /// + internal void ProcessProxiesTrustedKeys() { } + + /// + /// Computes a stable short hash from a string (used for JetStream node names). + /// Mirrors Go getHash. + /// + internal static string GetHash(string s) + { + var bytes = System.Text.Encoding.UTF8.GetBytes(s); + var hash = System.Security.Cryptography.SHA256.HashData(bytes); + return Convert.ToBase64String(hash)[..8].Replace('+', '-').Replace('/', '_'); + } + + /// + /// Validates that a string is a valid public operator NKey. + /// Mirrors Go nkeys.IsValidPublicOperatorKey. + /// Simplified: checks length and prefix 'O' for operator keys. + /// + internal static bool IsValidPublicOperatorKey(string key) => + !string.IsNullOrEmpty(key) && key.Length == 56 && key[0] == 'O'; + + // ========================================================================= + // Start (feature 3049) + // ========================================================================= + + /// + /// Starts the server (non-blocking). Writes startup log lines and begins accept loops. + /// Full implementation requires sessions 10-23 (gateway, websocket, leafnode, routes, etc.). + /// This stub handles the bootstrap sequence up to the subsystems not yet ported. + /// Mirrors Go Server.Start. + /// + public void Start() + { + Noticef("Starting nats-server"); + + var gc = string.IsNullOrEmpty(ServerConstants.GitCommit) ? "not set" : ServerConstants.GitCommit; + var opts = GetOpts(); + + _mu.EnterReadLock(); + var leafNoCluster = _leafNoCluster; + _mu.ExitReadLock(); + + var clusterName = leafNoCluster ? string.Empty : ClusterName(); + + Noticef(" Version: {0}", ServerConstants.Version); + Noticef(" Git: [{0}]", gc); + if (!string.IsNullOrEmpty(clusterName)) + Noticef(" Cluster: {0}", clusterName); + Noticef(" Name: {0}", _info.Name); + Noticef(" ID: {0}", _info.Id); + + // Avoid RACE between Start() and Shutdown(). + Interlocked.Exchange(ref _running, 1); + + _mu.EnterWriteLock(); + _leafNodeEnabled = opts.LeafNode.Port != 0 || opts.LeafNode.Remotes.Count > 0; + _mu.ExitWriteLock(); + + lock (_grMu) { _grRunning = true; } + + // Log PID. + if (!string.IsNullOrEmpty(opts.PidFile)) + { + var pidErr = LogPid(); + if (pidErr != null) + { + Fatalf("Could not write pidfile: {0}", pidErr); + return; + } + } + + // System account setup. + if (!string.IsNullOrEmpty(opts.SystemAccount)) + { + var saErr = SetSystemAccount(opts.SystemAccount); + if (saErr != null) + { + Fatalf("Can't set system account: {0}", saErr); + return; + } + } + else if (!opts.NoSystemAccount) + { + SetDefaultSystemAccount(); + } + + // Signal startup complete. + _startupComplete.TrySetResult(); + + Noticef("Server is ready"); + } + + // ========================================================================= + // Account resolver (feature 3002) + // ========================================================================= + + /// + /// Wires up the account resolver from opts and preloads any JWT claims. + /// Mirrors Go Server.configureResolver. + /// Server lock should be held on entry; released/reacquired internally for preloads. + /// + public Exception? ConfigureResolver() + { + var opts = GetOpts(); + _accResolver = opts.AccountResolver; + + if (opts.AccountResolver is not null && opts.ResolverPreloads.Count > 0) + { + var ar = _accResolver!; + if (ar.IsReadOnly()) + return new InvalidOperationException( + "resolver preloads only available for writeable resolver types MEM/DIR/CACHE_DIR"); + + foreach (var (k, v) in opts.ResolverPreloads) + { + // Validate JWT format (stub — session 06 has JWT decoder). + // jwt.DecodeAccountClaims(v) — skip here, checked again in CheckResolvePreloads. + ar.Store(k, v); + } + } + return null; + } + + /// + /// Validates preloaded resolver JWT claims and logs warnings. + /// Mirrors Go Server.checkResolvePreloads. + /// + public void CheckResolvePreloads() + { + var opts = GetOpts(); + foreach (var (k, _) in opts.ResolverPreloads) + { + // Full JWT validation deferred to session 06 JWT integration. + Debugf("Checking preloaded account [{0}]", k); + } + } + + /// Returns the configured account resolver. + public IAccountResolver? AccountResolver() => _accResolver; +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs new file mode 100644 index 0000000..b3995ef --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs @@ -0,0 +1,305 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/server.go in the NATS server Go source. + +using System.Collections.Concurrent; +using System.Threading.Channels; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using ZB.MOM.NatsNet.Server.Auth; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +/// +/// The core NATS server class. +/// Mirrors Go Server struct in server/server.go. +/// Session 09: initialization, configuration, and account management. +/// Sessions 10-23 add further capabilities as partial class files. +/// +public sealed partial class NatsServer : INatsServer +{ + // ========================================================================= + // Build-time stamps (mirrors package-level vars in server.go) + // ========================================================================= + + /// + /// Binary-stamped trusted operator keys (space-separated NKey public keys). + /// In Go this is a package-level var that can be overridden at build time. + /// In .NET it can be set before constructing any server instance. + /// Mirrors Go package-level trustedKeys var. + /// + public static string StampedTrustedKeys { get; set; } = string.Empty; + + // ========================================================================= + // Atomic counters (mirrors fields accessed with atomic operations) + // ========================================================================= + + private ulong _gcid; // global client id counter + private long _pinnedAccFail; // pinned-account auth failures + private int _activeAccounts; // number of active accounts + + // ========================================================================= + // Stats (embedded Go structs: stats, scStats, staleStats) + // ========================================================================= + + private readonly ServerStats _stats = new(); + private readonly SlowConsumerStats _scStats = new(); + private readonly StaleConnectionStats _staleStats = new(); + + // ========================================================================= + // Core identity + // ========================================================================= + + // kp / xkp are NKey keypairs — represented as byte arrays here. + // Full crypto operations deferred to auth session. + private byte[]? _kpSeed; // server NKey seed + private string _pub = string.Empty; // server public key (server ID) + private byte[]? _xkpSeed; // x25519 key seed + private string _xpub = string.Empty; // x25519 public key + + // ========================================================================= + // Server info (wire protocol) + // ========================================================================= + + private readonly ReaderWriterLockSlim _mu = new(LockRecursionPolicy.SupportsRecursion); + private readonly ReaderWriterLockSlim _reloadMu = new(LockRecursionPolicy.SupportsRecursion); + private ServerInfo _info = new(); + private string _configFile = string.Empty; + + // ========================================================================= + // Options (protected by _optsMu) + // ========================================================================= + + private readonly ReaderWriterLockSlim _optsMu = new(LockRecursionPolicy.NoRecursion); + private ServerOptions _opts; + + // ========================================================================= + // Running / shutdown state + // ========================================================================= + + private int _running; // 1 = running, 0 = not (Interlocked) + private int _shutdown; // 1 = shutting down + private readonly CancellationTokenSource _quitCts = new(); + private readonly TaskCompletionSource _startupComplete = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly TaskCompletionSource _shutdownComplete = new(TaskCreationOptions.RunContinuationsAsynchronously); + private Task? _quitTask; + + // ========================================================================= + // Listeners (forward-declared stubs — fully wired in session 10) + // ========================================================================= + + private System.Net.Sockets.TcpListener? _listener; + private Exception? _listenerErr; + + // ========================================================================= + // Accounts + // ========================================================================= + + private Account? _gacc; // global account + private Account? _sysAccAtomic; // system account (atomic) + private readonly ConcurrentDictionary _accounts = new(StringComparer.Ordinal); + private readonly ConcurrentDictionary _tmpAccounts = new(StringComparer.Ordinal); + private IAccountResolver? _accResolver; + private InternalState? _sys; // system messaging state + + // ========================================================================= + // Client/route/leaf tracking + // ========================================================================= + + private readonly Dictionary _clients = []; + private readonly Dictionary _leafs = []; + private Dictionary> _routes = []; + private int _routesPoolSize = 1; + private bool _routesReject; + private int _routesNoPool; + private Dictionary>? _accRoutes; + private readonly ConcurrentDictionary _accRouteByHash = new(StringComparer.Ordinal); + private Channel? _accAddedCh; // stub + private string _accAddedReqId = string.Empty; + + // ========================================================================= + // User / nkey maps + // ========================================================================= + + private Dictionary? _users; + private Dictionary? _nkeys; + + // ========================================================================= + // Connection tracking + // ========================================================================= + + private ulong _totalClients; + private ClosedRingBuffer _closed = new(0); + private DateTime _start; + private DateTime _configTime; + + // ========================================================================= + // Goroutine / WaitGroup tracking + // ========================================================================= + + private readonly object _grMu = new(); + private bool _grRunning; + private readonly Dictionary _grTmpClients = []; + private readonly SemaphoreSlim _grWg = new(1, 1); // simplified wg + + // ========================================================================= + // Cluster name (separate lock) + // ========================================================================= + + private readonly ReaderWriterLockSlim _cnMu = new(LockRecursionPolicy.NoRecursion); + private string _cn = string.Empty; + private ServerInfo _routeInfo = new(); + private bool _leafNoCluster; + private bool _leafNodeEnabled; + private bool _leafDisableConnect; + private bool _ldm; + + // ========================================================================= + // Trusted keys + // ========================================================================= + + private List? _trustedKeys; + private HashSet _strictSigningKeyUsage = []; + + // ========================================================================= + // Monitoring / stats endpoint + // ========================================================================= + + private string _httpBasePath = string.Empty; + private readonly Dictionary _httpReqStats = []; + + // ========================================================================= + // Client connect URLs + // ========================================================================= + + private readonly List _clientConnectUrls = []; + private readonly RefCountedUrlSet _clientConnectUrlsMap = new(); + + // ========================================================================= + // Gateway / Websocket / MQTT / OCSP stubs + // ========================================================================= + + private readonly SrvGateway _gateway = new(); + private readonly SrvWebsocket _websocket = new(); + private readonly SrvMqtt _mqtt = new(); + private OcspMonitor[]? _ocsps; + private bool _ocspPeerVerify; + private IOcspResponseCache? _ocsprc; + + // ========================================================================= + // Gateway reply map (stub — session 16) + // ========================================================================= + + private readonly SubscriptionIndex _gwLeafSubs; + + // ========================================================================= + // NUID event ID generator + // ========================================================================= + + // Replaced by actual NUID in session 10. Use Guid for now. + private string NextEventId() => Guid.NewGuid().ToString("N"); + + // ========================================================================= + // Various stubs + // ========================================================================= + + private readonly List _leafRemoteCfgs = []; // stub — session 15 + private readonly List _proxiesKeyPairs = []; // stub — session 09 (proxies) + private readonly Dictionary> _proxiedConns = []; + private long _cproto; // count of INFO-capable clients + private readonly ConcurrentDictionary _nodeToInfo = new(StringComparer.Ordinal); + private readonly ConcurrentDictionary _raftNodes = new(StringComparer.Ordinal); + private readonly Dictionary _routesToSelf = []; + private INetResolver? _routeResolver; + private readonly ConcurrentDictionary _rateLimitLogging = new(); + private readonly Channel _rateLimitLoggingCh; + private RateCounter? _connRateCounter; + + // GW reply map expiration + private readonly ConcurrentDictionary _gwrm = new(); + + // Catchup bytes + private readonly ReaderWriterLockSlim _gcbMu = new(LockRecursionPolicy.NoRecursion); + private long _gcbOut; + private long _gcbOutMax; + private readonly Channel? _gcbKick; // stub + + // Sync-out semaphore + private readonly SemaphoreSlim _syncOutSem; + private const int MaxConcurrentSyncRequests = 16; + + // ========================================================================= + // Logging + // ========================================================================= + + private ILogger _logger = NullLogger.Instance; + private int _traceEnabled; + private int _debugEnabled; + private int _traceSysAcc; + + // ========================================================================= + // INatsServer implementation + // ========================================================================= + + /// + public ulong NextClientId() => Interlocked.Increment(ref _gcid); + + /// + public ServerOptions Options => GetOpts(); + + /// + public bool TraceEnabled => Interlocked.CompareExchange(ref _traceEnabled, 0, 0) != 0; + + /// + public bool TraceSysAcc => Interlocked.CompareExchange(ref _traceSysAcc, 0, 0) != 0; + + /// + public ILogger Logger => _logger; + + /// + public void DecActiveAccounts() => Interlocked.Decrement(ref _activeAccounts); + + /// + public void IncActiveAccounts() => Interlocked.Increment(ref _activeAccounts); + + // ========================================================================= + // Logging helpers (mirrors Go s.Debugf / s.Noticef / s.Warnf / s.Errorf) + // ========================================================================= + + internal void Debugf(string fmt, params object?[] args) => _logger.LogDebug(fmt, args); + internal void Noticef(string fmt, params object?[] args) => _logger.LogInformation(fmt, args); + internal void Warnf(string fmt, params object?[] args) => _logger.LogWarning(fmt, args); + internal void Errorf(string fmt, params object?[] args) => _logger.LogError(fmt, args); + internal void Fatalf(string fmt, params object?[] args) => _logger.LogCritical(fmt, args); + + // ========================================================================= + // Constructor + // ========================================================================= + + /// + /// Direct constructor — do not call directly; use . + /// + private NatsServer(ServerOptions opts) + { + _opts = opts; + _gwLeafSubs = SubscriptionIndex.NewSublistWithCache(); + _rateLimitLoggingCh = Channel.CreateBounded(1); + _syncOutSem = new SemaphoreSlim(MaxConcurrentSyncRequests, MaxConcurrentSyncRequests); + } +} + +// Placeholder struct for stub channel types +internal readonly struct struct_ { } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs new file mode 100644 index 0000000..55b8c0f --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs @@ -0,0 +1,293 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/server.go in the NATS server Go source. + +using System.Text.Json.Serialization; +using ZB.MOM.NatsNet.Server.Auth; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +// ============================================================================ +// Wire-protocol Info payload +// ============================================================================ + +/// +/// The INFO payload sent to clients, routes, gateways and leaf nodes. +/// Mirrors Go Info struct in server.go. +/// +public sealed class ServerInfo +{ + [JsonPropertyName("server_id")] public string Id { get; set; } = string.Empty; + [JsonPropertyName("server_name")] public string Name { get; set; } = string.Empty; + [JsonPropertyName("version")] public string Version { get; set; } = string.Empty; + [JsonPropertyName("proto")] public int Proto { get; set; } + [JsonPropertyName("git_commit")] public string? GitCommit { get; set; } + [JsonPropertyName("go")] public string GoVersion { get; set; } = string.Empty; + [JsonPropertyName("host")] public string Host { get; set; } = string.Empty; + [JsonPropertyName("port")] public int Port { get; set; } + [JsonPropertyName("headers")] public bool Headers { get; set; } + [JsonPropertyName("auth_required")] public bool AuthRequired { get; set; } + [JsonPropertyName("tls_required")] public bool TlsRequired { get; set; } + [JsonPropertyName("tls_verify")] public bool TlsVerify { get; set; } + [JsonPropertyName("tls_available")] public bool TlsAvailable { get; set; } + [JsonPropertyName("max_payload")] public int MaxPayload { get; set; } + [JsonPropertyName("jetstream")] public bool JetStream { get; set; } + [JsonPropertyName("ip")] public string? Ip { get; set; } + [JsonPropertyName("client_id")] public ulong Cid { get; set; } + [JsonPropertyName("client_ip")] public string? ClientIp { get; set; } + [JsonPropertyName("nonce")] public string? Nonce { get; set; } + [JsonPropertyName("cluster")] public string? Cluster { get; set; } + [JsonPropertyName("cluster_dynamic")] public bool Dynamic { get; set; } + [JsonPropertyName("domain")] public string? Domain { get; set; } + [JsonPropertyName("connect_urls")] public string[]? ClientConnectUrls { get; set; } + [JsonPropertyName("ws_connect_urls")] public string[]? WsConnectUrls { get; set; } + [JsonPropertyName("ldm")] public bool LameDuckMode { get; set; } + [JsonPropertyName("compression")] public string? Compression { get; set; } + [JsonPropertyName("connect_info")] public bool ConnectInfo { get; set; } + [JsonPropertyName("remote_account")] public string? RemoteAccount { get; set; } + [JsonPropertyName("acc_is_sys")] public bool IsSystemAccount { get; set; } + [JsonPropertyName("api_lvl")] public int JsApiLevel { get; set; } + [JsonPropertyName("xkey")] public string? XKey { get; set; } + + // Route-specific + [JsonPropertyName("import")] public SubjectPermission? Import { get; set; } + [JsonPropertyName("export")] public SubjectPermission? Export { get; set; } + [JsonPropertyName("lnoc")] public bool Lnoc { get; set; } + [JsonPropertyName("lnocu")] public bool Lnocu { get; set; } + [JsonPropertyName("info_on_connect")] public bool InfoOnConnect { get; set; } + [JsonPropertyName("route_pool_size")] public int RoutePoolSize { get; set; } + [JsonPropertyName("route_pool_idx")] public int RoutePoolIdx { get; set; } + [JsonPropertyName("route_account")] public string? RouteAccount { get; set; } + [JsonPropertyName("route_acc_add_reqid")] public string? RouteAccReqId { get; set; } + [JsonPropertyName("gossip_mode")] public byte GossipMode { get; set; } + + // Gateway-specific + [JsonPropertyName("gateway")] public string? Gateway { get; set; } + [JsonPropertyName("gateway_urls")] public string[]? GatewayUrls { get; set; } + [JsonPropertyName("gateway_url")] public string? GatewayUrl { get; set; } + [JsonPropertyName("gateway_cmd")] public byte GatewayCmd { get; set; } + [JsonPropertyName("gateway_cmd_payload")] public byte[]? GatewayCmdPayload { get; set; } + [JsonPropertyName("gateway_nrp")] public bool GatewayNrp { get; set; } + [JsonPropertyName("gateway_iom")] public bool GatewayIom { get; set; } + + // LeafNode-specific + [JsonPropertyName("leafnode_urls")] public string[]? LeafNodeUrls { get; set; } +} + +// ============================================================================ +// Server stats structures +// ============================================================================ + +/// +/// Aggregate message/byte counters for the server. +/// Mirrors Go embedded stats struct in server.go. +/// +internal sealed class ServerStats +{ + public long InMsgs; + public long OutMsgs; + public long InBytes; + public long OutBytes; + public long SlowConsumers; + public long StaleConnections; + public long Stalls; +} + +/// +/// Per-kind slow-consumer counters (atomic). +/// Mirrors Go embedded scStats in server.go. +/// +internal sealed class SlowConsumerStats +{ + public long Clients; + public long Routes; + public long Leafs; + public long Gateways; +} + +/// +/// Per-kind stale-connection counters (atomic). +/// Mirrors Go embedded staleStats in server.go. +/// +internal sealed class StaleConnectionStats +{ + public long Clients; + public long Routes; + public long Leafs; + public long Gateways; +} + +// ============================================================================ +// nodeInfo — JetStream node metadata +// ============================================================================ + +/// +/// Per-node JetStream metadata stored in the server's node-info map. +/// Mirrors Go nodeInfo struct in server.go. +/// +internal sealed class NodeInfo +{ + public string Name { get; set; } = string.Empty; + public string Version { get; set; } = string.Empty; + public string Cluster { get; set; } = string.Empty; + public string Domain { get; set; } = string.Empty; + public string Id { get; set; } = string.Empty; + public string[] Tags { get; set; } = []; + public object? Cfg { get; set; } // JetStreamConfig — session 19 + public object? Stats { get; set; } // JetStreamStats — session 19 + public bool Offline { get; set; } + public bool Js { get; set; } + public bool BinarySnapshots { get; set; } + public bool AccountNrg { get; set; } +} + +// ============================================================================ +// Server protocol version constants +// ============================================================================ + +/// +/// Server-to-server (route/leaf/gateway) protocol versions. +/// Mirrors the iota block at the top of server.go. +/// +public static class ServerProtocol +{ + /// Original route protocol (2009). + public const int RouteProtoZero = 0; + /// Route protocol that supports INFO updates. + public const int RouteProtoInfo = 1; + /// Route/cluster protocol with account support. + public const int RouteProtoV2 = 2; + /// Protocol with distributed message tracing. + public const int MsgTraceProto = 3; +} + +// ============================================================================ +// Compression mode constants +// ============================================================================ + +/// +/// Compression mode string constants. +/// Mirrors the const block in server.go. +/// +public static class CompressionMode +{ + public const string NotSupported = "not supported"; + public const string Off = "off"; + public const string Accept = "accept"; + public const string S2Auto = "s2_auto"; + public const string S2Uncompressed = "s2_uncompressed"; + public const string S2Fast = "s2_fast"; + public const string S2Better = "s2_better"; + public const string S2Best = "s2_best"; +} + +// ============================================================================ +// Stub types for cross-session dependencies +// ============================================================================ + +// These stubs will be replaced with full implementations in later sessions. +// They are declared here to allow the NatsServer class to compile. + +/// Stub for reference-counted URL set (session 09/12). +internal sealed class RefCountedUrlSet : Dictionary { } + +/// Stub for the system/internal messaging state (session 12). +internal sealed class InternalState +{ + public Account? Account { get; set; } + // Full implementation in session 12 (events.go) +} + +/// Stub for JetStream state pointer (session 19). +internal sealed class JetStreamState { } + +/// Stub for JetStream config (session 19). +public sealed class JetStreamConfig +{ + public string StoreDir { get; set; } = string.Empty; + public TimeSpan SyncInterval { get; set; } + public bool SyncAlways { get; set; } + public bool Strict { get; set; } + public long MaxMemory { get; set; } + public long MaxStore { get; set; } + public string Domain { get; set; } = string.Empty; + public bool CompressOK { get; set; } + public string UniqueTag { get; set; } = string.Empty; +} + +/// Stub for server gateway state (session 16). +internal sealed class SrvGateway +{ + public bool Enabled { get; set; } +} + +/// Stub for server websocket state (session 23). +internal sealed class SrvWebsocket +{ + public RefCountedUrlSet ConnectUrlsMap { get; set; } = new(); +} + +/// Stub for server MQTT state (session 22). +internal sealed class SrvMqtt { } + +/// Stub for OCSP monitor (session 23). +internal sealed class OcspMonitor { } + +/// Stub for OCSP response cache (session 23). +internal interface IOcspResponseCache { } + +/// Stub for IP queue (session 02 — already ported as IpQueue). +// IpQueue is already in session 02 internals — used here via object. + +/// Stub for leaf node config (session 15). +internal sealed class LeafNodeCfg { } + +/// Stub for network resolver (session 09). +internal interface INetResolver { } + +/// Factory for rate counters. +internal static class RateCounterFactory +{ + public static ZB.MOM.NatsNet.Server.Internal.RateCounter Create(long rateLimit) + => new(rateLimit); +} + +/// Stub for RaftNode (session 20). +public interface IRaftNode { } + +/// +/// Stub for JWT account claims (session 06/11). +/// Mirrors Go jwt.AccountClaims from nats.io/jwt/v2. +/// Full implementation will decode a signed JWT and expose limits/imports/exports. +/// +public sealed class AccountClaims +{ + /// Account public NKey (subject of the JWT). + public string Subject { get; set; } = string.Empty; + + /// Operator or signing-key that issued this JWT. + public string Issuer { get; set; } = string.Empty; + + /// + /// Minimal stub decoder — returns null until session 11 provides full JWT parsing. + /// In Go: jwt.DecodeAccountClaims(claimJWT). + /// + public static AccountClaims? TryDecode(string claimJwt) + { + if (string.IsNullOrEmpty(claimJwt)) return null; + // TODO: implement proper JWT decoding in session 11. + return null; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ServerOptions.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ServerOptions.cs index f26f6c3..8afd0d5 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ServerOptions.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ServerOptions.cs @@ -108,6 +108,8 @@ public sealed partial class ServerOptions public string DefaultSentinel { get; set; } = string.Empty; public string SystemAccount { get; set; } = string.Empty; public bool NoSystemAccount { get; set; } + /// Parsed account objects from config. Mirrors Go opts.Accounts. + public List Accounts { get; set; } = []; public AuthCalloutOpts? AuthCallout { get; set; } public bool AlwaysEnableNonce { get; set; } public List? Users { get; set; } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ServerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ServerTests.cs new file mode 100644 index 0000000..ddabf16 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ServerTests.cs @@ -0,0 +1,251 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/server_test.go in the NATS server Go source. +// Session 09: standalone unit tests for NatsServer helpers. + +using System.Text.RegularExpressions; +using Shouldly; +using Xunit; + +namespace ZB.MOM.NatsNet.Server.Tests; + +/// +/// Standalone unit tests for helpers. +/// Tests that require a running server (listener, TLS, cluster) are marked n/a +/// and will be ported in sessions 10–23. +/// +public sealed class ServerTests +{ + // ========================================================================= + // TestSemanticVersion — Test ID 2866 + // Validates that ServerConstants.Version matches semver format. + // Mirrors Go TestSemanticVersion in server/server_test.go. + // ========================================================================= + + [Fact] + public void Version_IsValidSemVer() + { + // SemVer regex: major.minor.patch with optional pre-release / build meta. + var semVerRe = new Regex(@"^\d+\.\d+\.\d+(-\S+)?(\+\S+)?$", RegexOptions.Compiled); + semVerRe.IsMatch(ServerConstants.Version).ShouldBeTrue( + $"Version ({ServerConstants.Version}) is not a valid SemVer string"); + } + + // ========================================================================= + // TestProcessCommandLineArgs — Test ID 2882 + // Tests the ProcessCommandLineArgs helper. + // The Go version uses flag.FlagSet; our C# port takes string[]. + // Mirrors Go TestProcessCommandLineArgs in server/server_test.go. + // ========================================================================= + + [Fact] + public void ProcessCommandLineArgs_NoArgs_ReturnsFalse() + { + var (showVersion, showHelp, err) = NatsServer.ProcessCommandLineArgs([]); + err.ShouldBeNull(); + showVersion.ShouldBeFalse(); + showHelp.ShouldBeFalse(); + } + + [Theory] + [InlineData("version", true, false)] + [InlineData("VERSION", true, false)] + [InlineData("help", false, true)] + [InlineData("HELP", false, true)] + public void ProcessCommandLineArgs_KnownSubcommand_ReturnsCorrectFlags( + string arg, bool wantVersion, bool wantHelp) + { + var (showVersion, showHelp, err) = NatsServer.ProcessCommandLineArgs([arg]); + err.ShouldBeNull(); + showVersion.ShouldBe(wantVersion); + showHelp.ShouldBe(wantHelp); + } + + [Fact] + public void ProcessCommandLineArgs_UnknownSubcommand_ReturnsError() + { + var (_, _, err) = NatsServer.ProcessCommandLineArgs(["foo"]); + err.ShouldNotBeNull(); + } + + // ========================================================================= + // CompressionMode helpers — standalone tests for features 2976–2982 + // ========================================================================= + + [Theory] + [InlineData("off", CompressionMode.Off)] + [InlineData("false", CompressionMode.Off)] + [InlineData("accept", CompressionMode.Accept)] + [InlineData("s2_fast", CompressionMode.S2Fast)] + [InlineData("fast", CompressionMode.S2Fast)] + [InlineData("better", CompressionMode.S2Better)] + [InlineData("best", CompressionMode.S2Best)] + public void ValidateAndNormalizeCompressionOption_KnownModes_NormalizesCorrectly( + string input, string expected) + { + var co = new CompressionOpts { Mode = input }; + NatsServer.ValidateAndNormalizeCompressionOption(co, CompressionMode.S2Fast); + co.Mode.ShouldBe(expected); + } + + [Fact] + public void ValidateAndNormalizeCompressionOption_OnAlias_MapsToChosenMode() + { + var co = new CompressionOpts { Mode = "on" }; + NatsServer.ValidateAndNormalizeCompressionOption(co, CompressionMode.S2Better); + co.Mode.ShouldBe(CompressionMode.S2Better); + } + + [Fact] + public void ValidateAndNormalizeCompressionOption_S2Auto_UsesDefaults_WhenNoThresholds() + { + var co = new CompressionOpts { Mode = "s2_auto" }; + NatsServer.ValidateAndNormalizeCompressionOption(co, CompressionMode.S2Fast); + co.Mode.ShouldBe(CompressionMode.S2Auto); + co.RttThresholds.ShouldBe(NatsServer.DefaultCompressionS2AutoRttThresholds.ToList()); + } + + [Fact] + public void ValidateAndNormalizeCompressionOption_UnsupportedMode_Throws() + { + var co = new CompressionOpts { Mode = "bogus" }; + Should.Throw( + () => NatsServer.ValidateAndNormalizeCompressionOption(co, CompressionMode.S2Fast)); + } + + [Theory] + [InlineData(5, CompressionMode.S2Uncompressed)] // <= 10 ms threshold + [InlineData(25, CompressionMode.S2Fast)] // 10 < rtt <= 50 ms + [InlineData(75, CompressionMode.S2Better)] // 50 < rtt <= 100 ms + [InlineData(150, CompressionMode.S2Best)] // > 100 ms + public void SelectS2AutoModeBasedOnRtt_DefaultThresholds_CorrectMode(int rttMs, string expected) + { + var result = NatsServer.SelectS2AutoModeBasedOnRtt( + TimeSpan.FromMilliseconds(rttMs), + NatsServer.DefaultCompressionS2AutoRttThresholds); + result.ShouldBe(expected); + } + + [Theory] + [InlineData(CompressionMode.Off, CompressionMode.S2Fast, CompressionMode.Off)] + [InlineData(CompressionMode.Accept, CompressionMode.Accept, CompressionMode.Off)] + [InlineData(CompressionMode.S2Fast, CompressionMode.Accept, CompressionMode.S2Fast)] + [InlineData(CompressionMode.Accept, CompressionMode.S2Fast, CompressionMode.S2Fast)] + [InlineData(CompressionMode.Accept, CompressionMode.S2Auto, CompressionMode.S2Fast)] + public void SelectCompressionMode_TableDriven(string local, string remote, string expected) + { + NatsServer.SelectCompressionMode(local, remote).ShouldBe(expected); + } + + [Fact] + public void SelectCompressionMode_RemoteNotSupported_ReturnsNotSupported() + { + NatsServer.SelectCompressionMode(CompressionMode.S2Fast, CompressionMode.NotSupported) + .ShouldBe(CompressionMode.NotSupported); + } + + [Fact] + public void CompressOptsEqual_SameMode_ReturnsTrue() + { + var c1 = new CompressionOpts { Mode = CompressionMode.S2Fast }; + var c2 = new CompressionOpts { Mode = CompressionMode.S2Fast }; + NatsServer.CompressOptsEqual(c1, c2).ShouldBeTrue(); + } + + [Fact] + public void CompressOptsEqual_DifferentModes_ReturnsFalse() + { + var c1 = new CompressionOpts { Mode = CompressionMode.S2Fast }; + var c2 = new CompressionOpts { Mode = CompressionMode.S2Best }; + NatsServer.CompressOptsEqual(c1, c2).ShouldBeFalse(); + } + + // ========================================================================= + // Validation helpers + // ========================================================================= + + [Fact] + public void ValidateCluster_ClusterNameWithSpaces_ReturnsError() + { + var opts = new ServerOptions(); + opts.Cluster.Name = "bad name"; + var err = NatsServer.ValidateCluster(opts); + err.ShouldNotBeNull(); + err.ShouldBeSameAs(ServerErrors.ErrClusterNameHasSpaces); + } + + [Fact] + public void ValidatePinnedCerts_ValidSha256_ReturnsNull() + { + var pinned = new PinnedCertSet( + [new string('a', 64)]); // 64 hex chars + var err = NatsServer.ValidatePinnedCerts(pinned); + err.ShouldBeNull(); + } + + [Fact] + public void ValidatePinnedCerts_InvalidSha256_ReturnsError() + { + var pinned = new PinnedCertSet(["not_a_sha256"]); + var err = NatsServer.ValidatePinnedCerts(pinned); + err.ShouldNotBeNull(); + } + + // ========================================================================= + // GetServerProto + // ========================================================================= + + [Fact] + public void GetServerProto_DefaultOpts_ReturnsMsgTraceProto() + { + var opts = new ServerOptions(); + // SetBaselineOptions so OverrideProto gets default 0. + opts.SetBaselineOptions(); + var (s, err) = NatsServer.NewServer(opts); + err.ShouldBeNull(); + s.ShouldNotBeNull(); + s!.GetServerProto().ShouldBe(ServerProtocol.MsgTraceProto); + } + + // ========================================================================= + // Account helpers + // ========================================================================= + + [Fact] + public void ComputeRoutePoolIdx_PoolSizeOne_AlwaysReturnsZero() + { + NatsServer.ComputeRoutePoolIdx(1, "any-account").ShouldBe(0); + NatsServer.ComputeRoutePoolIdx(0, "any-account").ShouldBe(0); + } + + [Fact] + public void ComputeRoutePoolIdx_PoolSizeN_ReturnsIndexInRange() + { + const int poolSize = 5; + var idx = NatsServer.ComputeRoutePoolIdx(poolSize, "my-account"); + idx.ShouldBeInRange(0, poolSize - 1); + } + + [Fact] + public void NeedsCompression_Empty_ReturnsFalse() + => NatsServer.NeedsCompression(string.Empty).ShouldBeFalse(); + + [Fact] + public void NeedsCompression_Off_ReturnsFalse() + => NatsServer.NeedsCompression(CompressionMode.Off).ShouldBeFalse(); + + [Fact] + public void NeedsCompression_S2Fast_ReturnsTrue() + => NatsServer.NeedsCompression(CompressionMode.S2Fast).ShouldBeTrue(); +} diff --git a/porting.db b/porting.db index b7fd35a..aac4c6e 100644 Binary files a/porting.db and b/porting.db differ diff --git a/reports/current.md b/reports/current.md index 75ed31a..1894b68 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-02-26 18:50:39 UTC +Generated: 2026-02-26 19:18:19 UTC ## Modules (12 total) @@ -13,18 +13,18 @@ Generated: 2026-02-26 18:50:39 UTC | Status | Count | |--------|-------| -| complete | 667 | +| complete | 744 | | n_a | 82 | -| not_started | 2831 | +| not_started | 2754 | | stub | 93 | ## Unit Tests (3257 total) | Status | Count | |--------|-------| -| complete | 274 | -| n_a | 163 | -| not_started | 2596 | +| complete | 276 | +| n_a | 181 | +| not_started | 2576 | | stub | 224 | ## Library Mappings (36 total) @@ -36,4 +36,4 @@ Generated: 2026-02-26 18:50:39 UTC ## Overall Progress -**1197/6942 items complete (17.2%)** +**1294/6942 items complete (18.6%)** diff --git a/reports/report_11b387e.md b/reports/report_11b387e.md new file mode 100644 index 0000000..1894b68 --- /dev/null +++ b/reports/report_11b387e.md @@ -0,0 +1,39 @@ +# NATS .NET Porting Status Report + +Generated: 2026-02-26 19:18:19 UTC + +## Modules (12 total) + +| Status | Count | +|--------|-------| +| complete | 11 | +| not_started | 1 | + +## Features (3673 total) + +| Status | Count | +|--------|-------| +| complete | 744 | +| n_a | 82 | +| not_started | 2754 | +| stub | 93 | + +## Unit Tests (3257 total) + +| Status | Count | +|--------|-------| +| complete | 276 | +| n_a | 181 | +| not_started | 2576 | +| stub | 224 | + +## Library Mappings (36 total) + +| Status | Count | +|--------|-------| +| mapped | 36 | + + +## Overall Progress + +**1294/6942 items complete (18.6%)**