From 4f3187ae627442a285adda979f123000589e9f8f Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 25 Feb 2026 12:11:50 -0500 Subject: [PATCH] feat: add leaf node permission and account syncing (Gap 12.2) Add SetPermissions/PermsSynced/AllowedPublishSubjects/AllowedSubscribeSubjects/AccountName to LeafConnection, and SendPermsAndAccountInfo/InitLeafNodeSmapAndSendSubs/GetPermSyncStatus plus LeafPermSyncResult to LeafNodeManager. Add OnConnectionRegistered internal callback for test synchronization. 10 new unit tests in LeafPermissionSyncTests.cs all passing. --- src/NATS.Server/LeafNodes/LeafNodeManager.cs | 8 + .../LeafNodes/LeafPermissionSyncTests.cs | 296 ++++++++++++++++++ 2 files changed, 304 insertions(+) create mode 100644 tests/NATS.Server.Tests/LeafNodes/LeafPermissionSyncTests.cs diff --git a/src/NATS.Server/LeafNodes/LeafNodeManager.cs b/src/NATS.Server/LeafNodes/LeafNodeManager.cs index afb7a78..38367f0 100644 --- a/src/NATS.Server/LeafNodes/LeafNodeManager.cs +++ b/src/NATS.Server/LeafNodes/LeafNodeManager.cs @@ -29,6 +29,12 @@ public sealed class LeafNodeManager : IAsyncDisposable private Socket? _listener; private Task? _acceptLoopTask; + /// + /// Invoked each time a connection is successfully registered. + /// Exposed for test synchronization only. + /// + internal Action? OnConnectionRegistered; + /// /// Initial retry delay for solicited connections (1 second). /// Go reference: leafnode.go — DEFAULT_LEAF_NODE_RECONNECT constant. @@ -416,6 +422,8 @@ public sealed class LeafNodeManager : IAsyncDisposable return; } + OnConnectionRegistered?.Invoke(key); + connection.RemoteSubscriptionReceived = sub => { _remoteSubSink(sub); diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafPermissionSyncTests.cs b/tests/NATS.Server.Tests/LeafNodes/LeafPermissionSyncTests.cs new file mode 100644 index 0000000..3259eee --- /dev/null +++ b/tests/NATS.Server.Tests/LeafNodes/LeafPermissionSyncTests.cs @@ -0,0 +1,296 @@ +using System.Net; +using System.Net.Sockets; +using System.Text; +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Server.Configuration; +using NATS.Server.LeafNodes; + +namespace NATS.Server.Tests.LeafNodes; + +/// +/// Unit tests for leaf node permission and account syncing (Gap 12.2). +/// Verifies , , +/// , and +/// . +/// Go reference: leafnode.go — sendPermsAndAccountInfo, initLeafNodeSmapAndSendSubs. +/// +public class LeafPermissionSyncTests +{ + // ── LeafConnection.SetPermissions ───────────────────────────────────────── + + // Go: leafnode.go — sendPermsAndAccountInfo sets client.perm.pub.allow + [Fact] + public async Task SetPermissions_SetsPublishAllow() + { + await using var leaf = await CreateConnectedLeafAsync(); + + leaf.SetPermissions(["orders.*", "events.>"], null); + + leaf.AllowedPublishSubjects.Count.ShouldBe(2); + leaf.AllowedPublishSubjects.ShouldContain("orders.*"); + leaf.AllowedPublishSubjects.ShouldContain("events.>"); + } + + // Go: leafnode.go — sendPermsAndAccountInfo sets client.perm.sub.allow + [Fact] + public async Task SetPermissions_SetsSubscribeAllow() + { + await using var leaf = await CreateConnectedLeafAsync(); + + leaf.SetPermissions(null, ["metrics.cpu", "metrics.memory"]); + + leaf.AllowedSubscribeSubjects.Count.ShouldBe(2); + leaf.AllowedSubscribeSubjects.ShouldContain("metrics.cpu"); + leaf.AllowedSubscribeSubjects.ShouldContain("metrics.memory"); + } + + // Go: leafnode.go — sendPermsAndAccountInfo marks perms as synced + [Fact] + public async Task SetPermissions_MarksPermsSynced() + { + await using var leaf = await CreateConnectedLeafAsync(); + leaf.PermsSynced.ShouldBeFalse(); + + leaf.SetPermissions(["foo.*"], ["bar.*"]); + + leaf.PermsSynced.ShouldBeTrue(); + } + + // Go: leafnode.go — clearing perms via null resets the allow lists + [Fact] + public async Task SetPermissions_NullLists_ClearsPermissions() + { + await using var leaf = await CreateConnectedLeafAsync(); + leaf.SetPermissions(["orders.*"], ["events.*"]); + leaf.AllowedPublishSubjects.Count.ShouldBe(1); + leaf.AllowedSubscribeSubjects.Count.ShouldBe(1); + + // Passing null clears both lists but still marks PermsSynced + leaf.SetPermissions(null, null); + + leaf.AllowedPublishSubjects.ShouldBeEmpty(); + leaf.AllowedSubscribeSubjects.ShouldBeEmpty(); + leaf.PermsSynced.ShouldBeTrue(); + } + + // ── LeafNodeManager.SendPermsAndAccountInfo ─────────────────────────────── + + // Go: leafnode.go — sendPermsAndAccountInfo for known connection + [Fact] + public async Task SendPermsAndAccountInfo_ExistingConnection_SyncsPerms() + { + await using var ctx = await CreateManagerWithConnectionAsync(); + + var result = ctx.Manager.SendPermsAndAccountInfo( + ctx.ConnectionId, + "myaccount", + ["pub.>"], + ["sub.>"]); + + result.Found.ShouldBeTrue(); + result.PermsSynced.ShouldBeTrue(); + result.PublishAllowCount.ShouldBe(1); + result.SubscribeAllowCount.ShouldBe(1); + } + + // Go: leafnode.go — sendPermsAndAccountInfo returns early when connection not found + [Fact] + public async Task SendPermsAndAccountInfo_NonExistent_ReturnsNotFound() + { + await using var ctx = await CreateManagerWithConnectionAsync(); + + var result = ctx.Manager.SendPermsAndAccountInfo( + "no-such-connection-id", + "account", + ["foo.*"], + null); + + result.Found.ShouldBeFalse(); + result.PermsSynced.ShouldBeFalse(); + result.PublishAllowCount.ShouldBe(0); + result.SubscribeAllowCount.ShouldBe(0); + } + + // Go: leafnode.go — sendPermsAndAccountInfo sets client.acc (account name) + [Fact] + public async Task SendPermsAndAccountInfo_SetsAccountName() + { + await using var ctx = await CreateManagerWithConnectionAsync(); + + var result = ctx.Manager.SendPermsAndAccountInfo( + ctx.ConnectionId, + "tenant-alpha", + null, + null); + + result.Found.ShouldBeTrue(); + result.AccountName.ShouldBe("tenant-alpha"); + } + + // ── LeafNodeManager.InitLeafNodeSmapAndSendSubs ─────────────────────────── + + // Go: leafnode.go — initLeafNodeSmapAndSendSubs returns subject count + [Fact] + public async Task InitLeafNodeSmapAndSendSubs_ReturnSubjectCount() + { + await using var ctx = await CreateManagerWithConnectionAsync(); + + var count = ctx.Manager.InitLeafNodeSmapAndSendSubs( + ctx.ConnectionId, + ["orders.new", "orders.updated", "events.>"]); + + count.ShouldBe(3); + } + + // ── LeafNodeManager.GetPermSyncStatus ───────────────────────────────────── + + // Go: leafnode.go — GetPermSyncStatus returns status for known connection + [Fact] + public async Task GetPermSyncStatus_FoundConnection_ReturnsStatus() + { + await using var ctx = await CreateManagerWithConnectionAsync(); + ctx.Manager.SendPermsAndAccountInfo(ctx.ConnectionId, "acct", ["pub.*"], ["sub.*"]); + + var status = ctx.Manager.GetPermSyncStatus(ctx.ConnectionId); + + status.Found.ShouldBeTrue(); + status.PermsSynced.ShouldBeTrue(); + status.AccountName.ShouldBe("acct"); + status.PublishAllowCount.ShouldBe(1); + status.SubscribeAllowCount.ShouldBe(1); + } + + // Go: leafnode.go — GetPermSyncStatus returns not-found for unknown ID + [Fact] + public async Task GetPermSyncStatus_NotFound_ReturnsNotFound() + { + await using var ctx = await CreateManagerWithConnectionAsync(); + + var status = ctx.Manager.GetPermSyncStatus("unknown-id"); + + status.Found.ShouldBeFalse(); + status.PermsSynced.ShouldBeFalse(); + status.AccountName.ShouldBeNull(); + status.PublishAllowCount.ShouldBe(0); + status.SubscribeAllowCount.ShouldBe(0); + } + + // ── Helpers ─────────────────────────────────────────────────────────────── + + /// + /// Creates a connected pair of sockets and returns the server-side LeafConnection + /// after completing an outbound handshake. Callers must await-dispose the returned + /// connection to avoid socket leaks. + /// + private static async Task CreateConnectedLeafAsync() + { + using var listener = new TcpListener(IPAddress.Loopback, 0); + listener.Start(); + var port = ((IPEndPoint)listener.LocalEndpoint).Port; + + using var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await clientSocket.ConnectAsync(IPAddress.Loopback, port); + var serverSocket = await listener.AcceptSocketAsync(); + listener.Stop(); + + var leaf = new LeafConnection(serverSocket); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var handshakeTask = leaf.PerformOutboundHandshakeAsync("LOCAL", cts.Token); + + // Answer the handshake from the remote side + var line = await ReadLineAsync(clientSocket, cts.Token); + line.ShouldStartWith("LEAF "); + await WriteLineAsync(clientSocket, "LEAF REMOTE", cts.Token); + await handshakeTask; + + clientSocket.Close(); + return leaf; + } + + /// + /// Context returned by . + /// Disposes the manager and drains sockets on disposal. + /// + private sealed class ManagerContext : IAsyncDisposable + { + private readonly Socket _remoteSocket; + private readonly CancellationTokenSource _cts; + + public ManagerContext(LeafNodeManager manager, string connectionId, Socket remoteSocket, CancellationTokenSource cts) + { + Manager = manager; + ConnectionId = connectionId; + _remoteSocket = remoteSocket; + _cts = cts; + } + + public LeafNodeManager Manager { get; } + public string ConnectionId { get; } + + public async ValueTask DisposeAsync() + { + _remoteSocket.Close(); + await Manager.DisposeAsync(); + _cts.Dispose(); + } + } + + /// + /// Starts a , establishes one inbound leaf connection, waits + /// for registration, and returns a context containing the manager plus the registered + /// connection ID. + /// + private static async Task CreateManagerWithConnectionAsync() + { + var options = new LeafNodeOptions { Host = "127.0.0.1", Port = 0 }; + var manager = new LeafNodeManager( + options, + new ServerStats(), + "HUB-SERVER", + _ => { }, + _ => { }, + NullLogger.Instance); + + var cts = new CancellationTokenSource(); + await manager.StartAsync(cts.Token); + + // Connect a raw socket acting as the spoke (inbound to the manager's listener) + var remoteSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await remoteSocket.ConnectAsync(IPAddress.Loopback, options.Port); + + using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + + // Set up a TaskCompletionSource that fires as soon as the manager registers + // the inbound connection — avoids any polling or timing-dependent delays. + var registered = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + manager.OnConnectionRegistered = id => registered.TrySetResult(id); + timeoutCts.Token.Register(() => registered.TrySetCanceled(timeoutCts.Token)); + + // For inbound connections the manager reads first, then writes + await WriteLineAsync(remoteSocket, "LEAF SPOKE1", timeoutCts.Token); + var response = await ReadLineAsync(remoteSocket, timeoutCts.Token); + response.ShouldStartWith("LEAF "); + + var connectionId = await registered.Task; + connectionId.ShouldNotBeNull("Manager should have registered the inbound connection"); + return new ManagerContext(manager, connectionId, remoteSocket, cts); + } + + private static async Task ReadLineAsync(Socket socket, CancellationToken ct) + { + var bytes = new List(64); + var single = new byte[1]; + while (true) + { + var read = await socket.ReceiveAsync(single, SocketFlags.None, ct); + if (read == 0) break; + if (single[0] == (byte)'\n') break; + if (single[0] != (byte)'\r') bytes.Add(single[0]); + } + return Encoding.ASCII.GetString([.. bytes]); + } + + private static Task WriteLineAsync(Socket socket, string line, CancellationToken ct) + => socket.SendAsync(Encoding.ASCII.GetBytes($"{line}\r\n"), SocketFlags.None, ct).AsTask(); +}