From d0aa6a5fdd729d939a2a6b4e0f22fe909dbe6a6a Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 22 Feb 2026 23:33:44 -0500 Subject: [PATCH 01/11] feat: add lifecycle options (lame duck, PID file, ports file, config stub) --- src/NATS.Server/NatsOptions.cs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/NATS.Server/NatsOptions.cs b/src/NATS.Server/NatsOptions.cs index 1aced93..49e80af 100644 --- a/src/NATS.Server/NatsOptions.cs +++ b/src/NATS.Server/NatsOptions.cs @@ -36,6 +36,18 @@ public sealed class NatsOptions // 0 = disabled public int MonitorHttpsPort { get; set; } + // Lifecycle / lame-duck mode + public TimeSpan LameDuckDuration { get; set; } = TimeSpan.FromMinutes(2); + public TimeSpan LameDuckGracePeriod { get; set; } = TimeSpan.FromSeconds(10); + + // File paths + public string? PidFile { get; set; } + public string? PortsFileDir { get; set; } + public string? ConfigFile { get; set; } + + // Profiling (0 = disabled) + public int ProfPort { get; set; } + // TLS public string? TlsCert { get; set; } public string? TlsKey { get; set; } From 9ae75207fcf135c083ed4c9d7760c376181b4a57 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 22 Feb 2026 23:34:05 -0500 Subject: [PATCH 02/11] feat: add ClosedState enum ported from Go client.go --- src/NATS.Server/ClosedState.cs | 52 ++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 src/NATS.Server/ClosedState.cs diff --git a/src/NATS.Server/ClosedState.cs b/src/NATS.Server/ClosedState.cs new file mode 100644 index 0000000..40b44a1 --- /dev/null +++ b/src/NATS.Server/ClosedState.cs @@ -0,0 +1,52 @@ +// Ported from Go: server/client.go:188-228 + +namespace NATS.Server; + +/// +/// Reason a client connection was closed. Stored in connection info for monitoring +/// and passed to close handlers during connection teardown. +/// +/// +/// Values start at 1 (matching Go's iota + 1) so that the default zero value +/// is distinct from any valid close reason. +/// +public enum ClosedState +{ + ClientClosed = 1, + AuthenticationTimeout, + AuthenticationViolation, + TLSHandshakeError, + SlowConsumerPendingBytes, + SlowConsumerWriteDeadline, + WriteError, + ReadError, + ParseError, + StaleConnection, + ProtocolViolation, + BadClientProtocolVersion, + WrongPort, + MaxAccountConnectionsExceeded, + MaxConnectionsExceeded, + MaxPayloadExceeded, + MaxControlLineExceeded, + MaxSubscriptionsExceeded, + DuplicateRoute, + RouteRemoved, + ServerShutdown, + AuthenticationExpired, + WrongGateway, + MissingAccount, + Revocation, + InternalClient, + MsgHeaderViolation, + NoRespondersRequiresHeaders, + ClusterNameConflict, + DuplicateRemoteLeafnodeConnection, + DuplicateClientID, + DuplicateServerName, + MinimumVersionRequired, + ClusterNamesIdentical, + Kicked, + ProxyNotTrusted, + ProxyRequired, +} From 38eaaa8b8341200c1b7958a92a1cbd9695dbf98b Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 22 Feb 2026 23:36:01 -0500 Subject: [PATCH 03/11] feat: add ephemeral port (port=0) support --- src/NATS.Server/NatsServer.cs | 10 +++ tests/NATS.Server.Tests/ServerTests.cs | 102 +++++++++++++++++++++++++ 2 files changed, 112 insertions(+) diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 87f6b97..157cc21 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -38,6 +38,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable public string ServerId => _serverInfo.ServerId; public string ServerName => _serverInfo.ServerName; public int ClientCount => _clients.Count; + public int Port => _options.Port; public IEnumerable GetClients() => _clients.Values; public Task WaitForReadyAsync() => _listeningStarted.Task; @@ -82,6 +83,15 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _options.Port)); Interlocked.Exchange(ref _startTimeTicks, DateTime.UtcNow.Ticks); _listener.Listen(128); + + // Resolve ephemeral port if port=0 + if (_options.Port == 0) + { + var actualPort = ((IPEndPoint)_listener.LocalEndPoint!).Port; + _options.Port = actualPort; + _serverInfo.Port = actualPort; + } + _listeningStarted.TrySetResult(); _logger.LogInformation("Listening on {Host}:{Port}", _options.Host, _options.Port); diff --git a/tests/NATS.Server.Tests/ServerTests.cs b/tests/NATS.Server.Tests/ServerTests.cs index 3b49d3f..e3a6757 100644 --- a/tests/NATS.Server.Tests/ServerTests.cs +++ b/tests/NATS.Server.Tests/ServerTests.cs @@ -213,6 +213,39 @@ public class ServerTests : IAsyncLifetime } } +public class EphemeralPortTests +{ + [Fact] + public async Task Server_resolves_ephemeral_port() + { + using var cts = new CancellationTokenSource(); + var server = new NatsServer(new NatsOptions { Port = 0 }, NullLoggerFactory.Instance); + + _ = server.StartAsync(cts.Token); + await server.WaitForReadyAsync(); + + try + { + // Port should have been resolved to a real port + server.Port.ShouldBeGreaterThan(0); + + // Connect a raw socket to prove the port actually works + using var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await client.ConnectAsync(IPAddress.Loopback, server.Port); + + var buf = new byte[4096]; + var n = await client.ReceiveAsync(buf, SocketFlags.None); + var response = Encoding.ASCII.GetString(buf, 0, n); + response.ShouldStartWith("INFO "); + } + finally + { + await cts.CancelAsync(); + server.Dispose(); + } + } +} + public class MaxConnectionsTests : IAsyncLifetime { private readonly NatsServer _server; @@ -423,3 +456,72 @@ public class PingKeepaliveTests : IAsyncLifetime client.Dispose(); } } + +public class CloseReasonTests : IAsyncLifetime +{ + private readonly NatsServer _server; + private readonly int _port; + private readonly CancellationTokenSource _cts = new(); + + public CloseReasonTests() + { + _port = GetFreePort(); + _server = new NatsServer(new NatsOptions { Port = _port }, NullLoggerFactory.Instance); + } + + public async Task InitializeAsync() + { + _ = _server.StartAsync(_cts.Token); + await _server.WaitForReadyAsync(); + } + + public async Task DisposeAsync() + { + await _cts.CancelAsync(); + _server.Dispose(); + } + + private static int GetFreePort() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + sock.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + return ((IPEndPoint)sock.LocalEndPoint!).Port; + } + + [Fact] + public async Task Client_close_reason_set_on_normal_disconnect() + { + // Connect a raw TCP client + using var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await client.ConnectAsync(IPAddress.Loopback, _port); + + // Read INFO + var buf = new byte[4096]; + await client.ReceiveAsync(buf, SocketFlags.None); + + // Send CONNECT + PING, wait for PONG + await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPING\r\n")); + var sb = new StringBuilder(); + using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + while (!sb.ToString().Contains("PONG")) + { + var n = await client.ReceiveAsync(buf, SocketFlags.None, readCts.Token); + if (n == 0) break; + sb.Append(Encoding.ASCII.GetString(buf, 0, n)); + } + sb.ToString().ShouldContain("PONG"); + + // Get the NatsClient from the server + var natsClient = _server.GetClients().First(); + + // Close the TCP socket (normal client disconnect) + client.Shutdown(SocketShutdown.Both); + client.Close(); + + // Wait for the server to detect the disconnect + await Task.Delay(500); + + // The close reason should be ClientClosed (normal disconnect falls through to finally) + natsClient.CloseReason.ShouldBe(ClosedState.ClientClosed); + } +} From 086b4f50e894129a761a881982af0823639fb394 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 22 Feb 2026 23:36:55 -0500 Subject: [PATCH 04/11] feat: add close reason tracking to NatsClient --- src/NATS.Server/NatsClient.cs | 41 +++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/src/NATS.Server/NatsClient.cs b/src/NATS.Server/NatsClient.cs index 69cde4c..0e58d82 100644 --- a/src/NATS.Server/NatsClient.cs +++ b/src/NATS.Server/NatsClient.cs @@ -62,6 +62,12 @@ public sealed class NatsClient : IDisposable public long InBytes; public long OutBytes; + // Close reason tracking + private int _closeReason; // stores ClosedState as int for atomics + private int _skipFlushOnClose; + public ClosedState CloseReason => (ClosedState)Volatile.Read(ref _closeReason); + public bool ShouldSkipFlush => Volatile.Read(ref _skipFlushOnClose) != 0; + // PING keepalive state private int _pingsOut; private long _lastIn; @@ -139,13 +145,24 @@ public sealed class NatsClient : IDisposable catch (OperationCanceledException) { _logger.LogDebug("Client {ClientId} operation cancelled", Id); + MarkClosed(ClosedState.ServerShutdown); + } + catch (IOException) + { + MarkClosed(ClosedState.ReadError); + } + catch (SocketException) + { + MarkClosed(ClosedState.ReadError); } catch (Exception ex) { _logger.LogDebug(ex, "Client {ClientId} connection error", Id); + MarkClosed(ClosedState.ReadError); } finally { + MarkClosed(ClosedState.ClientClosed); try { _socket.Shutdown(SocketShutdown.Both); } catch (SocketException) { } catch (ObjectDisposedException) { } @@ -522,6 +539,30 @@ public sealed class NatsClient : IDisposable } } + /// + /// Marks this connection as closed with the given reason. + /// Sets skip-flush flag for error-related reasons. + /// Thread-safe — only the first call sets the reason. + /// + public void MarkClosed(ClosedState reason) + { + if (Interlocked.CompareExchange(ref _closeReason, (int)reason, 0) != 0) + return; + + switch (reason) + { + case ClosedState.ReadError: + case ClosedState.WriteError: + case ClosedState.SlowConsumerPendingBytes: + case ClosedState.SlowConsumerWriteDeadline: + case ClosedState.TLSHandshakeError: + Volatile.Write(ref _skipFlushOnClose, 1); + break; + } + + _logger.LogDebug("Client {ClientId} connection closed: {CloseReason}", Id, reason); + } + public void RemoveAllSubscriptions(SubList subList) { foreach (var sub in _subs.Values) From 600c6f9e5a0384a448566d3909f17941fbd5e800 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 22 Feb 2026 23:39:22 -0500 Subject: [PATCH 05/11] feat: add system account ($SYS) and server NKey identity stubs --- src/NATS.Server/NatsServer.cs | 13 +++++++++++++ tests/NATS.Server.Tests/ServerTests.cs | 22 ++++++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 157cc21..3b509aa 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -5,6 +5,7 @@ using System.Net.Sockets; using System.Security.Cryptography.X509Certificates; using System.Text; using Microsoft.Extensions.Logging; +using NATS.NKeys; using NATS.Server.Auth; using NATS.Server.Monitoring; using NATS.Server.Protocol; @@ -25,6 +26,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable private readonly AuthService _authService; private readonly ConcurrentDictionary _accounts = new(StringComparer.Ordinal); private readonly Account _globalAccount; + private readonly Account _systemAccount; private readonly SslServerAuthenticationOptions? _sslOptions; private readonly TlsRateLimiter? _tlsRateLimiter; private Socket? _listener; @@ -39,6 +41,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable public string ServerName => _serverInfo.ServerName; public int ClientCount => _clients.Count; public int Port => _options.Port; + public Account SystemAccount => _systemAccount; + public string ServerNKey { get; } public IEnumerable GetClients() => _clients.Values; public Task WaitForReadyAsync() => _listeningStarted.Task; @@ -51,6 +55,15 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _authService = AuthService.Build(options); _globalAccount = new Account(Account.GlobalAccountName); _accounts[Account.GlobalAccountName] = _globalAccount; + + // Create $SYS system account (stub -- no internal subscriptions yet) + _systemAccount = new Account("$SYS"); + _accounts["$SYS"] = _systemAccount; + + // Generate Ed25519 server NKey identity + using var serverKeyPair = KeyPair.CreatePair(PrefixByte.Server); + ServerNKey = serverKeyPair.GetPublicKey(); + _serverInfo = new ServerInfo { ServerId = Guid.NewGuid().ToString("N")[..20].ToUpperInvariant(), diff --git a/tests/NATS.Server.Tests/ServerTests.cs b/tests/NATS.Server.Tests/ServerTests.cs index e3a6757..b4f3618 100644 --- a/tests/NATS.Server.Tests/ServerTests.cs +++ b/tests/NATS.Server.Tests/ServerTests.cs @@ -525,3 +525,25 @@ public class CloseReasonTests : IAsyncLifetime natsClient.CloseReason.ShouldBe(ClosedState.ClientClosed); } } + +public class ServerIdentityTests +{ + [Fact] + public void Server_creates_system_account() + { + var server = new NatsServer(new NatsOptions { Port = 0 }, NullLoggerFactory.Instance); + server.SystemAccount.ShouldNotBeNull(); + server.SystemAccount.Name.ShouldBe("$SYS"); + server.Dispose(); + } + + [Fact] + public void Server_generates_nkey_identity() + { + var server = new NatsServer(new NatsOptions { Port = 0 }, NullLoggerFactory.Instance); + server.ServerNKey.ShouldNotBeNullOrEmpty(); + // Server NKey public keys start with 'N' + server.ServerNKey[0].ShouldBe('N'); + server.Dispose(); + } +} From b68f898fa0eafa9acb248c983d61c48e579471cf Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 22 Feb 2026 23:43:25 -0500 Subject: [PATCH 06/11] feat: add graceful shutdown, accept loop backoff, and task tracking --- src/NATS.Server/NatsServer.cs | 133 ++++++++++++++++++++++--- tests/NATS.Server.Tests/ServerTests.cs | 112 +++++++++++++++++++++ 2 files changed, 232 insertions(+), 13 deletions(-) diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 3b509aa..2a33091 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -34,6 +34,25 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable private ulong _nextClientId; private long _startTimeTicks; + private readonly CancellationTokenSource _quitCts = new(); + private readonly TaskCompletionSource _shutdownComplete = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly TaskCompletionSource _acceptLoopExited = new(TaskCreationOptions.RunContinuationsAsynchronously); + private int _shutdown; + private int _activeClientCount; + + // Used by future lame duck mode implementation +#pragma warning disable CS0649 // Field is never assigned to + private int _lameDuck; +#pragma warning restore CS0649 + + // Used by future ports file implementation +#pragma warning disable CS0169 // Field is never used + private string? _portsFilePath; +#pragma warning restore CS0169 + + private static readonly TimeSpan AcceptMinSleep = TimeSpan.FromMilliseconds(10); + private static readonly TimeSpan AcceptMaxSleep = TimeSpan.FromSeconds(1); + public SubList SubList => _globalAccount.SubList; public ServerStats Stats => _stats; public DateTime StartTime => new(Interlocked.Read(ref _startTimeTicks), DateTimeKind.Utc); @@ -43,10 +62,56 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable public int Port => _options.Port; public Account SystemAccount => _systemAccount; public string ServerNKey { get; } + public bool IsShuttingDown => Volatile.Read(ref _shutdown) != 0; + public bool IsLameDuckMode => Volatile.Read(ref _lameDuck) != 0; public IEnumerable GetClients() => _clients.Values; public Task WaitForReadyAsync() => _listeningStarted.Task; + public void WaitForShutdown() => _shutdownComplete.Task.GetAwaiter().GetResult(); + + public async Task ShutdownAsync() + { + if (Interlocked.CompareExchange(ref _shutdown, 1, 0) != 0) + return; // Already shutting down + + _logger.LogInformation("Initiating Shutdown..."); + + // Signal all internal loops to stop + await _quitCts.CancelAsync(); + + // Close listener to stop accept loop + _listener?.Close(); + + // Wait for accept loop to exit + await _acceptLoopExited.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); + + // Close all client connections + foreach (var client in _clients.Values) + { + client.MarkClosed(ClosedState.ServerShutdown); + } + + // Wait for active client tasks to drain (with timeout) + if (Volatile.Read(ref _activeClientCount) > 0) + { + using var drainCts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + try + { + while (Volatile.Read(ref _activeClientCount) > 0 && !drainCts.IsCancellationRequested) + await Task.Delay(50, drainCts.Token); + } + catch (OperationCanceledException) { } + } + + // Stop monitor server + if (_monitorServer != null) + await _monitorServer.DisposeAsync(); + + _logger.LogInformation("Server Exiting.."); + _shutdownComplete.TrySetResult(); + } + public NatsServer(NatsOptions options, ILoggerFactory loggerFactory) { _options = options; @@ -89,6 +154,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable public async Task StartAsync(CancellationToken ct) { + using var linked = CancellationTokenSource.CreateLinkedTokenSource(ct, _quitCts.Token); + _listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); _listener.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); _listener.Bind(new IPEndPoint( @@ -107,21 +174,54 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _listeningStarted.TrySetResult(); - _logger.LogInformation("Listening on {Host}:{Port}", _options.Host, _options.Port); + _logger.LogInformation("Listening for client connections on {Host}:{Port}", _options.Host, _options.Port); + + // Warn about stub features + if (_options.ConfigFile != null) + _logger.LogWarning("Config file parsing not yet supported (file: {ConfigFile})", _options.ConfigFile); + if (_options.ProfPort > 0) + _logger.LogWarning("Profiling endpoint not yet supported (port: {ProfPort})", _options.ProfPort); if (_options.MonitorPort > 0) { _monitorServer = new MonitorServer(this, _options, _stats, _loggerFactory); - await _monitorServer.StartAsync(ct); + await _monitorServer.StartAsync(linked.Token); } + var tmpDelay = AcceptMinSleep; + try { - while (!ct.IsCancellationRequested) + while (!linked.Token.IsCancellationRequested) { - var socket = await _listener.AcceptAsync(ct); + Socket socket; + try + { + socket = await _listener.AcceptAsync(linked.Token); + tmpDelay = AcceptMinSleep; // Reset on success + } + catch (OperationCanceledException) + { + break; + } + catch (ObjectDisposedException) + { + break; + } + catch (SocketException ex) + { + if (IsShuttingDown || IsLameDuckMode) + break; - // Check MaxConnections before creating the client + _logger.LogError(ex, "Temporary accept error, sleeping {Delay}ms", tmpDelay.TotalMilliseconds); + try { await Task.Delay(tmpDelay, linked.Token); } + catch (OperationCanceledException) { break; } + + tmpDelay = TimeSpan.FromTicks(Math.Min(tmpDelay.Ticks * 2, AcceptMaxSleep.Ticks)); + continue; + } + + // Check MaxConnections if (_options.MaxConnections > 0 && _clients.Count >= _options.MaxConnections) { _logger.LogWarning("Client connection rejected: maximum connections ({MaxConnections}) exceeded", @@ -131,13 +231,13 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable var stream = new NetworkStream(socket, ownsSocket: false); var errBytes = Encoding.ASCII.GetBytes( $"-ERR '{NatsProtocol.ErrMaxConnectionsExceeded}'\r\n"); - await stream.WriteAsync(errBytes, ct); - await stream.FlushAsync(ct); + await stream.WriteAsync(errBytes, linked.Token); + await stream.FlushAsync(linked.Token); stream.Dispose(); } - catch (Exception ex) + catch (Exception ex2) { - _logger.LogDebug(ex, "Failed to send -ERR to rejected client"); + _logger.LogDebug(ex2, "Failed to send -ERR to rejected client"); } finally { @@ -148,16 +248,21 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable var clientId = Interlocked.Increment(ref _nextClientId); Interlocked.Increment(ref _stats.TotalConnections); + Interlocked.Increment(ref _activeClientCount); _logger.LogDebug("Client {ClientId} connected from {RemoteEndpoint}", clientId, socket.RemoteEndPoint); - _ = AcceptClientAsync(socket, clientId, ct); + _ = AcceptClientAsync(socket, clientId, linked.Token); } } catch (OperationCanceledException) { _logger.LogDebug("Accept loop cancelled, server shutting down"); } + finally + { + _acceptLoopExited.TrySetResult(); + } } private async Task AcceptClientAsync(Socket socket, ulong clientId, CancellationToken ct) @@ -240,8 +345,9 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable } finally { - _logger.LogDebug("Client {ClientId} disconnected", client.Id); + _logger.LogDebug("Client {ClientId} disconnected (reason: {CloseReason})", client.Id, client.CloseReason); RemoveClient(client); + Interlocked.Decrement(ref _activeClientCount); } } @@ -313,8 +419,9 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable public void Dispose() { - if (_monitorServer != null) - _monitorServer.DisposeAsync().AsTask().GetAwaiter().GetResult(); + if (!IsShuttingDown) + ShutdownAsync().GetAwaiter().GetResult(); + _quitCts.Dispose(); _tlsRateLimiter?.Dispose(); _listener?.Dispose(); foreach (var client in _clients.Values) diff --git a/tests/NATS.Server.Tests/ServerTests.cs b/tests/NATS.Server.Tests/ServerTests.cs index b4f3618..caa4d36 100644 --- a/tests/NATS.Server.Tests/ServerTests.cs +++ b/tests/NATS.Server.Tests/ServerTests.cs @@ -547,3 +547,115 @@ public class ServerIdentityTests server.Dispose(); } } + +public class GracefulShutdownTests +{ + private static int GetFreePort() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + sock.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + return ((IPEndPoint)sock.LocalEndPoint!).Port; + } + + [Fact] + public async Task ShutdownAsync_disconnects_all_clients() + { + var port = GetFreePort(); + var server = new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + // Connect 2 raw TCP clients + using var client1 = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await client1.ConnectAsync(IPAddress.Loopback, port); + var buf = new byte[4096]; + await client1.ReceiveAsync(buf, SocketFlags.None); // INFO + + using var client2 = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await client2.ConnectAsync(IPAddress.Loopback, port); + await client2.ReceiveAsync(buf, SocketFlags.None); // INFO + + // Send CONNECT so both are registered + await client1.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPING\r\n")); + await client2.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPING\r\n")); + + // Wait for PONG from both (confirming they are registered) + using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + await client1.ReceiveAsync(buf, SocketFlags.None, readCts.Token); + await client2.ReceiveAsync(buf, SocketFlags.None, readCts.Token); + + server.ClientCount.ShouldBe(2); + + await server.ShutdownAsync(); + + server.ClientCount.ShouldBe(0); + server.Dispose(); + } + + [Fact] + public async Task WaitForShutdown_blocks_until_shutdown() + { + var port = GetFreePort(); + var server = new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + // Start WaitForShutdown in background + var waitTask = Task.Run(() => server.WaitForShutdown()); + + // Give it a moment -- it should NOT complete yet + await Task.Delay(200); + waitTask.IsCompleted.ShouldBeFalse(); + + // Trigger shutdown + await server.ShutdownAsync(); + + // WaitForShutdown should complete within 5 seconds + var completed = await Task.WhenAny(waitTask, Task.Delay(TimeSpan.FromSeconds(5))); + completed.ShouldBe(waitTask); + + server.Dispose(); + } + + [Fact] + public async Task ShutdownAsync_is_idempotent() + { + var port = GetFreePort(); + var server = new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + // Call ShutdownAsync 3 times -- should not throw + await server.ShutdownAsync(); + await server.ShutdownAsync(); + await server.ShutdownAsync(); + + server.IsShuttingDown.ShouldBeTrue(); + server.Dispose(); + } + + [Fact] + public async Task Accept_loop_waits_for_active_clients() + { + var port = GetFreePort(); + var server = new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + // Connect a client + using var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await client.ConnectAsync(IPAddress.Loopback, port); + var buf = new byte[4096]; + await client.ReceiveAsync(buf, SocketFlags.None); // INFO + await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPING\r\n")); + using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + await client.ReceiveAsync(buf, SocketFlags.None, readCts.Token); // PONG + + // ShutdownAsync should complete within 10 seconds (doesn't hang) + var shutdownTask = server.ShutdownAsync(); + var completed = await Task.WhenAny(shutdownTask, Task.Delay(TimeSpan.FromSeconds(10))); + completed.ShouldBe(shutdownTask); + + server.Dispose(); + } +} From 45de110a84888b0d4a5dad92a8b3d58e0b73c3f1 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 22 Feb 2026 23:45:26 -0500 Subject: [PATCH 07/11] feat: add flush-before-close for graceful client shutdown --- src/NATS.Server/NatsClient.cs | 25 ++++++++++ src/NATS.Server/NatsServer.cs | 5 +- tests/NATS.Server.Tests/ServerTests.cs | 69 ++++++++++++++++++++++++++ 3 files changed, 98 insertions(+), 1 deletion(-) diff --git a/src/NATS.Server/NatsClient.cs b/src/NATS.Server/NatsClient.cs index 0e58d82..43f2e28 100644 --- a/src/NATS.Server/NatsClient.cs +++ b/src/NATS.Server/NatsClient.cs @@ -563,6 +563,31 @@ public sealed class NatsClient : IDisposable _logger.LogDebug("Client {ClientId} connection closed: {CloseReason}", Id, reason); } + /// + /// Flushes pending data (unless skip-flush is set) and closes the connection. + /// + public async Task FlushAndCloseAsync(bool minimalFlush = false) + { + if (!ShouldSkipFlush) + { + try + { + using var flushCts = new CancellationTokenSource(minimalFlush + ? TimeSpan.FromMilliseconds(100) + : TimeSpan.FromSeconds(1)); + await _stream.FlushAsync(flushCts.Token); + } + catch (Exception) + { + // Best effort flush — don't let it prevent close + } + } + + try { _socket.Shutdown(SocketShutdown.Both); } + catch (SocketException) { } + catch (ObjectDisposedException) { } + } + public void RemoveAllSubscriptions(SubList subList) { foreach (var sub in _subs.Values) diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 2a33091..199f3cd 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -86,11 +86,14 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable // Wait for accept loop to exit await _acceptLoopExited.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); - // Close all client connections + // Close all client connections — flush first, then mark closed + var flushTasks = new List(); foreach (var client in _clients.Values) { client.MarkClosed(ClosedState.ServerShutdown); + flushTasks.Add(client.FlushAndCloseAsync(minimalFlush: true)); } + await Task.WhenAll(flushTasks).WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); // Wait for active client tasks to drain (with timeout) if (Volatile.Read(ref _activeClientCount) > 0) diff --git a/tests/NATS.Server.Tests/ServerTests.cs b/tests/NATS.Server.Tests/ServerTests.cs index caa4d36..8f39a8c 100644 --- a/tests/NATS.Server.Tests/ServerTests.cs +++ b/tests/NATS.Server.Tests/ServerTests.cs @@ -548,6 +548,75 @@ public class ServerIdentityTests } } +public class FlushBeforeCloseTests +{ + private static int GetFreePort() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + sock.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + return ((IPEndPoint)sock.LocalEndPoint!).Port; + } + + private static async Task ReadUntilAsync(Socket sock, string expected, int timeoutMs = 5000) + { + using var cts = new CancellationTokenSource(timeoutMs); + var sb = new StringBuilder(); + var buf = new byte[4096]; + while (!sb.ToString().Contains(expected)) + { + var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token); + if (n == 0) break; + sb.Append(Encoding.ASCII.GetString(buf, 0, n)); + } + return sb.ToString(); + } + + [Fact] + public async Task Shutdown_flushes_pending_data_to_clients() + { + var port = GetFreePort(); + var server = new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + try + { + // Connect a subscriber via raw socket + using var sub = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sub.ConnectAsync(IPAddress.Loopback, port); + + // Read INFO + var buf = new byte[4096]; + await sub.ReceiveAsync(buf, SocketFlags.None); + + // Subscribe to "foo" + await sub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB foo 1\r\nPING\r\n")); + var pong = await ReadUntilAsync(sub, "PONG"); + pong.ShouldContain("PONG"); + + // Connect a publisher + using var pub = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await pub.ConnectAsync(IPAddress.Loopback, port); + await pub.ReceiveAsync(buf, SocketFlags.None); // INFO + + // Publish "Hello" to "foo" + await pub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPUB foo 5\r\nHello\r\n")); + + // Wait briefly for delivery + await Task.Delay(200); + + // Read from subscriber to verify MSG was received + var msg = await ReadUntilAsync(sub, "Hello\r\n"); + msg.ShouldContain("MSG foo 1 5\r\nHello\r\n"); + } + finally + { + await server.ShutdownAsync(); + server.Dispose(); + } + } +} + public class GracefulShutdownTests { private static int GetFreePort() From 34067f2b9b4353a99df07f11c44e86273fc44bd0 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 22 Feb 2026 23:48:06 -0500 Subject: [PATCH 08/11] feat: add lame duck mode with staggered client shutdown --- src/NATS.Server/NatsServer.cs | 78 ++++++++++++++++- tests/NATS.Server.Tests/ServerTests.cs | 114 +++++++++++++++++++++++++ 2 files changed, 189 insertions(+), 3 deletions(-) diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 199f3cd..88ae76c 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -40,10 +40,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable private int _shutdown; private int _activeClientCount; - // Used by future lame duck mode implementation -#pragma warning disable CS0649 // Field is never assigned to private int _lameDuck; -#pragma warning restore CS0649 // Used by future ports file implementation #pragma warning disable CS0169 // Field is never used @@ -115,6 +112,81 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _shutdownComplete.TrySetResult(); } + public async Task LameDuckShutdownAsync() + { + if (IsShuttingDown || Interlocked.CompareExchange(ref _lameDuck, 1, 0) != 0) + return; + + _logger.LogInformation("Entering lame duck mode, stop accepting new clients"); + + // Close listener to stop accepting new connections + _listener?.Close(); + + // Wait for accept loop to exit + await _acceptLoopExited.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); + + var gracePeriod = _options.LameDuckGracePeriod; + if (gracePeriod < TimeSpan.Zero) gracePeriod = -gracePeriod; + + // If no clients, go straight to shutdown + if (_clients.IsEmpty) + { + await ShutdownAsync(); + return; + } + + // Wait grace period for clients to drain naturally + _logger.LogInformation("Waiting {GracePeriod}ms grace period", gracePeriod.TotalMilliseconds); + try + { + await Task.Delay(gracePeriod, _quitCts.Token); + } + catch (OperationCanceledException) { return; } + + if (_clients.IsEmpty) + { + await ShutdownAsync(); + return; + } + + // Stagger-close remaining clients + var dur = _options.LameDuckDuration - gracePeriod; + if (dur <= TimeSpan.Zero) dur = TimeSpan.FromSeconds(1); + + var clients = _clients.Values.ToList(); + var numClients = clients.Count; + + if (numClients > 0) + { + _logger.LogInformation("Closing {Count} existing clients over {Duration}ms", + numClients, dur.TotalMilliseconds); + + var sleepInterval = dur.Ticks / numClients; + if (sleepInterval < TimeSpan.TicksPerMillisecond) + sleepInterval = TimeSpan.TicksPerMillisecond; + if (sleepInterval > TimeSpan.TicksPerSecond) + sleepInterval = TimeSpan.TicksPerSecond; + + for (int i = 0; i < clients.Count; i++) + { + clients[i].MarkClosed(ClosedState.ServerShutdown); + await clients[i].FlushAndCloseAsync(minimalFlush: true); + + if (i < clients.Count - 1) + { + var jitter = Random.Shared.NextInt64(sleepInterval / 2, sleepInterval); + try + { + await Task.Delay(TimeSpan.FromTicks(jitter), _quitCts.Token); + } + catch (OperationCanceledException) { break; } + } + } + } + + await ShutdownAsync(); + } + public NatsServer(NatsOptions options, ILoggerFactory loggerFactory) { _options = options; diff --git a/tests/NATS.Server.Tests/ServerTests.cs b/tests/NATS.Server.Tests/ServerTests.cs index 8f39a8c..1897fb1 100644 --- a/tests/NATS.Server.Tests/ServerTests.cs +++ b/tests/NATS.Server.Tests/ServerTests.cs @@ -728,3 +728,117 @@ public class GracefulShutdownTests server.Dispose(); } } + +public class LameDuckTests +{ + private static int GetFreePort() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + sock.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + return ((IPEndPoint)sock.LocalEndPoint!).Port; + } + + [Fact] + public async Task LameDuckShutdown_stops_accepting_new_connections() + { + var port = GetFreePort(); + var server = new NatsServer( + new NatsOptions + { + Port = port, + LameDuckDuration = TimeSpan.FromSeconds(3), + LameDuckGracePeriod = TimeSpan.FromMilliseconds(500), + }, + NullLoggerFactory.Instance); + + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + try + { + // Connect 1 client + using var client1 = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await client1.ConnectAsync(IPAddress.Loopback, port); + var buf = new byte[4096]; + await client1.ReceiveAsync(buf, SocketFlags.None); // INFO + await client1.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPING\r\n")); + using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + await client1.ReceiveAsync(buf, SocketFlags.None, readCts.Token); // PONG + + // Start lame duck (don't await yet) + var lameDuckTask = server.LameDuckShutdownAsync(); + + // Wait briefly for listener to close + await Task.Delay(300); + + // Verify lame duck mode is active + server.IsLameDuckMode.ShouldBeTrue(); + + // Try connecting a new client -- should fail (connection refused) + using var client2 = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + var connectAction = async () => + { + await client2.ConnectAsync(IPAddress.Loopback, port); + }; + await connectAction.ShouldThrowAsync(); + + // Await the lame duck task with timeout + var completed = await Task.WhenAny(lameDuckTask, Task.Delay(TimeSpan.FromSeconds(15))); + completed.ShouldBe(lameDuckTask); + } + finally + { + server.Dispose(); + } + } + + [Fact] + public async Task LameDuckShutdown_eventually_closes_all_clients() + { + var port = GetFreePort(); + var server = new NatsServer( + new NatsOptions + { + Port = port, + LameDuckDuration = TimeSpan.FromSeconds(2), + LameDuckGracePeriod = TimeSpan.FromMilliseconds(200), + }, + NullLoggerFactory.Instance); + + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + try + { + // Connect 3 clients via raw sockets + var clients = new List(); + var buf = new byte[4096]; + for (int i = 0; i < 3; i++) + { + var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(IPAddress.Loopback, port); + await sock.ReceiveAsync(buf, SocketFlags.None); // INFO + await sock.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPING\r\n")); + using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + await sock.ReceiveAsync(buf, SocketFlags.None, readCts.Token); // PONG + clients.Add(sock); + } + + server.ClientCount.ShouldBe(3); + + // Await LameDuckShutdownAsync + var lameDuckTask = server.LameDuckShutdownAsync(); + var completed = await Task.WhenAny(lameDuckTask, Task.Delay(TimeSpan.FromSeconds(15))); + completed.ShouldBe(lameDuckTask); + + server.ClientCount.ShouldBe(0); + + foreach (var sock in clients) + sock.Dispose(); + } + finally + { + server.Dispose(); + } + } +} From e57605f090102a9c8896b425ae68174960686182 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 22 Feb 2026 23:50:22 -0500 Subject: [PATCH 09/11] feat: add PID file and ports file support --- src/NATS.Server/NatsServer.cs | 71 ++++++++++++++++++++++++-- tests/NATS.Server.Tests/ServerTests.cs | 59 +++++++++++++++++++++ 2 files changed, 127 insertions(+), 3 deletions(-) diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 88ae76c..f21f0c2 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -42,10 +42,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable private int _lameDuck; - // Used by future ports file implementation -#pragma warning disable CS0169 // Field is never used private string? _portsFilePath; -#pragma warning restore CS0169 private static readonly TimeSpan AcceptMinSleep = TimeSpan.FromMilliseconds(10); private static readonly TimeSpan AcceptMaxSleep = TimeSpan.FromSeconds(1); @@ -108,6 +105,9 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable if (_monitorServer != null) await _monitorServer.DisposeAsync(); + DeletePidFile(); + DeletePortsFile(); + _logger.LogInformation("Server Exiting.."); _shutdownComplete.TrySetResult(); } @@ -263,6 +263,9 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable await _monitorServer.StartAsync(linked.Token); } + WritePidFile(); + WritePortsFile(); + var tmpDelay = AcceptMinSleep; try @@ -492,6 +495,68 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable client.Account?.RemoveClient(client.Id); } + private void WritePidFile() + { + if (string.IsNullOrEmpty(_options.PidFile)) return; + try + { + File.WriteAllText(_options.PidFile, Environment.ProcessId.ToString()); + _logger.LogDebug("Wrote PID file {PidFile}", _options.PidFile); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error writing PID file {PidFile}", _options.PidFile); + } + } + + private void DeletePidFile() + { + if (string.IsNullOrEmpty(_options.PidFile)) return; + try + { + if (File.Exists(_options.PidFile)) + File.Delete(_options.PidFile); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error deleting PID file {PidFile}", _options.PidFile); + } + } + + private void WritePortsFile() + { + if (string.IsNullOrEmpty(_options.PortsFileDir)) return; + try + { + var exeName = Path.GetFileNameWithoutExtension(Environment.ProcessPath ?? "nats-server"); + var fileName = $"{exeName}_{Environment.ProcessId}.ports"; + _portsFilePath = Path.Combine(_options.PortsFileDir, fileName); + + var ports = new { client = _options.Port, monitor = _options.MonitorPort > 0 ? _options.MonitorPort : (int?)null }; + var json = System.Text.Json.JsonSerializer.Serialize(ports); + File.WriteAllText(_portsFilePath, json); + _logger.LogDebug("Wrote ports file {PortsFile}", _portsFilePath); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error writing ports file to {PortsFileDir}", _options.PortsFileDir); + } + } + + private void DeletePortsFile() + { + if (_portsFilePath == null) return; + try + { + if (File.Exists(_portsFilePath)) + File.Delete(_portsFilePath); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error deleting ports file {PortsFile}", _portsFilePath); + } + } + public void Dispose() { if (!IsShuttingDown) diff --git a/tests/NATS.Server.Tests/ServerTests.cs b/tests/NATS.Server.Tests/ServerTests.cs index 1897fb1..dd90668 100644 --- a/tests/NATS.Server.Tests/ServerTests.cs +++ b/tests/NATS.Server.Tests/ServerTests.cs @@ -842,3 +842,62 @@ public class LameDuckTests } } } + +public class PidFileTests : IDisposable +{ + private readonly string _tempDir = Path.Combine(Path.GetTempPath(), $"nats-test-{Guid.NewGuid():N}"); + + public PidFileTests() => Directory.CreateDirectory(_tempDir); + + public void Dispose() + { + if (Directory.Exists(_tempDir)) + Directory.Delete(_tempDir, recursive: true); + } + + private static int GetFreePort() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + sock.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + return ((IPEndPoint)sock.LocalEndPoint!).Port; + } + + [Fact] + public async Task Server_writes_pid_file_on_startup() + { + var pidFile = Path.Combine(_tempDir, "nats.pid"); + var port = GetFreePort(); + var server = new NatsServer(new NatsOptions { Port = port, PidFile = pidFile }, NullLoggerFactory.Instance); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + File.Exists(pidFile).ShouldBeTrue(); + var content = await File.ReadAllTextAsync(pidFile); + int.Parse(content).ShouldBe(Environment.ProcessId); + + await server.ShutdownAsync(); + File.Exists(pidFile).ShouldBeFalse(); + + server.Dispose(); + } + + [Fact] + public async Task Server_writes_ports_file_on_startup() + { + var port = GetFreePort(); + var server = new NatsServer(new NatsOptions { Port = port, PortsFileDir = _tempDir }, NullLoggerFactory.Instance); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var portsFiles = Directory.GetFiles(_tempDir, "*.ports"); + portsFiles.Length.ShouldBe(1); + + var content = await File.ReadAllTextAsync(portsFiles[0]); + content.ShouldContain($"\"client\":{port}"); + + await server.ShutdownAsync(); + Directory.GetFiles(_tempDir, "*.ports").Length.ShouldBe(0); + + server.Dispose(); + } +} From df39ebdc58e7694bcdd192b54ebdbc82451e31a6 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 22 Feb 2026 23:52:49 -0500 Subject: [PATCH 10/11] feat: add signal handling (SIGTERM, SIGUSR2, SIGHUP) and CLI stubs --- src/NATS.Server.Host/Program.cs | 23 +++++++++++++--- src/NATS.Server/NatsServer.cs | 49 +++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 4 deletions(-) diff --git a/src/NATS.Server.Host/Program.cs b/src/NATS.Server.Host/Program.cs index bdb6827..4dde566 100644 --- a/src/NATS.Server.Host/Program.cs +++ b/src/NATS.Server.Host/Program.cs @@ -32,6 +32,15 @@ for (int i = 0; i < args.Length; i++) case "--https_port" when i + 1 < args.Length: options.MonitorHttpsPort = int.Parse(args[++i]); break; + case "-c" when i + 1 < args.Length: + options.ConfigFile = args[++i]; + break; + case "--pid" when i + 1 < args.Length: + options.PidFile = args[++i]; + break; + case "--ports_file_dir" when i + 1 < args.Length: + options.PortsFileDir = args[++i]; + break; case "--tls": break; case "--tlscert" when i + 1 < args.Length: @@ -50,18 +59,24 @@ for (int i = 0; i < args.Length; i++) } using var loggerFactory = new Serilog.Extensions.Logging.SerilogLoggerFactory(Log.Logger); -var server = new NatsServer(options, loggerFactory); +using var server = new NatsServer(options, loggerFactory); -var cts = new CancellationTokenSource(); +// Register signal handlers +server.HandleSignals(); + +// Ctrl+C triggers graceful shutdown Console.CancelKeyPress += (_, e) => { e.Cancel = true; - cts.Cancel(); + Log.Information("Trapped SIGINT signal"); + _ = Task.Run(async () => await server.ShutdownAsync()); }; try { - await server.StartAsync(cts.Token); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + server.WaitForShutdown(); } catch (OperationCanceledException) { diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index f21f0c2..1222559 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -2,6 +2,7 @@ using System.Collections.Concurrent; using System.Net; using System.Net.Security; using System.Net.Sockets; +using System.Runtime.InteropServices; using System.Security.Cryptography.X509Certificates; using System.Text; using Microsoft.Extensions.Logging; @@ -42,6 +43,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable private int _lameDuck; + private readonly List _signalRegistrations = []; + private string? _portsFilePath; private static readonly TimeSpan AcceptMinSleep = TimeSpan.FromMilliseconds(10); @@ -187,6 +190,50 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable await ShutdownAsync(); } + /// + /// Registers Unix signal handlers. + /// SIGTERM → shutdown, SIGUSR2 → lame duck, SIGUSR1 → log reopen (stub), SIGHUP → reload (stub). + /// + public void HandleSignals() + { + _signalRegistrations.Add(PosixSignalRegistration.Create(PosixSignal.SIGTERM, ctx => + { + ctx.Cancel = true; + _logger.LogInformation("Trapped SIGTERM signal"); + _ = Task.Run(async () => await ShutdownAsync()); + })); + + _signalRegistrations.Add(PosixSignalRegistration.Create(PosixSignal.SIGQUIT, ctx => + { + ctx.Cancel = true; + _logger.LogInformation("Trapped SIGQUIT signal"); + _ = Task.Run(async () => await ShutdownAsync()); + })); + + _signalRegistrations.Add(PosixSignalRegistration.Create(PosixSignal.SIGHUP, ctx => + { + ctx.Cancel = true; + _logger.LogWarning("Trapped SIGHUP signal — config reload not yet supported"); + })); + + // SIGUSR1 and SIGUSR2 only on non-Windows + if (!OperatingSystem.IsWindows()) + { + _signalRegistrations.Add(PosixSignalRegistration.Create((PosixSignal)10, ctx => + { + ctx.Cancel = true; + _logger.LogWarning("Trapped SIGUSR1 signal — log reopen not yet supported"); + })); + + _signalRegistrations.Add(PosixSignalRegistration.Create((PosixSignal)12, ctx => + { + ctx.Cancel = true; + _logger.LogInformation("Trapped SIGUSR2 signal — entering lame duck mode"); + _ = Task.Run(async () => await LameDuckShutdownAsync()); + })); + } + } + public NatsServer(NatsOptions options, ILoggerFactory loggerFactory) { _options = options; @@ -561,6 +608,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable { if (!IsShuttingDown) ShutdownAsync().GetAwaiter().GetResult(); + foreach (var reg in _signalRegistrations) + reg.Dispose(); _quitCts.Dispose(); _tlsRateLimiter?.Dispose(); _listener?.Dispose(); From 5323c8bb3056f2ccade0ac463ca03882b3e6c304 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 22 Feb 2026 23:56:57 -0500 Subject: [PATCH 11/11] docs: update differences.md section 1 to reflect core lifecycle implementation --- differences.md | 402 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 402 insertions(+) create mode 100644 differences.md diff --git a/differences.md b/differences.md new file mode 100644 index 0000000..38ba8bd --- /dev/null +++ b/differences.md @@ -0,0 +1,402 @@ +# Go vs .NET NATS Server: Functionality Differences + +> Excludes clustering/routes, gateways, leaf nodes, and JetStream. +> Generated 2026-02-22 by comparing `golang/nats-server/server/` against `src/NATS.Server/`. + +--- + +## 1. Core Server Lifecycle + +### Server Initialization +| Feature | Go | .NET | Notes | +|---------|:--:|:----:|-------| +| NKey generation (server identity) | Y | Y | Ed25519 key pair via NATS.NKeys at startup | +| System account setup | Y | Y | `$SYS` account created; no event publishing yet (stub) | +| Config file validation on startup | Y | Stub | `-c` flag parsed, `ConfigFile` stored, but no config parser | +| PID file writing | Y | Y | Written on startup, deleted on shutdown | +| Profiling HTTP endpoint (`/debug/pprof`) | Y | Stub | `ProfPort` option exists but endpoint not implemented | +| Ports file output | Y | Y | JSON ports file written to `PortsFileDir` on startup | + +### Accept Loop +| Feature | Go | .NET | Notes | +|---------|:--:|:----:|-------| +| Exponential backoff on accept errors | Y | Y | .NET backs off from 10ms to 1s on repeated failures | +| Config reload lock during client creation | Y | N | Go holds `reloadMu` around `createClient` | +| Goroutine/task tracking (WaitGroup) | Y | Y | `Interlocked` counter + drain with 10s timeout on shutdown | +| Callback-based error handling | Y | N | Go uses `errFunc` callback pattern | +| Random/ephemeral port (port=0) | Y | Y | Port resolved after `Bind`+`Listen`, stored in `_options.Port` | + +### Shutdown +| Feature | Go | .NET | Notes | +|---------|:--:|:----:|-------| +| Graceful shutdown with `WaitForShutdown()` | Y | Y | Idempotent CAS-guarded `ShutdownAsync()` + blocking `WaitForShutdown()` | +| Close reason tracking per connection | Y | Y | 37-value `ClosedState` enum, CAS-based first-writer-wins `MarkClosed()` | +| Lame duck mode (stop new, drain existing) | Y | Y | `LameDuckShutdownAsync()` with grace period + stagger-close with jitter | +| Wait for accept loop completion | Y | Y | `TaskCompletionSource` signaled in accept loop `finally` | +| Flush pending data before close | Y | Y | `FlushAndCloseAsync()` with best-effort flush, skip-flush for error conditions | + +### Signal Handling +| Signal | Go | .NET | Notes | +|--------|:--:|:----:|-------| +| SIGINT (Ctrl+C) | Y | Y | Both handle graceful shutdown | +| SIGTERM | Y | Y | `PosixSignalRegistration` triggers `ShutdownAsync()` | +| SIGUSR1 (reopen logs) | Y | Stub | Signal registered, handler logs "not yet implemented" | +| SIGUSR2 (lame duck mode) | Y | Y | Triggers `LameDuckShutdownAsync()` | +| SIGHUP (config reload) | Y | Stub | Signal registered, handler logs "not yet implemented" | +| Windows Service integration | Y | N | | + +--- + +## 2. Client / Connection Handling + +### Concurrency Model +| Feature | Go | .NET | Notes | +|---------|:--:|:----:|-------| +| Separate read + write loops | Y | Partial | Go has dedicated writeLoop goroutine; .NET writes inline | +| Write coalescing / batch flush | Y | N | Go queues messages, flushes in batch via writeLoop | +| Dynamic buffer sizing (512B-64KB) | Y | N | .NET delegates to `System.IO.Pipelines` | +| Output buffer pooling (3-tier) | Y | N | Go pools at 512B, 4KB, 64KB | + +### Connection Types +| Type | Go | .NET | Notes | +|------|:--:|:----:|-------| +| CLIENT | Y | Y | | +| ROUTER | Y | N | Excluded per scope | +| GATEWAY | Y | N | Excluded per scope | +| LEAF | Y | N | Excluded per scope | +| SYSTEM (internal) | Y | N | | +| JETSTREAM (internal) | Y | N | | +| ACCOUNT (internal) | Y | N | | +| WebSocket clients | Y | N | | +| MQTT clients | Y | N | | + +### Client Features +| Feature | Go | .NET | Notes | +|---------|:--:|:----:|-------| +| Echo suppression (`echo: false`) | Y | Y | .NET checks echo in delivery path (NatsServer.cs:234,253) | +| Verbose mode (`+OK` responses) | Y | N | .NET never sends `+OK` | +| No-responders validation | Y | N | Flag parsed but not validated/enforced | +| Slow consumer detection | Y | N | **Critical gap** — Go tracks pending bytes, stalls, and closes | +| Write deadline / timeout policies | Y | N | Go has Close vs Retry policies | +| RTT measurement | Y | N | Go tracks round-trip time per client | +| Per-client trace mode | Y | N | | +| Detailed close reason tracking | Y | Y | 37-value `ClosedState` enum with CAS-based `MarkClosed()` | +| Connection state flags (16 flags) | Y | Partial | .NET only tracks `_connectReceived` | + +### Slow Consumer Handling (Go-only) +Go implements a sophisticated slow consumer detection system: +- Tracks `pendingBytes` per client output buffer +- If pending exceeds `maxPending`, enters stall mode (2-5ms waits) +- Total stall capped at 10ms per read cycle +- Closes with `SlowConsumerPendingBytes` or `SlowConsumerWriteDeadline` +- Sets `isSlowConsumer` flag for monitoring + +.NET has no equivalent — writes are synchronous and unbounded. + +### Stats Tracking +| Feature | Go | .NET | Notes | +|---------|:--:|:----:|-------| +| Per-connection atomic stats | Y | Y | .NET uses `Interlocked` for stats access | +| Per-read-cycle stat batching | Y | N | Go accumulates then flushes atomically | +| Per-account stats | Y | N | | +| Slow consumer counters | Y | N | Stat fields exist but never incremented | + +--- + +## 3. Protocol Parsing + +### Parser Architecture +| Aspect | Go | .NET | +|--------|-----|------| +| Approach | Byte-by-byte state machine (74 states) | Two-phase: line extraction + command dispatch | +| Case handling | Per-state character checks | Bit-mask lowercase normalization (`\| 0x20`) | +| Buffer strategy | Jump-ahead optimization for payloads | Direct size-based reads via Pipe | +| Split-buffer handling | argBuf accumulation with scratch buffer | State variables (`_awaitingPayload`, etc.) | +| Error model | Inline error sending + error return | Exception-based (`ProtocolViolationException`) | +| CRLF in payload | Included in message buffer | Excluded by design | + +### Protocol Operations +| Operation | Go | .NET | Notes | +|-----------|:--:|:----:|-------| +| PUB | Y | Y | | +| HPUB (headers) | Y | Y | | +| SUB | Y | Y | | +| UNSUB | Y | Y | | +| CONNECT | Y | Y | | +| INFO | Y | Y | | +| PING / PONG | Y | Y | | +| MSG / HMSG | Y | Y | | +| +OK / -ERR | Y | Y | | +| RS+/RS-/RMSG (routes) | Y | N | Excluded per scope | +| A+/A- (accounts) | Y | N | Excluded per scope | +| LS+/LS-/LMSG (leaf) | Y | N | Excluded per scope | + +### Protocol Parsing Gaps +| Feature | Go | .NET | Notes | +|---------|:--:|:----:|-------| +| Multi-client-type command routing | Y | N | Go checks `c.kind` to allow/reject commands | +| Protocol tracing in parser | Y | N | Go calls `traceInOp()` per operation | +| Subject mapping (input→output) | Y | N | Go transforms subjects via mapping rules | +| MIME header parsing | Y | N | .NET delegates header handling to client layer | +| Message trace event initialization | Y | N | | + +### Protocol Writing +| Aspect | Go | .NET | Notes | +|--------|:--:|:----:|-------| +| INFO serialization | Once at startup | Every send | .NET re-serializes JSON each time | +| MSG/HMSG construction | Direct buffer write | String interpolation → byte encode | More allocations in .NET | +| Pre-encoded constants | Y | Y | Both pre-encode PING/PONG/OK | + +--- + +## 4. Subscriptions & Subject Matching + +### Trie Implementation +| Feature | Go | .NET | Notes | +|---------|:--:|:----:|-------| +| Basic trie with `*`/`>` wildcards | Y | Y | Core matching identical | +| Queue group support | Y | Y | | +| Result caching (1024 max) | Y | Y | Same limits | +| `plist` optimization (>256 subs) | Y | N | Go converts high-fanout nodes to array | +| Async cache sweep (background) | Y | N | .NET sweeps inline under write lock | +| Atomic generation ID for invalidation | Y | N | .NET clears cache explicitly | +| Cache eviction strategy | Random | First-N | Semantic difference minimal | + +### Missing SubList Features +| Feature | Go | .NET | Notes | +|---------|:--:|:----:|-------| +| `Stats()` — comprehensive statistics | Y | N | Matches, cache hits, inserts, removes, fanout | +| `HasInterest()` — fast bool check | Y | N | | +| `NumInterest()` — fast count | Y | N | | +| `ReverseMatch()` — pattern→literal query | Y | N | | +| `RemoveBatch()` — efficient bulk removal | Y | N | | +| `All()` — enumerate all subscriptions | Y | N | | +| Notification system (interest changes) | Y | N | | +| Local/remote subscription filtering | Y | N | | +| Queue weight expansion (remote subs) | Y | N | | +| `MatchBytes()` — zero-copy byte API | Y | N | | + +### Subject Validation +| Feature | Go | .NET | Notes | +|---------|:--:|:----:|-------| +| Basic validation (empty tokens, wildcards) | Y | Y | | +| Literal subject check | Y | Y | | +| UTF-8/null rune validation | Y | N | Go has `checkRunes` parameter | +| Collision detection (`SubjectsCollide`) | Y | N | | +| Token utilities (`tokenAt`, `numTokens`) | Y | N | | +| Stack-allocated token buffer | Y | N | Go uses `[32]string{}` on stack | + +### Subscription Lifecycle +| Feature | Go | .NET | Notes | +|---------|:--:|:----:|-------| +| Per-account subscription limit | Y | N | | +| Auto-unsubscribe on max messages | Y | Y | .NET enforces at delivery time (NatsServer.cs:269-270) | +| Subscription routing propagation | Y | N | For clusters | +| Queue weight (`qw`) field | Y | N | For remote queue load balancing | + +--- + +## 5. Authentication & Authorization + +### Auth Mechanisms +| Mechanism | Go | .NET | Notes | +|-----------|:--:|:----:|-------| +| Username/password | Y | Y | | +| Token | Y | Y | | +| NKeys (Ed25519) | Y | Y | .NET has framework but integration is basic | +| JWT validation | Y | N | | +| Bcrypt password hashing | Y | Y | .NET supports bcrypt (`$2*` prefix) with constant-time fallback | +| TLS certificate mapping | Y | N | Property exists but no implementation | +| Custom auth interface | Y | N | | +| External auth callout | Y | N | | +| Proxy authentication | Y | N | | +| Bearer tokens | Y | N | | +| User revocation tracking | Y | N | | + +### Account System +| Feature | Go | .NET | Notes | +|---------|:--:|:----:|-------| +| Per-account SubList isolation | Y | Y | | +| Multi-account user resolution | Y | N | .NET has basic account, no resolution | +| Account exports/imports | Y | N | | +| Per-account connection limits | Y | N | | +| Per-account subscription limits | Y | N | | +| Account JetStream limits | Y | N | Excluded per scope | + +### Permissions +| Feature | Go | .NET | Notes | +|---------|:--:|:----:|-------| +| Publish allow list | Y | Y | | +| Subscribe allow list | Y | Y | | +| Publish deny list | Y | Partial | .NET has deny in struct but limited enforcement | +| Subscribe deny list | Y | Partial | Same | +| Message-level deny filtering | Y | N | Go filters at delivery time | +| Permission caching (128 entries) | Y | N | | +| Response permissions (reply tracking) | Y | N | Dynamic reply subject authorization | +| Permission templates (JWT) | Y | N | e.g., `{{name()}}`, `{{account-tag(...)}}` | + +--- + +## 6. Configuration + +### CLI Flags +| Flag | Go | .NET | Notes | +|------|:--:|:----:|-------| +| `-p/--port` | Y | Y | | +| `-a/--addr` | Y | Y | | +| `-n/--name` (ServerName) | Y | Y | | +| `-m/--http_port` (monitoring) | Y | Y | | +| `-c` (config file) | Y | Stub | Flag parsed, stored in `ConfigFile`, no config parser | +| `-D/-V/-DV` (debug/trace) | Y | N | | +| `--tlscert/--tlskey/--tlscacert` | Y | Y | | +| `--tlsverify` | Y | Y | | +| `--http_base_path` | Y | Y | | +| `--https_port` | Y | Y | | + +### Configuration System +| Feature | Go | .NET | Notes | +|---------|:--:|:----:|-------| +| Config file parsing | Y | N | Go has custom `conf` parser with includes | +| Hot reload (SIGHUP) | Y | N | | +| Config change detection | Y | N | Go tracks `inConfig`/`inCmdLine` origins | +| ~450 option fields | Y | ~54 | .NET covers core options only | + +### Missing Options Categories +- Logging options (file, rotation, syslog, trace levels) +- Advanced limits (MaxSubs, MaxSubTokens, MaxPending, WriteDeadline) +- Tags/metadata +- OCSP configuration +- WebSocket/MQTT options +- Operator mode / account resolver + +--- + +## 7. Monitoring + +### HTTP Endpoints +| Endpoint | Go | .NET | Notes | +|----------|:--:|:----:|-------| +| `/healthz` | Y | Y | | +| `/varz` | Y | Y | | +| `/connz` | Y | Y | | +| `/` (root listing) | Y | Y | | +| `/routez` | Y | Stub | Returns empty response | +| `/gatewayz` | Y | Stub | Returns empty response | +| `/leafz` | Y | Stub | Returns empty response | +| `/subz` / `/subscriptionsz` | Y | Stub | Returns empty response | +| `/accountz` | Y | Stub | Returns empty response | +| `/accstatz` | Y | Stub | Returns empty response | +| `/jsz` | Y | Stub | Returns empty response | + +### Varz Response +| Field Category | Go | .NET | Notes | +|----------------|:--:|:----:|-------| +| Identity (ID, Name, Version) | Y | Y | | +| Network (Host, Port, URLs) | Y | Y | | +| Security (AuthRequired, TLS) | Y | Y | | +| Limits (MaxConn, MaxPayload) | Y | Y | | +| Timing (Start, Now, Uptime) | Y | Y | | +| Runtime (Mem, CPU, Cores) | Y | Y | | +| Connections (current, total) | Y | Y | | +| Messages (in/out msgs/bytes) | Y | Y | | +| SlowConsumer breakdown | Y | N | Go tracks per connection type | +| Cluster/Gateway/Leaf blocks | Y | N | Excluded per scope | +| JetStream block | Y | N | Excluded per scope | +| TLS cert expiry info | Y | N | | + +### Connz Response +| Feature | Go | .NET | Notes | +|---------|:--:|:----:|-------| +| Filtering by CID, user, account | Y | Partial | | +| Sorting (11 options) | Y | Y | .NET missing ByStop, ByReason | +| Pagination (offset, limit) | Y | Y | | +| Subscription detail mode | Y | N | | +| TLS peer certificate info | Y | N | | +| JWT/IssuerKey/Tags fields | Y | N | | +| MQTT client ID filtering | Y | N | | +| Proxy info | Y | N | | + +--- + +## 8. TLS + +### TLS Modes +| Mode | Go | .NET | Notes | +|------|:--:|:----:|-------| +| No TLS | Y | Y | | +| INFO-first (default NATS) | Y | Y | | +| TLS-first (before INFO) | Y | Y | | +| Mixed/Fallback | Y | Y | | +| TLS-required | Y | Y | | + +### TLS Features +| Feature | Go | .NET | Notes | +|---------|:--:|:----:|-------| +| PEM cert/key loading | Y | Y | | +| CA chain validation | Y | Y | | +| Mutual TLS (client certs) | Y | Y | | +| Certificate pinning (SHA256 SPKI) | Y | Y | | +| TLS handshake timeout | Y | Y | | +| TLS rate limiting | Y | Property only | .NET has the option but enforcement is partial | +| First-byte peeking (0x16 detection) | Y | Y | | +| Cert subject→user mapping | Y | N | `TlsMap` property exists, no implementation | +| OCSP stapling | Y | N | | +| Min TLS version control | Y | Y | | + +--- + +## 9. Logging + +| Feature | Go | .NET | Notes | +|---------|:--:|:----:|-------| +| Structured logging | Partial | Y | .NET uses Serilog with ILogger | +| File logging with rotation | Y | N | | +| Syslog (local and remote) | Y | N | | +| Log reopening (SIGUSR1) | Y | N | | +| Trace mode (protocol-level) | Y | N | | +| Debug mode | Y | N | | +| Per-subsystem log control | Y | N | | +| Color output on TTY | Y | N | | +| Timestamp format control | Y | N | | + +--- + +## 10. Ping/Pong & Keepalive + +| Feature | Go | .NET | Notes | +|---------|:--:|:----:|-------| +| Server-initiated PING | Y | Y | | +| Configurable interval | Y | Y | PingInterval option | +| Max pings out | Y | Y | MaxPingsOut option | +| Stale connection close | Y | Y | | +| RTT-based first PING delay | Y | N | Go delays first PING based on RTT | +| RTT tracking | Y | N | | +| Stale connection watcher | Y | N | Go has dedicated watcher goroutine | + +--- + +## Summary: Critical Gaps for Production Use + +### High Priority +1. **Slow consumer detection** — unbounded writes can exhaust memory (stat fields exist but no detection logic) +2. **Write coalescing / batch flush** — performance gap for high-throughput scenarios + +### Medium Priority +3. **Verbose mode** — clients expect `+OK` when `verbose: true` +4. **Permission deny enforcement at delivery** — deny lists checked at SUB/PUB time but not during message delivery +5. **Config file parsing** — needed for production deployment (CLI stub exists) +6. **Hot reload** — needed for zero-downtime config changes (SIGHUP stub exists) +7. **File logging with rotation** — needed for production logging +8. **No-responders validation** — flag parsed but not enforced + +### Lower Priority +9. **Dynamic buffer sizing** — delegated to Pipe, less optimized for long-lived connections +10. **JWT authentication** — needed for operator mode +11. **TLS certificate mapping** — property exists, not implemented +12. **OCSP support** — certificate revocation checking +13. **Subject mapping** — input→output subject transformation +14. **Protocol tracing** — no trace-level logging +15. **Subscription statistics** — SubList has no stats collection +16. **Per-account limits** — connections, subscriptions per account +17. **Reply subject tracking** — dynamic response permissions +18. **Windows Service integration** — needed for Windows deployment