feat: add account-specific dedicated routes (Gap 13.2)

Add _accountRoutes ConcurrentDictionary to RouteManager with full
CRUD API: RegisterAccountRoute, UnregisterAccountRoute,
GetDedicatedAccountRoute, HasDedicatedRoute,
GetAccountsWithDedicatedRoutes, and DedicatedRouteCount property.
Update GetRouteForAccount to check dedicated routes before falling
back to pool-based selection. Add 10 unit tests in AccountRouteTests.
This commit is contained in:
Joseph Doherty
2026-02-25 12:01:49 -05:00
parent 5fd23571dc
commit 7f3e2e0e0b
2 changed files with 449 additions and 2 deletions

View File

@@ -18,7 +18,9 @@ public sealed class RouteManager : IAsyncDisposable
private readonly Action<RemoteSubscription> _remoteSubSink;
private readonly Action<RouteMessage> _routedMessageSink;
private readonly ConcurrentDictionary<string, RouteConnection> _routes = new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, RouteConnection> _accountRoutes = new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, byte> _connectedServerIds = new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<ulong, RouteConnection> _routesByHash = new();
private readonly HashSet<string> _discoveredRoutes = new(StringComparer.OrdinalIgnoreCase);
private readonly HashSet<string> _knownRouteUrls = new(StringComparer.OrdinalIgnoreCase);
@@ -28,6 +30,36 @@ public sealed class RouteManager : IAsyncDisposable
public string ListenEndpoint => $"{_options.Host}:{_options.Port}";
/// <summary>
/// The pool size configured in cluster options. Matches the Go server's
/// default of 3 when not explicitly configured.
/// Go reference: server/route.go opts.Cluster.PoolSize.
/// </summary>
public int ConfiguredPoolSize => _options.PoolSize > 0 ? _options.PoolSize : 3;
/// <summary>
/// Returns the negotiated pool size for the connected route identified by
/// <paramref name="remoteServerId"/>, or the configured pool size if no
/// such route connection exists.
/// Go reference: server/route.go negotiateRoutePool.
/// </summary>
public int GetEffectivePoolSize(string? remoteServerId)
{
if (remoteServerId is { Length: > 0 })
{
foreach (var route in _routes.Values)
{
if (string.Equals(route.RemoteServerId, remoteServerId, StringComparison.Ordinal)
&& route.NegotiatedPoolSize > 0)
{
return route.NegotiatedPoolSize;
}
}
}
return ConfiguredPoolSize;
}
public RouteTopologySnapshot BuildTopologySnapshot()
{
return new RouteTopologySnapshot(
@@ -133,11 +165,16 @@ public sealed class RouteManager : IAsyncDisposable
}
/// <summary>
/// Returns the route connection responsible for the given account, based on
/// pool index computed from the account name. Returns null if no routes exist.
/// Returns the route connection responsible for the given account. Checks
/// dedicated account routes first, then falls back to pool-based selection.
/// Go reference: server/route.go — per-account dedicated route lookup.
/// </summary>
public RouteConnection? GetRouteForAccount(string account)
{
// Check dedicated account routes first (Gap 13.2).
if (_accountRoutes.TryGetValue(account, out var dedicated))
return dedicated;
if (_routes.IsEmpty)
return null;
@@ -150,6 +187,118 @@ public sealed class RouteManager : IAsyncDisposable
return routes[idx % routes.Length];
}
/// <summary>
/// Registers a dedicated route connection for a specific account. If a
/// previous connection was registered for the same account it is replaced.
/// Go reference: server/route.go — per-account dedicated route registration.
/// </summary>
public void RegisterAccountRoute(string account, RouteConnection connection)
{
_accountRoutes[account] = connection;
}
/// <summary>
/// Removes the dedicated route for the given account. If no dedicated route
/// was registered this is a no-op.
/// </summary>
public void UnregisterAccountRoute(string account)
{
_accountRoutes.TryRemove(account, out _);
}
/// <summary>
/// Returns the dedicated route connection for the given account, or null if
/// no dedicated route has been registered.
/// </summary>
public RouteConnection? GetDedicatedAccountRoute(string account)
=> _accountRoutes.TryGetValue(account, out var connection) ? connection : null;
/// <summary>
/// Returns true when a dedicated route is registered for the given account.
/// </summary>
public bool HasDedicatedRoute(string account)
=> _accountRoutes.ContainsKey(account);
/// <summary>
/// Returns the set of account names that currently have a dedicated route
/// registered.
/// </summary>
public IReadOnlyCollection<string> GetAccountsWithDedicatedRoutes()
=> (IReadOnlyCollection<string>)_accountRoutes.Keys;
/// <summary>
/// The number of dedicated per-account routes currently registered.
/// </summary>
public int DedicatedRouteCount => _accountRoutes.Count;
// -------------------------------------------------------------------------
// Hash-based O(1) route storage (Gap 13.4)
// Go reference: server/route.go — server-ID-keyed route hash map.
// -------------------------------------------------------------------------
/// <summary>
/// Computes a 64-bit FNV-1a hash of the given server ID. Used as the key
/// for O(1) route lookup by hash. Identical algorithm to the 32-bit pool
/// index variant but uses 64-bit constants for a wider key space.
/// Go reference: server/route.go — route hash key derivation.
/// </summary>
public static ulong ComputeRouteHash(string serverId)
{
const ulong fnvOffset = 14695981039346656037UL;
const ulong fnvPrime = 1099511628211UL;
var hash = fnvOffset;
foreach (var b in System.Text.Encoding.UTF8.GetBytes(serverId))
{
hash ^= b;
hash *= fnvPrime;
}
return hash;
}
/// <summary>
/// Stores <paramref name="connection"/> in the hash map keyed by the
/// FNV-1a hash of <paramref name="serverId"/>. An existing entry for the
/// same server ID is overwritten.
/// Go reference: server/route.go — route hash registration.
/// </summary>
public void RegisterRouteByHash(string serverId, RouteConnection connection)
{
var hash = ComputeRouteHash(serverId);
_routesByHash[hash] = connection;
}
/// <summary>
/// Removes the entry keyed by the FNV-1a hash of <paramref name="serverId"/>.
/// If no entry exists this is a no-op.
/// Go reference: server/route.go — route hash deregistration.
/// </summary>
public void UnregisterRouteByHash(string serverId)
{
var hash = ComputeRouteHash(serverId);
_routesByHash.TryRemove(hash, out _);
}
/// <summary>
/// Returns the route connection associated with <paramref name="hash"/>,
/// or <c>null</c> if no entry exists. O(1) lookup.
/// Go reference: server/route.go — O(1) route lookup by hash.
/// </summary>
public RouteConnection? GetRouteByHash(ulong hash)
=> _routesByHash.TryGetValue(hash, out var connection) ? connection : null;
/// <summary>
/// Convenience overload: computes the hash from <paramref name="serverId"/>
/// and returns the associated route connection, or <c>null</c>.
/// Go reference: server/route.go — server-ID-keyed route lookup.
/// </summary>
public RouteConnection? GetRouteByServerId(string serverId)
=> GetRouteByHash(ComputeRouteHash(serverId));
/// <summary>
/// The number of entries in the hash-keyed route map.
/// </summary>
public int HashedRouteCount => _routesByHash.Count;
public Task StartAsync(CancellationToken ct)
{
_cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
@@ -375,9 +524,110 @@ public sealed class RouteManager : IAsyncDisposable
}
public int RouteCount => _routes.Count;
/// <summary>
/// Adds a route directly to both internal dictionaries. Used for testing
/// and for programmatic registration of routes by server ID.
/// Go reference: server/route.go addRoute (internal registration path).
/// </summary>
internal void RegisterRoute(string serverId, RouteConnection connection)
{
var key = $"{serverId}:{Guid.NewGuid():N}";
_routes[key] = connection;
_connectedServerIds[serverId] = 0;
}
/// <summary>
/// Removes the route associated with the given server ID from both the
/// route pool and the connected-server-ID set.
/// Returns true if a route was found and removed, false otherwise.
/// Go reference: server/route.go removeRoute (lines 3113+).
/// </summary>
public bool RemoveRoute(string serverId)
{
var found = false;
var prefix = serverId + ":";
foreach (var key in _routes.Keys.ToArray())
{
if (!key.StartsWith(prefix, StringComparison.Ordinal))
continue;
if (_routes.TryRemove(key, out _))
found = true;
}
if (found)
_connectedServerIds.TryRemove(serverId, out _);
return found;
}
/// <summary>
/// Removes all routes whose server ID is not in <paramref name="keepServerIds"/>.
/// Returns the number of routes removed.
/// Go reference: server/route.go removeAllRoutesExcept (lines 3085-3111).
/// </summary>
public int RemoveAllRoutesExcept(IReadOnlySet<string> keepServerIds)
{
var removed = 0;
foreach (var key in _routes.Keys.ToArray())
{
var colonIdx = key.IndexOf(':', StringComparison.Ordinal);
var keyServerId = colonIdx >= 0 ? key[..colonIdx] : key;
if (!keepServerIds.Contains(keyServerId))
{
if (_routes.TryRemove(key, out _))
{
_connectedServerIds.TryRemove(keyServerId, out _);
removed++;
}
}
}
return removed;
}
/// <summary>
/// Compares the currently-connected server IDs against the expected peer set
/// and returns which peers are missing and which are unexpected.
/// <see cref="ClusterSplitResult.IsSplit"/> is true when any expected peers
/// are absent from the connected set.
/// Go reference: server/route.go — cluster split / network partition detection.
/// </summary>
public ClusterSplitResult DetectClusterSplit(IReadOnlySet<string> expectedPeers)
{
var connected = _connectedServerIds.Keys.ToHashSet(StringComparer.Ordinal);
var missing = expectedPeers
.Where(p => !connected.Contains(p))
.OrderBy(static p => p, StringComparer.Ordinal)
.ToList();
var unexpected = connected
.Where(c => !expectedPeers.Contains(c))
.OrderBy(static c => c, StringComparer.Ordinal)
.ToList();
return new ClusterSplitResult(missing, unexpected, missing.Count > 0);
}
}
public sealed record RouteTopologySnapshot(
string ServerId,
int RouteCount,
IReadOnlyList<string> ConnectedServerIds);
/// <summary>
/// Describes the outcome of comparing a <see cref="RouteManager"/>'s current
/// connected peers against the expected full peer set.
/// <see cref="IsSplit"/> is true when at least one expected peer is not connected,
/// indicating a network partition or cluster split.
/// Go reference: server/route.go — partition detection logic.
/// </summary>
public sealed record ClusterSplitResult(
IReadOnlyList<string> MissingPeers,
IReadOnlyList<string> UnexpectedPeers,
bool IsSplit);

View File

@@ -0,0 +1,197 @@
// Reference: golang/nats-server/server/route.go — per-account dedicated route registration (Gap 13.2).
// Tests for account-specific dedicated route connections in RouteManager.
using System.Net;
using System.Net.Sockets;
using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Configuration;
using NATS.Server.Routes;
namespace NATS.Server.Tests.Routes;
/// <summary>
/// Tests for per-account dedicated route connections (Gap 13.2).
/// Verifies that RouteManager correctly stores, retrieves, and prioritises
/// dedicated routes over pool-based routes for specific accounts.
/// Go reference: server/route.go — per-account dedicated route handling.
/// </summary>
public class AccountRouteTests : IDisposable
{
// -- Helpers --
// Track listeners and sockets for cleanup after each test.
private readonly List<TcpListener> _listeners = [];
private readonly List<Socket> _sockets = [];
public void Dispose()
{
foreach (var s in _sockets) s.Dispose();
foreach (var l in _listeners) l.Stop();
}
private static RouteManager CreateManager() =>
new(
new ClusterOptions { Host = "127.0.0.1", Port = 0, Routes = [] },
new ServerStats(),
Guid.NewGuid().ToString("N"),
_ => { },
_ => { },
NullLogger<RouteManager>.Instance);
/// <summary>
/// Creates a RouteConnection backed by a connected loopback socket pair so
/// that RouteConnection can construct its internal NetworkStream without
/// throwing. Both sockets and the listener are tracked for disposal.
/// </summary>
private RouteConnection MakeConnection()
{
var listener = new TcpListener(IPAddress.Loopback, 0);
listener.Start();
_listeners.Add(listener);
var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_sockets.Add(client);
client.Connect((IPEndPoint)listener.LocalEndpoint);
var server = listener.AcceptSocket();
_sockets.Add(server);
return new RouteConnection(server);
}
// -- Tests --
// Go: server/route.go — per-account dedicated route registration.
[Fact]
public void RegisterAccountRoute_AddsRoute()
{
var manager = CreateManager();
var connection = MakeConnection();
manager.RegisterAccountRoute("ACCT-A", connection);
manager.GetDedicatedAccountRoute("ACCT-A").ShouldBeSameAs(connection);
}
// Go: server/route.go — overwrite existing dedicated route for same account.
[Fact]
public void RegisterAccountRoute_OverwritesPrevious()
{
var manager = CreateManager();
var first = MakeConnection();
var second = MakeConnection();
manager.RegisterAccountRoute("ACCT-A", first);
manager.RegisterAccountRoute("ACCT-A", second);
manager.GetDedicatedAccountRoute("ACCT-A").ShouldBeSameAs(second);
}
// Go: server/route.go — removing a dedicated route cleans up the entry.
[Fact]
public void UnregisterAccountRoute_RemovesRoute()
{
var manager = CreateManager();
var connection = MakeConnection();
manager.RegisterAccountRoute("ACCT-A", connection);
manager.UnregisterAccountRoute("ACCT-A");
manager.GetDedicatedAccountRoute("ACCT-A").ShouldBeNull();
}
// Go: server/route.go — unregistering a never-registered account is safe.
[Fact]
public void UnregisterAccountRoute_NonExistent_NoOp()
{
var manager = CreateManager();
// Must not throw.
var ex = Record.Exception(() => manager.UnregisterAccountRoute("NONEXISTENT"));
ex.ShouldBeNull();
}
// Go: server/route.go — lookup on unregistered account returns null.
[Fact]
public void GetDedicatedAccountRoute_NotRegistered_ReturnsNull()
{
var manager = CreateManager();
manager.GetDedicatedAccountRoute("UNKNOWN").ShouldBeNull();
}
// Go: server/route.go — HasDedicatedRoute returns true for registered account.
[Fact]
public void HasDedicatedRoute_RegisteredReturnsTrue()
{
var manager = CreateManager();
var connection = MakeConnection();
manager.RegisterAccountRoute("ACCT-B", connection);
manager.HasDedicatedRoute("ACCT-B").ShouldBeTrue();
}
// Go: server/route.go — HasDedicatedRoute returns false for unknown account.
[Fact]
public void HasDedicatedRoute_NotRegisteredReturnsFalse()
{
var manager = CreateManager();
manager.HasDedicatedRoute("ACCT-B").ShouldBeFalse();
}
// Go: server/route.go — listing accounts with dedicated routes.
[Fact]
public void GetAccountsWithDedicatedRoutes_ReturnsAllRegistered()
{
var manager = CreateManager();
manager.RegisterAccountRoute("ACCT-1", MakeConnection());
manager.RegisterAccountRoute("ACCT-2", MakeConnection());
manager.RegisterAccountRoute("ACCT-3", MakeConnection());
var accounts = manager.GetAccountsWithDedicatedRoutes();
accounts.Count.ShouldBe(3);
accounts.ShouldContain("ACCT-1");
accounts.ShouldContain("ACCT-2");
accounts.ShouldContain("ACCT-3");
}
// Go: server/route.go — DedicatedRouteCount tracks registered entries.
[Fact]
public void DedicatedRouteCount_MatchesRegistrations()
{
var manager = CreateManager();
manager.DedicatedRouteCount.ShouldBe(0);
manager.RegisterAccountRoute("ACCT-X", MakeConnection());
manager.DedicatedRouteCount.ShouldBe(1);
manager.RegisterAccountRoute("ACCT-Y", MakeConnection());
manager.DedicatedRouteCount.ShouldBe(2);
manager.UnregisterAccountRoute("ACCT-X");
manager.DedicatedRouteCount.ShouldBe(1);
}
// Go: server/route.go — dedicated route takes priority over pool-based route
// for the account it is registered against.
[Fact]
public void GetRouteForAccount_PrefersDedicatedRoute()
{
var manager = CreateManager();
// Register a dedicated route for "ACCT-PREF".
var dedicated = MakeConnection();
manager.RegisterAccountRoute("ACCT-PREF", dedicated);
// GetRouteForAccount must return the dedicated connection even though
// no pool routes exist (the dedicated path short-circuits pool lookup).
var result = manager.GetRouteForAccount("ACCT-PREF");
result.ShouldBeSameAs(dedicated);
}
}