feat(networking): add leaf subject filtering and port networking Go tests (D6+D7)
D6: Add ExportSubjects/ImportSubjects allow-lists to LeafHubSpokeMapper alongside existing DenyExports/DenyImports deny-lists. When an allow-list is non-empty, subjects must match at least one allow pattern; deny always takes precedence. Updated LeafNodeOptions, LeafHubSpokeMapper (5-arg constructor), and LeafNodeManager to wire through the new allow-lists. Added 13 new unit + integration tests covering allow-list semantics, deny precedence, bidirectional filtering, and wire-level propagation. D7: Existing NetworkingGoParityTests.cs (50 tests) covers gateway interest mode, route pool accounting, and leaf node connections. Parity DB already up to date.
This commit is contained in:
@@ -28,4 +28,22 @@ public sealed class LeafNodeOptions
|
||||
/// Go reference: leafnode.go — DenyImports in RemoteLeafOpts (opts.go:230).
|
||||
/// </summary>
|
||||
public List<string> DenyImports { get; set; } = [];
|
||||
|
||||
/// <summary>
|
||||
/// Explicit allow-list for exported subjects (hub→leaf direction). When non-empty,
|
||||
/// only messages matching at least one of these patterns will be forwarded from
|
||||
/// the hub to the leaf. Deny patterns (<see cref="DenyExports"/>) take precedence.
|
||||
/// Supports wildcards (* and >).
|
||||
/// Go reference: auth.go — SubjectPermission.Allow (Publish allow list).
|
||||
/// </summary>
|
||||
public List<string> ExportSubjects { get; set; } = [];
|
||||
|
||||
/// <summary>
|
||||
/// Explicit allow-list for imported subjects (leaf→hub direction). When non-empty,
|
||||
/// only messages matching at least one of these patterns will be forwarded from
|
||||
/// the leaf to the hub. Deny patterns (<see cref="DenyImports"/>) take precedence.
|
||||
/// Supports wildcards (* and >).
|
||||
/// Go reference: auth.go — SubjectPermission.Allow (Subscribe allow list).
|
||||
/// </summary>
|
||||
public List<string> ImportSubjects { get; set; } = [];
|
||||
}
|
||||
|
||||
@@ -12,10 +12,16 @@ public sealed record LeafMappingResult(string Account, string Subject);
|
||||
|
||||
/// <summary>
|
||||
/// Maps accounts between hub and spoke, and applies subject-level export/import
|
||||
/// filtering on leaf connections. In the Go server, DenyExports restricts what
|
||||
/// flows hub→leaf (Publish permission) and DenyImports restricts what flows
|
||||
/// leaf→hub (Subscribe permission).
|
||||
/// Go reference: leafnode.go:470-507 (newLeafNodeCfg), opts.go:230-231.
|
||||
/// filtering on leaf connections. Supports both allow-lists and deny-lists:
|
||||
///
|
||||
/// - <b>ExportSubjects</b> (allow) + <b>DenyExports</b> (deny): controls hub→leaf flow.
|
||||
/// - <b>ImportSubjects</b> (allow) + <b>DenyImports</b> (deny): controls leaf→hub flow.
|
||||
///
|
||||
/// When an allow-list is non-empty, a subject must match at least one allow pattern.
|
||||
/// A subject matching any deny pattern is always rejected (deny takes precedence).
|
||||
///
|
||||
/// Go reference: leafnode.go:470-507 (newLeafNodeCfg), opts.go:230-231,
|
||||
/// auth.go:127 (SubjectPermission with Allow + Deny).
|
||||
/// </summary>
|
||||
public sealed class LeafHubSpokeMapper
|
||||
{
|
||||
@@ -23,27 +29,46 @@ public sealed class LeafHubSpokeMapper
|
||||
private readonly IReadOnlyDictionary<string, string> _spokeToHub;
|
||||
private readonly IReadOnlyList<string> _denyExports;
|
||||
private readonly IReadOnlyList<string> _denyImports;
|
||||
private readonly IReadOnlyList<string> _allowExports;
|
||||
private readonly IReadOnlyList<string> _allowImports;
|
||||
|
||||
public LeafHubSpokeMapper(IReadOnlyDictionary<string, string> hubToSpoke)
|
||||
: this(hubToSpoke, [], [])
|
||||
: this(hubToSpoke, [], [], [], [])
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates a mapper with account mapping and subject deny filters.
|
||||
/// Creates a mapper with account mapping and subject deny filters (legacy constructor).
|
||||
/// </summary>
|
||||
/// <param name="hubToSpoke">Account mapping from hub account names to spoke account names.</param>
|
||||
/// <param name="denyExports">Subject patterns to deny in hub→leaf (outbound) direction.</param>
|
||||
/// <param name="denyImports">Subject patterns to deny in leaf→hub (inbound) direction.</param>
|
||||
public LeafHubSpokeMapper(
|
||||
IReadOnlyDictionary<string, string> hubToSpoke,
|
||||
IReadOnlyList<string> denyExports,
|
||||
IReadOnlyList<string> denyImports)
|
||||
: this(hubToSpoke, denyExports, denyImports, [], [])
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates a mapper with account mapping, deny filters, and allow-list filters.
|
||||
/// </summary>
|
||||
/// <param name="hubToSpoke">Account mapping from hub account names to spoke account names.</param>
|
||||
/// <param name="denyExports">Subject patterns to deny in hub→leaf (outbound) direction.</param>
|
||||
/// <param name="denyImports">Subject patterns to deny in leaf→hub (inbound) direction.</param>
|
||||
/// <param name="allowExports">Subject patterns to allow in hub→leaf (outbound) direction. Empty = allow all.</param>
|
||||
/// <param name="allowImports">Subject patterns to allow in leaf→hub (inbound) direction. Empty = allow all.</param>
|
||||
public LeafHubSpokeMapper(
|
||||
IReadOnlyDictionary<string, string> hubToSpoke,
|
||||
IReadOnlyList<string> denyExports,
|
||||
IReadOnlyList<string> denyImports,
|
||||
IReadOnlyList<string> allowExports,
|
||||
IReadOnlyList<string> allowImports)
|
||||
{
|
||||
_hubToSpoke = hubToSpoke;
|
||||
_spokeToHub = hubToSpoke.ToDictionary(static p => p.Value, static p => p.Key, StringComparer.Ordinal);
|
||||
_denyExports = denyExports;
|
||||
_denyImports = denyImports;
|
||||
_allowExports = allowExports;
|
||||
_allowImports = allowImports;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -61,23 +86,36 @@ public sealed class LeafHubSpokeMapper
|
||||
/// <summary>
|
||||
/// Returns true if the subject is allowed to flow in the given direction.
|
||||
/// A subject is denied if it matches any pattern in the corresponding deny list.
|
||||
/// Go reference: leafnode.go:475-484 (DenyExports → Publish deny, DenyImports → Subscribe deny).
|
||||
/// When an allow-list is set, the subject must also match at least one allow pattern.
|
||||
/// Deny takes precedence over allow (Go reference: auth.go SubjectPermission semantics).
|
||||
/// </summary>
|
||||
public bool IsSubjectAllowed(string subject, LeafMapDirection direction)
|
||||
{
|
||||
var denyList = direction switch
|
||||
var (denyList, allowList) = direction switch
|
||||
{
|
||||
LeafMapDirection.Outbound => _denyExports,
|
||||
LeafMapDirection.Inbound => _denyImports,
|
||||
_ => [],
|
||||
LeafMapDirection.Outbound => (_denyExports, _allowExports),
|
||||
LeafMapDirection.Inbound => (_denyImports, _allowImports),
|
||||
_ => ((IReadOnlyList<string>)[], (IReadOnlyList<string>)[]),
|
||||
};
|
||||
|
||||
// Deny takes precedence: if subject matches any deny pattern, reject it.
|
||||
for (var i = 0; i < denyList.Count; i++)
|
||||
{
|
||||
if (SubjectMatch.MatchLiteral(subject, denyList[i]))
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
// If allow-list is empty, everything not denied is allowed.
|
||||
if (allowList.Count == 0)
|
||||
return true;
|
||||
|
||||
// With a non-empty allow-list, subject must match at least one allow pattern.
|
||||
for (var i = 0; i < allowList.Count; i++)
|
||||
{
|
||||
if (SubjectMatch.MatchLiteral(subject, allowList[i]))
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,7 +59,9 @@ public sealed class LeafNodeManager : IAsyncDisposable
|
||||
_subjectFilter = new LeafHubSpokeMapper(
|
||||
new Dictionary<string, string>(),
|
||||
options.DenyExports,
|
||||
options.DenyImports);
|
||||
options.DenyImports,
|
||||
options.ExportSubjects,
|
||||
options.ImportSubjects);
|
||||
}
|
||||
|
||||
public Task StartAsync(CancellationToken ct)
|
||||
|
||||
@@ -10,9 +10,11 @@ using NATS.Server.Subscriptions;
|
||||
namespace NATS.Server.Tests.LeafNodes;
|
||||
|
||||
/// <summary>
|
||||
/// Tests for leaf node subject filtering via DenyExports and DenyImports.
|
||||
/// Go reference: leafnode.go:470-507 (newLeafNodeCfg), opts.go:230-231
|
||||
/// (DenyImports/DenyExports fields in RemoteLeafOpts).
|
||||
/// Tests for leaf node subject filtering via DenyExports/DenyImports (deny-lists) and
|
||||
/// ExportSubjects/ImportSubjects (allow-lists). When an allow-list is non-empty, only
|
||||
/// subjects matching at least one allow pattern are permitted; deny takes precedence.
|
||||
/// Go reference: leafnode.go:470-507 (newLeafNodeCfg), opts.go:230-231,
|
||||
/// auth.go:127 (SubjectPermission with Allow + Deny).
|
||||
/// </summary>
|
||||
public class LeafSubjectFilterTests
|
||||
{
|
||||
@@ -472,6 +474,394 @@ public class LeafSubjectFilterTests
|
||||
}
|
||||
}
|
||||
|
||||
// ── ExportSubjects/ImportSubjects allow-list Unit Tests ────────────
|
||||
|
||||
// Go: auth.go:127 SubjectPermission.Allow semantics
|
||||
[Fact]
|
||||
public void Allow_export_restricts_outbound_to_matching_subjects()
|
||||
{
|
||||
var mapper = new LeafHubSpokeMapper(
|
||||
new Dictionary<string, string>(),
|
||||
denyExports: [],
|
||||
denyImports: [],
|
||||
allowExports: ["orders.*", "events.>"],
|
||||
allowImports: []);
|
||||
|
||||
mapper.IsSubjectAllowed("orders.created", LeafMapDirection.Outbound).ShouldBeTrue();
|
||||
mapper.IsSubjectAllowed("orders.updated", LeafMapDirection.Outbound).ShouldBeTrue();
|
||||
mapper.IsSubjectAllowed("events.system.boot", LeafMapDirection.Outbound).ShouldBeTrue();
|
||||
mapper.IsSubjectAllowed("users.created", LeafMapDirection.Outbound).ShouldBeFalse();
|
||||
mapper.IsSubjectAllowed("admin.config", LeafMapDirection.Outbound).ShouldBeFalse();
|
||||
}
|
||||
|
||||
// Go: auth.go:127 SubjectPermission.Allow semantics
|
||||
[Fact]
|
||||
public void Allow_import_restricts_inbound_to_matching_subjects()
|
||||
{
|
||||
var mapper = new LeafHubSpokeMapper(
|
||||
new Dictionary<string, string>(),
|
||||
denyExports: [],
|
||||
denyImports: [],
|
||||
allowExports: [],
|
||||
allowImports: ["metrics.*"]);
|
||||
|
||||
mapper.IsSubjectAllowed("metrics.cpu", LeafMapDirection.Inbound).ShouldBeTrue();
|
||||
mapper.IsSubjectAllowed("metrics.memory", LeafMapDirection.Inbound).ShouldBeTrue();
|
||||
mapper.IsSubjectAllowed("logs.app", LeafMapDirection.Inbound).ShouldBeFalse();
|
||||
}
|
||||
|
||||
// Go: auth.go:127 SubjectPermission — deny takes precedence over allow
|
||||
[Fact]
|
||||
public void Deny_takes_precedence_over_allow()
|
||||
{
|
||||
var mapper = new LeafHubSpokeMapper(
|
||||
new Dictionary<string, string>(),
|
||||
denyExports: ["orders.secret"],
|
||||
denyImports: [],
|
||||
allowExports: ["orders.*"],
|
||||
allowImports: []);
|
||||
|
||||
// orders.created matches allow and not deny → permitted
|
||||
mapper.IsSubjectAllowed("orders.created", LeafMapDirection.Outbound).ShouldBeTrue();
|
||||
// orders.secret matches both allow and deny → deny wins
|
||||
mapper.IsSubjectAllowed("orders.secret", LeafMapDirection.Outbound).ShouldBeFalse();
|
||||
}
|
||||
|
||||
// Go: auth.go:127 SubjectPermission — deny takes precedence over allow (import direction)
|
||||
[Fact]
|
||||
public void Deny_import_takes_precedence_over_allow_import()
|
||||
{
|
||||
var mapper = new LeafHubSpokeMapper(
|
||||
new Dictionary<string, string>(),
|
||||
denyExports: [],
|
||||
denyImports: ["metrics.secret"],
|
||||
allowExports: [],
|
||||
allowImports: ["metrics.*"]);
|
||||
|
||||
mapper.IsSubjectAllowed("metrics.cpu", LeafMapDirection.Inbound).ShouldBeTrue();
|
||||
mapper.IsSubjectAllowed("metrics.secret", LeafMapDirection.Inbound).ShouldBeFalse();
|
||||
}
|
||||
|
||||
// Go: auth.go:127 SubjectPermission.Allow — empty allow-list means allow all
|
||||
[Fact]
|
||||
public void Empty_allow_lists_allow_everything_not_denied()
|
||||
{
|
||||
var mapper = new LeafHubSpokeMapper(
|
||||
new Dictionary<string, string>(),
|
||||
denyExports: [],
|
||||
denyImports: [],
|
||||
allowExports: [],
|
||||
allowImports: []);
|
||||
|
||||
mapper.IsSubjectAllowed("any.subject", LeafMapDirection.Outbound).ShouldBeTrue();
|
||||
mapper.IsSubjectAllowed("any.subject", LeafMapDirection.Inbound).ShouldBeTrue();
|
||||
}
|
||||
|
||||
// Go: auth.go:127 SubjectPermission.Allow — wildcard patterns in allow-list
|
||||
[Fact]
|
||||
public void Allow_export_with_fwc_matches_deep_hierarchy()
|
||||
{
|
||||
var mapper = new LeafHubSpokeMapper(
|
||||
new Dictionary<string, string>(),
|
||||
denyExports: [],
|
||||
denyImports: [],
|
||||
allowExports: ["data.>"],
|
||||
allowImports: []);
|
||||
|
||||
mapper.IsSubjectAllowed("data.x", LeafMapDirection.Outbound).ShouldBeTrue();
|
||||
mapper.IsSubjectAllowed("data.x.y.z", LeafMapDirection.Outbound).ShouldBeTrue();
|
||||
mapper.IsSubjectAllowed("other.x", LeafMapDirection.Outbound).ShouldBeFalse();
|
||||
}
|
||||
|
||||
// Go: auth.go:127 SubjectPermission.Allow — bidirectional allow-lists are independent
|
||||
[Fact]
|
||||
public void Allow_lists_are_direction_independent()
|
||||
{
|
||||
var mapper = new LeafHubSpokeMapper(
|
||||
new Dictionary<string, string>(),
|
||||
denyExports: [],
|
||||
denyImports: [],
|
||||
allowExports: ["export.only"],
|
||||
allowImports: ["import.only"]);
|
||||
|
||||
// export.only is allowed outbound, not restricted inbound (no inbound allow match required for it)
|
||||
mapper.IsSubjectAllowed("export.only", LeafMapDirection.Outbound).ShouldBeTrue();
|
||||
mapper.IsSubjectAllowed("export.only", LeafMapDirection.Inbound).ShouldBeFalse();
|
||||
mapper.IsSubjectAllowed("import.only", LeafMapDirection.Outbound).ShouldBeFalse();
|
||||
mapper.IsSubjectAllowed("import.only", LeafMapDirection.Inbound).ShouldBeTrue();
|
||||
}
|
||||
|
||||
// Go: auth.go:127 SubjectPermission.Allow — multiple allow patterns
|
||||
[Fact]
|
||||
public void Multiple_allow_patterns_any_match_permits()
|
||||
{
|
||||
var mapper = new LeafHubSpokeMapper(
|
||||
new Dictionary<string, string>(),
|
||||
denyExports: [],
|
||||
denyImports: [],
|
||||
allowExports: ["orders.*", "events.*", "metrics.>"],
|
||||
allowImports: []);
|
||||
|
||||
mapper.IsSubjectAllowed("orders.new", LeafMapDirection.Outbound).ShouldBeTrue();
|
||||
mapper.IsSubjectAllowed("events.created", LeafMapDirection.Outbound).ShouldBeTrue();
|
||||
mapper.IsSubjectAllowed("metrics.cpu.avg", LeafMapDirection.Outbound).ShouldBeTrue();
|
||||
mapper.IsSubjectAllowed("users.list", LeafMapDirection.Outbound).ShouldBeFalse();
|
||||
}
|
||||
|
||||
// Go: auth.go:127 SubjectPermission — allow + deny combined with account mapping
|
||||
[Fact]
|
||||
public void Allow_with_account_mapping_and_deny()
|
||||
{
|
||||
var mapper = new LeafHubSpokeMapper(
|
||||
new Dictionary<string, string> { ["HUB"] = "SPOKE" },
|
||||
denyExports: ["orders.secret"],
|
||||
denyImports: [],
|
||||
allowExports: ["orders.*"],
|
||||
allowImports: []);
|
||||
|
||||
var result = mapper.Map("HUB", "orders.new", LeafMapDirection.Outbound);
|
||||
result.Account.ShouldBe("SPOKE");
|
||||
result.Subject.ShouldBe("orders.new");
|
||||
|
||||
mapper.IsSubjectAllowed("orders.new", LeafMapDirection.Outbound).ShouldBeTrue();
|
||||
mapper.IsSubjectAllowed("orders.secret", LeafMapDirection.Outbound).ShouldBeFalse();
|
||||
mapper.IsSubjectAllowed("users.new", LeafMapDirection.Outbound).ShouldBeFalse();
|
||||
}
|
||||
|
||||
// Go: auth.go:127 SubjectPermission.Allow — literal subjects in allow-list
|
||||
[Fact]
|
||||
public void Allow_export_with_literal_subject()
|
||||
{
|
||||
var mapper = new LeafHubSpokeMapper(
|
||||
new Dictionary<string, string>(),
|
||||
denyExports: [],
|
||||
denyImports: [],
|
||||
allowExports: ["status.health"],
|
||||
allowImports: []);
|
||||
|
||||
mapper.IsSubjectAllowed("status.health", LeafMapDirection.Outbound).ShouldBeTrue();
|
||||
mapper.IsSubjectAllowed("status.ready", LeafMapDirection.Outbound).ShouldBeFalse();
|
||||
}
|
||||
|
||||
// ── Integration: ExportSubjects allow-list blocks hub→leaf ────────
|
||||
|
||||
// Go: auth.go:127 SubjectPermission.Allow — integration with server
|
||||
[Fact]
|
||||
public async Task ExportSubjects_allow_list_restricts_hub_to_leaf_forwarding()
|
||||
{
|
||||
var hubOptions = new NatsOptions
|
||||
{
|
||||
Host = "127.0.0.1",
|
||||
Port = 0,
|
||||
LeafNode = new LeafNodeOptions
|
||||
{
|
||||
Host = "127.0.0.1",
|
||||
Port = 0,
|
||||
ExportSubjects = ["allowed.>"],
|
||||
},
|
||||
};
|
||||
|
||||
var hub = new NatsServer(hubOptions, NullLoggerFactory.Instance);
|
||||
var hubCts = new CancellationTokenSource();
|
||||
_ = hub.StartAsync(hubCts.Token);
|
||||
await hub.WaitForReadyAsync();
|
||||
|
||||
try
|
||||
{
|
||||
var spokeOptions = new NatsOptions
|
||||
{
|
||||
Host = "127.0.0.1",
|
||||
Port = 0,
|
||||
LeafNode = new LeafNodeOptions
|
||||
{
|
||||
Host = "127.0.0.1",
|
||||
Port = 0,
|
||||
Remotes = [hub.LeafListen!],
|
||||
},
|
||||
};
|
||||
|
||||
var spoke = new NatsServer(spokeOptions, NullLoggerFactory.Instance);
|
||||
var spokeCts = new CancellationTokenSource();
|
||||
_ = spoke.StartAsync(spokeCts.Token);
|
||||
await spoke.WaitForReadyAsync();
|
||||
|
||||
try
|
||||
{
|
||||
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||
while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
|
||||
await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
|
||||
|
||||
await using var leafConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{spoke.Port}" });
|
||||
await leafConn.ConnectAsync();
|
||||
await using var hubConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{hub.Port}" });
|
||||
await hubConn.ConnectAsync();
|
||||
|
||||
await using var allowedSub = await leafConn.SubscribeCoreAsync<string>("allowed.data");
|
||||
await using var blockedSub = await leafConn.SubscribeCoreAsync<string>("blocked.data");
|
||||
await leafConn.PingAsync();
|
||||
await Task.Delay(500);
|
||||
|
||||
await hubConn.PublishAsync("allowed.data", "yes");
|
||||
await hubConn.PublishAsync("blocked.data", "no");
|
||||
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
|
||||
(await allowedSub.Msgs.ReadAsync(cts.Token)).Data.ShouldBe("yes");
|
||||
|
||||
using var leakCts = new CancellationTokenSource(TimeSpan.FromMilliseconds(500));
|
||||
await Should.ThrowAsync<OperationCanceledException>(async () =>
|
||||
await blockedSub.Msgs.ReadAsync(leakCts.Token));
|
||||
}
|
||||
finally
|
||||
{
|
||||
await spokeCts.CancelAsync();
|
||||
spoke.Dispose();
|
||||
spokeCts.Dispose();
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
await hubCts.CancelAsync();
|
||||
hub.Dispose();
|
||||
hubCts.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
// Go: auth.go:127 SubjectPermission.Allow — import allow-list integration
|
||||
[Fact]
|
||||
public async Task ImportSubjects_allow_list_restricts_leaf_to_hub_forwarding()
|
||||
{
|
||||
var hubOptions = new NatsOptions
|
||||
{
|
||||
Host = "127.0.0.1",
|
||||
Port = 0,
|
||||
LeafNode = new LeafNodeOptions
|
||||
{
|
||||
Host = "127.0.0.1",
|
||||
Port = 0,
|
||||
ImportSubjects = ["allowed.>"],
|
||||
},
|
||||
};
|
||||
|
||||
var hub = new NatsServer(hubOptions, NullLoggerFactory.Instance);
|
||||
var hubCts = new CancellationTokenSource();
|
||||
_ = hub.StartAsync(hubCts.Token);
|
||||
await hub.WaitForReadyAsync();
|
||||
|
||||
try
|
||||
{
|
||||
var spokeOptions = new NatsOptions
|
||||
{
|
||||
Host = "127.0.0.1",
|
||||
Port = 0,
|
||||
LeafNode = new LeafNodeOptions
|
||||
{
|
||||
Host = "127.0.0.1",
|
||||
Port = 0,
|
||||
Remotes = [hub.LeafListen!],
|
||||
},
|
||||
};
|
||||
|
||||
var spoke = new NatsServer(spokeOptions, NullLoggerFactory.Instance);
|
||||
var spokeCts = new CancellationTokenSource();
|
||||
_ = spoke.StartAsync(spokeCts.Token);
|
||||
await spoke.WaitForReadyAsync();
|
||||
|
||||
try
|
||||
{
|
||||
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||
while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
|
||||
await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
|
||||
|
||||
await using var hubConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{hub.Port}" });
|
||||
await hubConn.ConnectAsync();
|
||||
await using var leafConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{spoke.Port}" });
|
||||
await leafConn.ConnectAsync();
|
||||
|
||||
await using var allowedSub = await hubConn.SubscribeCoreAsync<string>("allowed.data");
|
||||
await using var blockedSub = await hubConn.SubscribeCoreAsync<string>("blocked.data");
|
||||
await hubConn.PingAsync();
|
||||
await Task.Delay(500);
|
||||
|
||||
await leafConn.PublishAsync("allowed.data", "yes");
|
||||
await leafConn.PublishAsync("blocked.data", "no");
|
||||
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
|
||||
(await allowedSub.Msgs.ReadAsync(cts.Token)).Data.ShouldBe("yes");
|
||||
|
||||
using var leakCts = new CancellationTokenSource(TimeSpan.FromMilliseconds(500));
|
||||
await Should.ThrowAsync<OperationCanceledException>(async () =>
|
||||
await blockedSub.Msgs.ReadAsync(leakCts.Token));
|
||||
}
|
||||
finally
|
||||
{
|
||||
await spokeCts.CancelAsync();
|
||||
spoke.Dispose();
|
||||
spokeCts.Dispose();
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
await hubCts.CancelAsync();
|
||||
hub.Dispose();
|
||||
hubCts.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
// ── Wire-level: ExportSubjects blocks LS+ propagation ────────────
|
||||
|
||||
// Go: auth.go:127 SubjectPermission.Allow — subscription propagation filtered by allow-list
|
||||
[Fact]
|
||||
public async Task ExportSubjects_blocks_subscription_propagation_for_non_allowed()
|
||||
{
|
||||
var options = new LeafNodeOptions
|
||||
{
|
||||
Host = "127.0.0.1",
|
||||
Port = 0,
|
||||
ExportSubjects = ["allowed.*"],
|
||||
};
|
||||
|
||||
var manager = new LeafNodeManager(
|
||||
options,
|
||||
new ServerStats(),
|
||||
"HUB1",
|
||||
_ => { },
|
||||
_ => { },
|
||||
NullLogger<LeafNodeManager>.Instance);
|
||||
|
||||
await manager.StartAsync(CancellationToken.None);
|
||||
try
|
||||
{
|
||||
using var remoteSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
await remoteSocket.ConnectAsync(IPAddress.Loopback, options.Port);
|
||||
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||
await WriteLineAsync(remoteSocket, "LEAF SPOKE1", cts.Token);
|
||||
var line = await ReadLineAsync(remoteSocket, cts.Token);
|
||||
line.ShouldStartWith("LEAF ");
|
||||
|
||||
await Task.Delay(200);
|
||||
|
||||
// Propagate allowed subscription
|
||||
manager.PropagateLocalSubscription("$G", "allowed.data", null);
|
||||
await Task.Delay(100);
|
||||
var lsLine = await ReadLineAsync(remoteSocket, cts.Token);
|
||||
lsLine.ShouldBe("LS+ $G allowed.data");
|
||||
|
||||
// Propagate non-allowed subscription — should NOT appear on wire
|
||||
manager.PropagateLocalSubscription("$G", "blocked.data", null);
|
||||
|
||||
// Verify by sending another allowed subscription
|
||||
manager.PropagateLocalSubscription("$G", "allowed.check", null);
|
||||
await Task.Delay(100);
|
||||
var nextLine = await ReadLineAsync(remoteSocket, cts.Token);
|
||||
nextLine.ShouldBe("LS+ $G allowed.check");
|
||||
}
|
||||
finally
|
||||
{
|
||||
await manager.DisposeAsync();
|
||||
}
|
||||
}
|
||||
|
||||
// ── Helpers ────────────────────────────────────────────────────────
|
||||
|
||||
private static async Task<string> ReadLineAsync(Socket socket, CancellationToken ct)
|
||||
|
||||
Reference in New Issue
Block a user