feat: add graceful shutdown, accept loop backoff, and task tracking

This commit is contained in:
Joseph Doherty
2026-02-22 23:43:25 -05:00
parent 600c6f9e5a
commit b68f898fa0
2 changed files with 232 additions and 13 deletions

View File

@@ -34,6 +34,25 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
private ulong _nextClientId;
private long _startTimeTicks;
private readonly CancellationTokenSource _quitCts = new();
private readonly TaskCompletionSource _shutdownComplete = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly TaskCompletionSource _acceptLoopExited = new(TaskCreationOptions.RunContinuationsAsynchronously);
private int _shutdown;
private int _activeClientCount;
// Used by future lame duck mode implementation
#pragma warning disable CS0649 // Field is never assigned to
private int _lameDuck;
#pragma warning restore CS0649
// Used by future ports file implementation
#pragma warning disable CS0169 // Field is never used
private string? _portsFilePath;
#pragma warning restore CS0169
private static readonly TimeSpan AcceptMinSleep = TimeSpan.FromMilliseconds(10);
private static readonly TimeSpan AcceptMaxSleep = TimeSpan.FromSeconds(1);
public SubList SubList => _globalAccount.SubList;
public ServerStats Stats => _stats;
public DateTime StartTime => new(Interlocked.Read(ref _startTimeTicks), DateTimeKind.Utc);
@@ -43,10 +62,56 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
public int Port => _options.Port;
public Account SystemAccount => _systemAccount;
public string ServerNKey { get; }
public bool IsShuttingDown => Volatile.Read(ref _shutdown) != 0;
public bool IsLameDuckMode => Volatile.Read(ref _lameDuck) != 0;
public IEnumerable<NatsClient> GetClients() => _clients.Values;
public Task WaitForReadyAsync() => _listeningStarted.Task;
public void WaitForShutdown() => _shutdownComplete.Task.GetAwaiter().GetResult();
public async Task ShutdownAsync()
{
if (Interlocked.CompareExchange(ref _shutdown, 1, 0) != 0)
return; // Already shutting down
_logger.LogInformation("Initiating Shutdown...");
// Signal all internal loops to stop
await _quitCts.CancelAsync();
// Close listener to stop accept loop
_listener?.Close();
// Wait for accept loop to exit
await _acceptLoopExited.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
// Close all client connections
foreach (var client in _clients.Values)
{
client.MarkClosed(ClosedState.ServerShutdown);
}
// Wait for active client tasks to drain (with timeout)
if (Volatile.Read(ref _activeClientCount) > 0)
{
using var drainCts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
try
{
while (Volatile.Read(ref _activeClientCount) > 0 && !drainCts.IsCancellationRequested)
await Task.Delay(50, drainCts.Token);
}
catch (OperationCanceledException) { }
}
// Stop monitor server
if (_monitorServer != null)
await _monitorServer.DisposeAsync();
_logger.LogInformation("Server Exiting..");
_shutdownComplete.TrySetResult();
}
public NatsServer(NatsOptions options, ILoggerFactory loggerFactory)
{
_options = options;
@@ -89,6 +154,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
public async Task StartAsync(CancellationToken ct)
{
using var linked = CancellationTokenSource.CreateLinkedTokenSource(ct, _quitCts.Token);
_listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_listener.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
_listener.Bind(new IPEndPoint(
@@ -107,21 +174,54 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
_listeningStarted.TrySetResult();
_logger.LogInformation("Listening on {Host}:{Port}", _options.Host, _options.Port);
_logger.LogInformation("Listening for client connections on {Host}:{Port}", _options.Host, _options.Port);
// Warn about stub features
if (_options.ConfigFile != null)
_logger.LogWarning("Config file parsing not yet supported (file: {ConfigFile})", _options.ConfigFile);
if (_options.ProfPort > 0)
_logger.LogWarning("Profiling endpoint not yet supported (port: {ProfPort})", _options.ProfPort);
if (_options.MonitorPort > 0)
{
_monitorServer = new MonitorServer(this, _options, _stats, _loggerFactory);
await _monitorServer.StartAsync(ct);
await _monitorServer.StartAsync(linked.Token);
}
var tmpDelay = AcceptMinSleep;
try
{
while (!ct.IsCancellationRequested)
while (!linked.Token.IsCancellationRequested)
{
var socket = await _listener.AcceptAsync(ct);
Socket socket;
try
{
socket = await _listener.AcceptAsync(linked.Token);
tmpDelay = AcceptMinSleep; // Reset on success
}
catch (OperationCanceledException)
{
break;
}
catch (ObjectDisposedException)
{
break;
}
catch (SocketException ex)
{
if (IsShuttingDown || IsLameDuckMode)
break;
// Check MaxConnections before creating the client
_logger.LogError(ex, "Temporary accept error, sleeping {Delay}ms", tmpDelay.TotalMilliseconds);
try { await Task.Delay(tmpDelay, linked.Token); }
catch (OperationCanceledException) { break; }
tmpDelay = TimeSpan.FromTicks(Math.Min(tmpDelay.Ticks * 2, AcceptMaxSleep.Ticks));
continue;
}
// Check MaxConnections
if (_options.MaxConnections > 0 && _clients.Count >= _options.MaxConnections)
{
_logger.LogWarning("Client connection rejected: maximum connections ({MaxConnections}) exceeded",
@@ -131,13 +231,13 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
var stream = new NetworkStream(socket, ownsSocket: false);
var errBytes = Encoding.ASCII.GetBytes(
$"-ERR '{NatsProtocol.ErrMaxConnectionsExceeded}'\r\n");
await stream.WriteAsync(errBytes, ct);
await stream.FlushAsync(ct);
await stream.WriteAsync(errBytes, linked.Token);
await stream.FlushAsync(linked.Token);
stream.Dispose();
}
catch (Exception ex)
catch (Exception ex2)
{
_logger.LogDebug(ex, "Failed to send -ERR to rejected client");
_logger.LogDebug(ex2, "Failed to send -ERR to rejected client");
}
finally
{
@@ -148,16 +248,21 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
var clientId = Interlocked.Increment(ref _nextClientId);
Interlocked.Increment(ref _stats.TotalConnections);
Interlocked.Increment(ref _activeClientCount);
_logger.LogDebug("Client {ClientId} connected from {RemoteEndpoint}", clientId, socket.RemoteEndPoint);
_ = AcceptClientAsync(socket, clientId, ct);
_ = AcceptClientAsync(socket, clientId, linked.Token);
}
}
catch (OperationCanceledException)
{
_logger.LogDebug("Accept loop cancelled, server shutting down");
}
finally
{
_acceptLoopExited.TrySetResult();
}
}
private async Task AcceptClientAsync(Socket socket, ulong clientId, CancellationToken ct)
@@ -240,8 +345,9 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
}
finally
{
_logger.LogDebug("Client {ClientId} disconnected", client.Id);
_logger.LogDebug("Client {ClientId} disconnected (reason: {CloseReason})", client.Id, client.CloseReason);
RemoveClient(client);
Interlocked.Decrement(ref _activeClientCount);
}
}
@@ -313,8 +419,9 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
public void Dispose()
{
if (_monitorServer != null)
_monitorServer.DisposeAsync().AsTask().GetAwaiter().GetResult();
if (!IsShuttingDown)
ShutdownAsync().GetAwaiter().GetResult();
_quitCts.Dispose();
_tlsRateLimiter?.Dispose();
_listener?.Dispose();
foreach (var client in _clients.Values)