feat: port session 09 — Server Core Init & Config
Port server/server.go account management and initialization (~1950 LOC): - NatsServer.cs: full server struct fields (atomic counters, locks, maps, stubs for gateway/websocket/mqtt/ocsp/leafnode) - NatsServer.Init.cs: factory methods (New/NewServer/NewServerFromConfig), compression helpers (ValidateAndNormalizeCompressionOption, SelectCompressionMode, SelectS2AutoModeBasedOnRtt, CompressOptsEqual), cluster-name management, validation (ValidateCluster, ValidatePinnedCerts, ValidateOptions), trusted-key processing, CLI helpers, running-state checks, and Start() stub - NatsServer.Accounts.cs: account management (ConfigureAccounts, LookupOrRegisterAccount, RegisterAccount, SetSystemAccount, SetDefaultSystemAccount, SetSystemAccountInternal, CreateInternalClient*, ShouldTrackSubscriptions, RegisterAccountNoLock, SetAccountSublist, SetRouteInfo, LookupAccount, LookupOrFetchAccount, UpdateAccount, UpdateAccountWithClaimJwt, FetchRawAccountClaims, FetchAccountClaims, VerifyAccountClaims, FetchAccountFromResolver, GlobalAccountOnly, StandAloneMode, ConfiguredRoutes, ActivePeers, ComputeRoutePoolIdx) - NatsServerTypes.cs: ServerInfo, ServerStats, NodeInfo, ServerProtocol, CompressionMode constants, AccountClaims stub, InternalState stub, and cross-session stubs for JetStream/gateway/websocket/mqtt/ocsp - AuthTypes.cs: extend Account stub with Issuer, ClaimJwt, RoutePoolIdx, Incomplete, Updated, Sublist, Server fields, and IsExpired() - ServerOptions.cs: add Accounts property (List<Account>) - ServerTests.cs: 38 standalone tests (IDs 2866, 2882, plus compression and validation helpers); server-dependent tests marked n/a Features: 77 complete (IDs 2974–3050) Tests: 2 complete (2866, 2882); 18 n/a (server-dependent) All tests: 545 unit + 1 integration pass
This commit is contained in:
305
dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs
Normal file
305
dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs
Normal file
@@ -0,0 +1,305 @@
|
||||
// Copyright 2012-2026 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Adapted from server/server.go in the NATS server Go source.
|
||||
|
||||
using System.Collections.Concurrent;
|
||||
using System.Threading.Channels;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using ZB.MOM.NatsNet.Server.Auth;
|
||||
using ZB.MOM.NatsNet.Server.Internal;
|
||||
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
|
||||
|
||||
namespace ZB.MOM.NatsNet.Server;
|
||||
|
||||
/// <summary>
|
||||
/// The core NATS server class.
|
||||
/// Mirrors Go <c>Server</c> struct in server/server.go.
|
||||
/// Session 09: initialization, configuration, and account management.
|
||||
/// Sessions 10-23 add further capabilities as partial class files.
|
||||
/// </summary>
|
||||
public sealed partial class NatsServer : INatsServer
|
||||
{
|
||||
// =========================================================================
|
||||
// Build-time stamps (mirrors package-level vars in server.go)
|
||||
// =========================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Binary-stamped trusted operator keys (space-separated NKey public keys).
|
||||
/// In Go this is a package-level var that can be overridden at build time.
|
||||
/// In .NET it can be set before constructing any server instance.
|
||||
/// Mirrors Go package-level <c>trustedKeys</c> var.
|
||||
/// </summary>
|
||||
public static string StampedTrustedKeys { get; set; } = string.Empty;
|
||||
|
||||
// =========================================================================
|
||||
// Atomic counters (mirrors fields accessed with atomic operations)
|
||||
// =========================================================================
|
||||
|
||||
private ulong _gcid; // global client id counter
|
||||
private long _pinnedAccFail; // pinned-account auth failures
|
||||
private int _activeAccounts; // number of active accounts
|
||||
|
||||
// =========================================================================
|
||||
// Stats (embedded Go structs: stats, scStats, staleStats)
|
||||
// =========================================================================
|
||||
|
||||
private readonly ServerStats _stats = new();
|
||||
private readonly SlowConsumerStats _scStats = new();
|
||||
private readonly StaleConnectionStats _staleStats = new();
|
||||
|
||||
// =========================================================================
|
||||
// Core identity
|
||||
// =========================================================================
|
||||
|
||||
// kp / xkp are NKey keypairs — represented as byte arrays here.
|
||||
// Full crypto operations deferred to auth session.
|
||||
private byte[]? _kpSeed; // server NKey seed
|
||||
private string _pub = string.Empty; // server public key (server ID)
|
||||
private byte[]? _xkpSeed; // x25519 key seed
|
||||
private string _xpub = string.Empty; // x25519 public key
|
||||
|
||||
// =========================================================================
|
||||
// Server info (wire protocol)
|
||||
// =========================================================================
|
||||
|
||||
private readonly ReaderWriterLockSlim _mu = new(LockRecursionPolicy.SupportsRecursion);
|
||||
private readonly ReaderWriterLockSlim _reloadMu = new(LockRecursionPolicy.SupportsRecursion);
|
||||
private ServerInfo _info = new();
|
||||
private string _configFile = string.Empty;
|
||||
|
||||
// =========================================================================
|
||||
// Options (protected by _optsMu)
|
||||
// =========================================================================
|
||||
|
||||
private readonly ReaderWriterLockSlim _optsMu = new(LockRecursionPolicy.NoRecursion);
|
||||
private ServerOptions _opts;
|
||||
|
||||
// =========================================================================
|
||||
// Running / shutdown state
|
||||
// =========================================================================
|
||||
|
||||
private int _running; // 1 = running, 0 = not (Interlocked)
|
||||
private int _shutdown; // 1 = shutting down
|
||||
private readonly CancellationTokenSource _quitCts = new();
|
||||
private readonly TaskCompletionSource _startupComplete = new(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
private readonly TaskCompletionSource _shutdownComplete = new(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
private Task? _quitTask;
|
||||
|
||||
// =========================================================================
|
||||
// Listeners (forward-declared stubs — fully wired in session 10)
|
||||
// =========================================================================
|
||||
|
||||
private System.Net.Sockets.TcpListener? _listener;
|
||||
private Exception? _listenerErr;
|
||||
|
||||
// =========================================================================
|
||||
// Accounts
|
||||
// =========================================================================
|
||||
|
||||
private Account? _gacc; // global account
|
||||
private Account? _sysAccAtomic; // system account (atomic)
|
||||
private readonly ConcurrentDictionary<string, Account> _accounts = new(StringComparer.Ordinal);
|
||||
private readonly ConcurrentDictionary<string, Account> _tmpAccounts = new(StringComparer.Ordinal);
|
||||
private IAccountResolver? _accResolver;
|
||||
private InternalState? _sys; // system messaging state
|
||||
|
||||
// =========================================================================
|
||||
// Client/route/leaf tracking
|
||||
// =========================================================================
|
||||
|
||||
private readonly Dictionary<ulong, ClientConnection> _clients = [];
|
||||
private readonly Dictionary<ulong, ClientConnection> _leafs = [];
|
||||
private Dictionary<string, List<ClientConnection>> _routes = [];
|
||||
private int _routesPoolSize = 1;
|
||||
private bool _routesReject;
|
||||
private int _routesNoPool;
|
||||
private Dictionary<string, Dictionary<string, ClientConnection>>? _accRoutes;
|
||||
private readonly ConcurrentDictionary<string, object?> _accRouteByHash = new(StringComparer.Ordinal);
|
||||
private Channel<struct_>? _accAddedCh; // stub
|
||||
private string _accAddedReqId = string.Empty;
|
||||
|
||||
// =========================================================================
|
||||
// User / nkey maps
|
||||
// =========================================================================
|
||||
|
||||
private Dictionary<string, Auth.User>? _users;
|
||||
private Dictionary<string, Auth.NkeyUser>? _nkeys;
|
||||
|
||||
// =========================================================================
|
||||
// Connection tracking
|
||||
// =========================================================================
|
||||
|
||||
private ulong _totalClients;
|
||||
private ClosedRingBuffer _closed = new(0);
|
||||
private DateTime _start;
|
||||
private DateTime _configTime;
|
||||
|
||||
// =========================================================================
|
||||
// Goroutine / WaitGroup tracking
|
||||
// =========================================================================
|
||||
|
||||
private readonly object _grMu = new();
|
||||
private bool _grRunning;
|
||||
private readonly Dictionary<ulong, ClientConnection> _grTmpClients = [];
|
||||
private readonly SemaphoreSlim _grWg = new(1, 1); // simplified wg
|
||||
|
||||
// =========================================================================
|
||||
// Cluster name (separate lock)
|
||||
// =========================================================================
|
||||
|
||||
private readonly ReaderWriterLockSlim _cnMu = new(LockRecursionPolicy.NoRecursion);
|
||||
private string _cn = string.Empty;
|
||||
private ServerInfo _routeInfo = new();
|
||||
private bool _leafNoCluster;
|
||||
private bool _leafNodeEnabled;
|
||||
private bool _leafDisableConnect;
|
||||
private bool _ldm;
|
||||
|
||||
// =========================================================================
|
||||
// Trusted keys
|
||||
// =========================================================================
|
||||
|
||||
private List<string>? _trustedKeys;
|
||||
private HashSet<string> _strictSigningKeyUsage = [];
|
||||
|
||||
// =========================================================================
|
||||
// Monitoring / stats endpoint
|
||||
// =========================================================================
|
||||
|
||||
private string _httpBasePath = string.Empty;
|
||||
private readonly Dictionary<string, ulong> _httpReqStats = [];
|
||||
|
||||
// =========================================================================
|
||||
// Client connect URLs
|
||||
// =========================================================================
|
||||
|
||||
private readonly List<string> _clientConnectUrls = [];
|
||||
private readonly RefCountedUrlSet _clientConnectUrlsMap = new();
|
||||
|
||||
// =========================================================================
|
||||
// Gateway / Websocket / MQTT / OCSP stubs
|
||||
// =========================================================================
|
||||
|
||||
private readonly SrvGateway _gateway = new();
|
||||
private readonly SrvWebsocket _websocket = new();
|
||||
private readonly SrvMqtt _mqtt = new();
|
||||
private OcspMonitor[]? _ocsps;
|
||||
private bool _ocspPeerVerify;
|
||||
private IOcspResponseCache? _ocsprc;
|
||||
|
||||
// =========================================================================
|
||||
// Gateway reply map (stub — session 16)
|
||||
// =========================================================================
|
||||
|
||||
private readonly SubscriptionIndex _gwLeafSubs;
|
||||
|
||||
// =========================================================================
|
||||
// NUID event ID generator
|
||||
// =========================================================================
|
||||
|
||||
// Replaced by actual NUID in session 10. Use Guid for now.
|
||||
private string NextEventId() => Guid.NewGuid().ToString("N");
|
||||
|
||||
// =========================================================================
|
||||
// Various stubs
|
||||
// =========================================================================
|
||||
|
||||
private readonly List<string> _leafRemoteCfgs = []; // stub — session 15
|
||||
private readonly List<object> _proxiesKeyPairs = []; // stub — session 09 (proxies)
|
||||
private readonly Dictionary<string, Dictionary<ulong, ClientConnection>> _proxiedConns = [];
|
||||
private long _cproto; // count of INFO-capable clients
|
||||
private readonly ConcurrentDictionary<string, object?> _nodeToInfo = new(StringComparer.Ordinal);
|
||||
private readonly ConcurrentDictionary<string, object?> _raftNodes = new(StringComparer.Ordinal);
|
||||
private readonly Dictionary<string, string> _routesToSelf = [];
|
||||
private INetResolver? _routeResolver;
|
||||
private readonly ConcurrentDictionary<string, object?> _rateLimitLogging = new();
|
||||
private readonly Channel<TimeSpan> _rateLimitLoggingCh;
|
||||
private RateCounter? _connRateCounter;
|
||||
|
||||
// GW reply map expiration
|
||||
private readonly ConcurrentDictionary<string, object?> _gwrm = new();
|
||||
|
||||
// Catchup bytes
|
||||
private readonly ReaderWriterLockSlim _gcbMu = new(LockRecursionPolicy.NoRecursion);
|
||||
private long _gcbOut;
|
||||
private long _gcbOutMax;
|
||||
private readonly Channel<struct_>? _gcbKick; // stub
|
||||
|
||||
// Sync-out semaphore
|
||||
private readonly SemaphoreSlim _syncOutSem;
|
||||
private const int MaxConcurrentSyncRequests = 16;
|
||||
|
||||
// =========================================================================
|
||||
// Logging
|
||||
// =========================================================================
|
||||
|
||||
private ILogger _logger = NullLogger.Instance;
|
||||
private int _traceEnabled;
|
||||
private int _debugEnabled;
|
||||
private int _traceSysAcc;
|
||||
|
||||
// =========================================================================
|
||||
// INatsServer implementation
|
||||
// =========================================================================
|
||||
|
||||
/// <inheritdoc/>
|
||||
public ulong NextClientId() => Interlocked.Increment(ref _gcid);
|
||||
|
||||
/// <inheritdoc/>
|
||||
public ServerOptions Options => GetOpts();
|
||||
|
||||
/// <inheritdoc/>
|
||||
public bool TraceEnabled => Interlocked.CompareExchange(ref _traceEnabled, 0, 0) != 0;
|
||||
|
||||
/// <inheritdoc/>
|
||||
public bool TraceSysAcc => Interlocked.CompareExchange(ref _traceSysAcc, 0, 0) != 0;
|
||||
|
||||
/// <inheritdoc/>
|
||||
public ILogger Logger => _logger;
|
||||
|
||||
/// <inheritdoc/>
|
||||
public void DecActiveAccounts() => Interlocked.Decrement(ref _activeAccounts);
|
||||
|
||||
/// <inheritdoc/>
|
||||
public void IncActiveAccounts() => Interlocked.Increment(ref _activeAccounts);
|
||||
|
||||
// =========================================================================
|
||||
// Logging helpers (mirrors Go s.Debugf / s.Noticef / s.Warnf / s.Errorf)
|
||||
// =========================================================================
|
||||
|
||||
internal void Debugf(string fmt, params object?[] args) => _logger.LogDebug(fmt, args);
|
||||
internal void Noticef(string fmt, params object?[] args) => _logger.LogInformation(fmt, args);
|
||||
internal void Warnf(string fmt, params object?[] args) => _logger.LogWarning(fmt, args);
|
||||
internal void Errorf(string fmt, params object?[] args) => _logger.LogError(fmt, args);
|
||||
internal void Fatalf(string fmt, params object?[] args) => _logger.LogCritical(fmt, args);
|
||||
|
||||
// =========================================================================
|
||||
// Constructor
|
||||
// =========================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Direct constructor — do not call directly; use <see cref="NewServer(ServerOptions)"/>.
|
||||
/// </summary>
|
||||
private NatsServer(ServerOptions opts)
|
||||
{
|
||||
_opts = opts;
|
||||
_gwLeafSubs = SubscriptionIndex.NewSublistWithCache();
|
||||
_rateLimitLoggingCh = Channel.CreateBounded<TimeSpan>(1);
|
||||
_syncOutSem = new SemaphoreSlim(MaxConcurrentSyncRequests, MaxConcurrentSyncRequests);
|
||||
}
|
||||
}
|
||||
|
||||
// Placeholder struct for stub channel types
|
||||
internal readonly struct struct_ { }
|
||||
Reference in New Issue
Block a user