987 lines
33 KiB
C#
987 lines
33 KiB
C#
// Copyright 2012-2026 The NATS Authors
|
||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||
// you may not use this file except in compliance with the License.
|
||
// You may obtain a copy of the License at
|
||
//
|
||
// http://www.apache.org/licenses/LICENSE-2.0
|
||
//
|
||
// Unless required by applicable law or agreed to in writing, software
|
||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
// See the License for the specific language governing permissions and
|
||
// limitations under the License.
|
||
//
|
||
// Adapted from server/server.go (lines 2577–4782) in the NATS server Go source.
|
||
// Session 10: shutdown, goroutine tracking, query helpers, lame duck mode.
|
||
|
||
using System.Net;
|
||
using System.Net.Sockets;
|
||
using ZB.MOM.NatsNet.Server.Auth;
|
||
using ZB.MOM.NatsNet.Server.Internal;
|
||
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
|
||
|
||
namespace ZB.MOM.NatsNet.Server;
|
||
|
||
public sealed partial class NatsServer
|
||
{
|
||
// =========================================================================
|
||
// Shutdown / WaitForShutdown (features 3051, 3053)
|
||
// =========================================================================
|
||
|
||
/// <summary>
|
||
/// Shuts the server down gracefully: closes all listeners, kicks all
|
||
/// connections, waits for goroutines, then signals <see cref="WaitForShutdown"/>.
|
||
/// Mirrors Go <c>Server.Shutdown()</c>.
|
||
/// </summary>
|
||
public void Shutdown()
|
||
{
|
||
// Stubs for JetStream / Raft / eventing — implemented in later sessions.
|
||
SignalPullConsumers();
|
||
StepdownRaftNodes();
|
||
ShutdownEventing();
|
||
|
||
if (IsShuttingDown()) return;
|
||
|
||
_mu.EnterWriteLock();
|
||
Noticef("Initiating Shutdown...");
|
||
|
||
var accRes = _accResolver;
|
||
GetOpts(); // snapshot opts (not used below but mirrors Go pattern)
|
||
|
||
Interlocked.Exchange(ref _shutdown, 1);
|
||
Interlocked.Exchange(ref _running, 0);
|
||
lock (_grMu) { _grRunning = false; }
|
||
|
||
_mu.ExitWriteLock();
|
||
|
||
accRes?.Close();
|
||
|
||
ShutdownJetStream();
|
||
ShutdownRaftNodes();
|
||
|
||
// ---- Collect all connections ----
|
||
var conns = new Dictionary<ulong, ClientConnection>();
|
||
|
||
_mu.EnterWriteLock();
|
||
foreach (var kvp in _clients) conns[kvp.Key] = kvp.Value;
|
||
lock (_grMu) { foreach (var kvp in _grTmpClients) conns[kvp.Key] = kvp.Value; }
|
||
ForEachRoute(r => { conns[r.Cid] = r; });
|
||
GetAllGatewayConnections(conns);
|
||
foreach (var kvp in _leafs) conns[kvp.Key] = kvp.Value;
|
||
|
||
// ---- Count & close listeners ----
|
||
int doneExpected = 0;
|
||
|
||
if (_listener != null)
|
||
{
|
||
doneExpected++;
|
||
_listener.Stop();
|
||
_listener = null;
|
||
}
|
||
doneExpected += CloseWebsocketServer();
|
||
if (_gateway.Enabled)
|
||
{
|
||
// mqtt listener managed by session 22
|
||
}
|
||
if (_leafNodeListener != null)
|
||
{
|
||
doneExpected++;
|
||
_leafNodeListener.Stop();
|
||
_leafNodeListener = null;
|
||
}
|
||
if (_routeListener != null)
|
||
{
|
||
doneExpected++;
|
||
_routeListener.Stop();
|
||
_routeListener = null;
|
||
}
|
||
if (_gatewayListener != null)
|
||
{
|
||
doneExpected++;
|
||
_gatewayListener.Stop();
|
||
_gatewayListener = null;
|
||
}
|
||
if (_http != null)
|
||
{
|
||
doneExpected++;
|
||
_http.Stop();
|
||
_http = null;
|
||
}
|
||
if (_profiler != null)
|
||
{
|
||
doneExpected++;
|
||
_profiler.Stop();
|
||
// profiler is not nulled — see Go code: it keeps _profiler ref for ProfilerAddr()
|
||
}
|
||
|
||
_mu.ExitWriteLock();
|
||
|
||
// Release all goroutines waiting on quitCts.
|
||
_quitCts.Cancel();
|
||
|
||
// Close all client / route / gateway / leaf connections.
|
||
foreach (var c in conns.Values)
|
||
{
|
||
c.SetNoReconnect();
|
||
c.CloseConnection(ClosedState.ServerShutdown);
|
||
}
|
||
|
||
// Wait for accept loops to exit.
|
||
for (int i = 0; i < doneExpected; i++)
|
||
_done.Reader.ReadAsync().GetAwaiter().GetResult();
|
||
|
||
// Wait for all goroutines.
|
||
_grWg.Wait();
|
||
|
||
var opts = GetOpts();
|
||
if (!string.IsNullOrEmpty(opts.PortsFileDir))
|
||
DeletePortsFile(opts.PortsFileDir);
|
||
|
||
Noticef("Server Exiting..");
|
||
|
||
if (_ocsprc != null) { /* stub — stop OCSP cache in session 23 */ }
|
||
|
||
DisposeSignalHandlers();
|
||
|
||
_shutdownComplete.TrySetResult();
|
||
}
|
||
|
||
/// <summary>
|
||
/// Blocks until <see cref="Shutdown"/> has fully completed.
|
||
/// Mirrors Go <c>Server.WaitForShutdown()</c>.
|
||
/// </summary>
|
||
public void WaitForShutdown() =>
|
||
_shutdownComplete.Task.GetAwaiter().GetResult();
|
||
|
||
// =========================================================================
|
||
// Goroutine tracking (features 3119–3120)
|
||
// =========================================================================
|
||
|
||
/// <summary>
|
||
/// Starts a background Task that counts toward the server wait group.
|
||
/// Returns true if the goroutine was started, false if the server is already stopped.
|
||
/// Mirrors Go <c>Server.startGoRoutine(f)</c>.
|
||
/// </summary>
|
||
internal bool StartGoRoutine(Action f)
|
||
{
|
||
lock (_grMu)
|
||
{
|
||
if (!_grRunning) return false;
|
||
_grWg.Add(1);
|
||
Task.Run(() =>
|
||
{
|
||
try { f(); }
|
||
finally { _grWg.Done(); }
|
||
});
|
||
return true;
|
||
}
|
||
}
|
||
|
||
// =========================================================================
|
||
// Client / connection management (features 3081–3084)
|
||
// =========================================================================
|
||
|
||
/// <summary>
|
||
/// Removes a client, route, gateway, or leaf from server accounting.
|
||
/// Mirrors Go <c>Server.removeClient()</c>.
|
||
/// </summary>
|
||
internal void RemoveClient(ClientConnection c)
|
||
{
|
||
switch (c.Kind)
|
||
{
|
||
case ClientKind.Client:
|
||
{
|
||
bool updateProto;
|
||
string proxyKey;
|
||
lock (c)
|
||
{
|
||
updateProto = c.Kind == ClientKind.Client &&
|
||
c.Opts.Protocol >= ClientProtocol.Info;
|
||
proxyKey = c.ProxyKey;
|
||
}
|
||
_mu.EnterWriteLock();
|
||
try
|
||
{
|
||
_clients.Remove(c.Cid);
|
||
if (updateProto) _cproto--;
|
||
if (!string.IsNullOrEmpty(proxyKey))
|
||
RemoveProxiedConn(proxyKey, c.Cid);
|
||
}
|
||
finally { _mu.ExitWriteLock(); }
|
||
break;
|
||
}
|
||
case ClientKind.Router:
|
||
RemoveRoute(c);
|
||
break;
|
||
case ClientKind.Gateway:
|
||
RemoveRemoteGatewayConnection(c);
|
||
break;
|
||
case ClientKind.Leaf:
|
||
RemoveLeafNodeConnection(c);
|
||
break;
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// Removes a proxied connection entry.
|
||
/// Server write lock must be held on entry.
|
||
/// Mirrors Go <c>Server.removeProxiedConn()</c>.
|
||
/// </summary>
|
||
private void RemoveProxiedConn(string key, ulong cid)
|
||
{
|
||
if (!_proxiedConns.TryGetValue(key, out var conns)) return;
|
||
conns.Remove(cid);
|
||
if (conns.Count == 0) _proxiedConns.Remove(key);
|
||
}
|
||
|
||
/// <summary>
|
||
/// Removes a client from the temporary goroutine client map.
|
||
/// Mirrors Go <c>Server.removeFromTempClients()</c>.
|
||
/// </summary>
|
||
internal void RemoveFromTempClients(ulong cid)
|
||
{
|
||
lock (_grMu) { _grTmpClients.Remove(cid); }
|
||
}
|
||
|
||
/// <summary>
|
||
/// Adds a client to the temporary goroutine client map.
|
||
/// Returns false if the server is no longer running goroutines.
|
||
/// Mirrors Go <c>Server.addToTempClients()</c>.
|
||
/// </summary>
|
||
internal bool AddToTempClients(ulong cid, ClientConnection c)
|
||
{
|
||
lock (_grMu)
|
||
{
|
||
if (!_grRunning) return false;
|
||
_grTmpClients[cid] = c;
|
||
return true;
|
||
}
|
||
}
|
||
|
||
// =========================================================================
|
||
// Query helpers (features 3085–3118, 3121–3123)
|
||
// =========================================================================
|
||
|
||
/// <summary>Returns the number of registered routes. Mirrors Go <c>Server.NumRoutes()</c>.</summary>
|
||
public int NumRoutes()
|
||
{
|
||
_mu.EnterReadLock();
|
||
try { return NumRoutesInternal(); }
|
||
finally { _mu.ExitReadLock(); }
|
||
}
|
||
|
||
private int NumRoutesInternal()
|
||
{
|
||
int nr = 0;
|
||
ForEachRoute(_ => nr++);
|
||
return nr;
|
||
}
|
||
|
||
/// <summary>Returns the number of registered remotes. Mirrors Go <c>Server.NumRemotes()</c>.</summary>
|
||
public int NumRemotes()
|
||
{
|
||
_mu.EnterReadLock();
|
||
try { return _routes.Count; }
|
||
finally { _mu.ExitReadLock(); }
|
||
}
|
||
|
||
/// <summary>Returns the number of leaf-node connections. Mirrors Go <c>Server.NumLeafNodes()</c>.</summary>
|
||
public int NumLeafNodes()
|
||
{
|
||
_mu.EnterReadLock();
|
||
try { return _leafs.Count; }
|
||
finally { _mu.ExitReadLock(); }
|
||
}
|
||
|
||
/// <summary>Returns the number of registered clients. Mirrors Go <c>Server.NumClients()</c>.</summary>
|
||
public int NumClients()
|
||
{
|
||
_mu.EnterReadLock();
|
||
try { return _clients.Count; }
|
||
finally { _mu.ExitReadLock(); }
|
||
}
|
||
|
||
/// <summary>Returns the client with the given connection ID. Mirrors Go <c>Server.GetClient()</c>.</summary>
|
||
public ClientConnection? GetClient(ulong cid) => GetClientInternal(cid);
|
||
|
||
private ClientConnection? GetClientInternal(ulong cid)
|
||
{
|
||
_mu.EnterReadLock();
|
||
try
|
||
{
|
||
_clients.TryGetValue(cid, out var c);
|
||
return c;
|
||
}
|
||
finally { _mu.ExitReadLock(); }
|
||
}
|
||
|
||
/// <summary>Returns the leaf node with the given connection ID. Mirrors Go <c>Server.GetLeafNode()</c>.</summary>
|
||
public ClientConnection? GetLeafNode(ulong cid)
|
||
{
|
||
_mu.EnterReadLock();
|
||
try
|
||
{
|
||
_leafs.TryGetValue(cid, out var c);
|
||
return c;
|
||
}
|
||
finally { _mu.ExitReadLock(); }
|
||
}
|
||
|
||
/// <summary>
|
||
/// Returns total subscriptions across all accounts.
|
||
/// Mirrors Go <c>Server.NumSubscriptions()</c>.
|
||
/// </summary>
|
||
public uint NumSubscriptions()
|
||
{
|
||
_mu.EnterReadLock();
|
||
try { return NumSubscriptionsInternal(); }
|
||
finally { _mu.ExitReadLock(); }
|
||
}
|
||
|
||
private uint NumSubscriptionsInternal()
|
||
{
|
||
int subs = 0;
|
||
foreach (var kvp in _accounts)
|
||
subs += kvp.Value.TotalSubs();
|
||
return (uint)subs;
|
||
}
|
||
|
||
/// <summary>Returns the number of slow consumers. Mirrors Go <c>Server.NumSlowConsumers()</c>.</summary>
|
||
public long NumSlowConsumers() => Interlocked.Read(ref _stats.SlowConsumers);
|
||
|
||
/// <summary>Returns the number of times clients were stalled. Mirrors Go <c>Server.NumStalledClients()</c>.</summary>
|
||
public long NumStalledClients() => Interlocked.Read(ref _stats.Stalls);
|
||
|
||
/// <summary>Mirrors Go <c>Server.NumSlowConsumersClients()</c>.</summary>
|
||
public long NumSlowConsumersClients() => Interlocked.Read(ref _scStats.Clients);
|
||
|
||
/// <summary>Mirrors Go <c>Server.NumSlowConsumersRoutes()</c>.</summary>
|
||
public long NumSlowConsumersRoutes() => Interlocked.Read(ref _scStats.Routes);
|
||
|
||
/// <summary>Mirrors Go <c>Server.NumSlowConsumersGateways()</c>.</summary>
|
||
public long NumSlowConsumersGateways() => Interlocked.Read(ref _scStats.Gateways);
|
||
|
||
/// <summary>Mirrors Go <c>Server.NumSlowConsumersLeafs()</c>.</summary>
|
||
public long NumSlowConsumersLeafs() => Interlocked.Read(ref _scStats.Leafs);
|
||
|
||
/// <summary>Mirrors Go <c>Server.NumStaleConnections()</c>.</summary>
|
||
public long NumStaleConnections() => Interlocked.Read(ref _stats.StaleConnections);
|
||
|
||
/// <summary>Mirrors Go <c>Server.NumStaleConnectionsClients()</c>.</summary>
|
||
public long NumStaleConnectionsClients() => Interlocked.Read(ref _staleStats.Clients);
|
||
|
||
/// <summary>Mirrors Go <c>Server.NumStaleConnectionsRoutes()</c>.</summary>
|
||
public long NumStaleConnectionsRoutes() => Interlocked.Read(ref _staleStats.Routes);
|
||
|
||
/// <summary>Mirrors Go <c>Server.NumStaleConnectionsGateways()</c>.</summary>
|
||
public long NumStaleConnectionsGateways() => Interlocked.Read(ref _staleStats.Gateways);
|
||
|
||
/// <summary>Mirrors Go <c>Server.NumStaleConnectionsLeafs()</c>.</summary>
|
||
public long NumStaleConnectionsLeafs() => Interlocked.Read(ref _staleStats.Leafs);
|
||
|
||
/// <summary>Returns the time the current configuration was loaded. Mirrors Go <c>Server.ConfigTime()</c>.</summary>
|
||
public DateTime ConfigTime()
|
||
{
|
||
_mu.EnterReadLock();
|
||
try { return _configTime; }
|
||
finally { _mu.ExitReadLock(); }
|
||
}
|
||
|
||
/// <summary>Returns the client listener address. Mirrors Go <c>Server.Addr()</c>.</summary>
|
||
public EndPoint? Addr()
|
||
{
|
||
_mu.EnterReadLock();
|
||
try { return _listener?.LocalEndpoint; }
|
||
finally { _mu.ExitReadLock(); }
|
||
}
|
||
|
||
/// <summary>Returns the monitoring listener address. Mirrors Go <c>Server.MonitorAddr()</c>.</summary>
|
||
public IPEndPoint? MonitorAddr()
|
||
{
|
||
_mu.EnterReadLock();
|
||
try { return _http?.LocalEndpoint as IPEndPoint; }
|
||
finally { _mu.ExitReadLock(); }
|
||
}
|
||
|
||
/// <summary>Returns the cluster (route) listener address. Mirrors Go <c>Server.ClusterAddr()</c>.</summary>
|
||
public IPEndPoint? ClusterAddr()
|
||
{
|
||
_mu.EnterReadLock();
|
||
try { return _routeListener?.LocalEndpoint as IPEndPoint; }
|
||
finally { _mu.ExitReadLock(); }
|
||
}
|
||
|
||
/// <summary>Returns the profiler listener address. Mirrors Go <c>Server.ProfilerAddr()</c>.</summary>
|
||
public IPEndPoint? ProfilerAddr()
|
||
{
|
||
_mu.EnterReadLock();
|
||
try { return _profiler?.LocalEndpoint as IPEndPoint; }
|
||
finally { _mu.ExitReadLock(); }
|
||
}
|
||
|
||
/// <summary>
|
||
/// Polls until all expected listeners are up or the deadline expires.
|
||
/// Returns an error description if not ready within <paramref name="d"/>.
|
||
/// Mirrors Go <c>Server.readyForConnections()</c>.
|
||
/// </summary>
|
||
public Exception? ReadyForConnectionsError(TimeSpan d)
|
||
{
|
||
var opts = GetOpts();
|
||
var end = DateTime.UtcNow.Add(d);
|
||
|
||
while (DateTime.UtcNow < end)
|
||
{
|
||
bool serverOk, routeOk, gatewayOk, leafOk, wsOk;
|
||
_mu.EnterReadLock();
|
||
serverOk = _listener != null || opts.DontListen;
|
||
routeOk = opts.Cluster.Port == 0 || _routeListener != null;
|
||
gatewayOk = string.IsNullOrEmpty(opts.Gateway.Name) || _gatewayListener != null;
|
||
leafOk = opts.LeafNode.Port == 0 || _leafNodeListener != null;
|
||
wsOk = opts.Websocket.Port == 0; // stub — websocket listener not tracked until session 23
|
||
_mu.ExitReadLock();
|
||
|
||
if (serverOk && routeOk && gatewayOk && leafOk && wsOk)
|
||
{
|
||
if (opts.DontListen)
|
||
{
|
||
try { _startupComplete.Task.Wait((int)d.TotalMilliseconds); }
|
||
catch { /* timeout */ }
|
||
}
|
||
return null;
|
||
}
|
||
|
||
if (d > TimeSpan.FromMilliseconds(25))
|
||
Thread.Sleep(25);
|
||
}
|
||
|
||
return new InvalidOperationException(
|
||
$"failed to be ready for connections after {d}");
|
||
}
|
||
|
||
/// <summary>
|
||
/// Returns true if the server is ready to accept connections.
|
||
/// Mirrors Go <c>Server.ReadyForConnections()</c>.
|
||
/// </summary>
|
||
public bool ReadyForConnections(TimeSpan dur) =>
|
||
ReadyForConnectionsError(dur) == null;
|
||
|
||
/// <summary>Returns true if the server supports headers. Mirrors Go <c>Server.supportsHeaders()</c>.</summary>
|
||
internal bool SupportsHeaders() => !(GetOpts().NoHeaderSupport);
|
||
|
||
/// <summary>Returns the server ID. Mirrors Go <c>Server.ID()</c>.</summary>
|
||
public string ID() => _info.Id;
|
||
|
||
/// <summary>Returns the JetStream node name (hash of server name). Mirrors Go <c>Server.NodeName()</c>.</summary>
|
||
public string NodeName() => GetHash(_info.Name);
|
||
|
||
/// <summary>Returns the server name. Mirrors Go <c>Server.Name()</c>.</summary>
|
||
public string Name() => _info.Name;
|
||
|
||
/// <summary>Returns the server name as a string. Mirrors Go <c>Server.String()</c>.</summary>
|
||
public override string ToString() => _info.Name;
|
||
|
||
/// <summary>Returns the number of currently-stored closed connections. Mirrors Go <c>Server.numClosedConns()</c>.</summary>
|
||
internal int NumClosedConns()
|
||
{
|
||
_mu.EnterReadLock();
|
||
try { return _closed.Len(); }
|
||
finally { _mu.ExitReadLock(); }
|
||
}
|
||
|
||
/// <summary>Returns total closed connections ever recorded. Mirrors Go <c>Server.totalClosedConns()</c>.</summary>
|
||
internal ulong TotalClosedConns()
|
||
{
|
||
_mu.EnterReadLock();
|
||
try { return _closed.TotalConns(); }
|
||
finally { _mu.ExitReadLock(); }
|
||
}
|
||
|
||
/// <summary>Returns a snapshot of recently closed connections. Mirrors Go <c>Server.closedClients()</c>.</summary>
|
||
internal Internal.ClosedClient?[] ClosedClients()
|
||
{
|
||
_mu.EnterReadLock();
|
||
try { return _closed.ClosedClients(); }
|
||
finally { _mu.ExitReadLock(); }
|
||
}
|
||
|
||
// =========================================================================
|
||
// Lame duck mode (features 3135–3139)
|
||
// =========================================================================
|
||
|
||
/// <summary>Returns true if the server is in lame duck mode. Mirrors Go <c>Server.isLameDuckMode()</c>.</summary>
|
||
public bool IsLameDuckMode()
|
||
{
|
||
_mu.EnterReadLock();
|
||
try { return _ldm; }
|
||
finally { _mu.ExitReadLock(); }
|
||
}
|
||
|
||
/// <summary>
|
||
/// Performs a lame-duck shutdown: stops accepting new clients, notifies
|
||
/// existing clients to reconnect elsewhere, then shuts down.
|
||
/// Mirrors Go <c>Server.LameDuckShutdown()</c>.
|
||
/// </summary>
|
||
public void LameDuckShutdown() => LameDuckMode();
|
||
|
||
/// <summary>
|
||
/// Core lame-duck implementation.
|
||
/// Mirrors Go <c>Server.lameDuckMode()</c>.
|
||
/// </summary>
|
||
internal void LameDuckMode()
|
||
{
|
||
_mu.EnterWriteLock();
|
||
if (IsShuttingDown() || _ldm || _listener == null)
|
||
{
|
||
_mu.ExitWriteLock();
|
||
return;
|
||
}
|
||
Noticef("Entering lame duck mode, stop accepting new clients");
|
||
_ldm = true;
|
||
SendLDMShutdownEventLocked();
|
||
|
||
int expected = 1;
|
||
_listener.Stop();
|
||
_listener = null;
|
||
expected += CloseWebsocketServer();
|
||
_ldmCh = System.Threading.Channels.Channel.CreateBounded<bool>(
|
||
new System.Threading.Channels.BoundedChannelOptions(expected)
|
||
{ FullMode = System.Threading.Channels.BoundedChannelFullMode.Wait });
|
||
|
||
var opts = GetOpts();
|
||
var gp = opts.LameDuckGracePeriod;
|
||
if (gp < TimeSpan.Zero) gp = gp.Negate();
|
||
_mu.ExitWriteLock();
|
||
|
||
// Transfer Raft leaders (stub returns false).
|
||
if (TransferRaftLeaders())
|
||
Thread.Sleep(1000);
|
||
|
||
ShutdownJetStream();
|
||
ShutdownRaftNodes();
|
||
|
||
// Wait for accept loops.
|
||
for (int i = 0; i < expected; i++)
|
||
_ldmCh.Reader.ReadAsync().GetAwaiter().GetResult();
|
||
|
||
_mu.EnterWriteLock();
|
||
var clients = new List<ClientConnection>(_clients.Values);
|
||
|
||
if (IsShuttingDown() || clients.Count == 0)
|
||
{
|
||
_mu.ExitWriteLock();
|
||
Shutdown();
|
||
return;
|
||
}
|
||
|
||
var dur = opts.LameDuckDuration - gp;
|
||
if (dur <= TimeSpan.Zero) dur = TimeSpan.FromSeconds(1);
|
||
|
||
long numClients = clients.Count;
|
||
var si = dur / numClients;
|
||
int batch = 1;
|
||
|
||
if (si < TimeSpan.FromTicks(1))
|
||
{
|
||
si = TimeSpan.FromTicks(1);
|
||
batch = (int)(numClients / dur.Ticks);
|
||
}
|
||
else if (si > TimeSpan.FromSeconds(1))
|
||
{
|
||
si = TimeSpan.FromSeconds(1);
|
||
}
|
||
|
||
SendLDMToRoutes();
|
||
SendLDMToClients();
|
||
_mu.ExitWriteLock();
|
||
|
||
// Grace-period delay.
|
||
var token = _quitCts.Token;
|
||
try { Task.Delay(gp, token).GetAwaiter().GetResult(); }
|
||
catch (OperationCanceledException) { return; }
|
||
|
||
Noticef("Closing existing clients");
|
||
for (int i = 0; i < clients.Count; i++)
|
||
{
|
||
clients[i].CloseConnection(ClosedState.ServerShutdown);
|
||
if (i == clients.Count - 1) break;
|
||
if (batch == 1 || i % batch == 0)
|
||
{
|
||
var jitter = (long)(Random.Shared.NextDouble() * si.Ticks);
|
||
if (jitter < si.Ticks / 2) jitter = si.Ticks / 2;
|
||
try { Task.Delay(TimeSpan.FromTicks(jitter), token).GetAwaiter().GetResult(); }
|
||
catch (OperationCanceledException) { return; }
|
||
}
|
||
}
|
||
|
||
Shutdown();
|
||
WaitForShutdown();
|
||
}
|
||
|
||
/// <summary>
|
||
/// Sends an LDM INFO to all routes.
|
||
/// Server lock must be held on entry.
|
||
/// Mirrors Go <c>Server.sendLDMToRoutes()</c>.
|
||
/// </summary>
|
||
private void SendLDMToRoutes()
|
||
{
|
||
_routeInfo.LameDuckMode = true;
|
||
var infoJson = GenerateInfoJson(_routeInfo);
|
||
ForEachRemote(r =>
|
||
{
|
||
lock (r) { r.EnqueueProto(infoJson); }
|
||
});
|
||
_routeInfo.LameDuckMode = false;
|
||
}
|
||
|
||
/// <summary>
|
||
/// Sends an LDM INFO to all connected clients.
|
||
/// Server lock must be held on entry.
|
||
/// Mirrors Go <c>Server.sendLDMToClients()</c>.
|
||
/// </summary>
|
||
private void SendLDMToClients()
|
||
{
|
||
_info.LameDuckMode = true;
|
||
_clientConnectUrls.Clear();
|
||
|
||
_info.ClientConnectUrls = null;
|
||
_info.WsConnectUrls = null;
|
||
|
||
if (!GetOpts().Cluster.NoAdvertise)
|
||
{
|
||
var cUrls = _clientConnectUrlsMap.GetAsStringSlice();
|
||
_info.ClientConnectUrls = cUrls.Length > 0 ? cUrls : null;
|
||
}
|
||
|
||
SendAsyncInfoToClients(true, true);
|
||
_info.LameDuckMode = false;
|
||
}
|
||
|
||
// =========================================================================
|
||
// Rate-limit logging (features 3144–3145)
|
||
// =========================================================================
|
||
|
||
/// <summary>
|
||
/// Starts the background goroutine that expires rate-limit log entries.
|
||
/// Mirrors Go <c>Server.startRateLimitLogExpiration()</c>.
|
||
/// </summary>
|
||
internal void StartRateLimitLogExpiration()
|
||
{
|
||
StartGoRoutine(() =>
|
||
{
|
||
var interval = TimeSpan.FromSeconds(1);
|
||
var token = _quitCts.Token;
|
||
|
||
while (!token.IsCancellationRequested)
|
||
{
|
||
try { Task.Delay(interval, token).GetAwaiter().GetResult(); }
|
||
catch (OperationCanceledException) { return; }
|
||
|
||
var now = DateTime.UtcNow;
|
||
foreach (var key in _rateLimitLogging.Keys)
|
||
{
|
||
if (_rateLimitLogging.TryGetValue(key, out var val) &&
|
||
val is DateTime ts && now - ts >= interval)
|
||
{
|
||
_rateLimitLogging.TryRemove(key, out _);
|
||
}
|
||
}
|
||
|
||
// Check for a new interval value.
|
||
if (_rateLimitLoggingCh.Reader.TryRead(out var newInterval))
|
||
interval = newInterval;
|
||
}
|
||
});
|
||
}
|
||
|
||
/// <summary>
|
||
/// Updates the rate-limit logging interval.
|
||
/// Mirrors Go <c>Server.changeRateLimitLogInterval()</c>.
|
||
/// </summary>
|
||
internal void ChangeRateLimitLogInterval(TimeSpan d)
|
||
{
|
||
if (d <= TimeSpan.Zero) return;
|
||
_rateLimitLoggingCh.Writer.TryWrite(d);
|
||
}
|
||
|
||
// =========================================================================
|
||
// DisconnectClientByID / LDMClientByID (features 3146–3147)
|
||
// =========================================================================
|
||
|
||
/// <summary>
|
||
/// Forcibly disconnects the client or leaf node with the given ID.
|
||
/// Mirrors Go <c>Server.DisconnectClientByID()</c>.
|
||
/// </summary>
|
||
public Exception? DisconnectClientByID(ulong id)
|
||
{
|
||
var c = GetClientInternal(id);
|
||
if (c != null) { c.CloseConnection(ClosedState.Kicked); return null; }
|
||
c = GetLeafNode(id);
|
||
if (c != null) { c.CloseConnection(ClosedState.Kicked); return null; }
|
||
return new InvalidOperationException("no such client or leafnode id");
|
||
}
|
||
|
||
/// <summary>
|
||
/// Sends a Lame Duck Mode INFO message to the specified client.
|
||
/// Mirrors Go <c>Server.LDMClientByID()</c>.
|
||
/// </summary>
|
||
public Exception? LDMClientByID(ulong id)
|
||
{
|
||
ClientConnection? c;
|
||
ServerInfo info;
|
||
_mu.EnterReadLock();
|
||
_clients.TryGetValue(id, out c);
|
||
if (c == null)
|
||
{
|
||
_mu.ExitReadLock();
|
||
return new InvalidOperationException("no such client id");
|
||
}
|
||
info = CopyInfo();
|
||
info.LameDuckMode = true;
|
||
_mu.ExitReadLock();
|
||
|
||
lock (c)
|
||
{
|
||
if (c.Opts.Protocol >= ClientProtocol.Info &&
|
||
(c.Flags & ClientFlags.FirstPongSent) != 0)
|
||
{
|
||
c.Debugf("Sending Lame Duck Mode info to client");
|
||
c.EnqueueProto(c.GenerateClientInfoJSON(info, true).Span);
|
||
return null;
|
||
}
|
||
}
|
||
return new InvalidOperationException(
|
||
"client does not support Lame Duck Mode or is not ready to receive the notification");
|
||
}
|
||
|
||
// =========================================================================
|
||
// updateRemoteSubscription / shouldReportConnectErr (features 3142–3143)
|
||
// =========================================================================
|
||
|
||
/// <summary>
|
||
/// Notifies routes, gateways, and leaf nodes about a subscription change.
|
||
/// Mirrors Go <c>Server.updateRemoteSubscription()</c>.
|
||
/// </summary>
|
||
internal void UpdateRemoteSubscription(Account acc, Subscription sub, int delta)
|
||
{
|
||
UpdateRouteSubscriptionMap(acc, sub, delta);
|
||
if (_gateway.Enabled)
|
||
GatewayUpdateSubInterest(acc.Name, sub, delta);
|
||
acc.UpdateLeafNodes(sub, delta);
|
||
}
|
||
|
||
/// <summary>
|
||
/// Returns true if a connect error at this attempt count should be reported.
|
||
/// Mirrors Go <c>Server.shouldReportConnectErr()</c>.
|
||
/// </summary>
|
||
internal bool ShouldReportConnectErr(bool firstConnect, int attempts)
|
||
{
|
||
var opts = GetOpts();
|
||
int threshold = firstConnect ? opts.ConnectErrorReports : opts.ReconnectErrorReports;
|
||
return attempts == 1 || attempts % threshold == 0;
|
||
}
|
||
|
||
// =========================================================================
|
||
// Session 10 stubs for cross-session calls
|
||
// =========================================================================
|
||
|
||
/// <summary>Stub — JetStream pull-consumer signalling (session 19).</summary>
|
||
private void SignalPullConsumers()
|
||
{
|
||
foreach (var c in _clients.Values)
|
||
{
|
||
if (c.Kind == ClientKind.JetStream)
|
||
c.FlushSignal();
|
||
}
|
||
}
|
||
|
||
/// <summary>Stub — Raft step-down (session 20).</summary>
|
||
private void StepdownRaftNodes()
|
||
{
|
||
foreach (var node in _raftNodes.Values)
|
||
{
|
||
var t = node.GetType();
|
||
var stepDown = t.GetMethod("StepDown", Type.EmptyTypes);
|
||
if (stepDown != null)
|
||
{
|
||
stepDown.Invoke(node, null);
|
||
continue;
|
||
}
|
||
|
||
stepDown = t.GetMethod("StepDown", [typeof(string[])]);
|
||
if (stepDown != null)
|
||
stepDown.Invoke(node, [Array.Empty<string>()]);
|
||
}
|
||
}
|
||
|
||
/// <summary>Stub — eventing shutdown (session 12).</summary>
|
||
private void ShutdownEventing()
|
||
{
|
||
if (_sys == null)
|
||
return;
|
||
|
||
_sys.Sweeper?.Dispose();
|
||
_sys.Sweeper = null;
|
||
_sys.StatsMsgTimer?.Dispose();
|
||
_sys.StatsMsgTimer = null;
|
||
_sys.Replies.Clear();
|
||
_sys = null;
|
||
}
|
||
|
||
/// <summary>Stub — JetStream shutdown (session 19).</summary>
|
||
private void ShutdownJetStream()
|
||
{
|
||
_info.JetStream = false;
|
||
}
|
||
|
||
/// <summary>Stub — Raft nodes shutdown (session 20).</summary>
|
||
private void ShutdownRaftNodes()
|
||
{
|
||
foreach (var node in _raftNodes.Values)
|
||
{
|
||
var stop = node.GetType().GetMethod("Stop", Type.EmptyTypes);
|
||
stop?.Invoke(node, null);
|
||
}
|
||
}
|
||
|
||
/// <summary>Stub — Raft leader transfer (session 20). Returns false (no leaders to transfer).</summary>
|
||
private bool TransferRaftLeaders() => false;
|
||
|
||
/// <summary>Stub — LDM shutdown event (session 12).</summary>
|
||
private void SendLDMShutdownEventLocked()
|
||
{
|
||
_ldm = true;
|
||
Noticef("Lame duck shutdown event emitted");
|
||
}
|
||
|
||
/// <summary>
|
||
/// Stub — closes WebSocket server if running (session 23).
|
||
/// Returns the number of done-channel signals to expect.
|
||
/// </summary>
|
||
private int CloseWebsocketServer() => 0;
|
||
|
||
/// <summary>
|
||
/// Iterates over all route connections. Stub — session 14.
|
||
/// Server lock must be held on entry.
|
||
/// </summary>
|
||
internal void ForEachRoute(Action<ClientConnection> fn)
|
||
{
|
||
if (fn == null)
|
||
return;
|
||
|
||
var seen = new HashSet<ulong>();
|
||
foreach (var list in _routes.Values)
|
||
{
|
||
foreach (var route in list)
|
||
{
|
||
if (seen.Add(route.Cid))
|
||
fn(route);
|
||
}
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// Iterates over all remote (outbound route) connections. Stub — session 14.
|
||
/// Server lock must be held on entry.
|
||
/// </summary>
|
||
private void ForEachRemote(Action<ClientConnection> fn) => ForEachRoute(fn);
|
||
|
||
/// <summary>Stub — collects all gateway connections (session 16).</summary>
|
||
private void GetAllGatewayConnections(Dictionary<ulong, ClientConnection> conns)
|
||
{
|
||
foreach (var c in _gateway.Out.Values)
|
||
conns[c.Cid] = c;
|
||
foreach (var c in _gateway.In.Values)
|
||
conns[c.Cid] = c;
|
||
}
|
||
|
||
/// <summary>Stub — removes a route connection (session 14).</summary>
|
||
private void RemoveRoute(ClientConnection c)
|
||
{
|
||
foreach (var key in _routes.Keys.ToArray())
|
||
{
|
||
var list = _routes[key];
|
||
list.RemoveAll(rc => rc.Cid == c.Cid);
|
||
if (list.Count == 0)
|
||
_routes.Remove(key);
|
||
}
|
||
_clients.Remove(c.Cid);
|
||
}
|
||
|
||
/// <summary>Stub — removes a remote gateway connection (session 16).</summary>
|
||
private void RemoveRemoteGatewayConnection(ClientConnection c)
|
||
{
|
||
foreach (var key in _gateway.Out.Keys.ToArray())
|
||
{
|
||
if (_gateway.Out[key].Cid == c.Cid)
|
||
_gateway.Out.Remove(key);
|
||
}
|
||
_gateway.Outo.RemoveAll(gc => gc.Cid == c.Cid);
|
||
_gateway.In.Remove(c.Cid);
|
||
_clients.Remove(c.Cid);
|
||
}
|
||
|
||
/// <summary>Stub — removes a leaf-node connection (session 15).</summary>
|
||
private void RemoveLeafNodeConnection(ClientConnection c)
|
||
{
|
||
_leafs.Remove(c.Cid);
|
||
_clients.Remove(c.Cid);
|
||
}
|
||
|
||
/// <summary>Stub — sends async INFO to clients (session 10/11). No-op until clients are running.</summary>
|
||
private void SendAsyncInfoToClients(bool cliUpdated, bool wsUpdated)
|
||
{
|
||
if (!cliUpdated && !wsUpdated)
|
||
return;
|
||
|
||
foreach (var c in _clients.Values)
|
||
c.FlushSignal();
|
||
}
|
||
|
||
/// <summary>Stub — updates route subscription map (session 14).</summary>
|
||
private void UpdateRouteSubscriptionMap(Account acc, Subscription sub, int delta)
|
||
{
|
||
if (acc == null || sub == null || delta == 0)
|
||
return;
|
||
}
|
||
|
||
/// <summary>Stub — updates gateway sub interest (session 16).</summary>
|
||
private void GatewayUpdateSubInterest(string accName, Subscription sub, int delta)
|
||
{
|
||
if (string.IsNullOrEmpty(accName) || sub == null || delta == 0 || sub.Subject.Length == 0)
|
||
return;
|
||
|
||
var subject = System.Text.Encoding.UTF8.GetString(sub.Subject);
|
||
var key = sub.Queue is { Length: > 0 }
|
||
? $"{subject} {System.Text.Encoding.UTF8.GetString(sub.Queue)}"
|
||
: subject;
|
||
|
||
lock (_gateway.PasiLock)
|
||
{
|
||
if (!_gateway.Pasi.TryGetValue(accName, out var map))
|
||
{
|
||
map = new Dictionary<string, SitAlly>(StringComparer.Ordinal);
|
||
_gateway.Pasi[accName] = map;
|
||
}
|
||
|
||
if (!map.TryGetValue(key, out var tally))
|
||
tally = new SitAlly { N = 0, Q = sub.Queue is { Length: > 0 } };
|
||
|
||
tally.N += delta;
|
||
if (tally.N <= 0)
|
||
map.Remove(key);
|
||
else
|
||
map[key] = tally;
|
||
|
||
if (map.Count == 0)
|
||
_gateway.Pasi.Remove(accName);
|
||
}
|
||
}
|
||
|
||
/// <summary>Stub — account disconnect event (session 12).</summary>
|
||
private void AccountDisconnectEvent(ClientConnection c, DateTime now, string reason)
|
||
{
|
||
var accName = c.GetAccount() is Account acc ? acc.Name : string.Empty;
|
||
Debugf("Account disconnect: cid={0} account={1} reason={2} at={3:o}", c.Cid, accName, reason, now);
|
||
}
|
||
}
|