diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs index baeda41..6680967 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs @@ -19,6 +19,7 @@ using System.Text.RegularExpressions; using ZB.MOM.NatsNet.Server.Auth; using ZB.MOM.NatsNet.Server.Internal; using ZB.MOM.NatsNet.Server.Internal.DataStructures; +using ZB.MOM.NatsNet.Server.Mqtt; namespace ZB.MOM.NatsNet.Server; @@ -968,10 +969,10 @@ public sealed partial class NatsServer // ========================================================================= /// - /// Starts the server (non-blocking). Writes startup log lines and begins accept loops. - /// Full implementation requires sessions 10-23 (gateway, websocket, leafnode, routes, etc.). - /// This stub handles the bootstrap sequence up to the subsystems not yet ported. - /// Mirrors Go Server.Start. + /// Starts the server (non-blocking). Writes startup log lines, starts all + /// subsystems (monitoring, JetStream, gateways, websocket, leafnodes, routes, + /// MQTT), then begins the client accept loop. + /// Mirrors Go Server.Start in server.go:2263–2575. /// public void Start() { @@ -991,8 +992,13 @@ public sealed partial class NatsServer if (!string.IsNullOrEmpty(clusterName)) Noticef(" Cluster: {0}", clusterName); Noticef(" Name: {0}", _info.Name); + if (opts.JetStream) + Noticef(" Node: {0}", GetHash(_info.Name)); Noticef(" ID: {0}", _info.Id); + // Check for insecure configurations. + CheckAuthForWarnings(); + // Avoid RACE between Start() and Shutdown(). Interlocked.Exchange(ref _running, 1); @@ -1002,6 +1008,34 @@ public sealed partial class NatsServer lock (_grMu) { _grRunning = true; } + StartRateLimitLogExpiration(); + + // Pprof http endpoint for the profiler. + if (opts.ProfPort != 0) + { + StartProfiler(); + } + + if (!string.IsNullOrEmpty(opts.ConfigFile)) + { + Noticef("Using configuration file: {0}", opts.ConfigFile); + } + + var hasOperators = opts.TrustedOperators.Count > 0; + if (hasOperators) + { + Noticef("Trusted Operators"); + } + if (hasOperators && string.IsNullOrEmpty(opts.SystemAccount)) + { + Warnf("Trusted Operators should utilize a System Account"); + } + if (opts.MaxPayload > ServerConstants.MaxPayloadMaxSize) + { + Warnf("Maximum payloads over {0} are generally discouraged and could lead to poor performance", + ServerConstants.MaxPayloadMaxSize); + } + // Log PID. if (!string.IsNullOrEmpty(opts.PidFile)) { @@ -1013,7 +1047,7 @@ public sealed partial class NatsServer } } - // System account setup. + // Setup system account which will start the eventing stack. if (!string.IsNullOrEmpty(opts.SystemAccount)) { var saErr = SetSystemAccount(opts.SystemAccount); @@ -1028,11 +1062,143 @@ public sealed partial class NatsServer SetDefaultSystemAccount(); } + // Start monitoring before enabling other subsystems. + var monErr = StartMonitoring(); + if (monErr != null) + { + Fatalf("Can't start monitoring: {0}", monErr); + return; + } + + // Start up resolver machinery. + var ar = AccountResolver(); + if (ar != null) + { + try + { + ar.Start(this); + } + catch (Exception arEx) + { + Fatalf("Could not start resolver: {0}", arEx); + return; + } + } + + // Start expiration of mapped GW replies. + StartGWReplyMapExpiration(); + + // Check if JetStream has been enabled. + if (opts.JetStream) + { + // Make sure someone is not trying to enable on the system account. + var sa = SystemAccount(); + if (sa != null && (sa.JetStreamLimits?.Count ?? 0) > 0) + { + Fatalf("Not allowed to enable JetStream on the system account"); + return; + } + + var cfg = new JetStreamConfig + { + StoreDir = opts.StoreDir, + SyncInterval = opts.SyncInterval, + SyncAlways = opts.SyncAlways, + Strict = !opts.NoJetStreamStrict, + MaxMemory = opts.JetStreamMaxMemory, + MaxStore = opts.JetStreamMaxStore, + Domain = opts.JetStreamDomain, + CompressOK = true, + UniqueTag = opts.JetStreamUniqueTag, + }; + + var jsErr = EnableJetStream(cfg); + if (jsErr != null) + { + Fatalf("Can't start JetStream: {0}", jsErr); + return; + } + } + + // Delayed API response handling — start regardless of JetStream config. + StartDelayedApiResponder(); + + // Start OCSP Stapling monitoring. + StartOCSPMonitoring(); + + // Configure OCSP Response Cache. StartOCSPResponseCache(); - // Signal startup complete. + // Start up gateway if needed. + if (opts.Gateway.Port != 0) + { + var gwErr = StartGateways(); + if (gwErr != null) + { + Fatalf("Can't start gateways: {0}", gwErr); + return; + } + } + + // Start websocket server if needed. + if (opts.Websocket.Port != 0) + { + StartWebsocketServer(); + } + + // Start up listen if we want to accept leaf node connections. + if (opts.LeafNode.Port != 0) + { + var lnErr = StartLeafNodeAcceptLoop(); + if (lnErr != null) + { + Fatalf("Can't start leaf node listener: {0}", lnErr); + return; + } + } + + // Solicit remote servers for leaf node connections. + if (opts.LeafNode.Remotes.Count > 0) + { + SolicitLeafNodeRemotes(opts.LeafNode.Remotes); + } + + // The Routing routine needs to wait for the client listen + // port to be opened and potential ephemeral port selected. + var clientListenReady = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + // MQTT + if (opts.Mqtt.Port != 0) + { + this.StartMqtt(); + } + + // Start up routing as well if needed. + if (opts.Cluster.Port != 0) + { + StartGoRoutine(() => + { + StartRouting(); + }); + } + + if (!string.IsNullOrEmpty(opts.PortsFileDir)) + { + LogPorts(); + } + + // We've finished starting up. _startupComplete.TrySetResult(); + // Wait for clients. + if (!opts.DontListen) + { + AcceptLoop(clientListenReady); + } + + // Bring OCSP Response cache online after accept loop started. + StartOCSPResponseCache(); + Noticef("Server is ready"); } diff --git a/reports/current.md b/reports/current.md index 85d94e5..300d21f 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-03-01 20:01:24 UTC +Generated: 2026-03-01 20:04:26 UTC ## Modules (12 total)