diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs
index 7746dc0..3c7c91c 100644
--- a/src/NATS.Server/NatsServer.cs
+++ b/src/NATS.Server/NatsServer.cs
@@ -54,6 +54,12 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
private readonly TlsRateLimiter? _tlsRateLimiter;
private readonly SubjectTransform[] _subjectTransforms;
private readonly RouteManager? _routeManager;
+
+ ///
+ /// Exposes the route manager for testing. Internal — visible to test project
+ /// via InternalsVisibleTo.
+ ///
+ internal RouteManager? RouteManager => _routeManager;
private readonly GatewayManager? _gatewayManager;
private readonly LeafNodeManager? _leafNodeManager;
private readonly InternalClient? _jetStreamInternalClient;
diff --git a/src/NATS.Server/Routes/RouteCompressionCodec.cs b/src/NATS.Server/Routes/RouteCompressionCodec.cs
index e9588c9..5ba8a71 100644
--- a/src/NATS.Server/Routes/RouteCompressionCodec.cs
+++ b/src/NATS.Server/Routes/RouteCompressionCodec.cs
@@ -1,26 +1,135 @@
-using System.IO.Compression;
+// Reference: golang/nats-server/server/route.go — S2/Snappy compression for route connections
+// Go uses s2 (Snappy variant) for route and gateway wire compression.
+// IronSnappy provides compatible Snappy block encode/decode.
+
+using IronSnappy;
namespace NATS.Server.Routes;
+///
+/// Compression levels for route wire traffic, matching Go's CompressionMode.
+///
+public enum RouteCompressionLevel
+{
+ /// No compression — data passes through unchanged.
+ Off = 0,
+
+ /// Fastest compression (Snappy/S2 default).
+ Fast = 1,
+
+ /// Better compression ratio at moderate CPU cost.
+ Better = 2,
+
+ /// Best compression ratio (highest CPU cost).
+ Best = 3,
+}
+
+///
+/// S2/Snappy compression codec for route and gateway wire traffic.
+/// Mirrors Go's route compression (server/route.go) using IronSnappy.
+///
public static class RouteCompressionCodec
{
- public static byte[] Compress(ReadOnlySpan payload)
- {
- using var output = new MemoryStream();
- using (var stream = new DeflateStream(output, CompressionLevel.Fastest, leaveOpen: true))
- {
- stream.Write(payload);
- }
+ // Snappy block format: the first byte is a varint-encoded length.
+ // Snappy stream format starts with 0xff 0x06 0x00 0x00 "sNaPpY" magic.
+ // For block format (which IronSnappy uses), compressed output starts with
+ // a varint for the uncompressed length, then chunk tags. We detect by
+ // attempting a decode-length check: valid Snappy blocks have a leading
+ // varint that decodes to a plausible uncompressed size.
+ //
+ // Snappy stream magic header (10 bytes):
+ private static ReadOnlySpan SnappyStreamMagic => [0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59];
- return output.ToArray();
+ ///
+ /// Compresses using Snappy block format.
+ /// If is ,
+ /// the original data is returned unchanged (copied).
+ ///
+ ///
+ /// IronSnappy only supports a single compression level (equivalent to Fast/S2).
+ /// The parameter is accepted for API parity with Go
+ /// but Fast, Better, and Best all produce the same output.
+ ///
+ public static byte[] Compress(ReadOnlySpan data, RouteCompressionLevel level = RouteCompressionLevel.Fast)
+ {
+ if (level == RouteCompressionLevel.Off)
+ return data.ToArray();
+
+ if (data.IsEmpty)
+ return [];
+
+ return Snappy.Encode(data);
}
- public static byte[] Decompress(ReadOnlySpan payload)
+ ///
+ /// Decompresses Snappy/S2-compressed data.
+ ///
+ /// If the data is not valid Snappy.
+ public static byte[] Decompress(ReadOnlySpan compressed)
{
- using var input = new MemoryStream(payload.ToArray());
- using var stream = new DeflateStream(input, CompressionMode.Decompress);
- using var output = new MemoryStream();
- stream.CopyTo(output);
- return output.ToArray();
+ if (compressed.IsEmpty)
+ return [];
+
+ return Snappy.Decode(compressed);
+ }
+
+ ///
+ /// Negotiates the effective compression level between two peers.
+ /// Returns the minimum (least aggressive) of the two levels, matching
+ /// Go's negotiation behavior where both sides must agree.
+ /// If either side is Off, the result is Off.
+ ///
+ public static RouteCompressionLevel NegotiateCompression(string localLevel, string remoteLevel)
+ {
+ var local = ParseLevel(localLevel);
+ var remote = ParseLevel(remoteLevel);
+
+ if (local == RouteCompressionLevel.Off || remote == RouteCompressionLevel.Off)
+ return RouteCompressionLevel.Off;
+
+ // Return the minimum (least aggressive) level
+ return (RouteCompressionLevel)Math.Min((int)local, (int)remote);
+ }
+
+ ///
+ /// Detects whether the given data appears to be Snappy-compressed.
+ /// Checks for Snappy stream magic header or attempts to validate
+ /// as a Snappy block format by checking the leading varint.
+ ///
+ public static bool IsCompressed(ReadOnlySpan data)
+ {
+ if (data.Length < 2)
+ return false;
+
+ // Check for Snappy stream format magic
+ if (data.Length >= SnappyStreamMagic.Length && data[..SnappyStreamMagic.Length].SequenceEqual(SnappyStreamMagic))
+ return true;
+
+ // For Snappy block format, try to decode and see if it succeeds.
+ // A valid Snappy block starts with a varint for the uncompressed length.
+ try
+ {
+ _ = Snappy.Decode(data);
+ return true;
+ }
+ catch
+ {
+ return false;
+ }
+ }
+
+ private static RouteCompressionLevel ParseLevel(string level)
+ {
+ if (string.IsNullOrWhiteSpace(level))
+ return RouteCompressionLevel.Off;
+
+ return level.Trim().ToLowerInvariant() switch
+ {
+ "off" or "disabled" or "none" => RouteCompressionLevel.Off,
+ "fast" or "s2_fast" => RouteCompressionLevel.Fast,
+ "better" or "s2_better" => RouteCompressionLevel.Better,
+ "best" or "s2_best" => RouteCompressionLevel.Best,
+ _ => RouteCompressionLevel.Off,
+ };
}
}
diff --git a/src/NATS.Server/Routes/RouteConnection.cs b/src/NATS.Server/Routes/RouteConnection.cs
index 4298c53..c518867 100644
--- a/src/NATS.Server/Routes/RouteConnection.cs
+++ b/src/NATS.Server/Routes/RouteConnection.cs
@@ -15,6 +15,13 @@ public sealed class RouteConnection(Socket socket) : IAsyncDisposable
public string? RemoteServerId { get; private set; }
public string RemoteEndpoint => _socket.RemoteEndPoint?.ToString() ?? Guid.NewGuid().ToString("N");
+
+ ///
+ /// The pool index assigned to this route connection. Used for account-based
+ /// routing to deterministically select which pool connection handles traffic
+ /// for a given account. See .
+ ///
+ public int PoolIndex { get; set; }
public Func? RemoteSubscriptionReceived { get; set; }
public Func? RoutedMessageReceived { get; set; }
diff --git a/src/NATS.Server/Routes/RouteManager.cs b/src/NATS.Server/Routes/RouteManager.cs
index b3d2005..4437116 100644
--- a/src/NATS.Server/Routes/RouteManager.cs
+++ b/src/NATS.Server/Routes/RouteManager.cs
@@ -49,6 +49,48 @@ public sealed class RouteManager : IAsyncDisposable
_logger = logger;
}
+
+ ///
+ /// Returns a route pool index for the given account name, matching Go's
+ /// computeRoutePoolIdx (route.go:533-545). Uses FNV-1a 32-bit hash
+ /// to deterministically map account names to pool indices.
+ ///
+ public static int ComputeRoutePoolIdx(int poolSize, string accountName)
+ {
+ if (poolSize <= 1)
+ return 0;
+
+ var bytes = System.Text.Encoding.UTF8.GetBytes(accountName);
+
+ // Use FNV-1a to match Go exactly
+ uint fnvHash = 2166136261; // FNV offset basis
+ foreach (var b in bytes)
+ {
+ fnvHash ^= b;
+ fnvHash *= 16777619; // FNV prime
+ }
+
+ return (int)(fnvHash % (uint)poolSize);
+ }
+
+ ///
+ /// Returns the route connection responsible for the given account, based on
+ /// pool index computed from the account name. Returns null if no routes exist.
+ ///
+ public RouteConnection? GetRouteForAccount(string account)
+ {
+ if (_routes.IsEmpty)
+ return null;
+
+ var routes = _routes.Values.ToArray();
+ if (routes.Length == 0)
+ return null;
+
+ var poolSize = routes.Length;
+ var idx = ComputeRoutePoolIdx(poolSize, account);
+ return routes[idx % routes.Length];
+ }
+
public Task StartAsync(CancellationToken ct)
{
_cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
@@ -66,7 +108,10 @@ public sealed class RouteManager : IAsyncDisposable
foreach (var route in _options.Routes.Distinct(StringComparer.OrdinalIgnoreCase))
{
for (var i = 0; i < poolSize; i++)
- _ = Task.Run(() => ConnectToRouteWithRetryAsync(route, _cts.Token));
+ {
+ var poolIndex = i;
+ _ = Task.Run(() => ConnectToRouteWithRetryAsync(route, poolIndex, _cts.Token));
+ }
}
return Task.CompletedTask;
@@ -119,8 +164,18 @@ public sealed class RouteManager : IAsyncDisposable
if (_routes.IsEmpty)
return;
- foreach (var route in _routes.Values)
+ // Use account-based pool routing: route the message only through the
+ // connection responsible for this account, matching Go's behavior.
+ var route = GetRouteForAccount(account);
+ if (route != null)
+ {
await route.SendRmsgAsync(account, subject, replyTo, payload, ct);
+ return;
+ }
+
+ // Fallback: broadcast to all routes if pool routing fails
+ foreach (var r in _routes.Values)
+ await r.SendRmsgAsync(account, subject, replyTo, payload, ct);
}
private async Task AcceptLoopAsync(CancellationToken ct)
@@ -165,7 +220,7 @@ public sealed class RouteManager : IAsyncDisposable
}
}
- private async Task ConnectToRouteWithRetryAsync(string route, CancellationToken ct)
+ private async Task ConnectToRouteWithRetryAsync(string route, int poolIndex, CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
@@ -174,7 +229,7 @@ public sealed class RouteManager : IAsyncDisposable
var endPoint = ParseRouteEndpoint(route);
var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await socket.ConnectAsync(endPoint.Address, endPoint.Port, ct);
- var connection = new RouteConnection(socket);
+ var connection = new RouteConnection(socket) { PoolIndex = poolIndex };
await connection.PerformOutboundHandshakeAsync(_serverId, ct);
Register(connection);
return;
diff --git a/tests/NATS.Server.Tests/Routes/RoutePoolAccountTests.cs b/tests/NATS.Server.Tests/Routes/RoutePoolAccountTests.cs
new file mode 100644
index 0000000..2135646
--- /dev/null
+++ b/tests/NATS.Server.Tests/Routes/RoutePoolAccountTests.cs
@@ -0,0 +1,147 @@
+// Reference: golang/nats-server/server/route.go:533-545 — computeRoutePoolIdx
+// Tests for account-based route pool index computation and message routing.
+
+using System.Text;
+using Microsoft.Extensions.Logging.Abstractions;
+using NATS.Server.Configuration;
+using NATS.Server.Routes;
+
+namespace NATS.Server.Tests.Routes;
+
+///
+/// Tests for route pool accounting per account, matching Go's
+/// computeRoutePoolIdx behavior (route.go:533-545).
+///
+public class RoutePoolAccountTests
+{
+ [Fact]
+ public void ComputeRoutePoolIdx_SinglePool_AlwaysReturnsZero()
+ {
+ RouteManager.ComputeRoutePoolIdx(1, "account-A").ShouldBe(0);
+ RouteManager.ComputeRoutePoolIdx(1, "account-B").ShouldBe(0);
+ RouteManager.ComputeRoutePoolIdx(1, "$G").ShouldBe(0);
+ RouteManager.ComputeRoutePoolIdx(0, "anything").ShouldBe(0);
+ }
+
+ [Fact]
+ public void ComputeRoutePoolIdx_DeterministicForSameAccount()
+ {
+ const int poolSize = 5;
+ const string account = "my-test-account";
+
+ var first = RouteManager.ComputeRoutePoolIdx(poolSize, account);
+ var second = RouteManager.ComputeRoutePoolIdx(poolSize, account);
+ var third = RouteManager.ComputeRoutePoolIdx(poolSize, account);
+
+ first.ShouldBe(second);
+ second.ShouldBe(third);
+ first.ShouldBeGreaterThanOrEqualTo(0);
+ first.ShouldBeLessThan(poolSize);
+ }
+
+ [Fact]
+ public void ComputeRoutePoolIdx_DistributesAcrossPool()
+ {
+ const int poolSize = 3;
+ var usedIndices = new HashSet();
+
+ for (var i = 0; i < 100; i++)
+ {
+ var idx = RouteManager.ComputeRoutePoolIdx(poolSize, $"account-{i}");
+ idx.ShouldBeGreaterThanOrEqualTo(0);
+ idx.ShouldBeLessThan(poolSize);
+ usedIndices.Add(idx);
+ }
+
+ usedIndices.Count.ShouldBe(poolSize);
+ }
+
+ [Fact]
+ public void ComputeRoutePoolIdx_EmptyAccount_ReturnsValid()
+ {
+ const int poolSize = 4;
+ var idx = RouteManager.ComputeRoutePoolIdx(poolSize, string.Empty);
+ idx.ShouldBeGreaterThanOrEqualTo(0);
+ idx.ShouldBeLessThan(poolSize);
+ }
+
+ [Fact]
+ public void ComputeRoutePoolIdx_DefaultGlobalAccount_ReturnsValid()
+ {
+ const int poolSize = 3;
+ var idx = RouteManager.ComputeRoutePoolIdx(poolSize, "$G");
+ idx.ShouldBeGreaterThanOrEqualTo(0);
+ idx.ShouldBeLessThan(poolSize);
+
+ var idx2 = RouteManager.ComputeRoutePoolIdx(poolSize, "$G");
+ idx.ShouldBe(idx2);
+ }
+
+ [Fact]
+ public async Task ForwardRoutedMessage_UsesCorrectPoolConnection()
+ {
+ var clusterName = Guid.NewGuid().ToString("N");
+
+ var optsA = new NatsOptions
+ {
+ Host = "127.0.0.1",
+ Port = 0,
+ Cluster = new ClusterOptions
+ {
+ Name = clusterName,
+ Host = "127.0.0.1",
+ Port = 0,
+ PoolSize = 1,
+ Routes = [],
+ },
+ };
+
+ var serverA = new NatsServer(optsA, NullLoggerFactory.Instance);
+ var ctsA = new CancellationTokenSource();
+ _ = serverA.StartAsync(ctsA.Token);
+ await serverA.WaitForReadyAsync();
+
+ var optsB = new NatsOptions
+ {
+ Host = "127.0.0.1",
+ Port = 0,
+ Cluster = new ClusterOptions
+ {
+ Name = clusterName,
+ Host = "127.0.0.1",
+ Port = 0,
+ PoolSize = 1,
+ Routes = [$"127.0.0.1:{optsA.Cluster.Port}"],
+ },
+ };
+
+ var serverB = new NatsServer(optsB, NullLoggerFactory.Instance);
+ var ctsB = new CancellationTokenSource();
+ _ = serverB.StartAsync(ctsB.Token);
+ await serverB.WaitForReadyAsync();
+
+ using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
+ while (!timeout.IsCancellationRequested &&
+ (Interlocked.Read(ref serverA.Stats.Routes) == 0 ||
+ Interlocked.Read(ref serverB.Stats.Routes) == 0))
+ {
+ await Task.Delay(50, timeout.Token);
+ }
+
+ Interlocked.Read(ref serverA.Stats.Routes).ShouldBeGreaterThan(0);
+
+ var payload = Encoding.UTF8.GetBytes("hello");
+ await serverA.RouteManager!.ForwardRoutedMessageAsync(
+ "$G", "test.subject", null, payload, CancellationToken.None);
+
+ var poolIdx = RouteManager.ComputeRoutePoolIdx(1, "$G");
+ poolIdx.ShouldBe(0);
+
+ await ctsA.CancelAsync();
+ await ctsB.CancelAsync();
+ serverA.Dispose();
+ serverB.Dispose();
+ ctsA.Dispose();
+ ctsB.Dispose();
+ }
+}
diff --git a/tests/NATS.Server.Tests/Routes/RouteS2CompressionTests.cs b/tests/NATS.Server.Tests/Routes/RouteS2CompressionTests.cs
new file mode 100644
index 0000000..235fdb4
--- /dev/null
+++ b/tests/NATS.Server.Tests/Routes/RouteS2CompressionTests.cs
@@ -0,0 +1,136 @@
+// Reference: golang/nats-server/server/route.go — S2/Snappy compression for routes
+// Tests for RouteCompressionCodec: compression, decompression, negotiation, detection.
+
+using System.Text;
+using NATS.Server.Routes;
+
+namespace NATS.Server.Tests.Routes;
+
+///
+/// Tests for route S2/Snappy compression codec, matching Go's route compression
+/// behavior using IronSnappy.
+///
+public class RouteS2CompressionTests
+{
+ [Fact]
+ public void Compress_Fast_ProducesValidOutput()
+ {
+ var data = Encoding.UTF8.GetBytes("NATS route compression test payload");
+ var compressed = RouteCompressionCodec.Compress(data, RouteCompressionLevel.Fast);
+
+ compressed.ShouldNotBeNull();
+ compressed.Length.ShouldBeGreaterThan(0);
+
+ // Compressed output should be decompressible
+ var decompressed = RouteCompressionCodec.Decompress(compressed);
+ decompressed.ShouldBe(data);
+ }
+
+ [Fact]
+ public void Compress_Decompress_RoundTrips()
+ {
+ var original = Encoding.UTF8.GetBytes("Hello NATS! This is a test of round-trip compression.");
+
+ foreach (var level in new[] { RouteCompressionLevel.Fast, RouteCompressionLevel.Better, RouteCompressionLevel.Best })
+ {
+ var compressed = RouteCompressionCodec.Compress(original, level);
+ var restored = RouteCompressionCodec.Decompress(compressed);
+ restored.ShouldBe(original, $"Round-trip failed for level {level}");
+ }
+ }
+
+ [Fact]
+ public void Compress_EmptyData_ReturnsEmpty()
+ {
+ var result = RouteCompressionCodec.Compress(ReadOnlySpan.Empty, RouteCompressionLevel.Fast);
+ result.ShouldBeEmpty();
+ }
+
+ [Fact]
+ public void Compress_Off_ReturnsOriginal()
+ {
+ var data = Encoding.UTF8.GetBytes("uncompressed payload");
+ var result = RouteCompressionCodec.Compress(data, RouteCompressionLevel.Off);
+
+ result.ShouldBe(data);
+ }
+
+ [Fact]
+ public void Decompress_CorruptedData_Throws()
+ {
+ var garbage = new byte[] { 0xDE, 0xAD, 0xBE, 0xEF, 0x01, 0x02, 0x03, 0x04 };
+
+ Should.Throw(() => RouteCompressionCodec.Decompress(garbage));
+ }
+
+ [Fact]
+ public void NegotiateCompression_BothOff_ReturnsOff()
+ {
+ var result = RouteCompressionCodec.NegotiateCompression("off", "off");
+ result.ShouldBe(RouteCompressionLevel.Off);
+ }
+
+ [Fact]
+ public void NegotiateCompression_OneFast_ReturnsFast()
+ {
+ // When both are fast, result is fast
+ var result = RouteCompressionCodec.NegotiateCompression("fast", "fast");
+ result.ShouldBe(RouteCompressionLevel.Fast);
+
+ // When one is off, result is off (off wins)
+ var result2 = RouteCompressionCodec.NegotiateCompression("fast", "off");
+ result2.ShouldBe(RouteCompressionLevel.Off);
+ }
+
+ [Fact]
+ public void NegotiateCompression_MismatchLevels_ReturnsMinimum()
+ {
+ // fast (1) vs best (3) => fast (minimum)
+ var result = RouteCompressionCodec.NegotiateCompression("fast", "best");
+ result.ShouldBe(RouteCompressionLevel.Fast);
+
+ // better (2) vs best (3) => better (minimum)
+ var result2 = RouteCompressionCodec.NegotiateCompression("better", "best");
+ result2.ShouldBe(RouteCompressionLevel.Better);
+
+ // fast (1) vs better (2) => fast (minimum)
+ var result3 = RouteCompressionCodec.NegotiateCompression("fast", "better");
+ result3.ShouldBe(RouteCompressionLevel.Fast);
+ }
+
+ [Fact]
+ public void IsCompressed_ValidSnappy_ReturnsTrue()
+ {
+ var data = Encoding.UTF8.GetBytes("This is test data for Snappy compression detection");
+ var compressed = RouteCompressionCodec.Compress(data, RouteCompressionLevel.Fast);
+
+ RouteCompressionCodec.IsCompressed(compressed).ShouldBeTrue();
+ }
+
+ [Fact]
+ public void IsCompressed_PlainText_ReturnsFalse()
+ {
+ var plainText = Encoding.UTF8.GetBytes("PUB test.subject 5\r\nhello\r\n");
+
+ RouteCompressionCodec.IsCompressed(plainText).ShouldBeFalse();
+ }
+
+ [Fact]
+ public void RoundTrip_LargePayload_Compresses()
+ {
+ // 10KB payload of repeated data should compress well
+ var largePayload = new byte[10240];
+ var pattern = Encoding.UTF8.GetBytes("NATS route payload ");
+ for (var i = 0; i < largePayload.Length; i++)
+ largePayload[i] = pattern[i % pattern.Length];
+
+ var compressed = RouteCompressionCodec.Compress(largePayload, RouteCompressionLevel.Fast);
+
+ // Compressed should be smaller than original for repetitive data
+ compressed.Length.ShouldBeLessThan(largePayload.Length);
+
+ // Round-trip should restore original
+ var restored = RouteCompressionCodec.Decompress(compressed);
+ restored.ShouldBe(largePayload);
+ }
+}