using System.Collections.Concurrent; using System.Net; using System.Net.Security; using System.Net.Sockets; using System.Runtime.InteropServices; using System.Security.Cryptography.X509Certificates; using System.Text; using Microsoft.Extensions.Logging; using NATS.NKeys; using NATS.Server.Auth; using NATS.Server.Monitoring; using NATS.Server.Protocol; using NATS.Server.Subscriptions; using NATS.Server.Tls; namespace NATS.Server; public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable { private readonly NatsOptions _options; private readonly ConcurrentDictionary _clients = new(); private readonly ServerInfo _serverInfo; private readonly ILogger _logger; private readonly ILoggerFactory _loggerFactory; private readonly ServerStats _stats = new(); private readonly TaskCompletionSource _listeningStarted = new(TaskCreationOptions.RunContinuationsAsynchronously); private readonly AuthService _authService; private readonly ConcurrentDictionary _accounts = new(StringComparer.Ordinal); private readonly Account _globalAccount; private readonly Account _systemAccount; private readonly SslServerAuthenticationOptions? _sslOptions; private readonly TlsRateLimiter? _tlsRateLimiter; private Socket? _listener; private MonitorServer? _monitorServer; private ulong _nextClientId; private long _startTimeTicks; private readonly CancellationTokenSource _quitCts = new(); private readonly TaskCompletionSource _shutdownComplete = new(TaskCreationOptions.RunContinuationsAsynchronously); private readonly TaskCompletionSource _acceptLoopExited = new(TaskCreationOptions.RunContinuationsAsynchronously); private int _shutdown; private int _activeClientCount; private int _lameDuck; private byte[] _cachedInfoLine = []; private readonly List _signalRegistrations = []; private string? _portsFilePath; private static readonly TimeSpan AcceptMinSleep = TimeSpan.FromMilliseconds(10); private static readonly TimeSpan AcceptMaxSleep = TimeSpan.FromSeconds(1); public SubList SubList => _globalAccount.SubList; public byte[] CachedInfoLine => _cachedInfoLine; public ServerStats Stats => _stats; public DateTime StartTime => new(Interlocked.Read(ref _startTimeTicks), DateTimeKind.Utc); public string ServerId => _serverInfo.ServerId; public string ServerName => _serverInfo.ServerName; public int ClientCount => _clients.Count; public int Port => _options.Port; public Account SystemAccount => _systemAccount; public string ServerNKey { get; } public bool IsShuttingDown => Volatile.Read(ref _shutdown) != 0; public bool IsLameDuckMode => Volatile.Read(ref _lameDuck) != 0; public IEnumerable GetClients() => _clients.Values; public Task WaitForReadyAsync() => _listeningStarted.Task; public void WaitForShutdown() => _shutdownComplete.Task.GetAwaiter().GetResult(); public async Task ShutdownAsync() { if (Interlocked.CompareExchange(ref _shutdown, 1, 0) != 0) return; // Already shutting down _logger.LogInformation("Initiating Shutdown..."); // Signal all internal loops to stop await _quitCts.CancelAsync(); // Close listener to stop accept loop _listener?.Close(); // Wait for accept loop to exit await _acceptLoopExited.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); // Close all client connections — flush first, then mark closed var flushTasks = new List(); foreach (var client in _clients.Values) { client.MarkClosed(ClientClosedReason.ServerShutdown); flushTasks.Add(client.FlushAndCloseAsync(minimalFlush: true)); } await Task.WhenAll(flushTasks).WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); // Wait for active client tasks to drain (with timeout) if (Volatile.Read(ref _activeClientCount) > 0) { using var drainCts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); try { while (Volatile.Read(ref _activeClientCount) > 0 && !drainCts.IsCancellationRequested) await Task.Delay(50, drainCts.Token); } catch (OperationCanceledException) { } } // Stop monitor server if (_monitorServer != null) await _monitorServer.DisposeAsync(); DeletePidFile(); DeletePortsFile(); _logger.LogInformation("Server Exiting.."); _shutdownComplete.TrySetResult(); } public async Task LameDuckShutdownAsync() { if (IsShuttingDown || Interlocked.CompareExchange(ref _lameDuck, 1, 0) != 0) return; _logger.LogInformation("Entering lame duck mode, stop accepting new clients"); // Close listener to stop accepting new connections _listener?.Close(); // Wait for accept loop to exit await _acceptLoopExited.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); var gracePeriod = _options.LameDuckGracePeriod; if (gracePeriod < TimeSpan.Zero) gracePeriod = -gracePeriod; // If no clients, go straight to shutdown if (_clients.IsEmpty) { await ShutdownAsync(); return; } // Wait grace period for clients to drain naturally _logger.LogInformation("Waiting {GracePeriod}ms grace period", gracePeriod.TotalMilliseconds); try { await Task.Delay(gracePeriod, _quitCts.Token); } catch (OperationCanceledException) { return; } if (_clients.IsEmpty) { await ShutdownAsync(); return; } // Stagger-close remaining clients var dur = _options.LameDuckDuration - gracePeriod; if (dur <= TimeSpan.Zero) dur = TimeSpan.FromSeconds(1); var clients = _clients.Values.ToList(); var numClients = clients.Count; if (numClients > 0) { _logger.LogInformation("Closing {Count} existing clients over {Duration}ms", numClients, dur.TotalMilliseconds); var sleepInterval = dur.Ticks / numClients; if (sleepInterval < TimeSpan.TicksPerMillisecond) sleepInterval = TimeSpan.TicksPerMillisecond; if (sleepInterval > TimeSpan.TicksPerSecond) sleepInterval = TimeSpan.TicksPerSecond; for (int i = 0; i < clients.Count; i++) { clients[i].MarkClosed(ClientClosedReason.ServerShutdown); await clients[i].FlushAndCloseAsync(minimalFlush: true); if (i < clients.Count - 1) { var jitter = Random.Shared.NextInt64(sleepInterval / 2, sleepInterval); try { await Task.Delay(TimeSpan.FromTicks(jitter), _quitCts.Token); } catch (OperationCanceledException) { break; } } } } await ShutdownAsync(); } /// /// Registers Unix signal handlers. /// SIGTERM → shutdown, SIGUSR2 → lame duck, SIGUSR1 → log reopen (stub), SIGHUP → reload (stub). /// public void HandleSignals() { _signalRegistrations.Add(PosixSignalRegistration.Create(PosixSignal.SIGTERM, ctx => { ctx.Cancel = true; _logger.LogInformation("Trapped SIGTERM signal"); _ = Task.Run(async () => await ShutdownAsync()); })); _signalRegistrations.Add(PosixSignalRegistration.Create(PosixSignal.SIGQUIT, ctx => { ctx.Cancel = true; _logger.LogInformation("Trapped SIGQUIT signal"); _ = Task.Run(async () => await ShutdownAsync()); })); _signalRegistrations.Add(PosixSignalRegistration.Create(PosixSignal.SIGHUP, ctx => { ctx.Cancel = true; _logger.LogWarning("Trapped SIGHUP signal — config reload not yet supported"); })); // SIGUSR1 and SIGUSR2 only on non-Windows if (!OperatingSystem.IsWindows()) { _signalRegistrations.Add(PosixSignalRegistration.Create((PosixSignal)10, ctx => { ctx.Cancel = true; _logger.LogWarning("Trapped SIGUSR1 signal — log reopen not yet supported"); })); _signalRegistrations.Add(PosixSignalRegistration.Create((PosixSignal)12, ctx => { ctx.Cancel = true; _logger.LogInformation("Trapped SIGUSR2 signal — entering lame duck mode"); _ = Task.Run(async () => await LameDuckShutdownAsync()); })); } } public NatsServer(NatsOptions options, ILoggerFactory loggerFactory) { _options = options; _loggerFactory = loggerFactory; _logger = loggerFactory.CreateLogger(); _authService = AuthService.Build(options); _globalAccount = new Account(Account.GlobalAccountName); _accounts[Account.GlobalAccountName] = _globalAccount; // Create $SYS system account (stub -- no internal subscriptions yet) _systemAccount = new Account("$SYS"); _accounts["$SYS"] = _systemAccount; // Generate Ed25519 server NKey identity using var serverKeyPair = KeyPair.CreatePair(PrefixByte.Server); ServerNKey = serverKeyPair.GetPublicKey(); _serverInfo = new ServerInfo { ServerId = Guid.NewGuid().ToString("N")[..20].ToUpperInvariant(), ServerName = options.ServerName ?? $"nats-dotnet-{Environment.MachineName}", Version = NatsProtocol.Version, Host = options.Host, Port = options.Port, MaxPayload = options.MaxPayload, AuthRequired = _authService.IsAuthRequired, }; if (options.HasTls) { _sslOptions = TlsHelper.BuildServerAuthOptions(options); _serverInfo.TlsRequired = !options.AllowNonTls; _serverInfo.TlsAvailable = options.AllowNonTls; _serverInfo.TlsVerify = options.TlsVerify; if (options.TlsRateLimit > 0) _tlsRateLimiter = new TlsRateLimiter(options.TlsRateLimit); } BuildCachedInfo(); } private void BuildCachedInfo() { var infoJson = System.Text.Json.JsonSerializer.Serialize(_serverInfo); _cachedInfoLine = Encoding.ASCII.GetBytes($"INFO {infoJson}\r\n"); } public async Task StartAsync(CancellationToken ct) { using var linked = CancellationTokenSource.CreateLinkedTokenSource(ct, _quitCts.Token); _listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); _listener.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); _listener.Bind(new IPEndPoint( _options.Host == "0.0.0.0" ? IPAddress.Any : IPAddress.Parse(_options.Host), _options.Port)); Interlocked.Exchange(ref _startTimeTicks, DateTime.UtcNow.Ticks); _listener.Listen(128); // Resolve ephemeral port if port=0 if (_options.Port == 0) { var actualPort = ((IPEndPoint)_listener.LocalEndPoint!).Port; _options.Port = actualPort; _serverInfo.Port = actualPort; BuildCachedInfo(); } _listeningStarted.TrySetResult(); _logger.LogInformation("Listening for client connections on {Host}:{Port}", _options.Host, _options.Port); // Warn about stub features if (_options.ConfigFile != null) _logger.LogWarning("Config file parsing not yet supported (file: {ConfigFile})", _options.ConfigFile); if (_options.ProfPort > 0) _logger.LogWarning("Profiling endpoint not yet supported (port: {ProfPort})", _options.ProfPort); if (_options.MonitorPort > 0) { _monitorServer = new MonitorServer(this, _options, _stats, _loggerFactory); await _monitorServer.StartAsync(linked.Token); } WritePidFile(); WritePortsFile(); var tmpDelay = AcceptMinSleep; try { while (!linked.Token.IsCancellationRequested) { Socket socket; try { socket = await _listener.AcceptAsync(linked.Token); tmpDelay = AcceptMinSleep; // Reset on success } catch (OperationCanceledException) { break; } catch (ObjectDisposedException) { break; } catch (SocketException ex) { if (IsShuttingDown || IsLameDuckMode) break; _logger.LogError(ex, "Temporary accept error, sleeping {Delay}ms", tmpDelay.TotalMilliseconds); try { await Task.Delay(tmpDelay, linked.Token); } catch (OperationCanceledException) { break; } tmpDelay = TimeSpan.FromTicks(Math.Min(tmpDelay.Ticks * 2, AcceptMaxSleep.Ticks)); continue; } // Check MaxConnections if (_options.MaxConnections > 0 && _clients.Count >= _options.MaxConnections) { _logger.LogWarning("Client connection rejected: maximum connections ({MaxConnections}) exceeded", _options.MaxConnections); try { var stream = new NetworkStream(socket, ownsSocket: false); var errBytes = Encoding.ASCII.GetBytes( $"-ERR '{NatsProtocol.ErrMaxConnectionsExceeded}'\r\n"); await stream.WriteAsync(errBytes, linked.Token); await stream.FlushAsync(linked.Token); stream.Dispose(); } catch (Exception ex2) { _logger.LogDebug(ex2, "Failed to send -ERR to rejected client"); } finally { socket.Dispose(); } continue; } var clientId = Interlocked.Increment(ref _nextClientId); Interlocked.Increment(ref _stats.TotalConnections); Interlocked.Increment(ref _activeClientCount); _logger.LogDebug("Client {ClientId} connected from {RemoteEndpoint}", clientId, socket.RemoteEndPoint); _ = AcceptClientAsync(socket, clientId, linked.Token); } } catch (OperationCanceledException) { _logger.LogDebug("Accept loop cancelled, server shutting down"); } finally { _acceptLoopExited.TrySetResult(); } } private async Task AcceptClientAsync(Socket socket, ulong clientId, CancellationToken ct) { try { // Rate limit TLS handshakes if (_tlsRateLimiter != null) await _tlsRateLimiter.WaitAsync(ct); var networkStream = new NetworkStream(socket, ownsSocket: false); // TLS negotiation (no-op if not configured) var (stream, infoAlreadySent) = await TlsConnectionWrapper.NegotiateAsync( socket, networkStream, _options, _sslOptions, _serverInfo, _loggerFactory.CreateLogger("NATS.Server.Tls"), ct); // Extract TLS state TlsConnectionState? tlsState = null; if (stream is SslStream ssl) { tlsState = new TlsConnectionState( ssl.SslProtocol.ToString(), ssl.NegotiatedCipherSuite.ToString(), ssl.RemoteCertificate as X509Certificate2); } // Build per-client ServerInfo with nonce if NKey auth is configured byte[]? nonce = null; var clientInfo = _serverInfo; if (_authService.NonceRequired) { var rawNonce = _authService.GenerateNonce(); var nonceStr = _authService.EncodeNonce(rawNonce); // The client signs the nonce string (ASCII), not the raw bytes nonce = Encoding.ASCII.GetBytes(nonceStr); clientInfo = new ServerInfo { ServerId = _serverInfo.ServerId, ServerName = _serverInfo.ServerName, Version = _serverInfo.Version, Host = _serverInfo.Host, Port = _serverInfo.Port, MaxPayload = _serverInfo.MaxPayload, AuthRequired = _serverInfo.AuthRequired, TlsRequired = _serverInfo.TlsRequired, TlsAvailable = _serverInfo.TlsAvailable, TlsVerify = _serverInfo.TlsVerify, Nonce = nonceStr, }; } var clientLogger = _loggerFactory.CreateLogger($"NATS.Server.NatsClient[{clientId}]"); var client = new NatsClient(clientId, stream, socket, _options, clientInfo, _authService, nonce, clientLogger, _stats); client.Router = this; client.TlsState = tlsState; client.InfoAlreadySent = infoAlreadySent; _clients[clientId] = client; await RunClientAsync(client, ct); } catch (Exception ex) { _logger.LogDebug(ex, "Failed to accept client {ClientId}", clientId); try { socket.Shutdown(SocketShutdown.Both); } catch { } socket.Dispose(); } } private async Task RunClientAsync(NatsClient client, CancellationToken ct) { try { await client.RunAsync(ct); } catch (Exception ex) { _logger.LogDebug(ex, "Client {ClientId} disconnected with error", client.Id); } finally { _logger.LogDebug("Client {ClientId} disconnected (reason: {CloseReason})", client.Id, client.CloseReason); RemoveClient(client); Interlocked.Decrement(ref _activeClientCount); } } public void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory headers, ReadOnlyMemory payload, NatsClient sender) { var subList = sender.Account?.SubList ?? _globalAccount.SubList; var result = subList.Match(subject); var delivered = false; // Deliver to plain subscribers foreach (var sub in result.PlainSubs) { if (sub.Client == null || sub.Client == sender && !(sender.ClientOpts?.Echo ?? true)) continue; DeliverMessage(sub, subject, replyTo, headers, payload); delivered = true; } // Deliver to one member of each queue group (round-robin) foreach (var queueGroup in result.QueueSubs) { if (queueGroup.Length == 0) continue; // Simple round-robin -- pick based on total delivered across group var idx = Math.Abs((int)Interlocked.Increment(ref sender.OutMsgs)) % queueGroup.Length; // Undo the OutMsgs increment -- it will be incremented properly in SendMessage Interlocked.Decrement(ref sender.OutMsgs); for (int attempt = 0; attempt < queueGroup.Length; attempt++) { var sub = queueGroup[(idx + attempt) % queueGroup.Length]; if (sub.Client != null && (sub.Client != sender || (sender.ClientOpts?.Echo ?? true))) { DeliverMessage(sub, subject, replyTo, headers, payload); delivered = true; break; } } } // No-responders: if nobody received the message and the publisher // opted in, send back a 503 status HMSG on the reply subject. if (!delivered && replyTo != null && sender.ClientOpts?.NoResponders == true) { SendNoResponders(sender, replyTo); } } private void DeliverMessage(Subscription sub, string subject, string? replyTo, ReadOnlyMemory headers, ReadOnlyMemory payload) { var client = sub.Client; if (client == null) return; // Check auto-unsub var count = Interlocked.Increment(ref sub.MessageCount); if (sub.MaxMessages > 0 && count > sub.MaxMessages) { // Clean up exhausted subscription from trie and client tracking var subList = client.Account?.SubList ?? _globalAccount.SubList; subList.Remove(sub); client.RemoveSubscription(sub.Sid); return; } // Deny-list delivery filter if (client.Permissions?.IsDeliveryAllowed(subject) == false) return; client.SendMessage(subject, sub.Sid, replyTo, headers, payload); // Track reply subject for response permissions if (replyTo != null && client.Permissions?.ResponseTracker != null) { if (client.Permissions.IsPublishAllowed(replyTo) == false) client.Permissions.ResponseTracker.RegisterReply(replyTo); } } private static void SendNoResponders(NatsClient sender, string replyTo) { // Find the sid for a subscription matching the reply subject var sid = string.Empty; foreach (var sub in sender.Subscriptions.Values) { if (SubjectMatch.MatchLiteral(replyTo, sub.Subject)) { sid = sub.Sid; break; } } // Build: HMSG {replyTo} {sid} {hdrLen} {hdrLen}\r\n{headers}\r\n var headerBlock = "NATS/1.0 503\r\n\r\n"u8; var hdrLen = headerBlock.Length; var controlLine = Encoding.ASCII.GetBytes($"HMSG {replyTo} {sid} {hdrLen} {hdrLen}\r\n"); var totalLen = controlLine.Length + hdrLen + NatsProtocol.CrLf.Length; var msg = new byte[totalLen]; var offset = 0; controlLine.CopyTo(msg.AsSpan(offset)); offset += controlLine.Length; headerBlock.CopyTo(msg.AsSpan(offset)); offset += hdrLen; NatsProtocol.CrLf.CopyTo(msg.AsSpan(offset)); sender.QueueOutbound(msg); } public Account GetOrCreateAccount(string name) { return _accounts.GetOrAdd(name, n => { var acc = new Account(n); if (_options.Accounts != null && _options.Accounts.TryGetValue(n, out var config)) { acc.MaxConnections = config.MaxConnections; acc.MaxSubscriptions = config.MaxSubscriptions; acc.DefaultPermissions = config.DefaultPermissions; } return acc; }); } public void RemoveClient(NatsClient client) { _clients.TryRemove(client.Id, out _); _logger.LogDebug("Removed client {ClientId}", client.Id); var subList = client.Account?.SubList ?? _globalAccount.SubList; client.RemoveAllSubscriptions(subList); client.Account?.RemoveClient(client.Id); } private void WritePidFile() { if (string.IsNullOrEmpty(_options.PidFile)) return; try { File.WriteAllText(_options.PidFile, Environment.ProcessId.ToString()); _logger.LogDebug("Wrote PID file {PidFile}", _options.PidFile); } catch (Exception ex) { _logger.LogError(ex, "Error writing PID file {PidFile}", _options.PidFile); } } private void DeletePidFile() { if (string.IsNullOrEmpty(_options.PidFile)) return; try { if (File.Exists(_options.PidFile)) File.Delete(_options.PidFile); } catch (Exception ex) { _logger.LogError(ex, "Error deleting PID file {PidFile}", _options.PidFile); } } private void WritePortsFile() { if (string.IsNullOrEmpty(_options.PortsFileDir)) return; try { var exeName = Path.GetFileNameWithoutExtension(Environment.ProcessPath ?? "nats-server"); var fileName = $"{exeName}_{Environment.ProcessId}.ports"; _portsFilePath = Path.Combine(_options.PortsFileDir, fileName); var ports = new { client = _options.Port, monitor = _options.MonitorPort > 0 ? _options.MonitorPort : (int?)null }; var json = System.Text.Json.JsonSerializer.Serialize(ports); File.WriteAllText(_portsFilePath, json); _logger.LogDebug("Wrote ports file {PortsFile}", _portsFilePath); } catch (Exception ex) { _logger.LogError(ex, "Error writing ports file to {PortsFileDir}", _options.PortsFileDir); } } private void DeletePortsFile() { if (_portsFilePath == null) return; try { if (File.Exists(_portsFilePath)) File.Delete(_portsFilePath); } catch (Exception ex) { _logger.LogError(ex, "Error deleting ports file {PortsFile}", _portsFilePath); } } public void Dispose() { if (!IsShuttingDown) ShutdownAsync().GetAwaiter().GetResult(); foreach (var reg in _signalRegistrations) reg.Dispose(); _quitCts.Dispose(); _tlsRateLimiter?.Dispose(); _listener?.Dispose(); foreach (var client in _clients.Values) client.Dispose(); foreach (var account in _accounts.Values) account.Dispose(); } }