feat: add per-account SubList isolation for message routing
Subscriptions and message routing now go through account-specific SubLists instead of a single global SubList. Clients in different accounts cannot see each other's messages. When no account is specified (or auth is not configured), all clients share the global $G account.
This commit is contained in:
@@ -260,6 +260,13 @@ public sealed class NatsClient : IDisposable
|
||||
_logger.LogDebug("Client {ClientId} authenticated as {Identity}", Id, result.Identity);
|
||||
}
|
||||
|
||||
// If no account was assigned by auth, assign global account
|
||||
if (Account == null && Router is NatsServer server2)
|
||||
{
|
||||
Account = server2.GetOrCreateAccount(Account.GlobalAccountName);
|
||||
Account.AddClient(Id);
|
||||
}
|
||||
|
||||
ConnectReceived = true;
|
||||
_logger.LogDebug("CONNECT received from client {ClientId}, name={ClientName}", Id, ClientOpts?.Name);
|
||||
}
|
||||
@@ -286,8 +293,7 @@ public sealed class NatsClient : IDisposable
|
||||
|
||||
_logger.LogDebug("SUB {Subject} {Sid} from client {ClientId}", cmd.Subject, cmd.Sid, Id);
|
||||
|
||||
if (Router is ISubListAccess sl)
|
||||
sl.SubList.Insert(sub);
|
||||
Account?.SubList.Insert(sub);
|
||||
}
|
||||
|
||||
private void ProcessUnsub(ParsedCommand cmd)
|
||||
@@ -306,8 +312,7 @@ public sealed class NatsClient : IDisposable
|
||||
|
||||
_subs.Remove(cmd.Sid!);
|
||||
|
||||
if (Router is ISubListAccess sl)
|
||||
sl.SubList.Remove(sub);
|
||||
Account?.SubList.Remove(sub);
|
||||
}
|
||||
|
||||
private async ValueTask ProcessPubAsync(ParsedCommand cmd)
|
||||
@@ -488,6 +493,7 @@ public sealed class NatsClient : IDisposable
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
_permissions?.Dispose();
|
||||
_clientCts?.Dispose();
|
||||
_stream.Dispose();
|
||||
_socket.Dispose();
|
||||
|
||||
@@ -13,7 +13,6 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
{
|
||||
private readonly NatsOptions _options;
|
||||
private readonly ConcurrentDictionary<ulong, NatsClient> _clients = new();
|
||||
private readonly SubList _subList = new();
|
||||
private readonly ServerInfo _serverInfo;
|
||||
private readonly ILogger<NatsServer> _logger;
|
||||
private readonly ILoggerFactory _loggerFactory;
|
||||
@@ -24,7 +23,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
private Socket? _listener;
|
||||
private ulong _nextClientId;
|
||||
|
||||
public SubList SubList => _subList;
|
||||
public SubList SubList => _globalAccount.SubList;
|
||||
|
||||
public Task WaitForReadyAsync() => _listeningStarted.Task;
|
||||
|
||||
@@ -148,7 +147,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
public void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory<byte> headers,
|
||||
ReadOnlyMemory<byte> payload, NatsClient sender)
|
||||
{
|
||||
var result = _subList.Match(subject);
|
||||
var subList = sender.Account?.SubList ?? _globalAccount.SubList;
|
||||
var result = subList.Match(subject);
|
||||
|
||||
// Deliver to plain subscribers
|
||||
foreach (var sub in result.PlainSubs)
|
||||
@@ -205,7 +205,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
{
|
||||
_clients.TryRemove(client.Id, out _);
|
||||
_logger.LogDebug("Removed client {ClientId}", client.Id);
|
||||
client.RemoveAllSubscriptions(_subList);
|
||||
var subList = client.Account?.SubList ?? _globalAccount.SubList;
|
||||
client.RemoveAllSubscriptions(subList);
|
||||
client.Account?.RemoveClient(client.Id);
|
||||
}
|
||||
|
||||
@@ -214,7 +215,6 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
_listener?.Dispose();
|
||||
foreach (var client in _clients.Values)
|
||||
client.Dispose();
|
||||
_subList.Dispose();
|
||||
foreach (var account in _accounts.Values)
|
||||
account.Dispose();
|
||||
}
|
||||
|
||||
106
tests/NATS.Server.Tests/AccountIsolationTests.cs
Normal file
106
tests/NATS.Server.Tests/AccountIsolationTests.cs
Normal file
@@ -0,0 +1,106 @@
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using NATS.Client.Core;
|
||||
using NATS.Server.Auth;
|
||||
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class AccountIsolationTests : IAsyncLifetime
|
||||
{
|
||||
private NatsServer _server = null!;
|
||||
private int _port;
|
||||
private readonly CancellationTokenSource _cts = new();
|
||||
private Task _serverTask = null!;
|
||||
|
||||
private static int GetFreePort()
|
||||
{
|
||||
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
sock.Bind(new IPEndPoint(IPAddress.Loopback, 0));
|
||||
return ((IPEndPoint)sock.LocalEndPoint!).Port;
|
||||
}
|
||||
|
||||
public async Task InitializeAsync()
|
||||
{
|
||||
_port = GetFreePort();
|
||||
_server = new NatsServer(new NatsOptions
|
||||
{
|
||||
Port = _port,
|
||||
Users =
|
||||
[
|
||||
new User { Username = "alice", Password = "pass", Account = "acct-a" },
|
||||
new User { Username = "bob", Password = "pass", Account = "acct-b" },
|
||||
new User { Username = "charlie", Password = "pass", Account = "acct-a" },
|
||||
],
|
||||
}, NullLoggerFactory.Instance);
|
||||
|
||||
_serverTask = _server.StartAsync(_cts.Token);
|
||||
await _server.WaitForReadyAsync();
|
||||
}
|
||||
|
||||
public async Task DisposeAsync()
|
||||
{
|
||||
await _cts.CancelAsync();
|
||||
_server.Dispose();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Same_account_receives_messages()
|
||||
{
|
||||
// Alice and Charlie are in acct-a
|
||||
await using var alice = new NatsConnection(new NatsOpts
|
||||
{
|
||||
Url = $"nats://alice:pass@127.0.0.1:{_port}",
|
||||
});
|
||||
await using var charlie = new NatsConnection(new NatsOpts
|
||||
{
|
||||
Url = $"nats://charlie:pass@127.0.0.1:{_port}",
|
||||
});
|
||||
|
||||
await alice.ConnectAsync();
|
||||
await charlie.ConnectAsync();
|
||||
|
||||
await using var sub = await charlie.SubscribeCoreAsync<string>("test.subject");
|
||||
await charlie.PingAsync();
|
||||
|
||||
await alice.PublishAsync("test.subject", "from-alice");
|
||||
|
||||
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||
var msg = await sub.Msgs.ReadAsync(timeout.Token);
|
||||
msg.Data.ShouldBe("from-alice");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Different_account_does_not_receive_messages()
|
||||
{
|
||||
// Alice is in acct-a, Bob is in acct-b
|
||||
await using var alice = new NatsConnection(new NatsOpts
|
||||
{
|
||||
Url = $"nats://alice:pass@127.0.0.1:{_port}",
|
||||
});
|
||||
await using var bob = new NatsConnection(new NatsOpts
|
||||
{
|
||||
Url = $"nats://bob:pass@127.0.0.1:{_port}",
|
||||
});
|
||||
|
||||
await alice.ConnectAsync();
|
||||
await bob.ConnectAsync();
|
||||
|
||||
await using var sub = await bob.SubscribeCoreAsync<string>("test.subject");
|
||||
await bob.PingAsync();
|
||||
|
||||
await alice.PublishAsync("test.subject", "from-alice");
|
||||
|
||||
// Bob should NOT receive this — wait briefly then verify nothing arrived
|
||||
using var timeout = new CancellationTokenSource(TimeSpan.FromMilliseconds(500));
|
||||
try
|
||||
{
|
||||
await sub.Msgs.ReadAsync(timeout.Token);
|
||||
throw new Exception("Bob should not have received a message from a different account");
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// Expected — no message received (timeout)
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user