diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Auth/AuthTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Auth/AuthTypes.cs
index d43cc55..74a97fd 100644
--- a/dotnet/src/ZB.MOM.NatsNet.Server/Auth/AuthTypes.cs
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/Auth/AuthTypes.cs
@@ -13,6 +13,8 @@
//
// Adapted from server/auth.go (type definitions) in the NATS server Go source.
+using ZB.MOM.NatsNet.Server;
+
namespace ZB.MOM.NatsNet.Server.Auth;
///
@@ -170,7 +172,7 @@ public class RoutePermissions
/// Stub for Account type. Full implementation in session 11.
/// Mirrors Go Account struct.
///
-public class Account
+public class Account : INatsAccount
{
public string Name { get; set; } = string.Empty;
@@ -184,6 +186,27 @@ public class Account
internal ZB.MOM.NatsNet.Server.Internal.DataStructures.SubscriptionIndex? Sublist { get; set; }
internal object? Server { get; set; } // INatsServer — avoids circular reference
+ // INatsAccount — stub implementations until session 11 (accounts.go).
+ bool INatsAccount.IsValid => true;
+ bool INatsAccount.MaxTotalConnectionsReached() => false;
+ bool INatsAccount.MaxTotalLeafNodesReached() => false;
+ int INatsAccount.AddClient(ClientConnection c) => 0;
+ int INatsAccount.RemoveClient(ClientConnection c) => 0;
+
/// Returns true if this account's JWT has expired. Stub — full impl in session 11.
public bool IsExpired() => false;
+
+ ///
+ /// Returns the total number of subscriptions across all clients in this account.
+ /// Stub — full implementation in session 11.
+ /// Mirrors Go Account.TotalSubs().
+ ///
+ public int TotalSubs() => 0;
+
+ ///
+ /// Notifies leaf nodes of a subscription change.
+ /// Stub — full implementation in session 15.
+ /// Mirrors Go Account.updateLeafNodes().
+ ///
+ internal void UpdateLeafNodes(object sub, int delta) { }
}
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs
index dfa891b..0d54c95 100644
--- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs
@@ -1077,6 +1077,21 @@ public sealed partial class ClientConnection
// features 425-427: writeLoop / flushClients / readLoop
internal void WriteLoop() { /* TODO session 09 */ }
internal void FlushClients(long budget) { /* TODO session 09 */ }
+ internal void ReadLoop(byte[]? pre) { /* TODO session 09 */ }
+
+ ///
+ /// Generates the INFO JSON bytes sent to the client on connect.
+ /// Stub — full implementation in session 09.
+ /// Mirrors Go client.generateClientInfoJSON().
+ ///
+ internal ReadOnlyMemory GenerateClientInfoJSON(ServerInfo info, bool includeClientIp)
+ => ReadOnlyMemory.Empty;
+
+ ///
+ /// Sets the auth-timeout timer to the specified duration.
+ /// Mirrors Go client.setAuthTimer(d).
+ ///
+ internal void SetAuthTimer(TimeSpan d) { /* TODO session 09 */ }
// features 428-432: closedStateForErr, collapsePtoNB, flushOutbound, handleWriteTimeout, markConnAsClosed
internal static ClosedState ClosedStateForErr(Exception err) =>
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/WaitGroup.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/WaitGroup.cs
new file mode 100644
index 0000000..e5f8a08
--- /dev/null
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/WaitGroup.cs
@@ -0,0 +1,64 @@
+// Copyright 2012-2026 The NATS Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+namespace ZB.MOM.NatsNet.Server.Internal;
+
+///
+/// A Go-like WaitGroup: tracks a set of in-flight operations and lets callers
+/// block until all of them complete.
+///
+internal sealed class WaitGroup
+{
+ private int _count;
+ private volatile TaskCompletionSource _tcs;
+
+ public WaitGroup()
+ {
+ _tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ _tcs.SetResult(true); // starts at zero, so "done" immediately
+ }
+
+ ///
+ /// Increment the counter by (usually 1).
+ /// Must be called before starting the goroutine it tracks.
+ ///
+ public void Add(int delta = 1)
+ {
+ var newCount = Interlocked.Add(ref _count, delta);
+ if (newCount < 0)
+ throw new InvalidOperationException("WaitGroup counter went negative");
+
+ if (newCount == 0)
+ {
+ // All goroutines done — signal any waiters.
+ Volatile.Read(ref _tcs).TrySetResult(true);
+ }
+ else if (delta > 0 && newCount == delta)
+ {
+ // Transitioning from 0 to positive — replace the completed TCS
+ // with a fresh unsignaled one so Wait() will block correctly.
+ Volatile.Write(ref _tcs,
+ new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously));
+ }
+ }
+
+ /// Decrement the counter by 1. Called when a goroutine finishes.
+ public void Done() => Add(-1);
+
+ /// Block synchronously until the counter reaches 0.
+ public void Wait()
+ {
+ if (Volatile.Read(ref _count) == 0) return;
+ Volatile.Read(ref _tcs).Task.GetAwaiter().GetResult();
+ }
+}
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs
index f13ec5e..c12a926 100644
--- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs
@@ -882,15 +882,26 @@ public sealed partial class NatsServer
// =========================================================================
///
- /// Sets the INFO host/port from the current listener endpoint.
- /// Full implementation in session 10 (listener startup).
- /// Stub sets to opts values.
+ /// Sets the INFO host/port from either ClientAdvertise or the bind options.
+ /// Mirrors Go Server.setInfoHostPort().
+ /// Returns non-null on parse error.
///
- internal void SetInfoHostPort()
+ internal Exception? SetInfoHostPort()
{
var opts = GetOpts();
- _info.Host = opts.Host;
- _info.Port = opts.Port;
+ if (!string.IsNullOrEmpty(opts.ClientAdvertise))
+ {
+ var (h, p, err) = Internal.ServerUtilities.ParseHostPort(opts.ClientAdvertise, opts.Port);
+ if (err != null) return err;
+ _info.Host = h;
+ _info.Port = p;
+ }
+ else
+ {
+ _info.Host = opts.Host;
+ _info.Port = opts.Port;
+ }
+ return null;
}
///
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs
new file mode 100644
index 0000000..093d773
--- /dev/null
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs
@@ -0,0 +1,847 @@
+// Copyright 2012-2026 The NATS Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Adapted from server/server.go (lines 2577–4782) in the NATS server Go source.
+// Session 10: shutdown, goroutine tracking, query helpers, lame duck mode.
+
+using System.Net;
+using System.Net.Sockets;
+using ZB.MOM.NatsNet.Server.Auth;
+using ZB.MOM.NatsNet.Server.Internal;
+using ZB.MOM.NatsNet.Server.Internal.DataStructures;
+
+namespace ZB.MOM.NatsNet.Server;
+
+public sealed partial class NatsServer
+{
+ // =========================================================================
+ // Shutdown / WaitForShutdown (features 3051, 3053)
+ // =========================================================================
+
+ ///
+ /// Shuts the server down gracefully: closes all listeners, kicks all
+ /// connections, waits for goroutines, then signals .
+ /// Mirrors Go Server.Shutdown().
+ ///
+ public void Shutdown()
+ {
+ // Stubs for JetStream / Raft / eventing — implemented in later sessions.
+ SignalPullConsumers();
+ StepdownRaftNodes();
+ ShutdownEventing();
+
+ if (IsShuttingDown()) return;
+
+ _mu.EnterWriteLock();
+ Noticef("Initiating Shutdown...");
+
+ var accRes = _accResolver;
+ GetOpts(); // snapshot opts (not used below but mirrors Go pattern)
+
+ Interlocked.Exchange(ref _shutdown, 1);
+ Interlocked.Exchange(ref _running, 0);
+ lock (_grMu) { _grRunning = false; }
+
+ _mu.ExitWriteLock();
+
+ accRes?.Close();
+
+ ShutdownJetStream();
+ ShutdownRaftNodes();
+
+ // ---- Collect all connections ----
+ var conns = new Dictionary();
+
+ _mu.EnterWriteLock();
+ foreach (var kvp in _clients) conns[kvp.Key] = kvp.Value;
+ lock (_grMu) { foreach (var kvp in _grTmpClients) conns[kvp.Key] = kvp.Value; }
+ ForEachRoute(r => { conns[r.Cid] = r; });
+ GetAllGatewayConnections(conns);
+ foreach (var kvp in _leafs) conns[kvp.Key] = kvp.Value;
+
+ // ---- Count & close listeners ----
+ int doneExpected = 0;
+
+ if (_listener != null)
+ {
+ doneExpected++;
+ _listener.Stop();
+ _listener = null;
+ }
+ doneExpected += CloseWebsocketServer();
+ if (_gateway.Enabled)
+ {
+ // mqtt listener managed by session 22
+ }
+ if (_leafNodeListener != null)
+ {
+ doneExpected++;
+ _leafNodeListener.Stop();
+ _leafNodeListener = null;
+ }
+ if (_routeListener != null)
+ {
+ doneExpected++;
+ _routeListener.Stop();
+ _routeListener = null;
+ }
+ if (_gatewayListener != null)
+ {
+ doneExpected++;
+ _gatewayListener.Stop();
+ _gatewayListener = null;
+ }
+ if (_http != null)
+ {
+ doneExpected++;
+ _http.Stop();
+ _http = null;
+ }
+ if (_profiler != null)
+ {
+ doneExpected++;
+ _profiler.Stop();
+ // profiler is not nulled — see Go code: it keeps _profiler ref for ProfilerAddr()
+ }
+
+ _mu.ExitWriteLock();
+
+ // Release all goroutines waiting on quitCts.
+ _quitCts.Cancel();
+
+ // Close all client / route / gateway / leaf connections.
+ foreach (var c in conns.Values)
+ {
+ c.SetNoReconnect();
+ c.CloseConnection(ClosedState.ServerShutdown);
+ }
+
+ // Wait for accept loops to exit.
+ for (int i = 0; i < doneExpected; i++)
+ _done.Reader.ReadAsync().GetAwaiter().GetResult();
+
+ // Wait for all goroutines.
+ _grWg.Wait();
+
+ var opts = GetOpts();
+ if (!string.IsNullOrEmpty(opts.PortsFileDir))
+ DeletePortsFile(opts.PortsFileDir);
+
+ Noticef("Server Exiting..");
+
+ if (_ocsprc != null) { /* stub — stop OCSP cache in session 23 */ }
+
+ _shutdownComplete.TrySetResult();
+ }
+
+ ///
+ /// Blocks until has fully completed.
+ /// Mirrors Go Server.WaitForShutdown().
+ ///
+ public void WaitForShutdown() =>
+ _shutdownComplete.Task.GetAwaiter().GetResult();
+
+ // =========================================================================
+ // Goroutine tracking (features 3119–3120)
+ // =========================================================================
+
+ ///
+ /// Starts a background Task that counts toward the server wait group.
+ /// Returns true if the goroutine was started, false if the server is already stopped.
+ /// Mirrors Go Server.startGoRoutine(f).
+ ///
+ internal bool StartGoRoutine(Action f)
+ {
+ lock (_grMu)
+ {
+ if (!_grRunning) return false;
+ _grWg.Add(1);
+ Task.Run(() =>
+ {
+ try { f(); }
+ finally { _grWg.Done(); }
+ });
+ return true;
+ }
+ }
+
+ // =========================================================================
+ // Client / connection management (features 3081–3084)
+ // =========================================================================
+
+ ///
+ /// Removes a client, route, gateway, or leaf from server accounting.
+ /// Mirrors Go Server.removeClient().
+ ///
+ internal void RemoveClient(ClientConnection c)
+ {
+ switch (c.Kind)
+ {
+ case ClientKind.Client:
+ {
+ bool updateProto;
+ string proxyKey;
+ lock (c)
+ {
+ updateProto = c.Kind == ClientKind.Client &&
+ c.Opts.Protocol >= ClientProtocol.Info;
+ proxyKey = c.ProxyKey;
+ }
+ _mu.EnterWriteLock();
+ try
+ {
+ _clients.Remove(c.Cid);
+ if (updateProto) _cproto--;
+ if (!string.IsNullOrEmpty(proxyKey))
+ RemoveProxiedConn(proxyKey, c.Cid);
+ }
+ finally { _mu.ExitWriteLock(); }
+ break;
+ }
+ case ClientKind.Router:
+ RemoveRoute(c);
+ break;
+ case ClientKind.Gateway:
+ RemoveRemoteGatewayConnection(c);
+ break;
+ case ClientKind.Leaf:
+ RemoveLeafNodeConnection(c);
+ break;
+ }
+ }
+
+ ///
+ /// Removes a proxied connection entry.
+ /// Server write lock must be held on entry.
+ /// Mirrors Go Server.removeProxiedConn().
+ ///
+ private void RemoveProxiedConn(string key, ulong cid)
+ {
+ if (!_proxiedConns.TryGetValue(key, out var conns)) return;
+ conns.Remove(cid);
+ if (conns.Count == 0) _proxiedConns.Remove(key);
+ }
+
+ ///
+ /// Removes a client from the temporary goroutine client map.
+ /// Mirrors Go Server.removeFromTempClients().
+ ///
+ internal void RemoveFromTempClients(ulong cid)
+ {
+ lock (_grMu) { _grTmpClients.Remove(cid); }
+ }
+
+ ///
+ /// Adds a client to the temporary goroutine client map.
+ /// Returns false if the server is no longer running goroutines.
+ /// Mirrors Go Server.addToTempClients().
+ ///
+ internal bool AddToTempClients(ulong cid, ClientConnection c)
+ {
+ lock (_grMu)
+ {
+ if (!_grRunning) return false;
+ _grTmpClients[cid] = c;
+ return true;
+ }
+ }
+
+ // =========================================================================
+ // Query helpers (features 3085–3118, 3121–3123)
+ // =========================================================================
+
+ /// Returns the number of registered routes. Mirrors Go Server.NumRoutes().
+ public int NumRoutes()
+ {
+ _mu.EnterReadLock();
+ try { return NumRoutesInternal(); }
+ finally { _mu.ExitReadLock(); }
+ }
+
+ private int NumRoutesInternal()
+ {
+ int nr = 0;
+ ForEachRoute(_ => nr++);
+ return nr;
+ }
+
+ /// Returns the number of registered remotes. Mirrors Go Server.NumRemotes().
+ public int NumRemotes()
+ {
+ _mu.EnterReadLock();
+ try { return _routes.Count; }
+ finally { _mu.ExitReadLock(); }
+ }
+
+ /// Returns the number of leaf-node connections. Mirrors Go Server.NumLeafNodes().
+ public int NumLeafNodes()
+ {
+ _mu.EnterReadLock();
+ try { return _leafs.Count; }
+ finally { _mu.ExitReadLock(); }
+ }
+
+ /// Returns the number of registered clients. Mirrors Go Server.NumClients().
+ public int NumClients()
+ {
+ _mu.EnterReadLock();
+ try { return _clients.Count; }
+ finally { _mu.ExitReadLock(); }
+ }
+
+ /// Returns the client with the given connection ID. Mirrors Go Server.GetClient().
+ public ClientConnection? GetClient(ulong cid) => GetClientInternal(cid);
+
+ private ClientConnection? GetClientInternal(ulong cid)
+ {
+ _mu.EnterReadLock();
+ try
+ {
+ _clients.TryGetValue(cid, out var c);
+ return c;
+ }
+ finally { _mu.ExitReadLock(); }
+ }
+
+ /// Returns the leaf node with the given connection ID. Mirrors Go Server.GetLeafNode().
+ public ClientConnection? GetLeafNode(ulong cid)
+ {
+ _mu.EnterReadLock();
+ try
+ {
+ _leafs.TryGetValue(cid, out var c);
+ return c;
+ }
+ finally { _mu.ExitReadLock(); }
+ }
+
+ ///
+ /// Returns total subscriptions across all accounts.
+ /// Mirrors Go Server.NumSubscriptions().
+ ///
+ public uint NumSubscriptions()
+ {
+ _mu.EnterReadLock();
+ try { return NumSubscriptionsInternal(); }
+ finally { _mu.ExitReadLock(); }
+ }
+
+ private uint NumSubscriptionsInternal()
+ {
+ int subs = 0;
+ foreach (var kvp in _accounts)
+ subs += kvp.Value.TotalSubs();
+ return (uint)subs;
+ }
+
+ /// Returns the number of slow consumers. Mirrors Go Server.NumSlowConsumers().
+ public long NumSlowConsumers() => Interlocked.Read(ref _stats.SlowConsumers);
+
+ /// Returns the number of times clients were stalled. Mirrors Go Server.NumStalledClients().
+ public long NumStalledClients() => Interlocked.Read(ref _stats.Stalls);
+
+ /// Mirrors Go Server.NumSlowConsumersClients().
+ public long NumSlowConsumersClients() => Interlocked.Read(ref _scStats.Clients);
+
+ /// Mirrors Go Server.NumSlowConsumersRoutes().
+ public long NumSlowConsumersRoutes() => Interlocked.Read(ref _scStats.Routes);
+
+ /// Mirrors Go Server.NumSlowConsumersGateways().
+ public long NumSlowConsumersGateways() => Interlocked.Read(ref _scStats.Gateways);
+
+ /// Mirrors Go Server.NumSlowConsumersLeafs().
+ public long NumSlowConsumersLeafs() => Interlocked.Read(ref _scStats.Leafs);
+
+ /// Mirrors Go Server.NumStaleConnections().
+ public long NumStaleConnections() => Interlocked.Read(ref _stats.StaleConnections);
+
+ /// Mirrors Go Server.NumStaleConnectionsClients().
+ public long NumStaleConnectionsClients() => Interlocked.Read(ref _staleStats.Clients);
+
+ /// Mirrors Go Server.NumStaleConnectionsRoutes().
+ public long NumStaleConnectionsRoutes() => Interlocked.Read(ref _staleStats.Routes);
+
+ /// Mirrors Go Server.NumStaleConnectionsGateways().
+ public long NumStaleConnectionsGateways() => Interlocked.Read(ref _staleStats.Gateways);
+
+ /// Mirrors Go Server.NumStaleConnectionsLeafs().
+ public long NumStaleConnectionsLeafs() => Interlocked.Read(ref _staleStats.Leafs);
+
+ /// Returns the time the current configuration was loaded. Mirrors Go Server.ConfigTime().
+ public DateTime ConfigTime()
+ {
+ _mu.EnterReadLock();
+ try { return _configTime; }
+ finally { _mu.ExitReadLock(); }
+ }
+
+ /// Returns the client listener address. Mirrors Go Server.Addr().
+ public EndPoint? Addr()
+ {
+ _mu.EnterReadLock();
+ try { return _listener?.LocalEndpoint; }
+ finally { _mu.ExitReadLock(); }
+ }
+
+ /// Returns the monitoring listener address. Mirrors Go Server.MonitorAddr().
+ public IPEndPoint? MonitorAddr()
+ {
+ _mu.EnterReadLock();
+ try { return _http?.LocalEndpoint as IPEndPoint; }
+ finally { _mu.ExitReadLock(); }
+ }
+
+ /// Returns the cluster (route) listener address. Mirrors Go Server.ClusterAddr().
+ public IPEndPoint? ClusterAddr()
+ {
+ _mu.EnterReadLock();
+ try { return _routeListener?.LocalEndpoint as IPEndPoint; }
+ finally { _mu.ExitReadLock(); }
+ }
+
+ /// Returns the profiler listener address. Mirrors Go Server.ProfilerAddr().
+ public IPEndPoint? ProfilerAddr()
+ {
+ _mu.EnterReadLock();
+ try { return _profiler?.LocalEndpoint as IPEndPoint; }
+ finally { _mu.ExitReadLock(); }
+ }
+
+ ///
+ /// Polls until all expected listeners are up or the deadline expires.
+ /// Returns an error description if not ready within .
+ /// Mirrors Go Server.readyForConnections().
+ ///
+ public Exception? ReadyForConnectionsError(TimeSpan d)
+ {
+ var opts = GetOpts();
+ var end = DateTime.UtcNow.Add(d);
+
+ while (DateTime.UtcNow < end)
+ {
+ bool serverOk, routeOk, gatewayOk, leafOk, wsOk;
+ _mu.EnterReadLock();
+ serverOk = _listener != null || opts.DontListen;
+ routeOk = opts.Cluster.Port == 0 || _routeListener != null;
+ gatewayOk = string.IsNullOrEmpty(opts.Gateway.Name) || _gatewayListener != null;
+ leafOk = opts.LeafNode.Port == 0 || _leafNodeListener != null;
+ wsOk = opts.Websocket.Port == 0; // stub — websocket listener not tracked until session 23
+ _mu.ExitReadLock();
+
+ if (serverOk && routeOk && gatewayOk && leafOk && wsOk)
+ {
+ if (opts.DontListen)
+ {
+ try { _startupComplete.Task.Wait((int)d.TotalMilliseconds); }
+ catch { /* timeout */ }
+ }
+ return null;
+ }
+
+ if (d > TimeSpan.FromMilliseconds(25))
+ Thread.Sleep(25);
+ }
+
+ return new InvalidOperationException(
+ $"failed to be ready for connections after {d}");
+ }
+
+ ///
+ /// Returns true if the server is ready to accept connections.
+ /// Mirrors Go Server.ReadyForConnections().
+ ///
+ public bool ReadyForConnections(TimeSpan dur) =>
+ ReadyForConnectionsError(dur) == null;
+
+ /// Returns true if the server supports headers. Mirrors Go Server.supportsHeaders().
+ internal bool SupportsHeaders() => !(GetOpts().NoHeaderSupport);
+
+ /// Returns the server ID. Mirrors Go Server.ID().
+ public string ID() => _info.Id;
+
+ /// Returns the JetStream node name (hash of server name). Mirrors Go Server.NodeName().
+ public string NodeName() => GetHash(_info.Name);
+
+ /// Returns the server name. Mirrors Go Server.Name().
+ public string Name() => _info.Name;
+
+ /// Returns the server name as a string. Mirrors Go Server.String().
+ public override string ToString() => _info.Name;
+
+ /// Returns the number of currently-stored closed connections. Mirrors Go Server.numClosedConns().
+ internal int NumClosedConns()
+ {
+ _mu.EnterReadLock();
+ try { return _closed.Len(); }
+ finally { _mu.ExitReadLock(); }
+ }
+
+ /// Returns total closed connections ever recorded. Mirrors Go Server.totalClosedConns().
+ internal ulong TotalClosedConns()
+ {
+ _mu.EnterReadLock();
+ try { return _closed.TotalConns(); }
+ finally { _mu.ExitReadLock(); }
+ }
+
+ /// Returns a snapshot of recently closed connections. Mirrors Go Server.closedClients().
+ internal Internal.ClosedClient?[] ClosedClients()
+ {
+ _mu.EnterReadLock();
+ try { return _closed.ClosedClients(); }
+ finally { _mu.ExitReadLock(); }
+ }
+
+ // =========================================================================
+ // Lame duck mode (features 3135–3139)
+ // =========================================================================
+
+ /// Returns true if the server is in lame duck mode. Mirrors Go Server.isLameDuckMode().
+ public bool IsLameDuckMode()
+ {
+ _mu.EnterReadLock();
+ try { return _ldm; }
+ finally { _mu.ExitReadLock(); }
+ }
+
+ ///
+ /// Performs a lame-duck shutdown: stops accepting new clients, notifies
+ /// existing clients to reconnect elsewhere, then shuts down.
+ /// Mirrors Go Server.LameDuckShutdown().
+ ///
+ public void LameDuckShutdown() => LameDuckMode();
+
+ ///
+ /// Core lame-duck implementation.
+ /// Mirrors Go Server.lameDuckMode().
+ ///
+ internal void LameDuckMode()
+ {
+ _mu.EnterWriteLock();
+ if (IsShuttingDown() || _ldm || _listener == null)
+ {
+ _mu.ExitWriteLock();
+ return;
+ }
+ Noticef("Entering lame duck mode, stop accepting new clients");
+ _ldm = true;
+ SendLDMShutdownEventLocked();
+
+ int expected = 1;
+ _listener.Stop();
+ _listener = null;
+ expected += CloseWebsocketServer();
+ _ldmCh = System.Threading.Channels.Channel.CreateBounded(
+ new System.Threading.Channels.BoundedChannelOptions(expected)
+ { FullMode = System.Threading.Channels.BoundedChannelFullMode.Wait });
+
+ var opts = GetOpts();
+ var gp = opts.LameDuckGracePeriod;
+ if (gp < TimeSpan.Zero) gp = gp.Negate();
+ _mu.ExitWriteLock();
+
+ // Transfer Raft leaders (stub returns false).
+ if (TransferRaftLeaders())
+ Thread.Sleep(1000);
+
+ ShutdownJetStream();
+ ShutdownRaftNodes();
+
+ // Wait for accept loops.
+ for (int i = 0; i < expected; i++)
+ _ldmCh.Reader.ReadAsync().GetAwaiter().GetResult();
+
+ _mu.EnterWriteLock();
+ var clients = new List(_clients.Values);
+
+ if (IsShuttingDown() || clients.Count == 0)
+ {
+ _mu.ExitWriteLock();
+ Shutdown();
+ return;
+ }
+
+ var dur = opts.LameDuckDuration - gp;
+ if (dur <= TimeSpan.Zero) dur = TimeSpan.FromSeconds(1);
+
+ long numClients = clients.Count;
+ var si = dur / numClients;
+ int batch = 1;
+
+ if (si < TimeSpan.FromTicks(1))
+ {
+ si = TimeSpan.FromTicks(1);
+ batch = (int)(numClients / dur.Ticks);
+ }
+ else if (si > TimeSpan.FromSeconds(1))
+ {
+ si = TimeSpan.FromSeconds(1);
+ }
+
+ SendLDMToRoutes();
+ SendLDMToClients();
+ _mu.ExitWriteLock();
+
+ // Grace-period delay.
+ var token = _quitCts.Token;
+ try { Task.Delay(gp, token).GetAwaiter().GetResult(); }
+ catch (OperationCanceledException) { return; }
+
+ Noticef("Closing existing clients");
+ for (int i = 0; i < clients.Count; i++)
+ {
+ clients[i].CloseConnection(ClosedState.ServerShutdown);
+ if (i == clients.Count - 1) break;
+ if (batch == 1 || i % batch == 0)
+ {
+ var jitter = (long)(Random.Shared.NextDouble() * si.Ticks);
+ if (jitter < si.Ticks / 2) jitter = si.Ticks / 2;
+ try { Task.Delay(TimeSpan.FromTicks(jitter), token).GetAwaiter().GetResult(); }
+ catch (OperationCanceledException) { return; }
+ }
+ }
+
+ Shutdown();
+ WaitForShutdown();
+ }
+
+ ///
+ /// Sends an LDM INFO to all routes.
+ /// Server lock must be held on entry.
+ /// Mirrors Go Server.sendLDMToRoutes().
+ ///
+ private void SendLDMToRoutes()
+ {
+ _routeInfo.LameDuckMode = true;
+ var infoJson = GenerateInfoJson(_routeInfo);
+ ForEachRemote(r =>
+ {
+ lock (r) { r.EnqueueProto(infoJson); }
+ });
+ _routeInfo.LameDuckMode = false;
+ }
+
+ ///
+ /// Sends an LDM INFO to all connected clients.
+ /// Server lock must be held on entry.
+ /// Mirrors Go Server.sendLDMToClients().
+ ///
+ private void SendLDMToClients()
+ {
+ _info.LameDuckMode = true;
+ _clientConnectUrls.Clear();
+
+ _info.ClientConnectUrls = null;
+ _info.WsConnectUrls = null;
+
+ if (!GetOpts().Cluster.NoAdvertise)
+ {
+ var cUrls = _clientConnectUrlsMap.GetAsStringSlice();
+ _info.ClientConnectUrls = cUrls.Length > 0 ? cUrls : null;
+ }
+
+ SendAsyncInfoToClients(true, true);
+ _info.LameDuckMode = false;
+ }
+
+ // =========================================================================
+ // Rate-limit logging (features 3144–3145)
+ // =========================================================================
+
+ ///
+ /// Starts the background goroutine that expires rate-limit log entries.
+ /// Mirrors Go Server.startRateLimitLogExpiration().
+ ///
+ internal void StartRateLimitLogExpiration()
+ {
+ StartGoRoutine(() =>
+ {
+ var interval = TimeSpan.FromSeconds(1);
+ var token = _quitCts.Token;
+
+ while (!token.IsCancellationRequested)
+ {
+ try { Task.Delay(interval, token).GetAwaiter().GetResult(); }
+ catch (OperationCanceledException) { return; }
+
+ var now = DateTime.UtcNow;
+ foreach (var key in _rateLimitLogging.Keys)
+ {
+ if (_rateLimitLogging.TryGetValue(key, out var val) &&
+ val is DateTime ts && now - ts >= interval)
+ {
+ _rateLimitLogging.TryRemove(key, out _);
+ }
+ }
+
+ // Check for a new interval value.
+ if (_rateLimitLoggingCh.Reader.TryRead(out var newInterval))
+ interval = newInterval;
+ }
+ });
+ }
+
+ ///
+ /// Updates the rate-limit logging interval.
+ /// Mirrors Go Server.changeRateLimitLogInterval().
+ ///
+ internal void ChangeRateLimitLogInterval(TimeSpan d)
+ {
+ if (d <= TimeSpan.Zero) return;
+ _rateLimitLoggingCh.Writer.TryWrite(d);
+ }
+
+ // =========================================================================
+ // DisconnectClientByID / LDMClientByID (features 3146–3147)
+ // =========================================================================
+
+ ///
+ /// Forcibly disconnects the client or leaf node with the given ID.
+ /// Mirrors Go Server.DisconnectClientByID().
+ ///
+ public Exception? DisconnectClientByID(ulong id)
+ {
+ var c = GetClientInternal(id);
+ if (c != null) { c.CloseConnection(ClosedState.Kicked); return null; }
+ c = GetLeafNode(id);
+ if (c != null) { c.CloseConnection(ClosedState.Kicked); return null; }
+ return new InvalidOperationException("no such client or leafnode id");
+ }
+
+ ///
+ /// Sends a Lame Duck Mode INFO message to the specified client.
+ /// Mirrors Go Server.LDMClientByID().
+ ///
+ public Exception? LDMClientByID(ulong id)
+ {
+ ClientConnection? c;
+ ServerInfo info;
+ _mu.EnterReadLock();
+ _clients.TryGetValue(id, out c);
+ if (c == null)
+ {
+ _mu.ExitReadLock();
+ return new InvalidOperationException("no such client id");
+ }
+ info = CopyInfo();
+ info.LameDuckMode = true;
+ _mu.ExitReadLock();
+
+ lock (c)
+ {
+ if (c.Opts.Protocol >= ClientProtocol.Info &&
+ (c.Flags & ClientFlags.FirstPongSent) != 0)
+ {
+ c.Debugf("Sending Lame Duck Mode info to client");
+ c.EnqueueProto(c.GenerateClientInfoJSON(info, true).Span);
+ return null;
+ }
+ }
+ return new InvalidOperationException(
+ "client does not support Lame Duck Mode or is not ready to receive the notification");
+ }
+
+ // =========================================================================
+ // updateRemoteSubscription / shouldReportConnectErr (features 3142–3143)
+ // =========================================================================
+
+ ///
+ /// Notifies routes, gateways, and leaf nodes about a subscription change.
+ /// Mirrors Go Server.updateRemoteSubscription().
+ ///
+ internal void UpdateRemoteSubscription(Account acc, Subscription sub, int delta)
+ {
+ UpdateRouteSubscriptionMap(acc, sub, delta);
+ if (_gateway.Enabled)
+ GatewayUpdateSubInterest(acc.Name, sub, delta);
+ acc.UpdateLeafNodes(sub, delta);
+ }
+
+ ///
+ /// Returns true if a connect error at this attempt count should be reported.
+ /// Mirrors Go Server.shouldReportConnectErr().
+ ///
+ internal bool ShouldReportConnectErr(bool firstConnect, int attempts)
+ {
+ var opts = GetOpts();
+ int threshold = firstConnect ? opts.ConnectErrorReports : opts.ReconnectErrorReports;
+ return attempts == 1 || attempts % threshold == 0;
+ }
+
+ // =========================================================================
+ // Session 10 stubs for cross-session calls
+ // =========================================================================
+
+ /// Stub — JetStream pull-consumer signalling (session 19).
+ private void SignalPullConsumers() { }
+
+ /// Stub — Raft step-down (session 20).
+ private void StepdownRaftNodes() { }
+
+ /// Stub — eventing shutdown (session 12).
+ private void ShutdownEventing() { }
+
+ /// Stub — JetStream shutdown (session 19).
+ private void ShutdownJetStream() { }
+
+ /// Stub — Raft nodes shutdown (session 20).
+ private void ShutdownRaftNodes() { }
+
+ /// Stub — Raft leader transfer (session 20). Returns false (no leaders to transfer).
+ private bool TransferRaftLeaders() => false;
+
+ /// Stub — LDM shutdown event (session 12).
+ private void SendLDMShutdownEventLocked() { }
+
+ ///
+ /// Stub — closes WebSocket server if running (session 23).
+ /// Returns the number of done-channel signals to expect.
+ ///
+ private int CloseWebsocketServer() => 0;
+
+ ///
+ /// Iterates over all route connections. Stub — session 14.
+ /// Server lock must be held on entry.
+ ///
+ internal void ForEachRoute(Action fn) { }
+
+ ///
+ /// Iterates over all remote (outbound route) connections. Stub — session 14.
+ /// Server lock must be held on entry.
+ ///
+ private void ForEachRemote(Action fn) { }
+
+ /// Stub — collects all gateway connections (session 16).
+ private void GetAllGatewayConnections(Dictionary conns) { }
+
+ /// Stub — removes a route connection (session 14).
+ private void RemoveRoute(ClientConnection c) { }
+
+ /// Stub — removes a remote gateway connection (session 16).
+ private void RemoveRemoteGatewayConnection(ClientConnection c) { }
+
+ /// Stub — removes a leaf-node connection (session 15).
+ private void RemoveLeafNodeConnection(ClientConnection c) { }
+
+ /// Stub — sends async INFO to clients (session 10/11). No-op until clients are running.
+ private void SendAsyncInfoToClients(bool cliUpdated, bool wsUpdated) { }
+
+ /// Stub — updates route subscription map (session 14).
+ private void UpdateRouteSubscriptionMap(Account acc, Subscription sub, int delta) { }
+
+ /// Stub — updates gateway sub interest (session 16).
+ private void GatewayUpdateSubInterest(string accName, Subscription sub, int delta) { }
+
+ /// Stub — account disconnect event (session 12).
+ private void AccountDisconnectEvent(ClientConnection c, DateTime now, string reason) { }
+}
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs
new file mode 100644
index 0000000..5d8d50d
--- /dev/null
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs
@@ -0,0 +1,1164 @@
+// Copyright 2012-2026 The NATS Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Adapted from server/server.go (lines 2779–4407) in the NATS server Go source.
+// Session 10: accept loops, client creation, connect URL management, port helpers.
+
+using System.Net;
+using System.Net.Sockets;
+using System.Text.Json;
+using ZB.MOM.NatsNet.Server.Internal;
+
+namespace ZB.MOM.NatsNet.Server;
+
+public sealed partial class NatsServer
+{
+ // =========================================================================
+ // Accept min/max sleep constants (mirrors const.go ACCEPT_MIN_SLEEP / ACCEPT_MAX_SLEEP)
+ // =========================================================================
+
+ private static readonly TimeSpan AcceptMinSleep = TimeSpan.FromMilliseconds(10);
+ private static readonly TimeSpan AcceptMaxSleep = TimeSpan.FromSeconds(1);
+
+ // =========================================================================
+ // HTTP monitoring path constants (mirrors RootPath / VarzPath / ... in server.go)
+ // =========================================================================
+
+ /// HTTP monitoring path constants. Mirrors the Go const block in server.go.
+ public static class MonitorPaths
+ {
+ public const string Root = "/";
+ public const string Varz = "/varz";
+ public const string Connz = "/connz";
+ public const string Routez = "/routez";
+ public const string Gatewayz = "/gatewayz";
+ public const string Leafz = "/leafz";
+ public const string Subsz = "/subsz";
+ public const string Stacksz = "/stacksz";
+ public const string Accountz = "/accountz";
+ public const string AccountStatz = "/accstatz";
+ public const string Jsz = "/jsz";
+ public const string Healthz = "/healthz";
+ public const string IPQueuesz = "/ipqueuesz";
+ public const string Raftz = "/raftz";
+ public const string Expvarz = "/debug/vars";
+ }
+
+ // =========================================================================
+ // Nonce (session 10 stubs — full implementation in session 11 auth.go)
+ // =========================================================================
+
+ ///
+ /// Returns true if this server requires clients to send a nonce for auth.
+ /// Stub — full implementation in session 11.
+ /// Mirrors Go Server.nonceRequired().
+ ///
+ private bool NonceRequired() => false;
+
+ ///
+ /// Fills with random bytes.
+ /// Stub — full implementation in session 11.
+ /// Mirrors Go Server.generateNonce().
+ ///
+ private void GenerateNonce(byte[] nonce) { }
+
+ // =========================================================================
+ // INFO JSON serialisation (feature 3124)
+ // =========================================================================
+
+ ///
+ /// Serialises to a JSON INFO line (INFO {...}\r\n).
+ /// Mirrors Go generateInfoJSON().
+ ///
+ internal static byte[] GenerateInfoJson(ServerInfo info)
+ {
+ var json = JsonSerializer.Serialize(info);
+ var line = $"INFO {json}\r\n";
+ return System.Text.Encoding.UTF8.GetBytes(line);
+ }
+
+ // =========================================================================
+ // copyInfo — deep copy INFO struct (feature 3071)
+ // =========================================================================
+
+ ///
+ /// Deep-copies the server's INFO struct.
+ /// Server lock must be held on entry.
+ /// Mirrors Go Server.copyInfo().
+ ///
+ internal ServerInfo CopyInfo()
+ {
+ // Shallow clone (copies all value-type and reference fields).
+ // ServerInfo is a class, so we must clone the object rather than copy the reference.
+ var info = _info.ShallowClone();
+
+ // Deep-copy the slice fields that may be modified independently.
+ if (info.ClientConnectUrls is { Length: > 0 })
+ info.ClientConnectUrls = [.. info.ClientConnectUrls];
+
+ if (info.WsConnectUrls is { Length: > 0 })
+ info.WsConnectUrls = [.. info.WsConnectUrls];
+
+ return info;
+ }
+
+ // =========================================================================
+ // AcceptLoop — exported for easier testing (feature 3052)
+ // =========================================================================
+
+ ///
+ /// Starts the main client accept loop. The optional
+ /// TCS is completed once the listener is bound (mirrors Go chan struct{}).
+ /// Mirrors Go Server.AcceptLoop().
+ ///
+ public void AcceptLoop(TaskCompletionSource? ready = null)
+ {
+ // If we were to exit before the listener is set up, signal the TCS.
+ try
+ {
+ if (IsShuttingDown()) return;
+
+ var opts = GetOpts();
+
+ _mu.EnterWriteLock();
+ var hp = $"{opts.Host}:{opts.Port}";
+ var result = GetServerListener(hp);
+ _listenerErr = result.err;
+
+ if (result.err != null)
+ {
+ _mu.ExitWriteLock();
+ Fatalf("Error listening on port: {0}, {1}", hp, result.err.Message);
+ return;
+ }
+
+ var l = result.listener!;
+ Noticef("Listening for client connections on {0}", l.LocalEndpoint);
+
+ if (opts.TlsConfig != null)
+ {
+ Noticef("TLS required for client connections");
+ }
+
+ // If opts.Port was 0, write resolved port back.
+ if (opts.Port == 0)
+ opts.Port = ((IPEndPoint)l.LocalEndpoint!).Port;
+
+ // Set the server's info Host/Port.
+ var infoErr = SetInfoHostPort();
+ if (infoErr != null)
+ {
+ Fatalf("Error setting server INFO with ClientAdvertise value of {0}, err={1}",
+ opts.ClientAdvertise, infoErr.Message);
+ l.Stop();
+ _mu.ExitWriteLock();
+ return;
+ }
+
+ _clientConnectUrls.Clear();
+ _clientConnectUrls.AddRange(GetClientConnectURLs());
+ _listener = l;
+
+ // Start the accept goroutine.
+ _ = Task.Run(() =>
+ {
+ _reloadMu.EnterReadLock();
+ AcceptConnections(l, "Client",
+ tc => CreateClient(tc),
+ _ =>
+ {
+ if (IsLameDuckMode())
+ {
+ // Signal that we are not accepting new clients.
+ _ldmCh?.Writer.TryWrite(default);
+ // Wait for quit signal.
+ _quitCts.Token.WaitHandle.WaitOne();
+ return true;
+ }
+ return false;
+ });
+ _reloadMu.ExitReadLock();
+ });
+
+ _mu.ExitWriteLock();
+ }
+ finally
+ {
+ // Signal that we are ready (or exiting early).
+ ready?.TrySetResult();
+ }
+ }
+
+ // =========================================================================
+ // GetServerListener (feature 3054)
+ // =========================================================================
+
+ ///
+ /// Returns the existing client listener (if any) or creates a new TCP listener on .
+ /// Server lock must be held on entry.
+ /// Mirrors Go Server.getServerListener().
+ ///
+ private (TcpListener? listener, Exception? err) GetServerListener(string hp)
+ {
+ if (_listener != null)
+ return (_listener, _listenerErr);
+
+ try
+ {
+ var parts = hp.Split(':', 2);
+ var host = parts[0];
+ var port = parts.Length > 1 ? int.Parse(parts[1]) : 0;
+ var addr = string.IsNullOrEmpty(host) || host == "0.0.0.0"
+ ? IPAddress.Any
+ : (host == "::" ? IPAddress.IPv6Any : IPAddress.Parse(host));
+
+ var l = new TcpListener(addr, port);
+ l.Start();
+ return (l, null);
+ }
+ catch (Exception ex)
+ {
+ return (null, ex);
+ }
+ }
+
+ // =========================================================================
+ // InProcessConn (feature 3055)
+ // =========================================================================
+
+ ///
+ /// Returns an in-process connection to the server, bypassing the need for
+ /// a TCP listener for local connectivity within the same process.
+ /// Mirrors Go Server.InProcessConn().
+ ///
+ public (Stream? conn, Exception? err) InProcessConn()
+ {
+ // Create a loopback TCP socket pair to simulate net.Pipe().
+ TcpListener? loopback = null;
+ TcpClient? clientTcp = null;
+ TcpClient? serverTcp = null;
+
+ try
+ {
+ loopback = new TcpListener(IPAddress.Loopback, 0);
+ loopback.Start();
+ var ep = (IPEndPoint)loopback.LocalEndpoint!;
+
+ clientTcp = new TcpClient();
+ clientTcp.Connect(ep);
+ serverTcp = loopback.AcceptTcpClient();
+ }
+ catch (Exception ex)
+ {
+ clientTcp?.Dispose();
+ serverTcp?.Dispose();
+ return (null, ex);
+ }
+ finally
+ {
+ loopback?.Stop();
+ }
+
+ var serverStream = serverTcp.GetStream();
+ var clientStream = clientTcp.GetStream();
+
+ if (!StartGoRoutine(() => CreateClientInProcess(serverStream)))
+ {
+ serverStream.Dispose();
+ clientStream.Dispose();
+ return (null, new InvalidOperationException("failed to create connection"));
+ }
+
+ return (clientStream, null);
+ }
+
+ // =========================================================================
+ // AcceptConnections — inner accept loop (feature 3056)
+ // =========================================================================
+
+ ///
+ /// Runs the TCP accept loop for , calling
+ /// for each accepted connection. On error, calls and backs off.
+ /// Signals _done when the loop exits.
+ /// Mirrors Go Server.acceptConnections().
+ ///
+ private void AcceptConnections(
+ TcpListener l,
+ string acceptName,
+ Action createFunc,
+ Func? errFunc = null)
+ {
+ var tmpDelay = AcceptMinSleep;
+
+ while (true)
+ {
+ TcpClient tc;
+ try
+ {
+ tc = l.AcceptTcpClient();
+ }
+ catch (Exception err)
+ {
+ if (errFunc != null && errFunc(err))
+ break;
+
+ var newDelay = AcceptError(acceptName, err, tmpDelay);
+ if (newDelay < TimeSpan.Zero)
+ break;
+
+ tmpDelay = newDelay;
+ continue;
+ }
+
+ tmpDelay = AcceptMinSleep;
+
+ var captured = tc;
+ if (!StartGoRoutine(() =>
+ {
+ try
+ {
+ _reloadMu.EnterReadLock();
+ createFunc(captured);
+ _reloadMu.ExitReadLock();
+ }
+ catch
+ {
+ _reloadMu.ExitReadLock();
+ captured.Dispose();
+ }
+ }))
+ {
+ tc.Dispose();
+ }
+ }
+
+ Debugf("{0} accept loop exiting..", acceptName);
+ _done.Writer.TryWrite(default);
+ }
+
+ // =========================================================================
+ // Profiler (feature 3057)
+ // =========================================================================
+
+ ///
+ /// Starts the HTTP profiling server.
+ /// Stub — full pprof integration deferred to session 17.
+ /// Mirrors Go Server.StartProfiler().
+ ///
+ public void StartProfiler()
+ {
+ if (IsShuttingDown()) return;
+
+ var opts = GetOpts();
+ var port = opts.ProfPort == -1 ? 0 : opts.ProfPort;
+
+ _mu.EnterWriteLock();
+ try
+ {
+ var hp = $"{opts.Host}:{port}";
+ var parts = hp.Split(':', 2);
+ var addr = string.IsNullOrEmpty(parts[0]) ? IPAddress.Any : IPAddress.Parse(parts[0]);
+ var l = new TcpListener(addr, port);
+ l.Start();
+ Noticef("profiling port: {0}", ((IPEndPoint)l.LocalEndpoint!).Port);
+ _profiler = l;
+ }
+ catch (Exception ex)
+ {
+ Fatalf("error starting profiler: {0}", ex);
+ }
+ finally
+ {
+ _mu.ExitWriteLock();
+ }
+ }
+
+ ///
+ /// Sets the block profiling rate. Stub — mirrors Go Server.setBlockProfileRate().
+ ///
+ private void SetBlockProfileRate(int rate)
+ {
+ if (rate > 0)
+ Warnf("Block profiling is enabled (rate {0}), this may have a performance impact", rate);
+ // runtime.SetBlockProfileRate — not applicable in .NET; no-op.
+ }
+
+ // =========================================================================
+ // HTTP Monitoring (features 3058–3061)
+ // =========================================================================
+
+ ///
+ /// Enables the HTTP monitoring port.
+ /// Deprecated: use instead.
+ /// Mirrors Go Server.StartHTTPMonitoring().
+ ///
+ public void StartHTTPMonitoring() => StartMonitoringInternal(false);
+
+ ///
+ /// Enables the HTTPS monitoring port.
+ /// Deprecated: use instead.
+ /// Mirrors Go Server.StartHTTPSMonitoring().
+ ///
+ public void StartHTTPSMonitoring() => StartMonitoringInternal(true);
+
+ ///
+ /// Starts the HTTP or HTTPS monitoring server as configured.
+ /// Mirrors Go Server.StartMonitoring().
+ ///
+ public Exception? StartMonitoring()
+ {
+ var opts = GetOpts();
+ if (opts.HttpPort != 0 && opts.HttpsPort != 0)
+ return new InvalidOperationException(
+ $"can't specify both HTTP ({opts.HttpPort}) and HTTPs ({opts.HttpsPort}) ports");
+
+ Exception? err = null;
+ if (opts.HttpPort != 0)
+ err = StartMonitoringInternal(false);
+ else if (opts.HttpsPort != 0)
+ {
+ if (opts.TlsConfig == null)
+ return new InvalidOperationException("TLS cert and key required for HTTPS");
+ err = StartMonitoringInternal(true);
+ }
+ return err;
+ }
+
+ ///
+ /// Returns the base URL path for the monitoring server.
+ /// Mirrors Go Server.basePath().
+ ///
+ private string BasePath(string p) =>
+ string.IsNullOrEmpty(_httpBasePath) ? p : System.IO.Path.Combine(_httpBasePath, p.TrimStart('/'));
+
+ ///
+ /// Starts the HTTP/HTTPS monitoring listener.
+ /// Stub — full monitoring handler registration deferred to session 17.
+ /// Mirrors Go Server.startMonitoring().
+ ///
+ private Exception? StartMonitoringInternal(bool secure)
+ {
+ if (IsShuttingDown()) return null;
+
+ var opts = GetOpts();
+ int port;
+ string proto;
+
+ if (secure)
+ {
+ proto = "https";
+ port = opts.HttpsPort == -1 ? 0 : opts.HttpsPort;
+ }
+ else
+ {
+ proto = "http";
+ port = opts.HttpPort == -1 ? 0 : opts.HttpPort;
+ }
+
+ var hp = $"{opts.HttpHost}:{port}";
+ TcpListener httpListener;
+ try
+ {
+ var parts = hp.Split(':', 2);
+ var bindAddr = string.IsNullOrEmpty(parts[0]) ? IPAddress.Any : IPAddress.Parse(parts[0]);
+ httpListener = new TcpListener(bindAddr, port);
+ httpListener.Start();
+ }
+ catch (Exception ex)
+ {
+ return new InvalidOperationException($"can't listen to the monitor port: {ex.Message}", ex);
+ }
+
+ var rport = ((IPEndPoint)httpListener.LocalEndpoint!).Port;
+ Noticef("Starting {0} monitor on {1}", proto, $"{opts.HttpHost}:{rport}");
+
+ _mu.EnterWriteLock();
+ _http = httpListener;
+ _mu.ExitWriteLock();
+
+ // Full HTTP handler registration in session 17; for now just drain the listener.
+ _ = Task.Run(() =>
+ {
+ // Accept and immediately close connections until shutdown.
+ while (!IsShuttingDown())
+ {
+ try
+ {
+ var tc = httpListener.AcceptTcpClient();
+ tc.Dispose();
+ }
+ catch
+ {
+ break;
+ }
+ }
+
+ _mu.EnterWriteLock();
+ // Don't null _http — ProfilerAddr etc. still read it.
+ _mu.ExitWriteLock();
+
+ _done.Writer.TryWrite(default);
+ });
+
+ return null;
+ }
+
+ // =========================================================================
+ // CreateClient / CreateClientInProcess / CreateClientEx (features 3063–3065)
+ // =========================================================================
+
+ ///
+ /// Creates and registers a new client connection from the given TCP client.
+ /// Mirrors Go Server.createClient().
+ ///
+ private ClientConnection? CreateClient(TcpClient tc) =>
+ CreateClientEx(tc.GetStream(), false);
+
+ ///
+ /// Creates and registers an in-process client connection.
+ /// Mirrors Go Server.createClientInProcess().
+ ///
+ private ClientConnection? CreateClientInProcess(Stream nc) =>
+ CreateClientEx(nc, true);
+
+ ///
+ /// Full client-creation factory: registers with account, sends INFO, starts read/write loops.
+ /// Mirrors Go Server.createClientEx().
+ ///
+ private ClientConnection? CreateClientEx(Stream nc, bool inProcess)
+ {
+ var opts = GetOpts();
+ var now = DateTime.UtcNow;
+
+ var c = new ClientConnection(ClientKind.Client, this, nc)
+ {
+ InProc = inProcess,
+ Start = now,
+ Last = now,
+ Opts = ClientOptions.Default,
+ };
+
+ var globalAcc = GlobalAccount();
+ if (globalAcc != null)
+ {
+ try { c.RegisterWithAccount(globalAcc); }
+ catch (Exception ex)
+ {
+ c.ReportErrRegisterAccount(globalAcc, ex);
+ return null;
+ }
+ }
+
+ ServerInfo info;
+ bool authRequired;
+
+ _mu.EnterWriteLock();
+ info = CopyInfo();
+
+ if (NonceRequired())
+ {
+ var raw = new byte[24]; // nonceLen
+ GenerateNonce(raw);
+ info.Nonce = System.Text.Encoding.ASCII.GetString(raw);
+ }
+
+ c.Nonce = info.Nonce != null ? System.Text.Encoding.ASCII.GetBytes(info.Nonce) : null;
+ authRequired = info.AuthRequired;
+
+ // If auth is required but there is a no_auth_user, don't require it.
+ if (info.AuthRequired
+ && !string.IsNullOrEmpty(opts.NoAuthUser)
+ && opts.NoAuthUser != _sysAccOnlyNoAuthUser)
+ {
+ info.AuthRequired = false;
+ }
+
+ // For in-process connections with TLS required, mark as available but not required.
+ if (inProcess && info.TlsRequired)
+ {
+ info.TlsRequired = false;
+ info.TlsAvailable = true;
+ }
+
+ _totalClients++;
+ _mu.ExitWriteLock();
+
+ lock (c)
+ {
+ if (authRequired)
+ c.Flags |= ClientFlags.ExpectConnect;
+
+ c.InitClient();
+ c.Debugf("Client connection created");
+
+ // Send INFO to the client immediately (TLS-first handling omitted — stub).
+ var infoBytes = c.GenerateClientInfoJSON(info, !opts.ProxyProtocol);
+ c.SendProtoNow(infoBytes.Span);
+ }
+
+ // Register with the server.
+ _mu.EnterWriteLock();
+ if (!IsRunning() || _ldm)
+ {
+ if (IsShuttingDown())
+ nc.Close();
+ _mu.ExitWriteLock();
+ return c;
+ }
+
+ if (opts.MaxConn > 0 && _clients.Count >= opts.MaxConn)
+ {
+ _mu.ExitWriteLock();
+ c.MaxConnExceeded();
+ return null;
+ }
+
+ _clients[c.Cid] = c;
+ _mu.ExitWriteLock();
+
+ lock (c)
+ {
+ if (c.IsClosed())
+ {
+ c.CloseConnection(ClosedState.WriteError);
+ return null;
+ }
+
+ if (authRequired)
+ c.SetAuthTimer(TimeSpan.FromSeconds(opts.AuthTimeout));
+
+ c.SetPingTimer();
+
+ // Start read/write loops as goroutines.
+ byte[]? pre = null;
+ StartGoRoutine(() => c.ReadLoop(pre));
+ StartGoRoutine(() => c.WriteLoop());
+ }
+
+ return c;
+ }
+
+ // =========================================================================
+ // SaveClosedClient (feature 3066)
+ // =========================================================================
+
+ ///
+ /// Records a closed client in the ring buffer for the /connz monitoring endpoint.
+ /// Mirrors Go Server.saveClosedClient().
+ ///
+ internal void SaveClosedClient(ClientConnection c, Stream? nc, ClosedState reason)
+ {
+ var now = DateTime.UtcNow;
+
+ // Stub: full implementation requires connInfo.Fill() from session 17.
+ // For now create a minimal ClosedClient record.
+ AccountDisconnectEvent(c, now, reason.ToString());
+
+ string user = string.Empty;
+ string acc = string.Empty;
+
+ lock (c)
+ {
+ // acc name if not the global account.
+ if (c.Account?.Name != null && c.Account.Name != ServerConstants.DefaultGlobalAccount)
+ acc = c.Account.Name;
+ }
+
+ var cc = new ClosedClient
+ {
+ User = user,
+ Account = acc,
+ };
+
+ _mu.EnterWriteLock();
+ _closed.Append(cc);
+ _mu.ExitWriteLock();
+ }
+
+ // =========================================================================
+ // Connect URL management (features 3067–3070)
+ // =========================================================================
+
+ ///
+ /// Adds URLs to the connect-URL maps and sends async INFO to clients.
+ /// Server lock must be held on entry.
+ /// Mirrors Go Server.addConnectURLsAndSendINFOToClients().
+ ///
+ internal void AddConnectURLsAndSendINFOToClients(string[] curls, string[] wsurls) =>
+ UpdateServerINFOAndSendINFOToClients(curls, wsurls, add: true);
+
+ ///
+ /// Removes URLs from the connect-URL maps and sends async INFO to clients.
+ /// Server lock must be held on entry.
+ /// Mirrors Go Server.removeConnectURLsAndSendINFOToClients().
+ ///
+ internal void RemoveConnectURLsAndSendINFOToClients(string[] curls, string[] wsurls) =>
+ UpdateServerINFOAndSendINFOToClients(curls, wsurls, add: false);
+
+ ///
+ /// Updates the connect-URL maps and, if changed, sends async INFO to clients.
+ /// Server lock must be held on entry.
+ /// Mirrors Go Server.updateServerINFOAndSendINFOToClients().
+ ///
+ private void UpdateServerINFOAndSendINFOToClients(string[] curls, string[] wsurls, bool add)
+ {
+ bool UpdateMap(string[] urls, RefCountedUrlSet m)
+ {
+ bool wasUpdated = false;
+ foreach (var url in urls)
+ {
+ if (add && m.AddUrl(url)) wasUpdated = true;
+ if (!add && m.RemoveUrl(url)) wasUpdated = true;
+ }
+ return wasUpdated;
+ }
+
+ bool cliUpdated = UpdateMap(curls, _clientConnectUrlsMap);
+ bool wsUpdated = UpdateMap(wsurls, _websocket.ConnectUrlsMap);
+
+ void UpdateInfo(ref string[]? infoUrls, List localUrls, RefCountedUrlSet m)
+ {
+ var list = new List(localUrls);
+ list.AddRange(m.GetAsStringSlice());
+ infoUrls = list.Count > 0 ? list.ToArray() : null;
+ }
+
+ if (cliUpdated)
+ {
+ var infoUrls = _info.ClientConnectUrls;
+ UpdateInfo(ref infoUrls, _clientConnectUrls, _clientConnectUrlsMap);
+ _info.ClientConnectUrls = infoUrls;
+ }
+
+ if (wsUpdated)
+ {
+ // WebSocket connect URLs stub — session 23.
+ }
+
+ if (cliUpdated || wsUpdated)
+ SendAsyncInfoToClients(cliUpdated, wsUpdated);
+ }
+
+ // =========================================================================
+ // TLS version helpers (features 3072–3073)
+ // =========================================================================
+
+ ///
+ /// Returns the TLS version string for the given TLS version number.
+ /// Mirrors Go tlsVersion().
+ ///
+ internal static string TlsVersion(ushort ver) => ver switch
+ {
+ 0x0301 => "1.0",
+ 0x0302 => "1.1",
+ 0x0303 => "1.2",
+ 0x0304 => "1.3",
+ _ => $"Unknown [0x{ver:x}]",
+ };
+
+ ///
+ /// Parses a TLS version string to a version number.
+ /// Mirrors Go tlsVersionFromString().
+ ///
+ internal static (ushort ver, Exception? err) TlsVersionFromString(string ver) => ver switch
+ {
+ "1.0" => (0x0301, null),
+ "1.1" => (0x0302, null),
+ "1.2" => (0x0303, null),
+ "1.3" => (0x0304, null),
+ _ => (0, new ArgumentException($"unknown version: {ver}")),
+ };
+
+ // =========================================================================
+ // Connect URL helpers (features 3074–3076)
+ // =========================================================================
+
+ ///
+ /// Returns client connect URLs based on server options.
+ /// Server lock must be held on entry.
+ /// Mirrors Go Server.getClientConnectURLs().
+ ///
+ private List GetClientConnectURLs()
+ {
+ var opts = GetOpts();
+ var (urls, _) = GetConnectURLs(opts.ClientAdvertise, opts.Host, opts.Port);
+ return urls;
+ }
+
+ ///
+ /// Returns connect URLs for the given advertise/host/port.
+ /// Mirrors Go Server.getConnectURLs().
+ ///
+ private (List urls, Exception? err) GetConnectURLs(string advertise, string host, int port)
+ {
+ var urls = new List();
+
+ if (!string.IsNullOrEmpty(advertise))
+ {
+ var (h, p, err) = ServerUtilities.ParseHostPort(advertise, port);
+ if (err != null) return (urls, err);
+ urls.Add($"{h}:{p}");
+ }
+ else
+ {
+ var sPort = port.ToString();
+ var (isAny, ips, _) = GetNonLocalIPsIfHostIsIPAny(host, all: true);
+ foreach (var ip in ips)
+ urls.Add($"{ip}:{sPort}");
+
+ if (!isAny || urls.Count == 0)
+ {
+ if (host != "0.0.0.0" && host != "::")
+ urls.Add($"{host}:{sPort}");
+ else
+ Errorf("Address {0} can not be resolved properly", host);
+ }
+ }
+
+ return (urls, null);
+ }
+
+ ///
+ /// Returns the non-local IPs if is 0.0.0.0 or ::.
+ /// Returns (isAny, ips, err).
+ /// Mirrors Go Server.getNonLocalIPsIfHostIsIPAny().
+ ///
+ private (bool isAny, List ips, Exception? err) GetNonLocalIPsIfHostIsIPAny(string host, bool all)
+ {
+ if (!IPAddress.TryParse(host, out var ip))
+ return (false, [], null);
+
+ if (!ip.Equals(IPAddress.Any) && !ip.Equals(IPAddress.IPv6Any))
+ return (false, [], null);
+
+ Debugf("Get non local IPs for \"{0}\"", host);
+
+ var ips = new List();
+ foreach (var iface in System.Net.NetworkInformation.NetworkInterface.GetAllNetworkInterfaces())
+ {
+ foreach (var addr in iface.GetIPProperties().UnicastAddresses)
+ {
+ var a = addr.Address;
+ if (!a.IsIPv6LinkLocal && !a.Equals(IPAddress.Loopback) &&
+ !a.Equals(IPAddress.IPv6Loopback) &&
+ a.AddressFamily is System.Net.Sockets.AddressFamily.InterNetwork
+ or System.Net.Sockets.AddressFamily.InterNetworkV6)
+ {
+ Debugf(" ip={0}", a);
+ ips.Add(a.ToString());
+ if (!all) return (true, ips, null);
+ }
+ }
+ }
+
+ return (true, ips, null);
+ }
+
+ // =========================================================================
+ // Port helpers (features 3077–3080)
+ // =========================================================================
+
+ ///
+ /// Resolves host-port strings from a TcpListener, expanding unspecified addresses.
+ /// Mirrors Go resolveHostPorts().
+ ///
+ private static List ResolveHostPorts(TcpListener l)
+ {
+ var result = new List();
+ if (l.LocalEndpoint is not IPEndPoint ep) return result;
+
+ var port = ep.Port.ToString();
+
+ if (ep.Address.Equals(IPAddress.Any) || ep.Address.Equals(IPAddress.IPv6Any))
+ {
+ foreach (var iface in System.Net.NetworkInformation.NetworkInterface.GetAllNetworkInterfaces())
+ foreach (var addr in iface.GetIPProperties().UnicastAddresses)
+ result.Add($"{addr.Address}:{port}");
+ }
+ else
+ {
+ result.Add($"{ep.Address}:{port}");
+ }
+
+ return result;
+ }
+
+ ///
+ /// Formats a listener's addresses as protocol URLs.
+ /// Mirrors Go formatURL().
+ ///
+ private static string[] FormatUrl(string protocol, TcpListener l)
+ {
+ var hostPorts = ResolveHostPorts(l);
+ for (int i = 0; i < hostPorts.Count; i++)
+ hostPorts[i] = $"{protocol}://{hostPorts[i]}";
+ return hostPorts.ToArray();
+ }
+
+ ///
+ /// Returns port info once all expected listeners are up, or null if timeout.
+ /// Mirrors Go Server.PortsInfo().
+ ///
+ public Ports? PortsInfo(TimeSpan maxWait)
+ {
+ if (!ReadyForListeners(maxWait)) return null;
+
+ var opts = GetOpts();
+
+ _mu.EnterReadLock();
+ bool tlsRequired = _info.TlsRequired;
+ var listener = _listener;
+ var httpListener = _http;
+ var clusterListener = _routeListener;
+ var profileListener = _profiler;
+ _mu.ExitReadLock();
+
+ var ports = new Ports();
+
+ if (listener != null)
+ {
+ var natsProto = tlsRequired ? "tls" : "nats";
+ ports.Nats = FormatUrl(natsProto, listener);
+ }
+
+ if (httpListener != null)
+ {
+ var monProto = opts.HttpsPort != 0 ? "https" : "http";
+ ports.Monitoring = FormatUrl(monProto, httpListener);
+ }
+
+ if (clusterListener != null)
+ {
+ var clusterProto = opts.Cluster.TlsConfig != null ? "tls" : "nats";
+ ports.Cluster = FormatUrl(clusterProto, clusterListener);
+ }
+
+ if (profileListener != null)
+ ports.Profile = FormatUrl("http", profileListener);
+
+ return ports;
+ }
+
+ ///
+ /// Returns the path of the ports file.
+ /// Mirrors Go Server.portFile().
+ ///
+ private string PortFile(string dirHint)
+ {
+ var dirname = GetOpts().PortsFileDir;
+ if (!string.IsNullOrEmpty(dirHint)) dirname = dirHint;
+ if (string.IsNullOrEmpty(dirname)) return string.Empty;
+
+ var exe = System.Diagnostics.Process.GetCurrentProcess().ProcessName;
+ var pid = System.Diagnostics.Process.GetCurrentProcess().Id;
+ return System.IO.Path.Combine(dirname, $"{exe}_{pid}.ports");
+ }
+
+ ///
+ /// Deletes the ports file.
+ /// Mirrors Go Server.deletePortsFile().
+ ///
+ private void DeletePortsFile(string hintDir)
+ {
+ var file = PortFile(hintDir);
+ if (string.IsNullOrEmpty(file)) return;
+ try { System.IO.File.Delete(file); }
+ catch (Exception ex) { Errorf("Error cleaning up ports file {0}: {1}", file, ex.Message); }
+ }
+
+ ///
+ /// Writes a Ports JSON file if a ports-file directory is configured.
+ /// Mirrors Go Server.logPorts().
+ ///
+ private void LogPorts()
+ {
+ var opts = GetOpts();
+ var portsFile = PortFile(opts.PortsFileDir);
+ if (string.IsNullOrEmpty(portsFile)) return;
+
+ _ = Task.Run(() =>
+ {
+ var info = PortsInfo(TimeSpan.FromSeconds(5));
+ if (info == null)
+ {
+ Errorf("Unable to resolve the ports in the specified time");
+ return;
+ }
+
+ try
+ {
+ var data = JsonSerializer.SerializeToUtf8Bytes(info);
+ System.IO.File.WriteAllBytes(portsFile, data);
+ }
+ catch (Exception ex)
+ {
+ Errorf("Error writing ports file ({0}): {1}", portsFile, ex.Message);
+ }
+ });
+ }
+
+ // =========================================================================
+ // readyForListeners / serviceListeners (features 3127–3128)
+ // =========================================================================
+
+ ///
+ /// Waits until all expected listeners are resolved or expires.
+ /// Mirrors Go Server.readyForListeners().
+ ///
+ private bool ReadyForListeners(TimeSpan dur)
+ {
+ var end = DateTime.UtcNow.Add(dur);
+ while (DateTime.UtcNow < end)
+ {
+ _mu.EnterReadLock();
+ var listeners = ServiceListeners();
+ _mu.ExitReadLock();
+
+ if (listeners.Length == 0) return false;
+
+ bool ok = true;
+ foreach (var l in listeners)
+ {
+ if (l == null) { ok = false; break; }
+ }
+
+ if (ok) return true;
+
+ try
+ {
+ Task.Delay(TimeSpan.FromMilliseconds(25), _quitCts.Token)
+ .GetAwaiter().GetResult();
+ }
+ catch (OperationCanceledException)
+ {
+ return false;
+ }
+ }
+
+ return false;
+ }
+
+ ///
+ /// Returns the list of expected listeners (may include nulls for not-yet-bound listeners).
+ /// Server lock must be held on entry.
+ /// Mirrors Go Server.serviceListeners().
+ ///
+ private TcpListener?[] ServiceListeners()
+ {
+ var opts = GetOpts();
+ var list = new List { _listener };
+
+ if (opts.Cluster.Port != 0) list.Add(_routeListener);
+ if (opts.HttpPort != 0 || opts.HttpsPort != 0) list.Add(_http);
+ if (opts.ProfPort != 0) list.Add(_profiler);
+ // WebSocket listener — session 23.
+
+ return list.ToArray();
+ }
+
+ // =========================================================================
+ // acceptError (feature 3129)
+ // =========================================================================
+
+ ///
+ /// Handles a transient accept error by sleeping and doubling the delay.
+ /// Returns a negative TimeSpan if the server has stopped.
+ /// Mirrors Go Server.acceptError().
+ ///
+ private TimeSpan AcceptError(string acceptName, Exception err, TimeSpan tmpDelay)
+ {
+ if (!IsRunning()) return TimeSpan.FromSeconds(-1);
+
+ if (err is SocketException se && se.SocketErrorCode == SocketError.WouldBlock)
+ {
+ Errorf("Temporary {0} Accept Error({1}), sleeping {2}ms",
+ acceptName, err.Message, (int)tmpDelay.TotalMilliseconds);
+
+ try
+ {
+ Task.Delay(tmpDelay, _quitCts.Token).GetAwaiter().GetResult();
+ }
+ catch (OperationCanceledException)
+ {
+ return TimeSpan.FromSeconds(-1);
+ }
+
+ tmpDelay *= 2;
+ if (tmpDelay > AcceptMaxSleep) tmpDelay = AcceptMaxSleep;
+ }
+ else
+ {
+ Errorf("{0} Accept error: {1}", acceptName, err.Message);
+ }
+
+ return tmpDelay;
+ }
+
+ // =========================================================================
+ // GetRandomIP (feature 3130)
+ // =========================================================================
+
+ private static readonly Exception ErrNoIPAvail = new InvalidOperationException("no IP available");
+
+ ///
+ /// Resolves a hostname to a random IP address, excluding addresses in .
+ /// Mirrors Go Server.getRandomIP().
+ ///
+ internal async Task<(string address, Exception? err)> GetRandomIP(
+ INetResolver resolver,
+ string url,
+ HashSet? excluded = null)
+ {
+ var colonIdx = url.LastIndexOf(':');
+ if (colonIdx < 0) return (url, new FormatException($"invalid host:port: {url}"));
+
+ var host = url[..colonIdx];
+ var port = url[(colonIdx + 1)..];
+
+ // If already an IP address, return as-is.
+ if (IPAddress.TryParse(host, out _)) return (url, null);
+
+ string[] ips;
+ try
+ {
+ ips = await resolver.LookupHostAsync(host, _quitCts.Token);
+ }
+ catch (Exception ex)
+ {
+ return (string.Empty, new InvalidOperationException($"lookup for host \"{host}\": {ex.Message}", ex));
+ }
+
+ if (excluded != null && excluded.Count > 0)
+ {
+ var filtered = new List(ips.Length);
+ foreach (var ip in ips)
+ {
+ var addr = $"{ip}:{port}";
+ if (!excluded.Contains(addr))
+ filtered.Add(ip);
+ }
+
+ ips = filtered.ToArray();
+ if (ips.Length == 0) return (string.Empty, ErrNoIPAvail);
+ }
+
+ if (ips.Length == 0)
+ {
+ Warnf("Unable to get IP for {0}, will try with {1}", host, url);
+ return (url, null);
+ }
+
+ var chosen = ips.Length == 1 ? ips[0] : ips[Random.Shared.Next(ips.Length)];
+ return ($"{chosen}:{port}", null);
+ }
+}
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs
index b3995ef..bcef42a 100644
--- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs
@@ -76,7 +76,7 @@ public sealed partial class NatsServer : INatsServer
private readonly ReaderWriterLockSlim _mu = new(LockRecursionPolicy.SupportsRecursion);
private readonly ReaderWriterLockSlim _reloadMu = new(LockRecursionPolicy.SupportsRecursion);
- private ServerInfo _info = new();
+ internal ServerInfo _info = new();
private string _configFile = string.Empty;
// =========================================================================
@@ -104,6 +104,34 @@ public sealed partial class NatsServer : INatsServer
private System.Net.Sockets.TcpListener? _listener;
private Exception? _listenerErr;
+ // HTTP monitoring listener
+ private System.Net.Sockets.TcpListener? _http;
+
+ // Route listener
+ private System.Net.Sockets.TcpListener? _routeListener;
+ private Exception? _routeListenerErr;
+
+ // Gateway listener
+ private System.Net.Sockets.TcpListener? _gatewayListener;
+ private Exception? _gatewayListenerErr;
+
+ // Leaf-node listener
+ private System.Net.Sockets.TcpListener? _leafNodeListener;
+ private Exception? _leafNodeListenerErr;
+
+ // Profiling listener
+ private System.Net.Sockets.TcpListener? _profiler;
+
+ // Accept-loop done channel — each accept loop sends true when it exits.
+ private readonly System.Threading.Channels.Channel _done =
+ System.Threading.Channels.Channel.CreateUnbounded();
+
+ // Lame-duck channel — created in lameDuckMode, receives one signal per accept loop.
+ private System.Threading.Channels.Channel? _ldmCh;
+
+ // The no-auth user that only the system account can use (auth session).
+ private string _sysAccOnlyNoAuthUser = string.Empty;
+
// =========================================================================
// Accounts
// =========================================================================
@@ -153,7 +181,7 @@ public sealed partial class NatsServer : INatsServer
private readonly object _grMu = new();
private bool _grRunning;
private readonly Dictionary _grTmpClients = [];
- private readonly SemaphoreSlim _grWg = new(1, 1); // simplified wg
+ private readonly WaitGroup _grWg = new();
// =========================================================================
// Cluster name (separate lock)
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs
index 55b8c0f..5f73d89 100644
--- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs
@@ -15,6 +15,7 @@
using System.Text.Json.Serialization;
using ZB.MOM.NatsNet.Server.Auth;
+using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server;
@@ -84,6 +85,9 @@ public sealed class ServerInfo
// LeafNode-specific
[JsonPropertyName("leafnode_urls")] public string[]? LeafNodeUrls { get; set; }
+
+ /// Returns a shallow clone of this .
+ internal ServerInfo ShallowClone() => (ServerInfo)MemberwiseClone();
}
// ============================================================================
@@ -200,9 +204,6 @@ public static class CompressionMode
// These stubs will be replaced with full implementations in later sessions.
// They are declared here to allow the NatsServer class to compile.
-/// Stub for reference-counted URL set (session 09/12).
-internal sealed class RefCountedUrlSet : Dictionary { }
-
/// Stub for the system/internal messaging state (session 12).
internal sealed class InternalState
{
@@ -254,8 +255,14 @@ internal interface IOcspResponseCache { }
/// Stub for leaf node config (session 15).
internal sealed class LeafNodeCfg { }
-/// Stub for network resolver (session 09).
-internal interface INetResolver { }
+///
+/// Network resolver used by .
+/// Mirrors Go netResolver interface in server.go.
+///
+internal interface INetResolver
+{
+ Task LookupHostAsync(string host, CancellationToken ct = default);
+}
/// Factory for rate counters.
internal static class RateCounterFactory
@@ -267,6 +274,108 @@ internal static class RateCounterFactory
/// Stub for RaftNode (session 20).
public interface IRaftNode { }
+// ============================================================================
+// Session 10: Ports, TlsMixConn, CaptureHttpServerLog
+// ============================================================================
+
+///
+/// Describes the URLs at which this server can be contacted.
+/// Mirrors Go Ports struct in server.go.
+///
+public sealed class Ports
+{
+ public string[]? Nats { get; set; }
+ public string[]? Monitoring { get; set; }
+ public string[]? Cluster { get; set; }
+ public string[]? Profile { get; set; }
+ public string[]? WebSocket { get; set; }
+}
+
+///
+/// Wraps a with an optional "pre-read" buffer that is
+/// drained first, then falls through to the underlying stream.
+/// Used when we peek at the first bytes of a connection to detect TLS.
+/// Mirrors Go tlsMixConn.
+///
+internal sealed class TlsMixConn : Stream
+{
+ private readonly Stream _inner;
+ private System.IO.MemoryStream? _pre;
+
+ public TlsMixConn(Stream inner, byte[] preRead)
+ {
+ _inner = inner;
+ if (preRead.Length > 0)
+ _pre = new System.IO.MemoryStream(preRead, writable: false);
+ }
+
+ public override bool CanRead => true;
+ public override bool CanSeek => false;
+ public override bool CanWrite => _inner.CanWrite;
+ public override long Length => throw new NotSupportedException();
+ public override long Position
+ {
+ get => throw new NotSupportedException();
+ set => throw new NotSupportedException();
+ }
+
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ if (_pre is { } pre)
+ {
+ var n = pre.Read(buffer, offset, count);
+ if (pre.Position >= pre.Length)
+ _pre = null;
+ return n;
+ }
+ return _inner.Read(buffer, offset, count);
+ }
+
+ public override void Write(byte[] buffer, int offset, int count) =>
+ _inner.Write(buffer, offset, count);
+
+ public override void Flush() => _inner.Flush();
+
+ public override long Seek(long offset, SeekOrigin origin) =>
+ throw new NotSupportedException();
+
+ public override void SetLength(long value) =>
+ throw new NotSupportedException();
+
+ protected override void Dispose(bool disposing)
+ {
+ if (disposing) { _pre?.Dispose(); _inner.Dispose(); }
+ base.Dispose(disposing);
+ }
+}
+
+///
+/// Captures HTTP server log lines and routes them through the server's
+/// error logger.
+/// Mirrors Go captureHTTPServerLog in server.go.
+///
+internal sealed class CaptureHttpServerLog : System.IO.TextWriter
+{
+ private readonly NatsServer _server;
+ private readonly string _prefix;
+
+ public CaptureHttpServerLog(NatsServer server, string prefix)
+ {
+ _server = server;
+ _prefix = prefix;
+ }
+
+ public override System.Text.Encoding Encoding => System.Text.Encoding.UTF8;
+
+ public override void Write(string? value)
+ {
+ if (value is null) return;
+ // Strip leading "http:" prefix that .NET's HttpListener sometimes emits.
+ var msg = value.StartsWith("http:", StringComparison.Ordinal) ? value[6..] : value;
+ _server.Errorf("{0}{1}", _prefix, msg.TrimEnd('\r', '\n'));
+ }
+}
+
///
/// Stub for JWT account claims (session 06/11).
/// Mirrors Go jwt.AccountClaims from nats.io/jwt/v2.
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Properties/AssemblyInfo.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Properties/AssemblyInfo.cs
index f2b3639..69622f0 100644
--- a/dotnet/src/ZB.MOM.NatsNet.Server/Properties/AssemblyInfo.cs
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/Properties/AssemblyInfo.cs
@@ -2,3 +2,4 @@ using System.Runtime.CompilerServices;
[assembly: InternalsVisibleTo("ZB.MOM.NatsNet.Server.Tests")]
[assembly: InternalsVisibleTo("ZB.MOM.NatsNet.Server.IntegrationTests")]
+[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")] // required for NSubstitute to proxy internal interfaces
diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ServerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ServerTests.cs
index ddabf16..5f36584 100644
--- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ServerTests.cs
+++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ServerTests.cs
@@ -12,11 +12,16 @@
// limitations under the License.
//
// Adapted from server/server_test.go in the NATS server Go source.
-// Session 09: standalone unit tests for NatsServer helpers.
+// Session 09–10: standalone unit tests for NatsServer helpers.
+using System.Text;
+using System.Text.Json;
using System.Text.RegularExpressions;
+using NSubstitute;
+using NSubstitute.ExceptionExtensions;
using Shouldly;
using Xunit;
+using ZB.MOM.NatsNet.Server.Auth;
namespace ZB.MOM.NatsNet.Server.Tests;
@@ -249,3 +254,215 @@ public sealed class ServerTests
public void NeedsCompression_S2Fast_ReturnsTrue()
=> NatsServer.NeedsCompression(CompressionMode.S2Fast).ShouldBeTrue();
}
+
+// =============================================================================
+// Session 10: Listeners, INFO JSON, TLS helpers, GetRandomIP
+// =============================================================================
+
+///
+/// Tests for session 10 features: GenerateInfoJson, TlsVersion helpers,
+/// CopyInfo, and GetRandomIP.
+///
+public sealed class ServerListenersTests
+{
+ // =========================================================================
+ // GenerateInfoJson (feature 3069) — Test ID 2906
+ // Mirrors Go TestServerJsonMarshalNestedStructsPanic (guards against
+ // marshaller panics with nested/nullable structs).
+ // =========================================================================
+
+ [Fact]
+ public void GenerateInfoJson_MinimalInfo_ProducesInfoFrame()
+ {
+ var info = new ServerInfo { Id = "TEST", Version = "1.0.0", Host = "0.0.0.0", Port = 4222 };
+ var bytes = NatsServer.GenerateInfoJson(info);
+ var text = Encoding.UTF8.GetString(bytes);
+ text.ShouldStartWith("INFO {");
+ text.ShouldEndWith("}\r\n");
+ }
+
+ [Fact]
+ public void GenerateInfoJson_WithConnectUrls_IncludesUrls()
+ {
+ var info = new ServerInfo
+ {
+ Id = "TEST",
+ Version = "1.0.0",
+ ClientConnectUrls = ["nats://127.0.0.1:4222", "nats://127.0.0.1:4223"],
+ };
+ var text = Encoding.UTF8.GetString(NatsServer.GenerateInfoJson(info));
+ text.ShouldContain("connect_urls");
+ text.ShouldContain("4222");
+ }
+
+ [Fact]
+ public void GenerateInfoJson_WithSubjectPermissions_DoesNotThrow()
+ {
+ // Mirrors Go TestServerJsonMarshalNestedStructsPanic — guards against
+ // JSON marshaller failures with nested nullable structs.
+ var info = new ServerInfo
+ {
+ Id = "TEST",
+ Version = "1.0.0",
+ Import = new SubjectPermission { Allow = ["foo.>"], Deny = ["bar"] },
+ Export = new SubjectPermission { Allow = ["pub.>"] },
+ };
+ var bytes = NatsServer.GenerateInfoJson(info);
+ bytes.ShouldNotBeEmpty();
+ // Strip the "INFO " prefix (5 bytes) and the trailing "\r\n" (2 bytes) to get pure JSON.
+ var json = Encoding.UTF8.GetString(bytes, 5, bytes.Length - 7);
+ var doc = JsonDocument.Parse(json);
+ doc.RootElement.GetProperty("import").ValueKind.ShouldBe(JsonValueKind.Object);
+ }
+
+ // =========================================================================
+ // TlsVersion helpers (features 3079–3080)
+ // =========================================================================
+
+ [Theory]
+ [InlineData(0x0301u, "1.0")]
+ [InlineData(0x0302u, "1.1")]
+ [InlineData(0x0303u, "1.2")]
+ [InlineData(0x0304u, "1.3")]
+ public void TlsVersion_KnownCodes_ReturnsVersionString(uint ver, string expected)
+ => NatsServer.TlsVersion((ushort)ver).ShouldBe(expected);
+
+ [Fact]
+ public void TlsVersion_UnknownCode_ReturnsUnknownLabel()
+ => NatsServer.TlsVersion(0xFFFF).ShouldStartWith("Unknown");
+
+ [Theory]
+ [InlineData("1.0", (ushort)0x0301)]
+ [InlineData("1.1", (ushort)0x0302)]
+ [InlineData("1.2", (ushort)0x0303)]
+ [InlineData("1.3", (ushort)0x0304)]
+ public void TlsVersionFromString_KnownStrings_ReturnsCode(string input, ushort expected)
+ {
+ var (ver, err) = NatsServer.TlsVersionFromString(input);
+ err.ShouldBeNull();
+ ver.ShouldBe(expected);
+ }
+
+ [Fact]
+ public void TlsVersionFromString_UnknownString_ReturnsError()
+ {
+ var (_, err) = NatsServer.TlsVersionFromString("9.9");
+ err.ShouldNotBeNull();
+ }
+
+ // =========================================================================
+ // CopyInfo (feature 3069)
+ // =========================================================================
+
+ [Fact]
+ public void CopyInfo_DeepCopiesSlices()
+ {
+ var (s, err) = NatsServer.NewServer(new ServerOptions());
+ err.ShouldBeNull();
+ s.ShouldNotBeNull();
+ s!._info.ClientConnectUrls = ["nats://127.0.0.1:4222"];
+
+ var copy = s.CopyInfo();
+ copy.ClientConnectUrls.ShouldNotBeNull();
+ copy.ClientConnectUrls!.ShouldBe(["nats://127.0.0.1:4222"]);
+ // Mutating original slice shouldn't affect the copy.
+ s._info.ClientConnectUrls = ["nats://10.0.0.1:4222"];
+ copy.ClientConnectUrls[0].ShouldBe("nats://127.0.0.1:4222");
+ }
+
+ // =========================================================================
+ // GetRandomIP (feature 3141) — Test ID 2895
+ // Mirrors Go TestGetRandomIP.
+ // =========================================================================
+
+ private static NatsServer MakeServer()
+ {
+ var (s, err) = NatsServer.NewServer(new ServerOptions());
+ err.ShouldBeNull();
+ return s!;
+ }
+
+ [Fact]
+ public async Task GetRandomIP_NoPort_ReturnsFormatError()
+ {
+ var s = MakeServer();
+ var resolver = Substitute.For();
+ var (_, err) = await s.GetRandomIP(resolver, "noport");
+ err.ShouldNotBeNull();
+ err!.Message.ShouldContain("port");
+ }
+
+ [Fact]
+ public async Task GetRandomIP_ResolverThrows_PropagatesError()
+ {
+ var s = MakeServer();
+ var resolver = Substitute.For();
+ resolver.LookupHostAsync(Arg.Any(), Arg.Any())
+ .ThrowsAsync(new InvalidOperationException("on purpose"));
+ var (_, err) = await s.GetRandomIP(resolver, "localhost:4222");
+ err.ShouldNotBeNull();
+ err!.Message.ShouldContain("on purpose");
+ }
+
+ [Fact]
+ public async Task GetRandomIP_EmptyIps_ReturnsFallbackUrl()
+ {
+ var s = MakeServer();
+ var resolver = Substitute.For();
+ resolver.LookupHostAsync("localhost", Arg.Any())
+ .Returns(Task.FromResult(Array.Empty()));
+ var (addr, err) = await s.GetRandomIP(resolver, "localhost:4222");
+ err.ShouldBeNull();
+ addr.ShouldBe("localhost:4222");
+ }
+
+ [Fact]
+ public async Task GetRandomIP_SingleIp_ReturnsMappedAddress()
+ {
+ var s = MakeServer();
+ var resolver = Substitute.For();
+ resolver.LookupHostAsync("localhost", Arg.Any())
+ .Returns(Task.FromResult(new[] { "1.2.3.4" }));
+ var (addr, err) = await s.GetRandomIP(resolver, "localhost:4222");
+ err.ShouldBeNull();
+ addr.ShouldBe("1.2.3.4:4222");
+ }
+
+ [Fact]
+ public async Task GetRandomIP_MultipleIps_AllSelectedWithinRange()
+ {
+ var s = MakeServer();
+ var resolver = Substitute.For();
+ resolver.LookupHostAsync("localhost", Arg.Any())
+ .Returns(Task.FromResult(new[] { "1.2.3.4", "2.2.3.4", "3.2.3.4" }));
+
+ var dist = new int[3];
+ for (int i = 0; i < 100; i++)
+ {
+ var (addr, err) = await s.GetRandomIP(resolver, "localhost:4222");
+ err.ShouldBeNull();
+ dist[int.Parse(addr[..1]) - 1]++;
+ }
+
+ // Each IP should appear at least once and no single IP should dominate.
+ foreach (var d in dist)
+ d.ShouldBeGreaterThan(0);
+ }
+
+ [Fact]
+ public async Task GetRandomIP_ExcludedIp_NeverReturned()
+ {
+ var s = MakeServer();
+ var resolver = Substitute.For();
+ resolver.LookupHostAsync("localhost", Arg.Any())
+ .Returns(Task.FromResult(new[] { "1.2.3.4", "2.2.3.4", "3.2.3.4" }));
+
+ var excluded = new HashSet { "1.2.3.4:4222" };
+ for (int i = 0; i < 100; i++)
+ {
+ var (addr, err) = await s.GetRandomIP(resolver, "localhost:4222", excluded);
+ err.ShouldBeNull();
+ addr.ShouldNotBe("1.2.3.4:4222");
+ }
+ }
+}
diff --git a/porting.db b/porting.db
index aac4c6e..c19893c 100644
Binary files a/porting.db and b/porting.db differ
diff --git a/reports/current.md b/reports/current.md
index 1894b68..96d6544 100644
--- a/reports/current.md
+++ b/reports/current.md
@@ -1,6 +1,6 @@
# NATS .NET Porting Status Report
-Generated: 2026-02-26 19:18:19 UTC
+Generated: 2026-02-26 20:08:24 UTC
## Modules (12 total)
@@ -13,18 +13,18 @@ Generated: 2026-02-26 19:18:19 UTC
| Status | Count |
|--------|-------|
-| complete | 744 |
+| complete | 841 |
| n_a | 82 |
-| not_started | 2754 |
+| not_started | 2657 |
| stub | 93 |
## Unit Tests (3257 total)
| Status | Count |
|--------|-------|
-| complete | 276 |
+| complete | 278 |
| n_a | 181 |
-| not_started | 2576 |
+| not_started | 2574 |
| stub | 224 |
## Library Mappings (36 total)
@@ -36,4 +36,4 @@ Generated: 2026-02-26 19:18:19 UTC
## Overall Progress
-**1294/6942 items complete (18.6%)**
+**1393/6942 items complete (20.1%)**
diff --git a/reports/report_0df93c2.md b/reports/report_0df93c2.md
new file mode 100644
index 0000000..96d6544
--- /dev/null
+++ b/reports/report_0df93c2.md
@@ -0,0 +1,39 @@
+# NATS .NET Porting Status Report
+
+Generated: 2026-02-26 20:08:24 UTC
+
+## Modules (12 total)
+
+| Status | Count |
+|--------|-------|
+| complete | 11 |
+| not_started | 1 |
+
+## Features (3673 total)
+
+| Status | Count |
+|--------|-------|
+| complete | 841 |
+| n_a | 82 |
+| not_started | 2657 |
+| stub | 93 |
+
+## Unit Tests (3257 total)
+
+| Status | Count |
+|--------|-------|
+| complete | 278 |
+| n_a | 181 |
+| not_started | 2574 |
+| stub | 224 |
+
+## Library Mappings (36 total)
+
+| Status | Count |
+|--------|-------|
+| mapped | 36 |
+
+
+## Overall Progress
+
+**1393/6942 items complete (20.1%)**