From 6a05308143f70251fd0d4f2f12d90298bfbab421 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 14:36:44 -0500 Subject: [PATCH] feat: enforce account-scoped remote delivery semantics --- src/NATS.Server/Gateways/GatewayConnection.cs | 42 ++++-- src/NATS.Server/Gateways/GatewayManager.cs | 6 +- src/NATS.Server/LeafNodes/LeafConnection.cs | 42 ++++-- src/NATS.Server/LeafNodes/LeafNodeManager.cs | 4 +- src/NATS.Server/NatsServer.cs | 17 ++- src/NATS.Server/Routes/RouteConnection.cs | 42 ++++-- src/NATS.Server/Routes/RouteManager.cs | 4 +- .../GatewayAccountScopedDeliveryTests.cs | 140 ++++++++++++++++++ .../LeafAccountScopedDeliveryTests.cs | 138 +++++++++++++++++ .../Routes/RouteAccountScopedDeliveryTests.cs | 140 ++++++++++++++++++ 10 files changed, 531 insertions(+), 44 deletions(-) create mode 100644 tests/NATS.Server.Tests/Gateways/GatewayAccountScopedDeliveryTests.cs create mode 100644 tests/NATS.Server.Tests/LeafNodes/LeafAccountScopedDeliveryTests.cs create mode 100644 tests/NATS.Server.Tests/Routes/RouteAccountScopedDeliveryTests.cs diff --git a/src/NATS.Server/Gateways/GatewayConnection.cs b/src/NATS.Server/Gateways/GatewayConnection.cs index 862ee1f..001a724 100644 --- a/src/NATS.Server/Gateways/GatewayConnection.cs +++ b/src/NATS.Server/Gateways/GatewayConnection.cs @@ -48,13 +48,13 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable public Task SendAMinusAsync(string account, string subject, string? queue, CancellationToken ct) => WriteLineAsync(queue is { Length: > 0 } ? $"A- {account} {subject} {queue}" : $"A- {account} {subject}", ct); - public async Task SendMessageAsync(string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) + public async Task SendMessageAsync(string account, string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) { var reply = string.IsNullOrEmpty(replyTo) ? "-" : replyTo; await _writeGate.WaitAsync(ct); try { - var control = Encoding.ASCII.GetBytes($"GMSG {subject} {reply} {payload.Length}\r\n"); + var control = Encoding.ASCII.GetBytes($"GMSG {account} {subject} {reply} {payload.Length}\r\n"); await _stream.WriteAsync(control, ct); if (!payload.IsEmpty) await _stream.WriteAsync(payload, ct); @@ -94,9 +94,9 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable if (line.StartsWith("A+ ", StringComparison.Ordinal)) { var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries); - if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var account, out var parsedSubject, out var queue)) + if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var parsedAccount, out var parsedSubject, out var queue)) { - await RemoteSubscriptionReceived(new RemoteSubscription(parsedSubject, queue, RemoteId ?? string.Empty, account)); + await RemoteSubscriptionReceived(new RemoteSubscription(parsedSubject, queue, RemoteId ?? string.Empty, parsedAccount)); } continue; } @@ -104,9 +104,9 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable if (line.StartsWith("A- ", StringComparison.Ordinal)) { var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries); - if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var account, out var parsedSubject, out var queue)) + if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var parsedAccount, out var parsedSubject, out var queue)) { - await RemoteSubscriptionReceived(RemoteSubscription.Removal(parsedSubject, queue, RemoteId ?? string.Empty, account)); + await RemoteSubscriptionReceived(RemoteSubscription.Removal(parsedSubject, queue, RemoteId ?? string.Empty, parsedAccount)); } continue; } @@ -115,12 +115,36 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable continue; var args = line.Split(' ', StringSplitOptions.RemoveEmptyEntries); - if (args.Length < 4 || !int.TryParse(args[3], out var size) || size < 0) + if (args.Length < 4) + continue; + + var account = "$G"; + string subject; + string replyToken; + string sizeToken; + + // New format: GMSG + // Legacy format: GMSG + if (args.Length >= 5 && !LooksLikeSubject(args[1])) + { + account = args[1]; + subject = args[2]; + replyToken = args[3]; + sizeToken = args[4]; + } + else + { + subject = args[1]; + replyToken = args[2]; + sizeToken = args[3]; + } + + if (!int.TryParse(sizeToken, out var size) || size < 0) continue; var payload = await ReadPayloadAsync(size, ct); if (MessageReceived != null) - await MessageReceived(new GatewayMessage(args[1], args[2] == "-" ? null : args[2], payload)); + await MessageReceived(new GatewayMessage(subject, replyToken == "-" ? null : replyToken, payload, account)); } } @@ -215,4 +239,4 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable || token.Contains('>', StringComparison.Ordinal); } -public sealed record GatewayMessage(string Subject, string? ReplyTo, ReadOnlyMemory Payload); +public sealed record GatewayMessage(string Subject, string? ReplyTo, ReadOnlyMemory Payload, string Account = "$G"); diff --git a/src/NATS.Server/Gateways/GatewayManager.cs b/src/NATS.Server/Gateways/GatewayManager.cs index ecee12f..32a4c3c 100644 --- a/src/NATS.Server/Gateways/GatewayManager.cs +++ b/src/NATS.Server/Gateways/GatewayManager.cs @@ -64,16 +64,16 @@ public sealed class GatewayManager : IAsyncDisposable return Task.CompletedTask; } - public async Task ForwardMessageAsync(string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) + public async Task ForwardMessageAsync(string account, string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) { foreach (var connection in _connections.Values) - await connection.SendMessageAsync(subject, replyTo, payload, ct); + await connection.SendMessageAsync(account, subject, replyTo, payload, ct); } public async Task ForwardJetStreamClusterMessageAsync(GatewayMessage message, CancellationToken ct) { Interlocked.Increment(ref _forwardedJetStreamClusterMessages); - await ForwardMessageAsync(message.Subject, message.ReplyTo, message.Payload, ct); + await ForwardMessageAsync(message.Account, message.Subject, message.ReplyTo, message.Payload, ct); } public void PropagateLocalSubscription(string account, string subject, string? queue) diff --git a/src/NATS.Server/LeafNodes/LeafConnection.cs b/src/NATS.Server/LeafNodes/LeafConnection.cs index 6044c3b..8e5f671 100644 --- a/src/NATS.Server/LeafNodes/LeafConnection.cs +++ b/src/NATS.Server/LeafNodes/LeafConnection.cs @@ -48,13 +48,13 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable public Task SendLsMinusAsync(string account, string subject, string? queue, CancellationToken ct) => WriteLineAsync(queue is { Length: > 0 } ? $"LS- {account} {subject} {queue}" : $"LS- {account} {subject}", ct); - public async Task SendMessageAsync(string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) + public async Task SendMessageAsync(string account, string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) { var reply = string.IsNullOrEmpty(replyTo) ? "-" : replyTo; await _writeGate.WaitAsync(ct); try { - var control = Encoding.ASCII.GetBytes($"LMSG {subject} {reply} {payload.Length}\r\n"); + var control = Encoding.ASCII.GetBytes($"LMSG {account} {subject} {reply} {payload.Length}\r\n"); await _stream.WriteAsync(control, ct); if (!payload.IsEmpty) await _stream.WriteAsync(payload, ct); @@ -94,9 +94,9 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable if (line.StartsWith("LS+ ", StringComparison.Ordinal)) { var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries); - if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var account, out var parsedSubject, out var queue)) + if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var parsedAccount, out var parsedSubject, out var queue)) { - await RemoteSubscriptionReceived(new RemoteSubscription(parsedSubject, queue, RemoteId ?? string.Empty, account)); + await RemoteSubscriptionReceived(new RemoteSubscription(parsedSubject, queue, RemoteId ?? string.Empty, parsedAccount)); } continue; } @@ -104,9 +104,9 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable if (line.StartsWith("LS- ", StringComparison.Ordinal)) { var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries); - if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var account, out var parsedSubject, out var queue)) + if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var parsedAccount, out var parsedSubject, out var queue)) { - await RemoteSubscriptionReceived(RemoteSubscription.Removal(parsedSubject, queue, RemoteId ?? string.Empty, account)); + await RemoteSubscriptionReceived(RemoteSubscription.Removal(parsedSubject, queue, RemoteId ?? string.Empty, parsedAccount)); } continue; } @@ -115,12 +115,36 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable continue; var args = line.Split(' ', StringSplitOptions.RemoveEmptyEntries); - if (args.Length < 4 || !int.TryParse(args[3], out var size) || size < 0) + if (args.Length < 4) + continue; + + var account = "$G"; + string subject; + string replyToken; + string sizeToken; + + // New format: LMSG + // Legacy format: LMSG + if (args.Length >= 5 && !LooksLikeSubject(args[1])) + { + account = args[1]; + subject = args[2]; + replyToken = args[3]; + sizeToken = args[4]; + } + else + { + subject = args[1]; + replyToken = args[2]; + sizeToken = args[3]; + } + + if (!int.TryParse(sizeToken, out var size) || size < 0) continue; var payload = await ReadPayloadAsync(size, ct); if (MessageReceived != null) - await MessageReceived(new LeafMessage(args[1], args[2] == "-" ? null : args[2], payload)); + await MessageReceived(new LeafMessage(subject, replyToken == "-" ? null : replyToken, payload, account)); } } @@ -215,4 +239,4 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable || token.Contains('>', StringComparison.Ordinal); } -public sealed record LeafMessage(string Subject, string? ReplyTo, ReadOnlyMemory Payload); +public sealed record LeafMessage(string Subject, string? ReplyTo, ReadOnlyMemory Payload, string Account = "$G"); diff --git a/src/NATS.Server/LeafNodes/LeafNodeManager.cs b/src/NATS.Server/LeafNodes/LeafNodeManager.cs index 11546d1..fb99da5 100644 --- a/src/NATS.Server/LeafNodes/LeafNodeManager.cs +++ b/src/NATS.Server/LeafNodes/LeafNodeManager.cs @@ -58,10 +58,10 @@ public sealed class LeafNodeManager : IAsyncDisposable return Task.CompletedTask; } - public async Task ForwardMessageAsync(string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) + public async Task ForwardMessageAsync(string account, string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) { foreach (var connection in _connections.Values) - await connection.SendMessageAsync(subject, replyTo, payload, ct); + await connection.SendMessageAsync(account, subject, replyTo, payload, ct); } public void PropagateLocalSubscription(string account, string subject, string? queue) diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 23aa777..ff2f230 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -871,7 +871,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable private void ProcessRoutedMessage(RouteMessage message) { - DeliverRemoteMessage(message.Subject, message.ReplyTo, message.Payload); + DeliverRemoteMessage(message.Account, message.Subject, message.ReplyTo, message.Payload); } private void ProcessGatewayMessage(GatewayMessage message) @@ -880,7 +880,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable if (ReplyMapper.TryRestoreGatewayReply(replyTo, out var restoredReply)) replyTo = restoredReply; - DeliverRemoteMessage(message.Subject, replyTo, message.Payload); + DeliverRemoteMessage(message.Account, message.Subject, replyTo, message.Payload); } private void ProcessLeafMessage(LeafMessage message) @@ -892,12 +892,13 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable if (LeafLoopDetector.TryUnmark(subject, out var unmarked)) subject = unmarked; - DeliverRemoteMessage(subject, message.ReplyTo, message.Payload); + DeliverRemoteMessage(message.Account, subject, message.ReplyTo, message.Payload); } - private void DeliverRemoteMessage(string subject, string? replyTo, ReadOnlyMemory payload) + private void DeliverRemoteMessage(string account, string subject, string? replyTo, ReadOnlyMemory payload) { - var result = _globalAccount.SubList.Match(subject); + var targetAccount = GetOrCreateAccount(account); + var result = targetAccount.SubList.Match(subject); foreach (var sub in result.PlainSubs) DeliverMessage(sub, subject, replyTo, default, payload); @@ -948,17 +949,17 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable var senderAccount = sender.Account ?? _globalAccount; if (_routeManager != null && senderAccount.SubList.HasRemoteInterest(senderAccount.Name, subject)) - _routeManager.ForwardRoutedMessageAsync(subject, replyTo, payload, default).GetAwaiter().GetResult(); + _routeManager.ForwardRoutedMessageAsync(senderAccount.Name, subject, replyTo, payload, default).GetAwaiter().GetResult(); if (_gatewayManager != null && senderAccount.SubList.HasRemoteInterest(senderAccount.Name, subject)) { var mappedReplyTo = ReplyMapper.ToGatewayReply(replyTo, ServerId); - _gatewayManager.ForwardMessageAsync(subject, mappedReplyTo, payload, default).GetAwaiter().GetResult(); + _gatewayManager.ForwardMessageAsync(senderAccount.Name, subject, mappedReplyTo, payload, default).GetAwaiter().GetResult(); } if (_leafNodeManager != null && senderAccount.SubList.HasRemoteInterest(senderAccount.Name, subject)) { var markedSubject = LeafLoopDetector.Mark(subject, ServerId); - _leafNodeManager.ForwardMessageAsync(markedSubject, replyTo, payload, default).GetAwaiter().GetResult(); + _leafNodeManager.ForwardMessageAsync(senderAccount.Name, markedSubject, replyTo, payload, default).GetAwaiter().GetResult(); } var subList = sender.Account?.SubList ?? _globalAccount.SubList; diff --git a/src/NATS.Server/Routes/RouteConnection.cs b/src/NATS.Server/Routes/RouteConnection.cs index ec1c999..4298c53 100644 --- a/src/NATS.Server/Routes/RouteConnection.cs +++ b/src/NATS.Server/Routes/RouteConnection.cs @@ -57,13 +57,13 @@ public sealed class RouteConnection(Socket socket) : IAsyncDisposable await WriteLineAsync(frame, ct); } - public async Task SendRmsgAsync(string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) + public async Task SendRmsgAsync(string account, string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) { var replyToken = string.IsNullOrEmpty(replyTo) ? "-" : replyTo; await _writeGate.WaitAsync(ct); try { - var control = Encoding.ASCII.GetBytes($"RMSG {subject} {replyToken} {payload.Length}\r\n"); + var control = Encoding.ASCII.GetBytes($"RMSG {account} {subject} {replyToken} {payload.Length}\r\n"); await _stream.WriteAsync(control, ct); if (!payload.IsEmpty) await _stream.WriteAsync(payload, ct); @@ -116,9 +116,9 @@ public sealed class RouteConnection(Socket socket) : IAsyncDisposable if (line.StartsWith("RS+ ", StringComparison.Ordinal)) { var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries); - if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var account, out var parsedSubject, out var queue)) + if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var parsedAccount, out var parsedSubject, out var queue)) { - await RemoteSubscriptionReceived(new RemoteSubscription(parsedSubject, queue, RemoteServerId ?? string.Empty, account)); + await RemoteSubscriptionReceived(new RemoteSubscription(parsedSubject, queue, RemoteServerId ?? string.Empty, parsedAccount)); } continue; } @@ -126,9 +126,9 @@ public sealed class RouteConnection(Socket socket) : IAsyncDisposable if (line.StartsWith("RS- ", StringComparison.Ordinal)) { var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries); - if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var account, out var parsedSubject, out var queue)) + if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var parsedAccount, out var parsedSubject, out var queue)) { - await RemoteSubscriptionReceived(RemoteSubscription.Removal(parsedSubject, queue, RemoteServerId ?? string.Empty, account)); + await RemoteSubscriptionReceived(RemoteSubscription.Removal(parsedSubject, queue, RemoteServerId ?? string.Empty, parsedAccount)); } continue; } @@ -140,14 +140,34 @@ public sealed class RouteConnection(Socket socket) : IAsyncDisposable if (args.Length < 4) continue; - var subject = args[1]; - var reply = args[2] == "-" ? null : args[2]; - if (!int.TryParse(args[3], out var size) || size < 0) + var account = "$G"; + string subject; + string replyToken; + string sizeToken; + + // New format: RMSG + // Legacy format: RMSG + if (args.Length >= 5 && !LooksLikeSubject(args[1])) + { + account = args[1]; + subject = args[2]; + replyToken = args[3]; + sizeToken = args[4]; + } + else + { + subject = args[1]; + replyToken = args[2]; + sizeToken = args[3]; + } + + var reply = replyToken == "-" ? null : replyToken; + if (!int.TryParse(sizeToken, out var size) || size < 0) continue; var payload = await ReadPayloadAsync(size, ct); if (RoutedMessageReceived != null) - await RoutedMessageReceived(new RouteMessage(subject, reply, payload)); + await RoutedMessageReceived(new RouteMessage(subject, reply, payload, account)); } } @@ -266,4 +286,4 @@ public sealed class RouteConnection(Socket socket) : IAsyncDisposable } } -public sealed record RouteMessage(string Subject, string? ReplyTo, ReadOnlyMemory Payload); +public sealed record RouteMessage(string Subject, string? ReplyTo, ReadOnlyMemory Payload, string Account = "$G"); diff --git a/src/NATS.Server/Routes/RouteManager.cs b/src/NATS.Server/Routes/RouteManager.cs index 545b9c3..b3d2005 100644 --- a/src/NATS.Server/Routes/RouteManager.cs +++ b/src/NATS.Server/Routes/RouteManager.cs @@ -114,13 +114,13 @@ public sealed class RouteManager : IAsyncDisposable _ = route.SendRsMinusAsync(account, subject, queue, _cts?.Token ?? CancellationToken.None); } - public async Task ForwardRoutedMessageAsync(string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) + public async Task ForwardRoutedMessageAsync(string account, string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) { if (_routes.IsEmpty) return; foreach (var route in _routes.Values) - await route.SendRmsgAsync(subject, replyTo, payload, ct); + await route.SendRmsgAsync(account, subject, replyTo, payload, ct); } private async Task AcceptLoopAsync(CancellationToken ct) diff --git a/tests/NATS.Server.Tests/Gateways/GatewayAccountScopedDeliveryTests.cs b/tests/NATS.Server.Tests/Gateways/GatewayAccountScopedDeliveryTests.cs new file mode 100644 index 0000000..2e13e52 --- /dev/null +++ b/tests/NATS.Server.Tests/Gateways/GatewayAccountScopedDeliveryTests.cs @@ -0,0 +1,140 @@ +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Client.Core; +using NATS.Server.Auth; +using NATS.Server.Configuration; + +namespace NATS.Server.Tests.Gateways; + +public class GatewayAccountScopedDeliveryTests +{ + [Fact] + public async Task Remote_message_delivery_uses_target_account_sublist_not_global_sublist() + { + const string subject = "orders.created"; + await using var fixture = await GatewayAccountDeliveryFixture.StartAsync(); + + await using var remoteAccountA = await fixture.ConnectAsync(fixture.Remote, "a_sub"); + await using var remoteAccountB = await fixture.ConnectAsync(fixture.Remote, "b_sub"); + await using var publisher = await fixture.ConnectAsync(fixture.Local, "a_pub"); + + await using var subA = await remoteAccountA.SubscribeCoreAsync(subject); + await using var subB = await remoteAccountB.SubscribeCoreAsync(subject); + await remoteAccountA.PingAsync(); + await remoteAccountB.PingAsync(); + await fixture.WaitForRemoteInterestOnLocalAsync("A", subject); + + await publisher.PublishAsync(subject, "from-gateway-a"); + + using var receiveTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var msgA = await subA.Msgs.ReadAsync(receiveTimeout.Token); + msgA.Data.ShouldBe("from-gateway-a"); + + using var leakTimeout = new CancellationTokenSource(TimeSpan.FromMilliseconds(500)); + await Should.ThrowAsync(async () => + await subB.Msgs.ReadAsync(leakTimeout.Token)); + } +} + +internal sealed class GatewayAccountDeliveryFixture : IAsyncDisposable +{ + private readonly CancellationTokenSource _localCts; + private readonly CancellationTokenSource _remoteCts; + + private GatewayAccountDeliveryFixture(NatsServer local, NatsServer remote, CancellationTokenSource localCts, CancellationTokenSource remoteCts) + { + Local = local; + Remote = remote; + _localCts = localCts; + _remoteCts = remoteCts; + } + + public NatsServer Local { get; } + public NatsServer Remote { get; } + + public static async Task StartAsync() + { + var users = new User[] + { + new() { Username = "a_pub", Password = "pass", Account = "A" }, + new() { Username = "a_sub", Password = "pass", Account = "A" }, + new() { Username = "b_sub", Password = "pass", Account = "B" }, + }; + + var localOptions = new NatsOptions + { + Host = "127.0.0.1", + Port = 0, + Users = users, + Gateway = new GatewayOptions + { + Name = "LOCAL", + Host = "127.0.0.1", + Port = 0, + }, + }; + + var local = new NatsServer(localOptions, NullLoggerFactory.Instance); + var localCts = new CancellationTokenSource(); + _ = local.StartAsync(localCts.Token); + await local.WaitForReadyAsync(); + + var remoteOptions = new NatsOptions + { + Host = "127.0.0.1", + Port = 0, + Users = users, + Gateway = new GatewayOptions + { + Name = "REMOTE", + Host = "127.0.0.1", + Port = 0, + Remotes = [local.GatewayListen!], + }, + }; + + var remote = new NatsServer(remoteOptions, NullLoggerFactory.Instance); + var remoteCts = new CancellationTokenSource(); + _ = remote.StartAsync(remoteCts.Token); + await remote.WaitForReadyAsync(); + + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + while (!timeout.IsCancellationRequested && (local.Stats.Gateways == 0 || remote.Stats.Gateways == 0)) + await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); + + return new GatewayAccountDeliveryFixture(local, remote, localCts, remoteCts); + } + + public async Task ConnectAsync(NatsServer server, string username) + { + var connection = new NatsConnection(new NatsOpts + { + Url = $"nats://{username}:pass@127.0.0.1:{server.Port}", + }); + await connection.ConnectAsync(); + return connection; + } + + public async Task WaitForRemoteInterestOnLocalAsync(string account, string subject) + { + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + while (!timeout.IsCancellationRequested) + { + if (Local.HasRemoteInterest(account, subject)) + return; + + await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); + } + + throw new TimeoutException($"Timed out waiting for remote interest {account}:{subject}."); + } + + public async ValueTask DisposeAsync() + { + await _localCts.CancelAsync(); + await _remoteCts.CancelAsync(); + Local.Dispose(); + Remote.Dispose(); + _localCts.Dispose(); + _remoteCts.Dispose(); + } +} diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafAccountScopedDeliveryTests.cs b/tests/NATS.Server.Tests/LeafNodes/LeafAccountScopedDeliveryTests.cs new file mode 100644 index 0000000..854dc02 --- /dev/null +++ b/tests/NATS.Server.Tests/LeafNodes/LeafAccountScopedDeliveryTests.cs @@ -0,0 +1,138 @@ +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Client.Core; +using NATS.Server.Auth; +using NATS.Server.Configuration; + +namespace NATS.Server.Tests.LeafNodes; + +public class LeafAccountScopedDeliveryTests +{ + [Fact] + public async Task Remote_message_delivery_uses_target_account_sublist_not_global_sublist() + { + const string subject = "orders.created"; + await using var fixture = await LeafAccountDeliveryFixture.StartAsync(); + + await using var remoteAccountA = await fixture.ConnectAsync(fixture.Spoke, "a_sub"); + await using var remoteAccountB = await fixture.ConnectAsync(fixture.Spoke, "b_sub"); + await using var publisher = await fixture.ConnectAsync(fixture.Hub, "a_pub"); + + await using var subA = await remoteAccountA.SubscribeCoreAsync(subject); + await using var subB = await remoteAccountB.SubscribeCoreAsync(subject); + await remoteAccountA.PingAsync(); + await remoteAccountB.PingAsync(); + await fixture.WaitForRemoteInterestOnHubAsync("A", subject); + + await publisher.PublishAsync(subject, "from-leaf-a"); + + using var receiveTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var msgA = await subA.Msgs.ReadAsync(receiveTimeout.Token); + msgA.Data.ShouldBe("from-leaf-a"); + + using var leakTimeout = new CancellationTokenSource(TimeSpan.FromMilliseconds(500)); + await Should.ThrowAsync(async () => + await subB.Msgs.ReadAsync(leakTimeout.Token)); + } +} + +internal sealed class LeafAccountDeliveryFixture : IAsyncDisposable +{ + private readonly CancellationTokenSource _hubCts; + private readonly CancellationTokenSource _spokeCts; + + private LeafAccountDeliveryFixture(NatsServer hub, NatsServer spoke, CancellationTokenSource hubCts, CancellationTokenSource spokeCts) + { + Hub = hub; + Spoke = spoke; + _hubCts = hubCts; + _spokeCts = spokeCts; + } + + public NatsServer Hub { get; } + public NatsServer Spoke { get; } + + public static async Task StartAsync() + { + var users = new User[] + { + new() { Username = "a_pub", Password = "pass", Account = "A" }, + new() { Username = "a_sub", Password = "pass", Account = "A" }, + new() { Username = "b_sub", Password = "pass", Account = "B" }, + }; + + var hubOptions = new NatsOptions + { + Host = "127.0.0.1", + Port = 0, + Users = users, + LeafNode = new LeafNodeOptions + { + Host = "127.0.0.1", + Port = 0, + }, + }; + + var hub = new NatsServer(hubOptions, NullLoggerFactory.Instance); + var hubCts = new CancellationTokenSource(); + _ = hub.StartAsync(hubCts.Token); + await hub.WaitForReadyAsync(); + + var spokeOptions = new NatsOptions + { + Host = "127.0.0.1", + Port = 0, + Users = users, + LeafNode = new LeafNodeOptions + { + Host = "127.0.0.1", + Port = 0, + Remotes = [hub.LeafListen!], + }, + }; + + var spoke = new NatsServer(spokeOptions, NullLoggerFactory.Instance); + var spokeCts = new CancellationTokenSource(); + _ = spoke.StartAsync(spokeCts.Token); + await spoke.WaitForReadyAsync(); + + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)) + await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); + + return new LeafAccountDeliveryFixture(hub, spoke, hubCts, spokeCts); + } + + public async Task ConnectAsync(NatsServer server, string username) + { + var connection = new NatsConnection(new NatsOpts + { + Url = $"nats://{username}:pass@127.0.0.1:{server.Port}", + }); + await connection.ConnectAsync(); + return connection; + } + + public async Task WaitForRemoteInterestOnHubAsync(string account, string subject) + { + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + while (!timeout.IsCancellationRequested) + { + if (Hub.HasRemoteInterest(account, subject)) + return; + + await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); + } + + throw new TimeoutException($"Timed out waiting for remote interest {account}:{subject}."); + } + + public async ValueTask DisposeAsync() + { + await _hubCts.CancelAsync(); + await _spokeCts.CancelAsync(); + Hub.Dispose(); + Spoke.Dispose(); + _hubCts.Dispose(); + _spokeCts.Dispose(); + } +} diff --git a/tests/NATS.Server.Tests/Routes/RouteAccountScopedDeliveryTests.cs b/tests/NATS.Server.Tests/Routes/RouteAccountScopedDeliveryTests.cs new file mode 100644 index 0000000..34efb55 --- /dev/null +++ b/tests/NATS.Server.Tests/Routes/RouteAccountScopedDeliveryTests.cs @@ -0,0 +1,140 @@ +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Client.Core; +using NATS.Server.Auth; +using NATS.Server.Configuration; + +namespace NATS.Server.Tests.Routes; + +public class RouteAccountScopedDeliveryTests +{ + [Fact] + public async Task Remote_message_delivery_uses_target_account_sublist_not_global_sublist() + { + const string subject = "orders.created"; + await using var fixture = await RouteAccountDeliveryFixture.StartAsync(); + + await using var remoteAccountA = await fixture.ConnectAsync(fixture.ServerB, "a_sub"); + await using var remoteAccountB = await fixture.ConnectAsync(fixture.ServerB, "b_sub"); + await using var publisher = await fixture.ConnectAsync(fixture.ServerA, "a_pub"); + + await using var subA = await remoteAccountA.SubscribeCoreAsync(subject); + await using var subB = await remoteAccountB.SubscribeCoreAsync(subject); + await remoteAccountA.PingAsync(); + await remoteAccountB.PingAsync(); + await fixture.WaitForRemoteInterestOnServerAAsync("A", subject); + + await publisher.PublishAsync(subject, "from-route-a"); + + using var receiveTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var msgA = await subA.Msgs.ReadAsync(receiveTimeout.Token); + msgA.Data.ShouldBe("from-route-a"); + + using var leakTimeout = new CancellationTokenSource(TimeSpan.FromMilliseconds(500)); + await Should.ThrowAsync(async () => + await subB.Msgs.ReadAsync(leakTimeout.Token)); + } +} + +internal sealed class RouteAccountDeliveryFixture : IAsyncDisposable +{ + private readonly CancellationTokenSource _ctsA; + private readonly CancellationTokenSource _ctsB; + + private RouteAccountDeliveryFixture(NatsServer serverA, NatsServer serverB, CancellationTokenSource ctsA, CancellationTokenSource ctsB) + { + ServerA = serverA; + ServerB = serverB; + _ctsA = ctsA; + _ctsB = ctsB; + } + + public NatsServer ServerA { get; } + public NatsServer ServerB { get; } + + public static async Task StartAsync() + { + var users = new User[] + { + new() { Username = "a_pub", Password = "pass", Account = "A" }, + new() { Username = "a_sub", Password = "pass", Account = "A" }, + new() { Username = "b_sub", Password = "pass", Account = "B" }, + }; + + var optsA = new NatsOptions + { + Host = "127.0.0.1", + Port = 0, + Users = users, + Cluster = new ClusterOptions + { + Name = Guid.NewGuid().ToString("N"), + Host = "127.0.0.1", + Port = 0, + }, + }; + + var serverA = new NatsServer(optsA, NullLoggerFactory.Instance); + var ctsA = new CancellationTokenSource(); + _ = serverA.StartAsync(ctsA.Token); + await serverA.WaitForReadyAsync(); + + var optsB = new NatsOptions + { + Host = "127.0.0.1", + Port = 0, + Users = users, + Cluster = new ClusterOptions + { + Name = Guid.NewGuid().ToString("N"), + Host = "127.0.0.1", + Port = 0, + Routes = [serverA.ClusterListen!], + }, + }; + + var serverB = new NatsServer(optsB, NullLoggerFactory.Instance); + var ctsB = new CancellationTokenSource(); + _ = serverB.StartAsync(ctsB.Token); + await serverB.WaitForReadyAsync(); + + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + while (!timeout.IsCancellationRequested && (serverA.Stats.Routes == 0 || serverB.Stats.Routes == 0)) + await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); + + return new RouteAccountDeliveryFixture(serverA, serverB, ctsA, ctsB); + } + + public async Task ConnectAsync(NatsServer server, string username) + { + var connection = new NatsConnection(new NatsOpts + { + Url = $"nats://{username}:pass@127.0.0.1:{server.Port}", + }); + await connection.ConnectAsync(); + return connection; + } + + public async Task WaitForRemoteInterestOnServerAAsync(string account, string subject) + { + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + while (!timeout.IsCancellationRequested) + { + if (ServerA.HasRemoteInterest(account, subject)) + return; + + await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); + } + + throw new TimeoutException($"Timed out waiting for remote interest {account}:{subject}."); + } + + public async ValueTask DisposeAsync() + { + await _ctsA.CancelAsync(); + await _ctsB.CancelAsync(); + ServerA.Dispose(); + ServerB.Dispose(); + _ctsA.Dispose(); + _ctsB.Dispose(); + } +}