From 9cb3e2fe0f3d2ea9d1fc0a5f103ad4f4145cc448 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 22 Feb 2026 23:00:59 -0500 Subject: [PATCH] feat: add per-account SubList isolation for message routing Subscriptions and message routing now go through account-specific SubLists instead of a single global SubList. Clients in different accounts cannot see each other's messages. When no account is specified (or auth is not configured), all clients share the global $G account. --- src/NATS.Server/NatsClient.cs | 14 ++- src/NATS.Server/NatsServer.cs | 10 +- .../AccountIsolationTests.cs | 106 ++++++++++++++++++ 3 files changed, 121 insertions(+), 9 deletions(-) create mode 100644 tests/NATS.Server.Tests/AccountIsolationTests.cs diff --git a/src/NATS.Server/NatsClient.cs b/src/NATS.Server/NatsClient.cs index 4e62313..e69e3d5 100644 --- a/src/NATS.Server/NatsClient.cs +++ b/src/NATS.Server/NatsClient.cs @@ -260,6 +260,13 @@ public sealed class NatsClient : IDisposable _logger.LogDebug("Client {ClientId} authenticated as {Identity}", Id, result.Identity); } + // If no account was assigned by auth, assign global account + if (Account == null && Router is NatsServer server2) + { + Account = server2.GetOrCreateAccount(Account.GlobalAccountName); + Account.AddClient(Id); + } + ConnectReceived = true; _logger.LogDebug("CONNECT received from client {ClientId}, name={ClientName}", Id, ClientOpts?.Name); } @@ -286,8 +293,7 @@ public sealed class NatsClient : IDisposable _logger.LogDebug("SUB {Subject} {Sid} from client {ClientId}", cmd.Subject, cmd.Sid, Id); - if (Router is ISubListAccess sl) - sl.SubList.Insert(sub); + Account?.SubList.Insert(sub); } private void ProcessUnsub(ParsedCommand cmd) @@ -306,8 +312,7 @@ public sealed class NatsClient : IDisposable _subs.Remove(cmd.Sid!); - if (Router is ISubListAccess sl) - sl.SubList.Remove(sub); + Account?.SubList.Remove(sub); } private async ValueTask ProcessPubAsync(ParsedCommand cmd) @@ -488,6 +493,7 @@ public sealed class NatsClient : IDisposable public void Dispose() { + _permissions?.Dispose(); _clientCts?.Dispose(); _stream.Dispose(); _socket.Dispose(); diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index ae1637e..6297f1b 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -13,7 +13,6 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable { private readonly NatsOptions _options; private readonly ConcurrentDictionary _clients = new(); - private readonly SubList _subList = new(); private readonly ServerInfo _serverInfo; private readonly ILogger _logger; private readonly ILoggerFactory _loggerFactory; @@ -24,7 +23,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable private Socket? _listener; private ulong _nextClientId; - public SubList SubList => _subList; + public SubList SubList => _globalAccount.SubList; public Task WaitForReadyAsync() => _listeningStarted.Task; @@ -148,7 +147,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable public void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory headers, ReadOnlyMemory payload, NatsClient sender) { - var result = _subList.Match(subject); + var subList = sender.Account?.SubList ?? _globalAccount.SubList; + var result = subList.Match(subject); // Deliver to plain subscribers foreach (var sub in result.PlainSubs) @@ -205,7 +205,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable { _clients.TryRemove(client.Id, out _); _logger.LogDebug("Removed client {ClientId}", client.Id); - client.RemoveAllSubscriptions(_subList); + var subList = client.Account?.SubList ?? _globalAccount.SubList; + client.RemoveAllSubscriptions(subList); client.Account?.RemoveClient(client.Id); } @@ -214,7 +215,6 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _listener?.Dispose(); foreach (var client in _clients.Values) client.Dispose(); - _subList.Dispose(); foreach (var account in _accounts.Values) account.Dispose(); } diff --git a/tests/NATS.Server.Tests/AccountIsolationTests.cs b/tests/NATS.Server.Tests/AccountIsolationTests.cs new file mode 100644 index 0000000..8f6be47 --- /dev/null +++ b/tests/NATS.Server.Tests/AccountIsolationTests.cs @@ -0,0 +1,106 @@ +using System.Net; +using System.Net.Sockets; +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Client.Core; +using NATS.Server.Auth; + +namespace NATS.Server.Tests; + +public class AccountIsolationTests : IAsyncLifetime +{ + private NatsServer _server = null!; + private int _port; + private readonly CancellationTokenSource _cts = new(); + private Task _serverTask = null!; + + private static int GetFreePort() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + sock.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + return ((IPEndPoint)sock.LocalEndPoint!).Port; + } + + public async Task InitializeAsync() + { + _port = GetFreePort(); + _server = new NatsServer(new NatsOptions + { + Port = _port, + Users = + [ + new User { Username = "alice", Password = "pass", Account = "acct-a" }, + new User { Username = "bob", Password = "pass", Account = "acct-b" }, + new User { Username = "charlie", Password = "pass", Account = "acct-a" }, + ], + }, NullLoggerFactory.Instance); + + _serverTask = _server.StartAsync(_cts.Token); + await _server.WaitForReadyAsync(); + } + + public async Task DisposeAsync() + { + await _cts.CancelAsync(); + _server.Dispose(); + } + + [Fact] + public async Task Same_account_receives_messages() + { + // Alice and Charlie are in acct-a + await using var alice = new NatsConnection(new NatsOpts + { + Url = $"nats://alice:pass@127.0.0.1:{_port}", + }); + await using var charlie = new NatsConnection(new NatsOpts + { + Url = $"nats://charlie:pass@127.0.0.1:{_port}", + }); + + await alice.ConnectAsync(); + await charlie.ConnectAsync(); + + await using var sub = await charlie.SubscribeCoreAsync("test.subject"); + await charlie.PingAsync(); + + await alice.PublishAsync("test.subject", "from-alice"); + + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var msg = await sub.Msgs.ReadAsync(timeout.Token); + msg.Data.ShouldBe("from-alice"); + } + + [Fact] + public async Task Different_account_does_not_receive_messages() + { + // Alice is in acct-a, Bob is in acct-b + await using var alice = new NatsConnection(new NatsOpts + { + Url = $"nats://alice:pass@127.0.0.1:{_port}", + }); + await using var bob = new NatsConnection(new NatsOpts + { + Url = $"nats://bob:pass@127.0.0.1:{_port}", + }); + + await alice.ConnectAsync(); + await bob.ConnectAsync(); + + await using var sub = await bob.SubscribeCoreAsync("test.subject"); + await bob.PingAsync(); + + await alice.PublishAsync("test.subject", "from-alice"); + + // Bob should NOT receive this — wait briefly then verify nothing arrived + using var timeout = new CancellationTokenSource(TimeSpan.FromMilliseconds(500)); + try + { + await sub.Msgs.ReadAsync(timeout.Token); + throw new Exception("Bob should not have received a message from a different account"); + } + catch (OperationCanceledException) + { + // Expected — no message received (timeout) + } + } +}