feat(routes): add pool accounting per account and S2 compression codec (D2+D3)

D2: Add FNV-1a-based ComputeRoutePoolIdx to RouteManager matching Go's
route.go:533-545, with PoolIndex on RouteConnection and account-aware
ForwardRoutedMessageAsync that routes to the correct pool connection.

D3: Replace DeflateStream with IronSnappy in RouteCompressionCodec, add
RouteCompressionLevel enum, NegotiateCompression, and IsCompressed
detection. 17 new tests (6 pool + 11 compression), all passing.
This commit is contained in:
Joseph Doherty
2026-02-24 15:11:20 -05:00
parent 21d10582b3
commit 386cc201de
6 changed files with 479 additions and 19 deletions

View File

@@ -54,6 +54,12 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
private readonly TlsRateLimiter? _tlsRateLimiter;
private readonly SubjectTransform[] _subjectTransforms;
private readonly RouteManager? _routeManager;
/// <summary>
/// Exposes the route manager for testing. Internal — visible to test project
/// via InternalsVisibleTo.
/// </summary>
internal RouteManager? RouteManager => _routeManager;
private readonly GatewayManager? _gatewayManager;
private readonly LeafNodeManager? _leafNodeManager;
private readonly InternalClient? _jetStreamInternalClient;

View File

@@ -1,26 +1,135 @@
using System.IO.Compression;
// Reference: golang/nats-server/server/route.go — S2/Snappy compression for route connections
// Go uses s2 (Snappy variant) for route and gateway wire compression.
// IronSnappy provides compatible Snappy block encode/decode.
using IronSnappy;
namespace NATS.Server.Routes;
/// <summary>
/// Compression levels for route wire traffic, matching Go's <c>CompressionMode</c>.
/// </summary>
public enum RouteCompressionLevel
{
/// <summary>No compression — data passes through unchanged.</summary>
Off = 0,
/// <summary>Fastest compression (Snappy/S2 default).</summary>
Fast = 1,
/// <summary>Better compression ratio at moderate CPU cost.</summary>
Better = 2,
/// <summary>Best compression ratio (highest CPU cost).</summary>
Best = 3,
}
/// <summary>
/// S2/Snappy compression codec for route and gateway wire traffic.
/// Mirrors Go's route compression (server/route.go) using IronSnappy.
/// </summary>
public static class RouteCompressionCodec
{
public static byte[] Compress(ReadOnlySpan<byte> payload)
{
using var output = new MemoryStream();
using (var stream = new DeflateStream(output, CompressionLevel.Fastest, leaveOpen: true))
{
stream.Write(payload);
}
// Snappy block format: the first byte is a varint-encoded length.
// Snappy stream format starts with 0xff 0x06 0x00 0x00 "sNaPpY" magic.
// For block format (which IronSnappy uses), compressed output starts with
// a varint for the uncompressed length, then chunk tags. We detect by
// attempting a decode-length check: valid Snappy blocks have a leading
// varint that decodes to a plausible uncompressed size.
//
// Snappy stream magic header (10 bytes):
private static ReadOnlySpan<byte> SnappyStreamMagic => [0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59];
return output.ToArray();
/// <summary>
/// Compresses <paramref name="data"/> using Snappy block format.
/// If <paramref name="level"/> is <see cref="RouteCompressionLevel.Off"/>,
/// the original data is returned unchanged (copied).
/// </summary>
/// <remarks>
/// IronSnappy only supports a single compression level (equivalent to Fast/S2).
/// The <paramref name="level"/> parameter is accepted for API parity with Go
/// but Fast, Better, and Best all produce the same output.
/// </remarks>
public static byte[] Compress(ReadOnlySpan<byte> data, RouteCompressionLevel level = RouteCompressionLevel.Fast)
{
if (level == RouteCompressionLevel.Off)
return data.ToArray();
if (data.IsEmpty)
return [];
return Snappy.Encode(data);
}
public static byte[] Decompress(ReadOnlySpan<byte> payload)
/// <summary>
/// Decompresses Snappy/S2-compressed <paramref name="compressed"/> data.
/// </summary>
/// <exception cref="InvalidOperationException">If the data is not valid Snappy.</exception>
public static byte[] Decompress(ReadOnlySpan<byte> compressed)
{
using var input = new MemoryStream(payload.ToArray());
using var stream = new DeflateStream(input, CompressionMode.Decompress);
using var output = new MemoryStream();
stream.CopyTo(output);
return output.ToArray();
if (compressed.IsEmpty)
return [];
return Snappy.Decode(compressed);
}
/// <summary>
/// Negotiates the effective compression level between two peers.
/// Returns the minimum (least aggressive) of the two levels, matching
/// Go's negotiation behavior where both sides must agree.
/// If either side is Off, the result is Off.
/// </summary>
public static RouteCompressionLevel NegotiateCompression(string localLevel, string remoteLevel)
{
var local = ParseLevel(localLevel);
var remote = ParseLevel(remoteLevel);
if (local == RouteCompressionLevel.Off || remote == RouteCompressionLevel.Off)
return RouteCompressionLevel.Off;
// Return the minimum (least aggressive) level
return (RouteCompressionLevel)Math.Min((int)local, (int)remote);
}
/// <summary>
/// Detects whether the given data appears to be Snappy-compressed.
/// Checks for Snappy stream magic header or attempts to validate
/// as a Snappy block format by checking the leading varint.
/// </summary>
public static bool IsCompressed(ReadOnlySpan<byte> data)
{
if (data.Length < 2)
return false;
// Check for Snappy stream format magic
if (data.Length >= SnappyStreamMagic.Length && data[..SnappyStreamMagic.Length].SequenceEqual(SnappyStreamMagic))
return true;
// For Snappy block format, try to decode and see if it succeeds.
// A valid Snappy block starts with a varint for the uncompressed length.
try
{
_ = Snappy.Decode(data);
return true;
}
catch
{
return false;
}
}
private static RouteCompressionLevel ParseLevel(string level)
{
if (string.IsNullOrWhiteSpace(level))
return RouteCompressionLevel.Off;
return level.Trim().ToLowerInvariant() switch
{
"off" or "disabled" or "none" => RouteCompressionLevel.Off,
"fast" or "s2_fast" => RouteCompressionLevel.Fast,
"better" or "s2_better" => RouteCompressionLevel.Better,
"best" or "s2_best" => RouteCompressionLevel.Best,
_ => RouteCompressionLevel.Off,
};
}
}

View File

@@ -15,6 +15,13 @@ public sealed class RouteConnection(Socket socket) : IAsyncDisposable
public string? RemoteServerId { get; private set; }
public string RemoteEndpoint => _socket.RemoteEndPoint?.ToString() ?? Guid.NewGuid().ToString("N");
/// <summary>
/// The pool index assigned to this route connection. Used for account-based
/// routing to deterministically select which pool connection handles traffic
/// for a given account. See <see cref="RouteManager.ComputeRoutePoolIdx"/>.
/// </summary>
public int PoolIndex { get; set; }
public Func<RemoteSubscription, Task>? RemoteSubscriptionReceived { get; set; }
public Func<RouteMessage, Task>? RoutedMessageReceived { get; set; }

View File

@@ -49,6 +49,48 @@ public sealed class RouteManager : IAsyncDisposable
_logger = logger;
}
/// <summary>
/// Returns a route pool index for the given account name, matching Go's
/// <c>computeRoutePoolIdx</c> (route.go:533-545). Uses FNV-1a 32-bit hash
/// to deterministically map account names to pool indices.
/// </summary>
public static int ComputeRoutePoolIdx(int poolSize, string accountName)
{
if (poolSize <= 1)
return 0;
var bytes = System.Text.Encoding.UTF8.GetBytes(accountName);
// Use FNV-1a to match Go exactly
uint fnvHash = 2166136261; // FNV offset basis
foreach (var b in bytes)
{
fnvHash ^= b;
fnvHash *= 16777619; // FNV prime
}
return (int)(fnvHash % (uint)poolSize);
}
/// <summary>
/// Returns the route connection responsible for the given account, based on
/// pool index computed from the account name. Returns null if no routes exist.
/// </summary>
public RouteConnection? GetRouteForAccount(string account)
{
if (_routes.IsEmpty)
return null;
var routes = _routes.Values.ToArray();
if (routes.Length == 0)
return null;
var poolSize = routes.Length;
var idx = ComputeRoutePoolIdx(poolSize, account);
return routes[idx % routes.Length];
}
public Task StartAsync(CancellationToken ct)
{
_cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
@@ -66,7 +108,10 @@ public sealed class RouteManager : IAsyncDisposable
foreach (var route in _options.Routes.Distinct(StringComparer.OrdinalIgnoreCase))
{
for (var i = 0; i < poolSize; i++)
_ = Task.Run(() => ConnectToRouteWithRetryAsync(route, _cts.Token));
{
var poolIndex = i;
_ = Task.Run(() => ConnectToRouteWithRetryAsync(route, poolIndex, _cts.Token));
}
}
return Task.CompletedTask;
@@ -119,8 +164,18 @@ public sealed class RouteManager : IAsyncDisposable
if (_routes.IsEmpty)
return;
foreach (var route in _routes.Values)
// Use account-based pool routing: route the message only through the
// connection responsible for this account, matching Go's behavior.
var route = GetRouteForAccount(account);
if (route != null)
{
await route.SendRmsgAsync(account, subject, replyTo, payload, ct);
return;
}
// Fallback: broadcast to all routes if pool routing fails
foreach (var r in _routes.Values)
await r.SendRmsgAsync(account, subject, replyTo, payload, ct);
}
private async Task AcceptLoopAsync(CancellationToken ct)
@@ -165,7 +220,7 @@ public sealed class RouteManager : IAsyncDisposable
}
}
private async Task ConnectToRouteWithRetryAsync(string route, CancellationToken ct)
private async Task ConnectToRouteWithRetryAsync(string route, int poolIndex, CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
@@ -174,7 +229,7 @@ public sealed class RouteManager : IAsyncDisposable
var endPoint = ParseRouteEndpoint(route);
var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await socket.ConnectAsync(endPoint.Address, endPoint.Port, ct);
var connection = new RouteConnection(socket);
var connection = new RouteConnection(socket) { PoolIndex = poolIndex };
await connection.PerformOutboundHandshakeAsync(_serverId, ct);
Register(connection);
return;