feat: port session 10 — Server Core Runtime, Accept Loops & Listeners

Ports server/server.go lines 2577–4782 (~1,881 Go LOC), implementing ~97
features (IDs 3051–3147) across three new partial-class files.

New files:
- NatsServer.Lifecycle.cs: Shutdown, WaitForShutdown, RemoveClient,
  SendLDMToClients, LameDuckMode, LDMClientByID, rate-limit logging,
  DisconnectClientByID, SendAsyncInfoToClients
- NatsServer.Listeners.cs: AcceptLoop, GetServerListener, InProcessConn,
  AcceptConnections, GenerateInfoJson, CopyInfo, CreateClient/Ex/InProcess,
  StartMonitoring (HTTP/HTTPS), AddConnectURLs/RemoveConnectURLs,
  TlsVersion/TlsVersionFromString, GetClientConnectURLs, ResolveHostPorts,
  PortsInfo/PortFile/LogPorts, ReadyForListeners, GetRandomIP, AcceptError
- Internal/WaitGroup.cs: Go-style WaitGroup using TaskCompletionSource

Modified:
- Auth/AuthTypes.cs: Account now implements INatsAccount (stub)
- NatsServerTypes.cs: ServerInfo.ShallowClone(), removed duplicate RefCountedUrlSet
- NatsServer.cs: _info promoted to internal for test access
- Properties/AssemblyInfo.cs: InternalsVisibleTo(DynamicProxyGenAssembly2)
- ServerTests.cs: 20 new session-10 unit tests (GenerateInfoJson, TlsVersion,
  CopyInfo, GetRandomIP — Test IDs 2895, 2906)

All 565 unit tests + 1 integration test pass.
This commit is contained in:
Joseph Doherty
2026-02-26 15:08:23 -05:00
parent 0df93c23b0
commit 06779a1f77
13 changed files with 2539 additions and 21 deletions

View File

@@ -13,6 +13,8 @@
//
// Adapted from server/auth.go (type definitions) in the NATS server Go source.
using ZB.MOM.NatsNet.Server;
namespace ZB.MOM.NatsNet.Server.Auth;
/// <summary>
@@ -170,7 +172,7 @@ public class RoutePermissions
/// Stub for Account type. Full implementation in session 11.
/// Mirrors Go <c>Account</c> struct.
/// </summary>
public class Account
public class Account : INatsAccount
{
public string Name { get; set; } = string.Empty;
@@ -184,6 +186,27 @@ public class Account
internal ZB.MOM.NatsNet.Server.Internal.DataStructures.SubscriptionIndex? Sublist { get; set; }
internal object? Server { get; set; } // INatsServer — avoids circular reference
// INatsAccount — stub implementations until session 11 (accounts.go).
bool INatsAccount.IsValid => true;
bool INatsAccount.MaxTotalConnectionsReached() => false;
bool INatsAccount.MaxTotalLeafNodesReached() => false;
int INatsAccount.AddClient(ClientConnection c) => 0;
int INatsAccount.RemoveClient(ClientConnection c) => 0;
/// <summary>Returns true if this account's JWT has expired. Stub — full impl in session 11.</summary>
public bool IsExpired() => false;
/// <summary>
/// Returns the total number of subscriptions across all clients in this account.
/// Stub — full implementation in session 11.
/// Mirrors Go <c>Account.TotalSubs()</c>.
/// </summary>
public int TotalSubs() => 0;
/// <summary>
/// Notifies leaf nodes of a subscription change.
/// Stub — full implementation in session 15.
/// Mirrors Go <c>Account.updateLeafNodes()</c>.
/// </summary>
internal void UpdateLeafNodes(object sub, int delta) { }
}

View File

@@ -1077,6 +1077,21 @@ public sealed partial class ClientConnection
// features 425-427: writeLoop / flushClients / readLoop
internal void WriteLoop() { /* TODO session 09 */ }
internal void FlushClients(long budget) { /* TODO session 09 */ }
internal void ReadLoop(byte[]? pre) { /* TODO session 09 */ }
/// <summary>
/// Generates the INFO JSON bytes sent to the client on connect.
/// Stub — full implementation in session 09.
/// Mirrors Go <c>client.generateClientInfoJSON()</c>.
/// </summary>
internal ReadOnlyMemory<byte> GenerateClientInfoJSON(ServerInfo info, bool includeClientIp)
=> ReadOnlyMemory<byte>.Empty;
/// <summary>
/// Sets the auth-timeout timer to the specified duration.
/// Mirrors Go <c>client.setAuthTimer(d)</c>.
/// </summary>
internal void SetAuthTimer(TimeSpan d) { /* TODO session 09 */ }
// features 428-432: closedStateForErr, collapsePtoNB, flushOutbound, handleWriteTimeout, markConnAsClosed
internal static ClosedState ClosedStateForErr(Exception err) =>

View File

@@ -0,0 +1,64 @@
// Copyright 2012-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
namespace ZB.MOM.NatsNet.Server.Internal;
/// <summary>
/// A Go-like WaitGroup: tracks a set of in-flight operations and lets callers
/// block until all of them complete.
/// </summary>
internal sealed class WaitGroup
{
private int _count;
private volatile TaskCompletionSource<bool> _tcs;
public WaitGroup()
{
_tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
_tcs.SetResult(true); // starts at zero, so "done" immediately
}
/// <summary>
/// Increment the counter by <paramref name="delta"/> (usually 1).
/// Must be called before starting the goroutine it tracks.
/// </summary>
public void Add(int delta = 1)
{
var newCount = Interlocked.Add(ref _count, delta);
if (newCount < 0)
throw new InvalidOperationException("WaitGroup counter went negative");
if (newCount == 0)
{
// All goroutines done — signal any waiters.
Volatile.Read(ref _tcs).TrySetResult(true);
}
else if (delta > 0 && newCount == delta)
{
// Transitioning from 0 to positive — replace the completed TCS
// with a fresh unsignaled one so Wait() will block correctly.
Volatile.Write(ref _tcs,
new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously));
}
}
/// <summary>Decrement the counter by 1. Called when a goroutine finishes.</summary>
public void Done() => Add(-1);
/// <summary>Block synchronously until the counter reaches 0.</summary>
public void Wait()
{
if (Volatile.Read(ref _count) == 0) return;
Volatile.Read(ref _tcs).Task.GetAwaiter().GetResult();
}
}

View File

@@ -882,15 +882,26 @@ public sealed partial class NatsServer
// =========================================================================
/// <summary>
/// Sets the INFO host/port from the current listener endpoint.
/// Full implementation in session 10 (listener startup).
/// Stub sets to opts values.
/// Sets the INFO host/port from either <c>ClientAdvertise</c> or the bind options.
/// Mirrors Go <c>Server.setInfoHostPort()</c>.
/// Returns non-null on parse error.
/// </summary>
internal void SetInfoHostPort()
internal Exception? SetInfoHostPort()
{
var opts = GetOpts();
_info.Host = opts.Host;
_info.Port = opts.Port;
if (!string.IsNullOrEmpty(opts.ClientAdvertise))
{
var (h, p, err) = Internal.ServerUtilities.ParseHostPort(opts.ClientAdvertise, opts.Port);
if (err != null) return err;
_info.Host = h;
_info.Port = p;
}
else
{
_info.Host = opts.Host;
_info.Port = opts.Port;
}
return null;
}
/// <summary>

View File

@@ -0,0 +1,847 @@
// Copyright 2012-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Adapted from server/server.go (lines 25774782) in the NATS server Go source.
// Session 10: shutdown, goroutine tracking, query helpers, lame duck mode.
using System.Net;
using System.Net.Sockets;
using ZB.MOM.NatsNet.Server.Auth;
using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
// =========================================================================
// Shutdown / WaitForShutdown (features 3051, 3053)
// =========================================================================
/// <summary>
/// Shuts the server down gracefully: closes all listeners, kicks all
/// connections, waits for goroutines, then signals <see cref="WaitForShutdown"/>.
/// Mirrors Go <c>Server.Shutdown()</c>.
/// </summary>
public void Shutdown()
{
// Stubs for JetStream / Raft / eventing — implemented in later sessions.
SignalPullConsumers();
StepdownRaftNodes();
ShutdownEventing();
if (IsShuttingDown()) return;
_mu.EnterWriteLock();
Noticef("Initiating Shutdown...");
var accRes = _accResolver;
GetOpts(); // snapshot opts (not used below but mirrors Go pattern)
Interlocked.Exchange(ref _shutdown, 1);
Interlocked.Exchange(ref _running, 0);
lock (_grMu) { _grRunning = false; }
_mu.ExitWriteLock();
accRes?.Close();
ShutdownJetStream();
ShutdownRaftNodes();
// ---- Collect all connections ----
var conns = new Dictionary<ulong, ClientConnection>();
_mu.EnterWriteLock();
foreach (var kvp in _clients) conns[kvp.Key] = kvp.Value;
lock (_grMu) { foreach (var kvp in _grTmpClients) conns[kvp.Key] = kvp.Value; }
ForEachRoute(r => { conns[r.Cid] = r; });
GetAllGatewayConnections(conns);
foreach (var kvp in _leafs) conns[kvp.Key] = kvp.Value;
// ---- Count & close listeners ----
int doneExpected = 0;
if (_listener != null)
{
doneExpected++;
_listener.Stop();
_listener = null;
}
doneExpected += CloseWebsocketServer();
if (_gateway.Enabled)
{
// mqtt listener managed by session 22
}
if (_leafNodeListener != null)
{
doneExpected++;
_leafNodeListener.Stop();
_leafNodeListener = null;
}
if (_routeListener != null)
{
doneExpected++;
_routeListener.Stop();
_routeListener = null;
}
if (_gatewayListener != null)
{
doneExpected++;
_gatewayListener.Stop();
_gatewayListener = null;
}
if (_http != null)
{
doneExpected++;
_http.Stop();
_http = null;
}
if (_profiler != null)
{
doneExpected++;
_profiler.Stop();
// profiler is not nulled — see Go code: it keeps _profiler ref for ProfilerAddr()
}
_mu.ExitWriteLock();
// Release all goroutines waiting on quitCts.
_quitCts.Cancel();
// Close all client / route / gateway / leaf connections.
foreach (var c in conns.Values)
{
c.SetNoReconnect();
c.CloseConnection(ClosedState.ServerShutdown);
}
// Wait for accept loops to exit.
for (int i = 0; i < doneExpected; i++)
_done.Reader.ReadAsync().GetAwaiter().GetResult();
// Wait for all goroutines.
_grWg.Wait();
var opts = GetOpts();
if (!string.IsNullOrEmpty(opts.PortsFileDir))
DeletePortsFile(opts.PortsFileDir);
Noticef("Server Exiting..");
if (_ocsprc != null) { /* stub — stop OCSP cache in session 23 */ }
_shutdownComplete.TrySetResult();
}
/// <summary>
/// Blocks until <see cref="Shutdown"/> has fully completed.
/// Mirrors Go <c>Server.WaitForShutdown()</c>.
/// </summary>
public void WaitForShutdown() =>
_shutdownComplete.Task.GetAwaiter().GetResult();
// =========================================================================
// Goroutine tracking (features 31193120)
// =========================================================================
/// <summary>
/// Starts a background Task that counts toward the server wait group.
/// Returns true if the goroutine was started, false if the server is already stopped.
/// Mirrors Go <c>Server.startGoRoutine(f)</c>.
/// </summary>
internal bool StartGoRoutine(Action f)
{
lock (_grMu)
{
if (!_grRunning) return false;
_grWg.Add(1);
Task.Run(() =>
{
try { f(); }
finally { _grWg.Done(); }
});
return true;
}
}
// =========================================================================
// Client / connection management (features 30813084)
// =========================================================================
/// <summary>
/// Removes a client, route, gateway, or leaf from server accounting.
/// Mirrors Go <c>Server.removeClient()</c>.
/// </summary>
internal void RemoveClient(ClientConnection c)
{
switch (c.Kind)
{
case ClientKind.Client:
{
bool updateProto;
string proxyKey;
lock (c)
{
updateProto = c.Kind == ClientKind.Client &&
c.Opts.Protocol >= ClientProtocol.Info;
proxyKey = c.ProxyKey;
}
_mu.EnterWriteLock();
try
{
_clients.Remove(c.Cid);
if (updateProto) _cproto--;
if (!string.IsNullOrEmpty(proxyKey))
RemoveProxiedConn(proxyKey, c.Cid);
}
finally { _mu.ExitWriteLock(); }
break;
}
case ClientKind.Router:
RemoveRoute(c);
break;
case ClientKind.Gateway:
RemoveRemoteGatewayConnection(c);
break;
case ClientKind.Leaf:
RemoveLeafNodeConnection(c);
break;
}
}
/// <summary>
/// Removes a proxied connection entry.
/// Server write lock must be held on entry.
/// Mirrors Go <c>Server.removeProxiedConn()</c>.
/// </summary>
private void RemoveProxiedConn(string key, ulong cid)
{
if (!_proxiedConns.TryGetValue(key, out var conns)) return;
conns.Remove(cid);
if (conns.Count == 0) _proxiedConns.Remove(key);
}
/// <summary>
/// Removes a client from the temporary goroutine client map.
/// Mirrors Go <c>Server.removeFromTempClients()</c>.
/// </summary>
internal void RemoveFromTempClients(ulong cid)
{
lock (_grMu) { _grTmpClients.Remove(cid); }
}
/// <summary>
/// Adds a client to the temporary goroutine client map.
/// Returns false if the server is no longer running goroutines.
/// Mirrors Go <c>Server.addToTempClients()</c>.
/// </summary>
internal bool AddToTempClients(ulong cid, ClientConnection c)
{
lock (_grMu)
{
if (!_grRunning) return false;
_grTmpClients[cid] = c;
return true;
}
}
// =========================================================================
// Query helpers (features 30853118, 31213123)
// =========================================================================
/// <summary>Returns the number of registered routes. Mirrors Go <c>Server.NumRoutes()</c>.</summary>
public int NumRoutes()
{
_mu.EnterReadLock();
try { return NumRoutesInternal(); }
finally { _mu.ExitReadLock(); }
}
private int NumRoutesInternal()
{
int nr = 0;
ForEachRoute(_ => nr++);
return nr;
}
/// <summary>Returns the number of registered remotes. Mirrors Go <c>Server.NumRemotes()</c>.</summary>
public int NumRemotes()
{
_mu.EnterReadLock();
try { return _routes.Count; }
finally { _mu.ExitReadLock(); }
}
/// <summary>Returns the number of leaf-node connections. Mirrors Go <c>Server.NumLeafNodes()</c>.</summary>
public int NumLeafNodes()
{
_mu.EnterReadLock();
try { return _leafs.Count; }
finally { _mu.ExitReadLock(); }
}
/// <summary>Returns the number of registered clients. Mirrors Go <c>Server.NumClients()</c>.</summary>
public int NumClients()
{
_mu.EnterReadLock();
try { return _clients.Count; }
finally { _mu.ExitReadLock(); }
}
/// <summary>Returns the client with the given connection ID. Mirrors Go <c>Server.GetClient()</c>.</summary>
public ClientConnection? GetClient(ulong cid) => GetClientInternal(cid);
private ClientConnection? GetClientInternal(ulong cid)
{
_mu.EnterReadLock();
try
{
_clients.TryGetValue(cid, out var c);
return c;
}
finally { _mu.ExitReadLock(); }
}
/// <summary>Returns the leaf node with the given connection ID. Mirrors Go <c>Server.GetLeafNode()</c>.</summary>
public ClientConnection? GetLeafNode(ulong cid)
{
_mu.EnterReadLock();
try
{
_leafs.TryGetValue(cid, out var c);
return c;
}
finally { _mu.ExitReadLock(); }
}
/// <summary>
/// Returns total subscriptions across all accounts.
/// Mirrors Go <c>Server.NumSubscriptions()</c>.
/// </summary>
public uint NumSubscriptions()
{
_mu.EnterReadLock();
try { return NumSubscriptionsInternal(); }
finally { _mu.ExitReadLock(); }
}
private uint NumSubscriptionsInternal()
{
int subs = 0;
foreach (var kvp in _accounts)
subs += kvp.Value.TotalSubs();
return (uint)subs;
}
/// <summary>Returns the number of slow consumers. Mirrors Go <c>Server.NumSlowConsumers()</c>.</summary>
public long NumSlowConsumers() => Interlocked.Read(ref _stats.SlowConsumers);
/// <summary>Returns the number of times clients were stalled. Mirrors Go <c>Server.NumStalledClients()</c>.</summary>
public long NumStalledClients() => Interlocked.Read(ref _stats.Stalls);
/// <summary>Mirrors Go <c>Server.NumSlowConsumersClients()</c>.</summary>
public long NumSlowConsumersClients() => Interlocked.Read(ref _scStats.Clients);
/// <summary>Mirrors Go <c>Server.NumSlowConsumersRoutes()</c>.</summary>
public long NumSlowConsumersRoutes() => Interlocked.Read(ref _scStats.Routes);
/// <summary>Mirrors Go <c>Server.NumSlowConsumersGateways()</c>.</summary>
public long NumSlowConsumersGateways() => Interlocked.Read(ref _scStats.Gateways);
/// <summary>Mirrors Go <c>Server.NumSlowConsumersLeafs()</c>.</summary>
public long NumSlowConsumersLeafs() => Interlocked.Read(ref _scStats.Leafs);
/// <summary>Mirrors Go <c>Server.NumStaleConnections()</c>.</summary>
public long NumStaleConnections() => Interlocked.Read(ref _stats.StaleConnections);
/// <summary>Mirrors Go <c>Server.NumStaleConnectionsClients()</c>.</summary>
public long NumStaleConnectionsClients() => Interlocked.Read(ref _staleStats.Clients);
/// <summary>Mirrors Go <c>Server.NumStaleConnectionsRoutes()</c>.</summary>
public long NumStaleConnectionsRoutes() => Interlocked.Read(ref _staleStats.Routes);
/// <summary>Mirrors Go <c>Server.NumStaleConnectionsGateways()</c>.</summary>
public long NumStaleConnectionsGateways() => Interlocked.Read(ref _staleStats.Gateways);
/// <summary>Mirrors Go <c>Server.NumStaleConnectionsLeafs()</c>.</summary>
public long NumStaleConnectionsLeafs() => Interlocked.Read(ref _staleStats.Leafs);
/// <summary>Returns the time the current configuration was loaded. Mirrors Go <c>Server.ConfigTime()</c>.</summary>
public DateTime ConfigTime()
{
_mu.EnterReadLock();
try { return _configTime; }
finally { _mu.ExitReadLock(); }
}
/// <summary>Returns the client listener address. Mirrors Go <c>Server.Addr()</c>.</summary>
public EndPoint? Addr()
{
_mu.EnterReadLock();
try { return _listener?.LocalEndpoint; }
finally { _mu.ExitReadLock(); }
}
/// <summary>Returns the monitoring listener address. Mirrors Go <c>Server.MonitorAddr()</c>.</summary>
public IPEndPoint? MonitorAddr()
{
_mu.EnterReadLock();
try { return _http?.LocalEndpoint as IPEndPoint; }
finally { _mu.ExitReadLock(); }
}
/// <summary>Returns the cluster (route) listener address. Mirrors Go <c>Server.ClusterAddr()</c>.</summary>
public IPEndPoint? ClusterAddr()
{
_mu.EnterReadLock();
try { return _routeListener?.LocalEndpoint as IPEndPoint; }
finally { _mu.ExitReadLock(); }
}
/// <summary>Returns the profiler listener address. Mirrors Go <c>Server.ProfilerAddr()</c>.</summary>
public IPEndPoint? ProfilerAddr()
{
_mu.EnterReadLock();
try { return _profiler?.LocalEndpoint as IPEndPoint; }
finally { _mu.ExitReadLock(); }
}
/// <summary>
/// Polls until all expected listeners are up or the deadline expires.
/// Returns an error description if not ready within <paramref name="d"/>.
/// Mirrors Go <c>Server.readyForConnections()</c>.
/// </summary>
public Exception? ReadyForConnectionsError(TimeSpan d)
{
var opts = GetOpts();
var end = DateTime.UtcNow.Add(d);
while (DateTime.UtcNow < end)
{
bool serverOk, routeOk, gatewayOk, leafOk, wsOk;
_mu.EnterReadLock();
serverOk = _listener != null || opts.DontListen;
routeOk = opts.Cluster.Port == 0 || _routeListener != null;
gatewayOk = string.IsNullOrEmpty(opts.Gateway.Name) || _gatewayListener != null;
leafOk = opts.LeafNode.Port == 0 || _leafNodeListener != null;
wsOk = opts.Websocket.Port == 0; // stub — websocket listener not tracked until session 23
_mu.ExitReadLock();
if (serverOk && routeOk && gatewayOk && leafOk && wsOk)
{
if (opts.DontListen)
{
try { _startupComplete.Task.Wait((int)d.TotalMilliseconds); }
catch { /* timeout */ }
}
return null;
}
if (d > TimeSpan.FromMilliseconds(25))
Thread.Sleep(25);
}
return new InvalidOperationException(
$"failed to be ready for connections after {d}");
}
/// <summary>
/// Returns true if the server is ready to accept connections.
/// Mirrors Go <c>Server.ReadyForConnections()</c>.
/// </summary>
public bool ReadyForConnections(TimeSpan dur) =>
ReadyForConnectionsError(dur) == null;
/// <summary>Returns true if the server supports headers. Mirrors Go <c>Server.supportsHeaders()</c>.</summary>
internal bool SupportsHeaders() => !(GetOpts().NoHeaderSupport);
/// <summary>Returns the server ID. Mirrors Go <c>Server.ID()</c>.</summary>
public string ID() => _info.Id;
/// <summary>Returns the JetStream node name (hash of server name). Mirrors Go <c>Server.NodeName()</c>.</summary>
public string NodeName() => GetHash(_info.Name);
/// <summary>Returns the server name. Mirrors Go <c>Server.Name()</c>.</summary>
public string Name() => _info.Name;
/// <summary>Returns the server name as a string. Mirrors Go <c>Server.String()</c>.</summary>
public override string ToString() => _info.Name;
/// <summary>Returns the number of currently-stored closed connections. Mirrors Go <c>Server.numClosedConns()</c>.</summary>
internal int NumClosedConns()
{
_mu.EnterReadLock();
try { return _closed.Len(); }
finally { _mu.ExitReadLock(); }
}
/// <summary>Returns total closed connections ever recorded. Mirrors Go <c>Server.totalClosedConns()</c>.</summary>
internal ulong TotalClosedConns()
{
_mu.EnterReadLock();
try { return _closed.TotalConns(); }
finally { _mu.ExitReadLock(); }
}
/// <summary>Returns a snapshot of recently closed connections. Mirrors Go <c>Server.closedClients()</c>.</summary>
internal Internal.ClosedClient?[] ClosedClients()
{
_mu.EnterReadLock();
try { return _closed.ClosedClients(); }
finally { _mu.ExitReadLock(); }
}
// =========================================================================
// Lame duck mode (features 31353139)
// =========================================================================
/// <summary>Returns true if the server is in lame duck mode. Mirrors Go <c>Server.isLameDuckMode()</c>.</summary>
public bool IsLameDuckMode()
{
_mu.EnterReadLock();
try { return _ldm; }
finally { _mu.ExitReadLock(); }
}
/// <summary>
/// Performs a lame-duck shutdown: stops accepting new clients, notifies
/// existing clients to reconnect elsewhere, then shuts down.
/// Mirrors Go <c>Server.LameDuckShutdown()</c>.
/// </summary>
public void LameDuckShutdown() => LameDuckMode();
/// <summary>
/// Core lame-duck implementation.
/// Mirrors Go <c>Server.lameDuckMode()</c>.
/// </summary>
internal void LameDuckMode()
{
_mu.EnterWriteLock();
if (IsShuttingDown() || _ldm || _listener == null)
{
_mu.ExitWriteLock();
return;
}
Noticef("Entering lame duck mode, stop accepting new clients");
_ldm = true;
SendLDMShutdownEventLocked();
int expected = 1;
_listener.Stop();
_listener = null;
expected += CloseWebsocketServer();
_ldmCh = System.Threading.Channels.Channel.CreateBounded<bool>(
new System.Threading.Channels.BoundedChannelOptions(expected)
{ FullMode = System.Threading.Channels.BoundedChannelFullMode.Wait });
var opts = GetOpts();
var gp = opts.LameDuckGracePeriod;
if (gp < TimeSpan.Zero) gp = gp.Negate();
_mu.ExitWriteLock();
// Transfer Raft leaders (stub returns false).
if (TransferRaftLeaders())
Thread.Sleep(1000);
ShutdownJetStream();
ShutdownRaftNodes();
// Wait for accept loops.
for (int i = 0; i < expected; i++)
_ldmCh.Reader.ReadAsync().GetAwaiter().GetResult();
_mu.EnterWriteLock();
var clients = new List<ClientConnection>(_clients.Values);
if (IsShuttingDown() || clients.Count == 0)
{
_mu.ExitWriteLock();
Shutdown();
return;
}
var dur = opts.LameDuckDuration - gp;
if (dur <= TimeSpan.Zero) dur = TimeSpan.FromSeconds(1);
long numClients = clients.Count;
var si = dur / numClients;
int batch = 1;
if (si < TimeSpan.FromTicks(1))
{
si = TimeSpan.FromTicks(1);
batch = (int)(numClients / dur.Ticks);
}
else if (si > TimeSpan.FromSeconds(1))
{
si = TimeSpan.FromSeconds(1);
}
SendLDMToRoutes();
SendLDMToClients();
_mu.ExitWriteLock();
// Grace-period delay.
var token = _quitCts.Token;
try { Task.Delay(gp, token).GetAwaiter().GetResult(); }
catch (OperationCanceledException) { return; }
Noticef("Closing existing clients");
for (int i = 0; i < clients.Count; i++)
{
clients[i].CloseConnection(ClosedState.ServerShutdown);
if (i == clients.Count - 1) break;
if (batch == 1 || i % batch == 0)
{
var jitter = (long)(Random.Shared.NextDouble() * si.Ticks);
if (jitter < si.Ticks / 2) jitter = si.Ticks / 2;
try { Task.Delay(TimeSpan.FromTicks(jitter), token).GetAwaiter().GetResult(); }
catch (OperationCanceledException) { return; }
}
}
Shutdown();
WaitForShutdown();
}
/// <summary>
/// Sends an LDM INFO to all routes.
/// Server lock must be held on entry.
/// Mirrors Go <c>Server.sendLDMToRoutes()</c>.
/// </summary>
private void SendLDMToRoutes()
{
_routeInfo.LameDuckMode = true;
var infoJson = GenerateInfoJson(_routeInfo);
ForEachRemote(r =>
{
lock (r) { r.EnqueueProto(infoJson); }
});
_routeInfo.LameDuckMode = false;
}
/// <summary>
/// Sends an LDM INFO to all connected clients.
/// Server lock must be held on entry.
/// Mirrors Go <c>Server.sendLDMToClients()</c>.
/// </summary>
private void SendLDMToClients()
{
_info.LameDuckMode = true;
_clientConnectUrls.Clear();
_info.ClientConnectUrls = null;
_info.WsConnectUrls = null;
if (!GetOpts().Cluster.NoAdvertise)
{
var cUrls = _clientConnectUrlsMap.GetAsStringSlice();
_info.ClientConnectUrls = cUrls.Length > 0 ? cUrls : null;
}
SendAsyncInfoToClients(true, true);
_info.LameDuckMode = false;
}
// =========================================================================
// Rate-limit logging (features 31443145)
// =========================================================================
/// <summary>
/// Starts the background goroutine that expires rate-limit log entries.
/// Mirrors Go <c>Server.startRateLimitLogExpiration()</c>.
/// </summary>
internal void StartRateLimitLogExpiration()
{
StartGoRoutine(() =>
{
var interval = TimeSpan.FromSeconds(1);
var token = _quitCts.Token;
while (!token.IsCancellationRequested)
{
try { Task.Delay(interval, token).GetAwaiter().GetResult(); }
catch (OperationCanceledException) { return; }
var now = DateTime.UtcNow;
foreach (var key in _rateLimitLogging.Keys)
{
if (_rateLimitLogging.TryGetValue(key, out var val) &&
val is DateTime ts && now - ts >= interval)
{
_rateLimitLogging.TryRemove(key, out _);
}
}
// Check for a new interval value.
if (_rateLimitLoggingCh.Reader.TryRead(out var newInterval))
interval = newInterval;
}
});
}
/// <summary>
/// Updates the rate-limit logging interval.
/// Mirrors Go <c>Server.changeRateLimitLogInterval()</c>.
/// </summary>
internal void ChangeRateLimitLogInterval(TimeSpan d)
{
if (d <= TimeSpan.Zero) return;
_rateLimitLoggingCh.Writer.TryWrite(d);
}
// =========================================================================
// DisconnectClientByID / LDMClientByID (features 31463147)
// =========================================================================
/// <summary>
/// Forcibly disconnects the client or leaf node with the given ID.
/// Mirrors Go <c>Server.DisconnectClientByID()</c>.
/// </summary>
public Exception? DisconnectClientByID(ulong id)
{
var c = GetClientInternal(id);
if (c != null) { c.CloseConnection(ClosedState.Kicked); return null; }
c = GetLeafNode(id);
if (c != null) { c.CloseConnection(ClosedState.Kicked); return null; }
return new InvalidOperationException("no such client or leafnode id");
}
/// <summary>
/// Sends a Lame Duck Mode INFO message to the specified client.
/// Mirrors Go <c>Server.LDMClientByID()</c>.
/// </summary>
public Exception? LDMClientByID(ulong id)
{
ClientConnection? c;
ServerInfo info;
_mu.EnterReadLock();
_clients.TryGetValue(id, out c);
if (c == null)
{
_mu.ExitReadLock();
return new InvalidOperationException("no such client id");
}
info = CopyInfo();
info.LameDuckMode = true;
_mu.ExitReadLock();
lock (c)
{
if (c.Opts.Protocol >= ClientProtocol.Info &&
(c.Flags & ClientFlags.FirstPongSent) != 0)
{
c.Debugf("Sending Lame Duck Mode info to client");
c.EnqueueProto(c.GenerateClientInfoJSON(info, true).Span);
return null;
}
}
return new InvalidOperationException(
"client does not support Lame Duck Mode or is not ready to receive the notification");
}
// =========================================================================
// updateRemoteSubscription / shouldReportConnectErr (features 31423143)
// =========================================================================
/// <summary>
/// Notifies routes, gateways, and leaf nodes about a subscription change.
/// Mirrors Go <c>Server.updateRemoteSubscription()</c>.
/// </summary>
internal void UpdateRemoteSubscription(Account acc, Subscription sub, int delta)
{
UpdateRouteSubscriptionMap(acc, sub, delta);
if (_gateway.Enabled)
GatewayUpdateSubInterest(acc.Name, sub, delta);
acc.UpdateLeafNodes(sub, delta);
}
/// <summary>
/// Returns true if a connect error at this attempt count should be reported.
/// Mirrors Go <c>Server.shouldReportConnectErr()</c>.
/// </summary>
internal bool ShouldReportConnectErr(bool firstConnect, int attempts)
{
var opts = GetOpts();
int threshold = firstConnect ? opts.ConnectErrorReports : opts.ReconnectErrorReports;
return attempts == 1 || attempts % threshold == 0;
}
// =========================================================================
// Session 10 stubs for cross-session calls
// =========================================================================
/// <summary>Stub — JetStream pull-consumer signalling (session 19).</summary>
private void SignalPullConsumers() { }
/// <summary>Stub — Raft step-down (session 20).</summary>
private void StepdownRaftNodes() { }
/// <summary>Stub — eventing shutdown (session 12).</summary>
private void ShutdownEventing() { }
/// <summary>Stub — JetStream shutdown (session 19).</summary>
private void ShutdownJetStream() { }
/// <summary>Stub — Raft nodes shutdown (session 20).</summary>
private void ShutdownRaftNodes() { }
/// <summary>Stub — Raft leader transfer (session 20). Returns false (no leaders to transfer).</summary>
private bool TransferRaftLeaders() => false;
/// <summary>Stub — LDM shutdown event (session 12).</summary>
private void SendLDMShutdownEventLocked() { }
/// <summary>
/// Stub — closes WebSocket server if running (session 23).
/// Returns the number of done-channel signals to expect.
/// </summary>
private int CloseWebsocketServer() => 0;
/// <summary>
/// Iterates over all route connections. Stub — session 14.
/// Server lock must be held on entry.
/// </summary>
internal void ForEachRoute(Action<ClientConnection> fn) { }
/// <summary>
/// Iterates over all remote (outbound route) connections. Stub — session 14.
/// Server lock must be held on entry.
/// </summary>
private void ForEachRemote(Action<ClientConnection> fn) { }
/// <summary>Stub — collects all gateway connections (session 16).</summary>
private void GetAllGatewayConnections(Dictionary<ulong, ClientConnection> conns) { }
/// <summary>Stub — removes a route connection (session 14).</summary>
private void RemoveRoute(ClientConnection c) { }
/// <summary>Stub — removes a remote gateway connection (session 16).</summary>
private void RemoveRemoteGatewayConnection(ClientConnection c) { }
/// <summary>Stub — removes a leaf-node connection (session 15).</summary>
private void RemoveLeafNodeConnection(ClientConnection c) { }
/// <summary>Stub — sends async INFO to clients (session 10/11). No-op until clients are running.</summary>
private void SendAsyncInfoToClients(bool cliUpdated, bool wsUpdated) { }
/// <summary>Stub — updates route subscription map (session 14).</summary>
private void UpdateRouteSubscriptionMap(Account acc, Subscription sub, int delta) { }
/// <summary>Stub — updates gateway sub interest (session 16).</summary>
private void GatewayUpdateSubInterest(string accName, Subscription sub, int delta) { }
/// <summary>Stub — account disconnect event (session 12).</summary>
private void AccountDisconnectEvent(ClientConnection c, DateTime now, string reason) { }
}

File diff suppressed because it is too large Load Diff

View File

@@ -76,7 +76,7 @@ public sealed partial class NatsServer : INatsServer
private readonly ReaderWriterLockSlim _mu = new(LockRecursionPolicy.SupportsRecursion);
private readonly ReaderWriterLockSlim _reloadMu = new(LockRecursionPolicy.SupportsRecursion);
private ServerInfo _info = new();
internal ServerInfo _info = new();
private string _configFile = string.Empty;
// =========================================================================
@@ -104,6 +104,34 @@ public sealed partial class NatsServer : INatsServer
private System.Net.Sockets.TcpListener? _listener;
private Exception? _listenerErr;
// HTTP monitoring listener
private System.Net.Sockets.TcpListener? _http;
// Route listener
private System.Net.Sockets.TcpListener? _routeListener;
private Exception? _routeListenerErr;
// Gateway listener
private System.Net.Sockets.TcpListener? _gatewayListener;
private Exception? _gatewayListenerErr;
// Leaf-node listener
private System.Net.Sockets.TcpListener? _leafNodeListener;
private Exception? _leafNodeListenerErr;
// Profiling listener
private System.Net.Sockets.TcpListener? _profiler;
// Accept-loop done channel — each accept loop sends true when it exits.
private readonly System.Threading.Channels.Channel<bool> _done =
System.Threading.Channels.Channel.CreateUnbounded<bool>();
// Lame-duck channel — created in lameDuckMode, receives one signal per accept loop.
private System.Threading.Channels.Channel<bool>? _ldmCh;
// The no-auth user that only the system account can use (auth session).
private string _sysAccOnlyNoAuthUser = string.Empty;
// =========================================================================
// Accounts
// =========================================================================
@@ -153,7 +181,7 @@ public sealed partial class NatsServer : INatsServer
private readonly object _grMu = new();
private bool _grRunning;
private readonly Dictionary<ulong, ClientConnection> _grTmpClients = [];
private readonly SemaphoreSlim _grWg = new(1, 1); // simplified wg
private readonly WaitGroup _grWg = new();
// =========================================================================
// Cluster name (separate lock)

View File

@@ -15,6 +15,7 @@
using System.Text.Json.Serialization;
using ZB.MOM.NatsNet.Server.Auth;
using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server;
@@ -84,6 +85,9 @@ public sealed class ServerInfo
// LeafNode-specific
[JsonPropertyName("leafnode_urls")] public string[]? LeafNodeUrls { get; set; }
/// <summary>Returns a shallow clone of this <see cref="ServerInfo"/>.</summary>
internal ServerInfo ShallowClone() => (ServerInfo)MemberwiseClone();
}
// ============================================================================
@@ -200,9 +204,6 @@ public static class CompressionMode
// These stubs will be replaced with full implementations in later sessions.
// They are declared here to allow the NatsServer class to compile.
/// <summary>Stub for reference-counted URL set (session 09/12).</summary>
internal sealed class RefCountedUrlSet : Dictionary<string, int> { }
/// <summary>Stub for the system/internal messaging state (session 12).</summary>
internal sealed class InternalState
{
@@ -254,8 +255,14 @@ internal interface IOcspResponseCache { }
/// <summary>Stub for leaf node config (session 15).</summary>
internal sealed class LeafNodeCfg { }
/// <summary>Stub for network resolver (session 09).</summary>
internal interface INetResolver { }
/// <summary>
/// Network resolver used by <see cref="NatsServer.GetRandomIP"/>.
/// Mirrors Go <c>netResolver</c> interface in server.go.
/// </summary>
internal interface INetResolver
{
Task<string[]> LookupHostAsync(string host, CancellationToken ct = default);
}
/// <summary>Factory for rate counters.</summary>
internal static class RateCounterFactory
@@ -267,6 +274,108 @@ internal static class RateCounterFactory
/// <summary>Stub for RaftNode (session 20).</summary>
public interface IRaftNode { }
// ============================================================================
// Session 10: Ports, TlsMixConn, CaptureHttpServerLog
// ============================================================================
/// <summary>
/// Describes the URLs at which this server can be contacted.
/// Mirrors Go <c>Ports</c> struct in server.go.
/// </summary>
public sealed class Ports
{
public string[]? Nats { get; set; }
public string[]? Monitoring { get; set; }
public string[]? Cluster { get; set; }
public string[]? Profile { get; set; }
public string[]? WebSocket { get; set; }
}
/// <summary>
/// Wraps a <see cref="Stream"/> with an optional "pre-read" buffer that is
/// drained first, then falls through to the underlying stream.
/// Used when we peek at the first bytes of a connection to detect TLS.
/// Mirrors Go <c>tlsMixConn</c>.
/// </summary>
internal sealed class TlsMixConn : Stream
{
private readonly Stream _inner;
private System.IO.MemoryStream? _pre;
public TlsMixConn(Stream inner, byte[] preRead)
{
_inner = inner;
if (preRead.Length > 0)
_pre = new System.IO.MemoryStream(preRead, writable: false);
}
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => _inner.CanWrite;
public override long Length => throw new NotSupportedException();
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
public override int Read(byte[] buffer, int offset, int count)
{
if (_pre is { } pre)
{
var n = pre.Read(buffer, offset, count);
if (pre.Position >= pre.Length)
_pre = null;
return n;
}
return _inner.Read(buffer, offset, count);
}
public override void Write(byte[] buffer, int offset, int count) =>
_inner.Write(buffer, offset, count);
public override void Flush() => _inner.Flush();
public override long Seek(long offset, SeekOrigin origin) =>
throw new NotSupportedException();
public override void SetLength(long value) =>
throw new NotSupportedException();
protected override void Dispose(bool disposing)
{
if (disposing) { _pre?.Dispose(); _inner.Dispose(); }
base.Dispose(disposing);
}
}
/// <summary>
/// Captures HTTP server log lines and routes them through the server's
/// error logger.
/// Mirrors Go <c>captureHTTPServerLog</c> in server.go.
/// </summary>
internal sealed class CaptureHttpServerLog : System.IO.TextWriter
{
private readonly NatsServer _server;
private readonly string _prefix;
public CaptureHttpServerLog(NatsServer server, string prefix)
{
_server = server;
_prefix = prefix;
}
public override System.Text.Encoding Encoding => System.Text.Encoding.UTF8;
public override void Write(string? value)
{
if (value is null) return;
// Strip leading "http:" prefix that .NET's HttpListener sometimes emits.
var msg = value.StartsWith("http:", StringComparison.Ordinal) ? value[6..] : value;
_server.Errorf("{0}{1}", _prefix, msg.TrimEnd('\r', '\n'));
}
}
/// <summary>
/// Stub for JWT account claims (session 06/11).
/// Mirrors Go <c>jwt.AccountClaims</c> from nats.io/jwt/v2.

View File

@@ -2,3 +2,4 @@ using System.Runtime.CompilerServices;
[assembly: InternalsVisibleTo("ZB.MOM.NatsNet.Server.Tests")]
[assembly: InternalsVisibleTo("ZB.MOM.NatsNet.Server.IntegrationTests")]
[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")] // required for NSubstitute to proxy internal interfaces