From 37d3cc29ea405a691e6b36c362c03cc3417d128f Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 24 Feb 2026 16:07:33 -0500 Subject: [PATCH] feat(networking): add leaf subject filtering and port networking Go tests (D6+D7) D6: Add ExportSubjects/ImportSubjects allow-lists to LeafHubSpokeMapper alongside existing DenyExports/DenyImports deny-lists. When an allow-list is non-empty, subjects must match at least one allow pattern; deny always takes precedence. Updated LeafNodeOptions, LeafHubSpokeMapper (5-arg constructor), and LeafNodeManager to wire through the new allow-lists. Added 13 new unit + integration tests covering allow-list semantics, deny precedence, bidirectional filtering, and wire-level propagation. D7: Existing NetworkingGoParityTests.cs (50 tests) covers gateway interest mode, route pool accounting, and leaf node connections. Parity DB already up to date. --- .../Configuration/LeafNodeOptions.cs | 18 + .../LeafNodes/LeafHubSpokeMapper.cs | 68 ++- src/NATS.Server/LeafNodes/LeafNodeManager.cs | 4 +- .../LeafNodes/LeafSubjectFilterTests.cs | 396 +++++++++++++++++- 4 files changed, 467 insertions(+), 19 deletions(-) diff --git a/src/NATS.Server/Configuration/LeafNodeOptions.cs b/src/NATS.Server/Configuration/LeafNodeOptions.cs index c01a857..1ab5577 100644 --- a/src/NATS.Server/Configuration/LeafNodeOptions.cs +++ b/src/NATS.Server/Configuration/LeafNodeOptions.cs @@ -28,4 +28,22 @@ public sealed class LeafNodeOptions /// Go reference: leafnode.go — DenyImports in RemoteLeafOpts (opts.go:230). /// public List DenyImports { get; set; } = []; + + /// + /// Explicit allow-list for exported subjects (hub→leaf direction). When non-empty, + /// only messages matching at least one of these patterns will be forwarded from + /// the hub to the leaf. Deny patterns () take precedence. + /// Supports wildcards (* and >). + /// Go reference: auth.go — SubjectPermission.Allow (Publish allow list). + /// + public List ExportSubjects { get; set; } = []; + + /// + /// Explicit allow-list for imported subjects (leaf→hub direction). When non-empty, + /// only messages matching at least one of these patterns will be forwarded from + /// the leaf to the hub. Deny patterns () take precedence. + /// Supports wildcards (* and >). + /// Go reference: auth.go — SubjectPermission.Allow (Subscribe allow list). + /// + public List ImportSubjects { get; set; } = []; } diff --git a/src/NATS.Server/LeafNodes/LeafHubSpokeMapper.cs b/src/NATS.Server/LeafNodes/LeafHubSpokeMapper.cs index 688ed91..53e79d0 100644 --- a/src/NATS.Server/LeafNodes/LeafHubSpokeMapper.cs +++ b/src/NATS.Server/LeafNodes/LeafHubSpokeMapper.cs @@ -12,10 +12,16 @@ public sealed record LeafMappingResult(string Account, string Subject); /// /// Maps accounts between hub and spoke, and applies subject-level export/import -/// filtering on leaf connections. In the Go server, DenyExports restricts what -/// flows hub→leaf (Publish permission) and DenyImports restricts what flows -/// leaf→hub (Subscribe permission). -/// Go reference: leafnode.go:470-507 (newLeafNodeCfg), opts.go:230-231. +/// filtering on leaf connections. Supports both allow-lists and deny-lists: +/// +/// - ExportSubjects (allow) + DenyExports (deny): controls hub→leaf flow. +/// - ImportSubjects (allow) + DenyImports (deny): controls leaf→hub flow. +/// +/// When an allow-list is non-empty, a subject must match at least one allow pattern. +/// A subject matching any deny pattern is always rejected (deny takes precedence). +/// +/// Go reference: leafnode.go:470-507 (newLeafNodeCfg), opts.go:230-231, +/// auth.go:127 (SubjectPermission with Allow + Deny). /// public sealed class LeafHubSpokeMapper { @@ -23,27 +29,46 @@ public sealed class LeafHubSpokeMapper private readonly IReadOnlyDictionary _spokeToHub; private readonly IReadOnlyList _denyExports; private readonly IReadOnlyList _denyImports; + private readonly IReadOnlyList _allowExports; + private readonly IReadOnlyList _allowImports; public LeafHubSpokeMapper(IReadOnlyDictionary hubToSpoke) - : this(hubToSpoke, [], []) + : this(hubToSpoke, [], [], [], []) { } /// - /// Creates a mapper with account mapping and subject deny filters. + /// Creates a mapper with account mapping and subject deny filters (legacy constructor). /// - /// Account mapping from hub account names to spoke account names. - /// Subject patterns to deny in hub→leaf (outbound) direction. - /// Subject patterns to deny in leaf→hub (inbound) direction. public LeafHubSpokeMapper( IReadOnlyDictionary hubToSpoke, IReadOnlyList denyExports, IReadOnlyList denyImports) + : this(hubToSpoke, denyExports, denyImports, [], []) + { + } + + /// + /// Creates a mapper with account mapping, deny filters, and allow-list filters. + /// + /// Account mapping from hub account names to spoke account names. + /// Subject patterns to deny in hub→leaf (outbound) direction. + /// Subject patterns to deny in leaf→hub (inbound) direction. + /// Subject patterns to allow in hub→leaf (outbound) direction. Empty = allow all. + /// Subject patterns to allow in leaf→hub (inbound) direction. Empty = allow all. + public LeafHubSpokeMapper( + IReadOnlyDictionary hubToSpoke, + IReadOnlyList denyExports, + IReadOnlyList denyImports, + IReadOnlyList allowExports, + IReadOnlyList allowImports) { _hubToSpoke = hubToSpoke; _spokeToHub = hubToSpoke.ToDictionary(static p => p.Value, static p => p.Key, StringComparer.Ordinal); _denyExports = denyExports; _denyImports = denyImports; + _allowExports = allowExports; + _allowImports = allowImports; } /// @@ -61,23 +86,36 @@ public sealed class LeafHubSpokeMapper /// /// Returns true if the subject is allowed to flow in the given direction. /// A subject is denied if it matches any pattern in the corresponding deny list. - /// Go reference: leafnode.go:475-484 (DenyExports → Publish deny, DenyImports → Subscribe deny). + /// When an allow-list is set, the subject must also match at least one allow pattern. + /// Deny takes precedence over allow (Go reference: auth.go SubjectPermission semantics). /// public bool IsSubjectAllowed(string subject, LeafMapDirection direction) { - var denyList = direction switch + var (denyList, allowList) = direction switch { - LeafMapDirection.Outbound => _denyExports, - LeafMapDirection.Inbound => _denyImports, - _ => [], + LeafMapDirection.Outbound => (_denyExports, _allowExports), + LeafMapDirection.Inbound => (_denyImports, _allowImports), + _ => ((IReadOnlyList)[], (IReadOnlyList)[]), }; + // Deny takes precedence: if subject matches any deny pattern, reject it. for (var i = 0; i < denyList.Count; i++) { if (SubjectMatch.MatchLiteral(subject, denyList[i])) return false; } - return true; + // If allow-list is empty, everything not denied is allowed. + if (allowList.Count == 0) + return true; + + // With a non-empty allow-list, subject must match at least one allow pattern. + for (var i = 0; i < allowList.Count; i++) + { + if (SubjectMatch.MatchLiteral(subject, allowList[i])) + return true; + } + + return false; } } diff --git a/src/NATS.Server/LeafNodes/LeafNodeManager.cs b/src/NATS.Server/LeafNodes/LeafNodeManager.cs index 38392b3..6aa300a 100644 --- a/src/NATS.Server/LeafNodes/LeafNodeManager.cs +++ b/src/NATS.Server/LeafNodes/LeafNodeManager.cs @@ -59,7 +59,9 @@ public sealed class LeafNodeManager : IAsyncDisposable _subjectFilter = new LeafHubSpokeMapper( new Dictionary(), options.DenyExports, - options.DenyImports); + options.DenyImports, + options.ExportSubjects, + options.ImportSubjects); } public Task StartAsync(CancellationToken ct) diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafSubjectFilterTests.cs b/tests/NATS.Server.Tests/LeafNodes/LeafSubjectFilterTests.cs index a679b5a..9e9e4ed 100644 --- a/tests/NATS.Server.Tests/LeafNodes/LeafSubjectFilterTests.cs +++ b/tests/NATS.Server.Tests/LeafNodes/LeafSubjectFilterTests.cs @@ -10,9 +10,11 @@ using NATS.Server.Subscriptions; namespace NATS.Server.Tests.LeafNodes; /// -/// Tests for leaf node subject filtering via DenyExports and DenyImports. -/// Go reference: leafnode.go:470-507 (newLeafNodeCfg), opts.go:230-231 -/// (DenyImports/DenyExports fields in RemoteLeafOpts). +/// Tests for leaf node subject filtering via DenyExports/DenyImports (deny-lists) and +/// ExportSubjects/ImportSubjects (allow-lists). When an allow-list is non-empty, only +/// subjects matching at least one allow pattern are permitted; deny takes precedence. +/// Go reference: leafnode.go:470-507 (newLeafNodeCfg), opts.go:230-231, +/// auth.go:127 (SubjectPermission with Allow + Deny). /// public class LeafSubjectFilterTests { @@ -472,6 +474,394 @@ public class LeafSubjectFilterTests } } + // ── ExportSubjects/ImportSubjects allow-list Unit Tests ──────────── + + // Go: auth.go:127 SubjectPermission.Allow semantics + [Fact] + public void Allow_export_restricts_outbound_to_matching_subjects() + { + var mapper = new LeafHubSpokeMapper( + new Dictionary(), + denyExports: [], + denyImports: [], + allowExports: ["orders.*", "events.>"], + allowImports: []); + + mapper.IsSubjectAllowed("orders.created", LeafMapDirection.Outbound).ShouldBeTrue(); + mapper.IsSubjectAllowed("orders.updated", LeafMapDirection.Outbound).ShouldBeTrue(); + mapper.IsSubjectAllowed("events.system.boot", LeafMapDirection.Outbound).ShouldBeTrue(); + mapper.IsSubjectAllowed("users.created", LeafMapDirection.Outbound).ShouldBeFalse(); + mapper.IsSubjectAllowed("admin.config", LeafMapDirection.Outbound).ShouldBeFalse(); + } + + // Go: auth.go:127 SubjectPermission.Allow semantics + [Fact] + public void Allow_import_restricts_inbound_to_matching_subjects() + { + var mapper = new LeafHubSpokeMapper( + new Dictionary(), + denyExports: [], + denyImports: [], + allowExports: [], + allowImports: ["metrics.*"]); + + mapper.IsSubjectAllowed("metrics.cpu", LeafMapDirection.Inbound).ShouldBeTrue(); + mapper.IsSubjectAllowed("metrics.memory", LeafMapDirection.Inbound).ShouldBeTrue(); + mapper.IsSubjectAllowed("logs.app", LeafMapDirection.Inbound).ShouldBeFalse(); + } + + // Go: auth.go:127 SubjectPermission — deny takes precedence over allow + [Fact] + public void Deny_takes_precedence_over_allow() + { + var mapper = new LeafHubSpokeMapper( + new Dictionary(), + denyExports: ["orders.secret"], + denyImports: [], + allowExports: ["orders.*"], + allowImports: []); + + // orders.created matches allow and not deny → permitted + mapper.IsSubjectAllowed("orders.created", LeafMapDirection.Outbound).ShouldBeTrue(); + // orders.secret matches both allow and deny → deny wins + mapper.IsSubjectAllowed("orders.secret", LeafMapDirection.Outbound).ShouldBeFalse(); + } + + // Go: auth.go:127 SubjectPermission — deny takes precedence over allow (import direction) + [Fact] + public void Deny_import_takes_precedence_over_allow_import() + { + var mapper = new LeafHubSpokeMapper( + new Dictionary(), + denyExports: [], + denyImports: ["metrics.secret"], + allowExports: [], + allowImports: ["metrics.*"]); + + mapper.IsSubjectAllowed("metrics.cpu", LeafMapDirection.Inbound).ShouldBeTrue(); + mapper.IsSubjectAllowed("metrics.secret", LeafMapDirection.Inbound).ShouldBeFalse(); + } + + // Go: auth.go:127 SubjectPermission.Allow — empty allow-list means allow all + [Fact] + public void Empty_allow_lists_allow_everything_not_denied() + { + var mapper = new LeafHubSpokeMapper( + new Dictionary(), + denyExports: [], + denyImports: [], + allowExports: [], + allowImports: []); + + mapper.IsSubjectAllowed("any.subject", LeafMapDirection.Outbound).ShouldBeTrue(); + mapper.IsSubjectAllowed("any.subject", LeafMapDirection.Inbound).ShouldBeTrue(); + } + + // Go: auth.go:127 SubjectPermission.Allow — wildcard patterns in allow-list + [Fact] + public void Allow_export_with_fwc_matches_deep_hierarchy() + { + var mapper = new LeafHubSpokeMapper( + new Dictionary(), + denyExports: [], + denyImports: [], + allowExports: ["data.>"], + allowImports: []); + + mapper.IsSubjectAllowed("data.x", LeafMapDirection.Outbound).ShouldBeTrue(); + mapper.IsSubjectAllowed("data.x.y.z", LeafMapDirection.Outbound).ShouldBeTrue(); + mapper.IsSubjectAllowed("other.x", LeafMapDirection.Outbound).ShouldBeFalse(); + } + + // Go: auth.go:127 SubjectPermission.Allow — bidirectional allow-lists are independent + [Fact] + public void Allow_lists_are_direction_independent() + { + var mapper = new LeafHubSpokeMapper( + new Dictionary(), + denyExports: [], + denyImports: [], + allowExports: ["export.only"], + allowImports: ["import.only"]); + + // export.only is allowed outbound, not restricted inbound (no inbound allow match required for it) + mapper.IsSubjectAllowed("export.only", LeafMapDirection.Outbound).ShouldBeTrue(); + mapper.IsSubjectAllowed("export.only", LeafMapDirection.Inbound).ShouldBeFalse(); + mapper.IsSubjectAllowed("import.only", LeafMapDirection.Outbound).ShouldBeFalse(); + mapper.IsSubjectAllowed("import.only", LeafMapDirection.Inbound).ShouldBeTrue(); + } + + // Go: auth.go:127 SubjectPermission.Allow — multiple allow patterns + [Fact] + public void Multiple_allow_patterns_any_match_permits() + { + var mapper = new LeafHubSpokeMapper( + new Dictionary(), + denyExports: [], + denyImports: [], + allowExports: ["orders.*", "events.*", "metrics.>"], + allowImports: []); + + mapper.IsSubjectAllowed("orders.new", LeafMapDirection.Outbound).ShouldBeTrue(); + mapper.IsSubjectAllowed("events.created", LeafMapDirection.Outbound).ShouldBeTrue(); + mapper.IsSubjectAllowed("metrics.cpu.avg", LeafMapDirection.Outbound).ShouldBeTrue(); + mapper.IsSubjectAllowed("users.list", LeafMapDirection.Outbound).ShouldBeFalse(); + } + + // Go: auth.go:127 SubjectPermission — allow + deny combined with account mapping + [Fact] + public void Allow_with_account_mapping_and_deny() + { + var mapper = new LeafHubSpokeMapper( + new Dictionary { ["HUB"] = "SPOKE" }, + denyExports: ["orders.secret"], + denyImports: [], + allowExports: ["orders.*"], + allowImports: []); + + var result = mapper.Map("HUB", "orders.new", LeafMapDirection.Outbound); + result.Account.ShouldBe("SPOKE"); + result.Subject.ShouldBe("orders.new"); + + mapper.IsSubjectAllowed("orders.new", LeafMapDirection.Outbound).ShouldBeTrue(); + mapper.IsSubjectAllowed("orders.secret", LeafMapDirection.Outbound).ShouldBeFalse(); + mapper.IsSubjectAllowed("users.new", LeafMapDirection.Outbound).ShouldBeFalse(); + } + + // Go: auth.go:127 SubjectPermission.Allow — literal subjects in allow-list + [Fact] + public void Allow_export_with_literal_subject() + { + var mapper = new LeafHubSpokeMapper( + new Dictionary(), + denyExports: [], + denyImports: [], + allowExports: ["status.health"], + allowImports: []); + + mapper.IsSubjectAllowed("status.health", LeafMapDirection.Outbound).ShouldBeTrue(); + mapper.IsSubjectAllowed("status.ready", LeafMapDirection.Outbound).ShouldBeFalse(); + } + + // ── Integration: ExportSubjects allow-list blocks hub→leaf ──────── + + // Go: auth.go:127 SubjectPermission.Allow — integration with server + [Fact] + public async Task ExportSubjects_allow_list_restricts_hub_to_leaf_forwarding() + { + var hubOptions = new NatsOptions + { + Host = "127.0.0.1", + Port = 0, + LeafNode = new LeafNodeOptions + { + Host = "127.0.0.1", + Port = 0, + ExportSubjects = ["allowed.>"], + }, + }; + + var hub = new NatsServer(hubOptions, NullLoggerFactory.Instance); + var hubCts = new CancellationTokenSource(); + _ = hub.StartAsync(hubCts.Token); + await hub.WaitForReadyAsync(); + + try + { + var spokeOptions = new NatsOptions + { + Host = "127.0.0.1", + Port = 0, + LeafNode = new LeafNodeOptions + { + Host = "127.0.0.1", + Port = 0, + Remotes = [hub.LeafListen!], + }, + }; + + var spoke = new NatsServer(spokeOptions, NullLoggerFactory.Instance); + var spokeCts = new CancellationTokenSource(); + _ = spoke.StartAsync(spokeCts.Token); + await spoke.WaitForReadyAsync(); + + try + { + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)) + await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); + + await using var leafConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{spoke.Port}" }); + await leafConn.ConnectAsync(); + await using var hubConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{hub.Port}" }); + await hubConn.ConnectAsync(); + + await using var allowedSub = await leafConn.SubscribeCoreAsync("allowed.data"); + await using var blockedSub = await leafConn.SubscribeCoreAsync("blocked.data"); + await leafConn.PingAsync(); + await Task.Delay(500); + + await hubConn.PublishAsync("allowed.data", "yes"); + await hubConn.PublishAsync("blocked.data", "no"); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); + (await allowedSub.Msgs.ReadAsync(cts.Token)).Data.ShouldBe("yes"); + + using var leakCts = new CancellationTokenSource(TimeSpan.FromMilliseconds(500)); + await Should.ThrowAsync(async () => + await blockedSub.Msgs.ReadAsync(leakCts.Token)); + } + finally + { + await spokeCts.CancelAsync(); + spoke.Dispose(); + spokeCts.Dispose(); + } + } + finally + { + await hubCts.CancelAsync(); + hub.Dispose(); + hubCts.Dispose(); + } + } + + // Go: auth.go:127 SubjectPermission.Allow — import allow-list integration + [Fact] + public async Task ImportSubjects_allow_list_restricts_leaf_to_hub_forwarding() + { + var hubOptions = new NatsOptions + { + Host = "127.0.0.1", + Port = 0, + LeafNode = new LeafNodeOptions + { + Host = "127.0.0.1", + Port = 0, + ImportSubjects = ["allowed.>"], + }, + }; + + var hub = new NatsServer(hubOptions, NullLoggerFactory.Instance); + var hubCts = new CancellationTokenSource(); + _ = hub.StartAsync(hubCts.Token); + await hub.WaitForReadyAsync(); + + try + { + var spokeOptions = new NatsOptions + { + Host = "127.0.0.1", + Port = 0, + LeafNode = new LeafNodeOptions + { + Host = "127.0.0.1", + Port = 0, + Remotes = [hub.LeafListen!], + }, + }; + + var spoke = new NatsServer(spokeOptions, NullLoggerFactory.Instance); + var spokeCts = new CancellationTokenSource(); + _ = spoke.StartAsync(spokeCts.Token); + await spoke.WaitForReadyAsync(); + + try + { + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)) + await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); + + await using var hubConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{hub.Port}" }); + await hubConn.ConnectAsync(); + await using var leafConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{spoke.Port}" }); + await leafConn.ConnectAsync(); + + await using var allowedSub = await hubConn.SubscribeCoreAsync("allowed.data"); + await using var blockedSub = await hubConn.SubscribeCoreAsync("blocked.data"); + await hubConn.PingAsync(); + await Task.Delay(500); + + await leafConn.PublishAsync("allowed.data", "yes"); + await leafConn.PublishAsync("blocked.data", "no"); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); + (await allowedSub.Msgs.ReadAsync(cts.Token)).Data.ShouldBe("yes"); + + using var leakCts = new CancellationTokenSource(TimeSpan.FromMilliseconds(500)); + await Should.ThrowAsync(async () => + await blockedSub.Msgs.ReadAsync(leakCts.Token)); + } + finally + { + await spokeCts.CancelAsync(); + spoke.Dispose(); + spokeCts.Dispose(); + } + } + finally + { + await hubCts.CancelAsync(); + hub.Dispose(); + hubCts.Dispose(); + } + } + + // ── Wire-level: ExportSubjects blocks LS+ propagation ──────────── + + // Go: auth.go:127 SubjectPermission.Allow — subscription propagation filtered by allow-list + [Fact] + public async Task ExportSubjects_blocks_subscription_propagation_for_non_allowed() + { + var options = new LeafNodeOptions + { + Host = "127.0.0.1", + Port = 0, + ExportSubjects = ["allowed.*"], + }; + + var manager = new LeafNodeManager( + options, + new ServerStats(), + "HUB1", + _ => { }, + _ => { }, + NullLogger.Instance); + + await manager.StartAsync(CancellationToken.None); + try + { + using var remoteSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await remoteSocket.ConnectAsync(IPAddress.Loopback, options.Port); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + await WriteLineAsync(remoteSocket, "LEAF SPOKE1", cts.Token); + var line = await ReadLineAsync(remoteSocket, cts.Token); + line.ShouldStartWith("LEAF "); + + await Task.Delay(200); + + // Propagate allowed subscription + manager.PropagateLocalSubscription("$G", "allowed.data", null); + await Task.Delay(100); + var lsLine = await ReadLineAsync(remoteSocket, cts.Token); + lsLine.ShouldBe("LS+ $G allowed.data"); + + // Propagate non-allowed subscription — should NOT appear on wire + manager.PropagateLocalSubscription("$G", "blocked.data", null); + + // Verify by sending another allowed subscription + manager.PropagateLocalSubscription("$G", "allowed.check", null); + await Task.Delay(100); + var nextLine = await ReadLineAsync(remoteSocket, cts.Token); + nextLine.ShouldBe("LS+ $G allowed.check"); + } + finally + { + await manager.DisposeAsync(); + } + } + // ── Helpers ──────────────────────────────────────────────────────── private static async Task ReadLineAsync(Socket socket, CancellationToken ct)