using System.Collections.Concurrent; using System.Net; using System.Net.Sockets; using Microsoft.Extensions.Logging; using NATS.Server.Configuration; using NATS.Server.Subscriptions; namespace NATS.Server.Routes; public sealed class RouteManager : IAsyncDisposable { private static readonly ConcurrentDictionary Managers = new(StringComparer.Ordinal); private readonly ClusterOptions _options; private readonly ServerStats _stats; private readonly string _serverId; private readonly ILogger _logger; private readonly Action _remoteSubSink; private readonly Action _routedMessageSink; private readonly ConcurrentDictionary _routes = new(StringComparer.Ordinal); private readonly ConcurrentDictionary _connectedServerIds = new(StringComparer.Ordinal); private CancellationTokenSource? _cts; private Socket? _listener; private Task? _acceptLoopTask; public string ListenEndpoint => $"{_options.Host}:{_options.Port}"; public RouteTopologySnapshot BuildTopologySnapshot() { return new RouteTopologySnapshot( _serverId, _routes.Count, _connectedServerIds.Keys.OrderBy(static k => k, StringComparer.Ordinal).ToArray()); } public RouteManager( ClusterOptions options, ServerStats stats, string serverId, Action remoteSubSink, Action routedMessageSink, ILogger logger) { _options = options; _stats = stats; _serverId = serverId; _remoteSubSink = remoteSubSink; _routedMessageSink = routedMessageSink; _logger = logger; } /// /// Returns a route pool index for the given account name, matching Go's /// computeRoutePoolIdx (route.go:533-545). Uses FNV-1a 32-bit hash /// to deterministically map account names to pool indices. /// public static int ComputeRoutePoolIdx(int poolSize, string accountName) { if (poolSize <= 1) return 0; var bytes = System.Text.Encoding.UTF8.GetBytes(accountName); // Use FNV-1a to match Go exactly uint fnvHash = 2166136261; // FNV offset basis foreach (var b in bytes) { fnvHash ^= b; fnvHash *= 16777619; // FNV prime } return (int)(fnvHash % (uint)poolSize); } /// /// Returns the route connection responsible for the given account, based on /// pool index computed from the account name. Returns null if no routes exist. /// public RouteConnection? GetRouteForAccount(string account) { if (_routes.IsEmpty) return null; var routes = _routes.Values.ToArray(); if (routes.Length == 0) return null; var poolSize = routes.Length; var idx = ComputeRoutePoolIdx(poolSize, account); return routes[idx % routes.Length]; } public Task StartAsync(CancellationToken ct) { _cts = CancellationTokenSource.CreateLinkedTokenSource(ct); Managers[_serverId] = this; _listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); _listener.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); _listener.Bind(new IPEndPoint(IPAddress.Parse(_options.Host), _options.Port)); _listener.Listen(128); if (_options.Port == 0) _options.Port = ((IPEndPoint)_listener.LocalEndPoint!).Port; _acceptLoopTask = Task.Run(() => AcceptLoopAsync(_cts.Token)); var poolSize = Math.Max(_options.PoolSize, 1); foreach (var route in _options.Routes.Distinct(StringComparer.OrdinalIgnoreCase)) { for (var i = 0; i < poolSize; i++) { var poolIndex = i; _ = Task.Run(() => ConnectToRouteWithRetryAsync(route, poolIndex, _cts.Token)); } } return Task.CompletedTask; } public async ValueTask DisposeAsync() { if (_cts == null) return; await _cts.CancelAsync(); _listener?.Dispose(); if (_acceptLoopTask != null) await _acceptLoopTask.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); foreach (var route in _routes.Values) await route.DisposeAsync(); _routes.Clear(); _connectedServerIds.Clear(); Managers.TryRemove(_serverId, out _); Interlocked.Exchange(ref _stats.Routes, 0); _cts.Dispose(); _cts = null; } public void PropagateLocalSubscription(string account, string subject, string? queue) { if (_routes.IsEmpty) return; foreach (var route in _routes.Values) { _ = route.SendRsPlusAsync(account, subject, queue, _cts?.Token ?? CancellationToken.None); } } public void PropagateLocalUnsubscription(string account, string subject, string? queue) { if (_routes.IsEmpty) return; foreach (var route in _routes.Values) _ = route.SendRsMinusAsync(account, subject, queue, _cts?.Token ?? CancellationToken.None); } public async Task ForwardRoutedMessageAsync(string account, string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) { if (_routes.IsEmpty) return; // Use account-based pool routing: route the message only through the // connection responsible for this account, matching Go's behavior. var route = GetRouteForAccount(account); if (route != null) { await route.SendRmsgAsync(account, subject, replyTo, payload, ct); return; } // Fallback: broadcast to all routes if pool routing fails foreach (var r in _routes.Values) await r.SendRmsgAsync(account, subject, replyTo, payload, ct); } private async Task AcceptLoopAsync(CancellationToken ct) { while (!ct.IsCancellationRequested) { Socket socket; try { socket = await _listener!.AcceptAsync(ct); } catch (OperationCanceledException) { break; } catch (ObjectDisposedException) { break; } catch (Exception ex) { _logger.LogDebug(ex, "Route accept loop error"); break; } _ = Task.Run(() => HandleInboundRouteAsync(socket, ct), ct); } } private async Task HandleInboundRouteAsync(Socket socket, CancellationToken ct) { var route = new RouteConnection(socket); try { await route.PerformInboundHandshakeAsync(_serverId, ct); Register(route); } catch (Exception ex) { _logger.LogDebug(ex, "Inbound route handshake failed"); await route.DisposeAsync(); } } private async Task ConnectToRouteWithRetryAsync(string route, int poolIndex, CancellationToken ct) { while (!ct.IsCancellationRequested) { try { var endPoint = ParseRouteEndpoint(route); var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await socket.ConnectAsync(endPoint.Address, endPoint.Port, ct); var connection = new RouteConnection(socket) { PoolIndex = poolIndex }; await connection.PerformOutboundHandshakeAsync(_serverId, ct); Register(connection); return; } catch (OperationCanceledException) { return; } catch (Exception ex) { _logger.LogDebug(ex, "Failed to connect route seed {Route}", route); } try { await Task.Delay(250, ct); } catch (OperationCanceledException) { return; } } } private void Register(RouteConnection route) { var key = $"{route.RemoteServerId}:{route.RemoteEndpoint}:{Guid.NewGuid():N}"; if (!_routes.TryAdd(key, route)) { _ = route.DisposeAsync(); return; } if (route.RemoteServerId is { Length: > 0 } remoteServerId) _connectedServerIds[remoteServerId] = 0; route.RemoteSubscriptionReceived = sub => { _remoteSubSink(sub); return Task.CompletedTask; }; route.RoutedMessageReceived = msg => { _routedMessageSink(msg); return Task.CompletedTask; }; route.StartFrameLoop(_cts!.Token); Interlocked.Increment(ref _stats.Routes); _ = Task.Run(() => WatchRouteAsync(key, route, _cts!.Token)); } private async Task WatchRouteAsync(string key, RouteConnection route, CancellationToken ct) { try { await route.WaitUntilClosedAsync(ct); } catch (OperationCanceledException) { // Shutdown path. } catch (Exception ex) { _logger.LogDebug(ex, "Route {RouteKey} closed with error", key); } finally { if (_routes.TryRemove(key, out _)) Interlocked.Decrement(ref _stats.Routes); await route.DisposeAsync(); } } private static IPEndPoint ParseRouteEndpoint(string route) { var trimmed = route.Trim(); var parts = trimmed.Split(':', 2, StringSplitOptions.TrimEntries | StringSplitOptions.RemoveEmptyEntries); if (parts.Length != 2) throw new FormatException($"Invalid route endpoint: '{route}'"); return new IPEndPoint(IPAddress.Parse(parts[0]), int.Parse(parts[1])); } public int RouteCount => _routes.Count; } public sealed record RouteTopologySnapshot( string ServerId, int RouteCount, IReadOnlyList ConnectedServerIds);