diff --git a/src/NATS.Server/Routes/RouteManager.cs b/src/NATS.Server/Routes/RouteManager.cs index f41353b..51f2008 100644 --- a/src/NATS.Server/Routes/RouteManager.cs +++ b/src/NATS.Server/Routes/RouteManager.cs @@ -18,7 +18,9 @@ public sealed class RouteManager : IAsyncDisposable private readonly Action _remoteSubSink; private readonly Action _routedMessageSink; private readonly ConcurrentDictionary _routes = new(StringComparer.Ordinal); + private readonly ConcurrentDictionary _accountRoutes = new(StringComparer.Ordinal); private readonly ConcurrentDictionary _connectedServerIds = new(StringComparer.Ordinal); + private readonly ConcurrentDictionary _routesByHash = new(); private readonly HashSet _discoveredRoutes = new(StringComparer.OrdinalIgnoreCase); private readonly HashSet _knownRouteUrls = new(StringComparer.OrdinalIgnoreCase); @@ -28,6 +30,36 @@ public sealed class RouteManager : IAsyncDisposable public string ListenEndpoint => $"{_options.Host}:{_options.Port}"; + /// + /// The pool size configured in cluster options. Matches the Go server's + /// default of 3 when not explicitly configured. + /// Go reference: server/route.go opts.Cluster.PoolSize. + /// + public int ConfiguredPoolSize => _options.PoolSize > 0 ? _options.PoolSize : 3; + + /// + /// Returns the negotiated pool size for the connected route identified by + /// , or the configured pool size if no + /// such route connection exists. + /// Go reference: server/route.go negotiateRoutePool. + /// + public int GetEffectivePoolSize(string? remoteServerId) + { + if (remoteServerId is { Length: > 0 }) + { + foreach (var route in _routes.Values) + { + if (string.Equals(route.RemoteServerId, remoteServerId, StringComparison.Ordinal) + && route.NegotiatedPoolSize > 0) + { + return route.NegotiatedPoolSize; + } + } + } + + return ConfiguredPoolSize; + } + public RouteTopologySnapshot BuildTopologySnapshot() { return new RouteTopologySnapshot( @@ -133,11 +165,16 @@ public sealed class RouteManager : IAsyncDisposable } /// - /// Returns the route connection responsible for the given account, based on - /// pool index computed from the account name. Returns null if no routes exist. + /// Returns the route connection responsible for the given account. Checks + /// dedicated account routes first, then falls back to pool-based selection. + /// Go reference: server/route.go — per-account dedicated route lookup. /// public RouteConnection? GetRouteForAccount(string account) { + // Check dedicated account routes first (Gap 13.2). + if (_accountRoutes.TryGetValue(account, out var dedicated)) + return dedicated; + if (_routes.IsEmpty) return null; @@ -150,6 +187,118 @@ public sealed class RouteManager : IAsyncDisposable return routes[idx % routes.Length]; } + /// + /// Registers a dedicated route connection for a specific account. If a + /// previous connection was registered for the same account it is replaced. + /// Go reference: server/route.go — per-account dedicated route registration. + /// + public void RegisterAccountRoute(string account, RouteConnection connection) + { + _accountRoutes[account] = connection; + } + + /// + /// Removes the dedicated route for the given account. If no dedicated route + /// was registered this is a no-op. + /// + public void UnregisterAccountRoute(string account) + { + _accountRoutes.TryRemove(account, out _); + } + + /// + /// Returns the dedicated route connection for the given account, or null if + /// no dedicated route has been registered. + /// + public RouteConnection? GetDedicatedAccountRoute(string account) + => _accountRoutes.TryGetValue(account, out var connection) ? connection : null; + + /// + /// Returns true when a dedicated route is registered for the given account. + /// + public bool HasDedicatedRoute(string account) + => _accountRoutes.ContainsKey(account); + + /// + /// Returns the set of account names that currently have a dedicated route + /// registered. + /// + public IReadOnlyCollection GetAccountsWithDedicatedRoutes() + => (IReadOnlyCollection)_accountRoutes.Keys; + + /// + /// The number of dedicated per-account routes currently registered. + /// + public int DedicatedRouteCount => _accountRoutes.Count; + + // ------------------------------------------------------------------------- + // Hash-based O(1) route storage (Gap 13.4) + // Go reference: server/route.go — server-ID-keyed route hash map. + // ------------------------------------------------------------------------- + + /// + /// Computes a 64-bit FNV-1a hash of the given server ID. Used as the key + /// for O(1) route lookup by hash. Identical algorithm to the 32-bit pool + /// index variant but uses 64-bit constants for a wider key space. + /// Go reference: server/route.go — route hash key derivation. + /// + public static ulong ComputeRouteHash(string serverId) + { + const ulong fnvOffset = 14695981039346656037UL; + const ulong fnvPrime = 1099511628211UL; + var hash = fnvOffset; + foreach (var b in System.Text.Encoding.UTF8.GetBytes(serverId)) + { + hash ^= b; + hash *= fnvPrime; + } + return hash; + } + + /// + /// Stores in the hash map keyed by the + /// FNV-1a hash of . An existing entry for the + /// same server ID is overwritten. + /// Go reference: server/route.go — route hash registration. + /// + public void RegisterRouteByHash(string serverId, RouteConnection connection) + { + var hash = ComputeRouteHash(serverId); + _routesByHash[hash] = connection; + } + + /// + /// Removes the entry keyed by the FNV-1a hash of . + /// If no entry exists this is a no-op. + /// Go reference: server/route.go — route hash deregistration. + /// + public void UnregisterRouteByHash(string serverId) + { + var hash = ComputeRouteHash(serverId); + _routesByHash.TryRemove(hash, out _); + } + + /// + /// Returns the route connection associated with , + /// or null if no entry exists. O(1) lookup. + /// Go reference: server/route.go — O(1) route lookup by hash. + /// + public RouteConnection? GetRouteByHash(ulong hash) + => _routesByHash.TryGetValue(hash, out var connection) ? connection : null; + + /// + /// Convenience overload: computes the hash from + /// and returns the associated route connection, or null. + /// Go reference: server/route.go — server-ID-keyed route lookup. + /// + public RouteConnection? GetRouteByServerId(string serverId) + => GetRouteByHash(ComputeRouteHash(serverId)); + + /// + /// The number of entries in the hash-keyed route map. + /// + public int HashedRouteCount => _routesByHash.Count; + public Task StartAsync(CancellationToken ct) { _cts = CancellationTokenSource.CreateLinkedTokenSource(ct); @@ -375,9 +524,110 @@ public sealed class RouteManager : IAsyncDisposable } public int RouteCount => _routes.Count; + + /// + /// Adds a route directly to both internal dictionaries. Used for testing + /// and for programmatic registration of routes by server ID. + /// Go reference: server/route.go addRoute (internal registration path). + /// + internal void RegisterRoute(string serverId, RouteConnection connection) + { + var key = $"{serverId}:{Guid.NewGuid():N}"; + _routes[key] = connection; + _connectedServerIds[serverId] = 0; + } + + /// + /// Removes the route associated with the given server ID from both the + /// route pool and the connected-server-ID set. + /// Returns true if a route was found and removed, false otherwise. + /// Go reference: server/route.go removeRoute (lines 3113+). + /// + public bool RemoveRoute(string serverId) + { + var found = false; + var prefix = serverId + ":"; + + foreach (var key in _routes.Keys.ToArray()) + { + if (!key.StartsWith(prefix, StringComparison.Ordinal)) + continue; + + if (_routes.TryRemove(key, out _)) + found = true; + } + + if (found) + _connectedServerIds.TryRemove(serverId, out _); + + return found; + } + + /// + /// Removes all routes whose server ID is not in . + /// Returns the number of routes removed. + /// Go reference: server/route.go removeAllRoutesExcept (lines 3085-3111). + /// + public int RemoveAllRoutesExcept(IReadOnlySet keepServerIds) + { + var removed = 0; + + foreach (var key in _routes.Keys.ToArray()) + { + var colonIdx = key.IndexOf(':', StringComparison.Ordinal); + var keyServerId = colonIdx >= 0 ? key[..colonIdx] : key; + + if (!keepServerIds.Contains(keyServerId)) + { + if (_routes.TryRemove(key, out _)) + { + _connectedServerIds.TryRemove(keyServerId, out _); + removed++; + } + } + } + + return removed; + } + + /// + /// Compares the currently-connected server IDs against the expected peer set + /// and returns which peers are missing and which are unexpected. + /// is true when any expected peers + /// are absent from the connected set. + /// Go reference: server/route.go — cluster split / network partition detection. + /// + public ClusterSplitResult DetectClusterSplit(IReadOnlySet expectedPeers) + { + var connected = _connectedServerIds.Keys.ToHashSet(StringComparer.Ordinal); + + var missing = expectedPeers + .Where(p => !connected.Contains(p)) + .OrderBy(static p => p, StringComparer.Ordinal) + .ToList(); + + var unexpected = connected + .Where(c => !expectedPeers.Contains(c)) + .OrderBy(static c => c, StringComparer.Ordinal) + .ToList(); + + return new ClusterSplitResult(missing, unexpected, missing.Count > 0); + } } public sealed record RouteTopologySnapshot( string ServerId, int RouteCount, IReadOnlyList ConnectedServerIds); + +/// +/// Describes the outcome of comparing a 's current +/// connected peers against the expected full peer set. +/// is true when at least one expected peer is not connected, +/// indicating a network partition or cluster split. +/// Go reference: server/route.go — partition detection logic. +/// +public sealed record ClusterSplitResult( + IReadOnlyList MissingPeers, + IReadOnlyList UnexpectedPeers, + bool IsSplit); diff --git a/tests/NATS.Server.Tests/Routes/AccountRouteTests.cs b/tests/NATS.Server.Tests/Routes/AccountRouteTests.cs new file mode 100644 index 0000000..b94de27 --- /dev/null +++ b/tests/NATS.Server.Tests/Routes/AccountRouteTests.cs @@ -0,0 +1,197 @@ +// Reference: golang/nats-server/server/route.go — per-account dedicated route registration (Gap 13.2). +// Tests for account-specific dedicated route connections in RouteManager. + +using System.Net; +using System.Net.Sockets; +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Server.Configuration; +using NATS.Server.Routes; + +namespace NATS.Server.Tests.Routes; + +/// +/// Tests for per-account dedicated route connections (Gap 13.2). +/// Verifies that RouteManager correctly stores, retrieves, and prioritises +/// dedicated routes over pool-based routes for specific accounts. +/// Go reference: server/route.go — per-account dedicated route handling. +/// +public class AccountRouteTests : IDisposable +{ + // -- Helpers -- + + // Track listeners and sockets for cleanup after each test. + private readonly List _listeners = []; + private readonly List _sockets = []; + + public void Dispose() + { + foreach (var s in _sockets) s.Dispose(); + foreach (var l in _listeners) l.Stop(); + } + + private static RouteManager CreateManager() => + new( + new ClusterOptions { Host = "127.0.0.1", Port = 0, Routes = [] }, + new ServerStats(), + Guid.NewGuid().ToString("N"), + _ => { }, + _ => { }, + NullLogger.Instance); + + /// + /// Creates a RouteConnection backed by a connected loopback socket pair so + /// that RouteConnection can construct its internal NetworkStream without + /// throwing. Both sockets and the listener are tracked for disposal. + /// + private RouteConnection MakeConnection() + { + var listener = new TcpListener(IPAddress.Loopback, 0); + listener.Start(); + _listeners.Add(listener); + + var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + _sockets.Add(client); + client.Connect((IPEndPoint)listener.LocalEndpoint); + + var server = listener.AcceptSocket(); + _sockets.Add(server); + + return new RouteConnection(server); + } + + // -- Tests -- + + // Go: server/route.go — per-account dedicated route registration. + [Fact] + public void RegisterAccountRoute_AddsRoute() + { + var manager = CreateManager(); + var connection = MakeConnection(); + + manager.RegisterAccountRoute("ACCT-A", connection); + + manager.GetDedicatedAccountRoute("ACCT-A").ShouldBeSameAs(connection); + } + + // Go: server/route.go — overwrite existing dedicated route for same account. + [Fact] + public void RegisterAccountRoute_OverwritesPrevious() + { + var manager = CreateManager(); + var first = MakeConnection(); + var second = MakeConnection(); + + manager.RegisterAccountRoute("ACCT-A", first); + manager.RegisterAccountRoute("ACCT-A", second); + + manager.GetDedicatedAccountRoute("ACCT-A").ShouldBeSameAs(second); + } + + // Go: server/route.go — removing a dedicated route cleans up the entry. + [Fact] + public void UnregisterAccountRoute_RemovesRoute() + { + var manager = CreateManager(); + var connection = MakeConnection(); + + manager.RegisterAccountRoute("ACCT-A", connection); + manager.UnregisterAccountRoute("ACCT-A"); + + manager.GetDedicatedAccountRoute("ACCT-A").ShouldBeNull(); + } + + // Go: server/route.go — unregistering a never-registered account is safe. + [Fact] + public void UnregisterAccountRoute_NonExistent_NoOp() + { + var manager = CreateManager(); + + // Must not throw. + var ex = Record.Exception(() => manager.UnregisterAccountRoute("NONEXISTENT")); + ex.ShouldBeNull(); + } + + // Go: server/route.go — lookup on unregistered account returns null. + [Fact] + public void GetDedicatedAccountRoute_NotRegistered_ReturnsNull() + { + var manager = CreateManager(); + + manager.GetDedicatedAccountRoute("UNKNOWN").ShouldBeNull(); + } + + // Go: server/route.go — HasDedicatedRoute returns true for registered account. + [Fact] + public void HasDedicatedRoute_RegisteredReturnsTrue() + { + var manager = CreateManager(); + var connection = MakeConnection(); + + manager.RegisterAccountRoute("ACCT-B", connection); + + manager.HasDedicatedRoute("ACCT-B").ShouldBeTrue(); + } + + // Go: server/route.go — HasDedicatedRoute returns false for unknown account. + [Fact] + public void HasDedicatedRoute_NotRegisteredReturnsFalse() + { + var manager = CreateManager(); + + manager.HasDedicatedRoute("ACCT-B").ShouldBeFalse(); + } + + // Go: server/route.go — listing accounts with dedicated routes. + [Fact] + public void GetAccountsWithDedicatedRoutes_ReturnsAllRegistered() + { + var manager = CreateManager(); + + manager.RegisterAccountRoute("ACCT-1", MakeConnection()); + manager.RegisterAccountRoute("ACCT-2", MakeConnection()); + manager.RegisterAccountRoute("ACCT-3", MakeConnection()); + + var accounts = manager.GetAccountsWithDedicatedRoutes(); + + accounts.Count.ShouldBe(3); + accounts.ShouldContain("ACCT-1"); + accounts.ShouldContain("ACCT-2"); + accounts.ShouldContain("ACCT-3"); + } + + // Go: server/route.go — DedicatedRouteCount tracks registered entries. + [Fact] + public void DedicatedRouteCount_MatchesRegistrations() + { + var manager = CreateManager(); + + manager.DedicatedRouteCount.ShouldBe(0); + + manager.RegisterAccountRoute("ACCT-X", MakeConnection()); + manager.DedicatedRouteCount.ShouldBe(1); + + manager.RegisterAccountRoute("ACCT-Y", MakeConnection()); + manager.DedicatedRouteCount.ShouldBe(2); + + manager.UnregisterAccountRoute("ACCT-X"); + manager.DedicatedRouteCount.ShouldBe(1); + } + + // Go: server/route.go — dedicated route takes priority over pool-based route + // for the account it is registered against. + [Fact] + public void GetRouteForAccount_PrefersDedicatedRoute() + { + var manager = CreateManager(); + + // Register a dedicated route for "ACCT-PREF". + var dedicated = MakeConnection(); + manager.RegisterAccountRoute("ACCT-PREF", dedicated); + + // GetRouteForAccount must return the dedicated connection even though + // no pool routes exist (the dedicated path short-circuits pool lookup). + var result = manager.GetRouteForAccount("ACCT-PREF"); + + result.ShouldBeSameAs(dedicated); + } +}