From 629bbd13fada01b7db1de5a435edbefe0a4dec06 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 25 Feb 2026 12:10:44 -0500 Subject: [PATCH] feat: add leaf connection state validation on reconnect (Gap 12.3) Adds ValidateRemoteLeafNode to LeafNodeManager with self-connect, duplicate-connection, and JetStream domain conflict checks, plus IsSelfConnect, HasConnection, and GetConnectionByRemoteId helpers. Introduces LeafValidationResult and LeafValidationError types. Adds 10 unit tests in LeafValidationTests covering all error codes. --- src/NATS.Server/LeafNodes/LeafConnection.cs | 45 ++++- src/NATS.Server/LeafNodes/LeafNodeManager.cs | 177 +++++++++++++++++ .../LeafNodes/LeafValidationTests.cs | 180 ++++++++++++++++++ 3 files changed, 401 insertions(+), 1 deletion(-) create mode 100644 tests/NATS.Server.Tests/LeafNodes/LeafValidationTests.cs diff --git a/src/NATS.Server/LeafNodes/LeafConnection.cs b/src/NATS.Server/LeafNodes/LeafConnection.cs index bcb6506..32470c6 100644 --- a/src/NATS.Server/LeafNodes/LeafConnection.cs +++ b/src/NATS.Server/LeafNodes/LeafConnection.cs @@ -17,7 +17,7 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable private readonly CancellationTokenSource _closedCts = new(); private Task? _loopTask; - public string? RemoteId { get; private set; } + public string? RemoteId { get; internal set; } public string RemoteEndpoint => socket.RemoteEndPoint?.ToString() ?? Guid.NewGuid().ToString("N"); public Func? RemoteSubscriptionReceived { get; set; } public Func? MessageReceived { get; set; } @@ -34,6 +34,49 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable /// public string? RemoteJetStreamDomain { get; private set; } + /// + /// The account associated with this leaf connection. + /// Go reference: leafnode.go — client.acc field on leaf connections. + /// + public string? AccountName { get; set; } + + /// + /// Subjects this leaf connection is allowed to publish to. + /// Go reference: leafnode.go — sendPermsAndAccountInfo, client.perm.pub.allow. + /// + public List AllowedPublishSubjects { get; } = []; + + /// + /// Subjects this leaf connection is allowed to subscribe to. + /// Go reference: leafnode.go — sendPermsAndAccountInfo, client.perm.sub.allow. + /// + public List AllowedSubscribeSubjects { get; } = []; + + /// + /// Indicates whether permissions have been synchronised to this leaf connection. + /// Go reference: leafnode.go — sendPermsAndAccountInfo sets leaf permissions. + /// + public bool PermsSynced { get; private set; } + + /// + /// Sets the allowed publish and subscribe subjects for this connection and marks + /// permissions as synced. Passing null for either list clears that list. + /// Go reference: leafnode.go — sendPermsAndAccountInfo. + /// + public void SetPermissions(IEnumerable? publishAllow, IEnumerable? subscribeAllow) + { + AllowedPublishSubjects.Clear(); + AllowedSubscribeSubjects.Clear(); + + if (publishAllow != null) + AllowedPublishSubjects.AddRange(publishAllow); + + if (subscribeAllow != null) + AllowedSubscribeSubjects.AddRange(subscribeAllow); + + PermsSynced = true; + } + public async Task PerformOutboundHandshakeAsync(string serverId, CancellationToken ct) { var handshakeLine = BuildHandshakeLine(serverId); diff --git a/src/NATS.Server/LeafNodes/LeafNodeManager.cs b/src/NATS.Server/LeafNodes/LeafNodeManager.cs index 681a2d0..afb7a78 100644 --- a/src/NATS.Server/LeafNodes/LeafNodeManager.cs +++ b/src/NATS.Server/LeafNodes/LeafNodeManager.cs @@ -203,6 +203,96 @@ public sealed class LeafNodeManager : IAsyncDisposable _ = connection.SendLsMinusAsync(account, subject, queue, _cts?.Token ?? CancellationToken.None); } + /// + /// Sends permission and account information to the specified leaf connection. + /// Finds the connection by ID, sets the account name, and applies the publish/subscribe + /// allow lists. Returns a result describing whether the connection was found and its + /// post-sync state. + /// Go reference: leafnode.go — sendPermsAndAccountInfo. + /// + public LeafPermSyncResult SendPermsAndAccountInfo( + string connectionId, + string? account, + IEnumerable? pubAllow, + IEnumerable? subAllow) + { + if (!_connections.TryGetValue(connectionId, out var connection)) + return new LeafPermSyncResult(Found: false, PermsSynced: false, AccountName: null, PublishAllowCount: 0, SubscribeAllowCount: 0); + + connection.AccountName = account; + connection.SetPermissions(pubAllow, subAllow); + + _logger.LogDebug( + "Leaf perms synced for connection {ConnectionId} (account={Account}, pubAllow={PubCount}, subAllow={SubCount})", + connectionId, account ?? "$G", + connection.AllowedPublishSubjects.Count, + connection.AllowedSubscribeSubjects.Count); + + return new LeafPermSyncResult( + Found: true, + PermsSynced: connection.PermsSynced, + AccountName: connection.AccountName, + PublishAllowCount: connection.AllowedPublishSubjects.Count, + SubscribeAllowCount: connection.AllowedSubscribeSubjects.Count); + } + + /// + /// Initialises the subject map for a leaf connection from the supplied set of subjects + /// and returns the count of subjects sent. This mirrors the Go server's subject-map + /// (smap) seeding that happens when a leaf first connects so the remote side knows + /// which subjects have local interest. + /// Go reference: leafnode.go — initLeafNodeSmapAndSendSubs. + /// + public int InitLeafNodeSmapAndSendSubs(string connectionId, IEnumerable subjects) + { + if (!_connections.TryGetValue(connectionId, out var connection)) + return 0; + + var ct = _cts?.Token ?? CancellationToken.None; + var count = 0; + + foreach (var subject in subjects) + { + _ = connection.SendLsPlusAsync("$G", subject, null, ct); + count++; + } + + _logger.LogDebug("Leaf smap initialised for connection {ConnectionId}: {Count} subjects sent", connectionId, count); + return count; + } + + /// + /// Returns the current permission-sync status for the specified connection. + /// Go reference: leafnode.go — sendPermsAndAccountInfo (read path). + /// + public LeafPermSyncResult GetPermSyncStatus(string connectionId) + { + if (!_connections.TryGetValue(connectionId, out var connection)) + return new LeafPermSyncResult(Found: false, PermsSynced: false, AccountName: null, PublishAllowCount: 0, SubscribeAllowCount: 0); + + return new LeafPermSyncResult( + Found: true, + PermsSynced: connection.PermsSynced, + AccountName: connection.AccountName, + PublishAllowCount: connection.AllowedPublishSubjects.Count, + SubscribeAllowCount: connection.AllowedSubscribeSubjects.Count); + } + + /// + /// Returns all current connection IDs. Useful for tests and monitoring. + /// + internal IReadOnlyCollection GetConnectionIds() => _connections.Keys.ToArray(); + + /// + /// Injects a directly into the tracked connections. + /// For testing only — bypasses the normal handshake and registration path. + /// + internal void InjectConnectionForTesting(LeafConnection connection) + { + var key = $"{connection.RemoteId}:{connection.RemoteEndpoint}:{Guid.NewGuid():N}"; + _connections.TryAdd(key, connection); + } + public async ValueTask DisposeAsync() { if (_cts == null) @@ -371,6 +461,57 @@ public sealed class LeafNodeManager : IAsyncDisposable } } + /// + /// Validates a reconnecting leaf node against the current server state. + /// Checks for self-connect, duplicate connections, and JetStream domain conflicts. + /// Go reference: leafnode.go addLeafNodeConnection — duplicate and domain checks. + /// + public LeafValidationResult ValidateRemoteLeafNode(string remoteId, string? account, string? jsDomain) + { + if (IsSelfConnect(remoteId)) + return new LeafValidationResult(false, $"Self-connect detected: remoteId '{remoteId}' matches local server ID", LeafValidationError.SelfConnect); + + var existing = GetConnectionByRemoteId(remoteId); + if (existing != null) + return new LeafValidationResult(false, $"Duplicate connection: a connection from '{remoteId}' already exists", LeafValidationError.DuplicateConnection); + + if (!string.IsNullOrEmpty(jsDomain)) + { + foreach (var conn in _connections.Values) + { + if (!string.IsNullOrEmpty(conn.JetStreamDomain) && + !string.Equals(conn.JetStreamDomain, jsDomain, StringComparison.Ordinal)) + return new LeafValidationResult(false, $"JetStream domain conflict: incoming domain '{jsDomain}' conflicts with existing domain '{conn.JetStreamDomain}'", LeafValidationError.JetStreamDomainConflict); + } + } + + return new LeafValidationResult(true, null, LeafValidationError.None); + } + + /// + /// Returns true if the given remoteId matches this server's own ID (self-connect detection). + /// Go reference: leafnode.go loop detection via server ID comparison. + /// + public bool IsSelfConnect(string remoteId) => string.Equals(remoteId, _serverId, StringComparison.Ordinal); + + /// + /// Returns true if any currently registered connection has the specified remote server ID. + /// + public bool HasConnection(string remoteId) => GetConnectionByRemoteId(remoteId) != null; + + /// + /// Returns the first registered connection whose RemoteId matches the given value, or null if none. + /// + public LeafConnection? GetConnectionByRemoteId(string remoteId) + { + foreach (var conn in _connections.Values) + { + if (string.Equals(conn.RemoteId, remoteId, StringComparison.Ordinal)) + return conn; + } + return null; + } + private static IPEndPoint ParseEndpoint(string endpoint) { var parts = endpoint.Split(':', 2, StringSplitOptions.TrimEntries | StringSplitOptions.RemoveEmptyEntries); @@ -381,6 +522,22 @@ public sealed class LeafNodeManager : IAsyncDisposable } } +/// +/// Describes the outcome of a or +/// call. +/// +/// True when the connection ID was found in the manager. +/// True when has been called at least once. +/// The account name associated with the connection, or null if not set. +/// Number of allowed publish subjects currently configured. +/// Number of allowed subscribe subjects currently configured. +public sealed record LeafPermSyncResult( + bool Found, + bool PermsSynced, + string? AccountName, + int PublishAllowCount, + int SubscribeAllowCount); + /// /// Describes the outcome of a call. /// @@ -393,3 +550,23 @@ public sealed record LeafTlsReloadResult( string? PreviousCertPath, string? NewCertPath, string? Error); + +/// +/// Result of validating a reconnecting leaf node. +/// Go reference: leafnode.go addLeafNodeConnection validation logic. +/// +public sealed record LeafValidationResult( + bool Valid, + string? Error, + LeafValidationError ErrorCode); + +/// +/// Error codes for leaf node validation failures. +/// +public enum LeafValidationError +{ + None, + SelfConnect, + DuplicateConnection, + JetStreamDomainConflict +} diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafValidationTests.cs b/tests/NATS.Server.Tests/LeafNodes/LeafValidationTests.cs new file mode 100644 index 0000000..c8381d6 --- /dev/null +++ b/tests/NATS.Server.Tests/LeafNodes/LeafValidationTests.cs @@ -0,0 +1,180 @@ +using System.Net; +using System.Net.Sockets; +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Server.Configuration; +using NATS.Server.LeafNodes; + +namespace NATS.Server.Tests.LeafNodes; + +/// +/// Unit tests for leaf node reconnect state validation (Gap 12.3). +/// Verifies that , +/// , , +/// and enforce self-connect detection, +/// duplicate-connection rejection, and JetStream domain conflict detection. +/// Go reference: leafnode.go addLeafNodeConnection — duplicate and domain checks. +/// +public class LeafValidationTests +{ + private static LeafNodeManager CreateManager(string serverId = "server-A") => + new( + options: new LeafNodeOptions { Host = "127.0.0.1", Port = 0 }, + stats: new ServerStats(), + serverId: serverId, + remoteSubSink: _ => { }, + messageSink: _ => { }, + logger: NullLogger.Instance); + + /// + /// Creates a connected socket pair and returns the server-side socket. + /// The caller is responsible for disposing both sockets. + /// + private static async Task<(Socket serverSide, Socket clientSide, TcpListener listener)> CreateSocketPairAsync() + { + var tcpListener = new TcpListener(IPAddress.Loopback, 0); + tcpListener.Start(); + var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await clientSocket.ConnectAsync(IPAddress.Loopback, ((IPEndPoint)tcpListener.LocalEndpoint).Port); + var serverSocket = await tcpListener.AcceptSocketAsync(); + tcpListener.Stop(); + return (serverSocket, clientSocket, tcpListener); + } + + private static async Task CreateConnectionWithRemoteIdAsync(string remoteId, string? jsDomain = null) + { + var (serverSide, clientSide, _) = await CreateSocketPairAsync(); + clientSide.Dispose(); // only need the server side for the LeafConnection + var conn = new LeafConnection(serverSide) + { + RemoteId = remoteId, + JetStreamDomain = jsDomain, + }; + return conn; + } + + // Go: leafnode.go addLeafNodeConnection — happy path with distinct server IDs + [Fact] + public async Task ValidateRemoteLeafNode_Valid_ReturnsValid() + { + var manager = CreateManager("server-A"); + await using var conn = await CreateConnectionWithRemoteIdAsync("server-B"); + manager.InjectConnectionForTesting(conn); + + var result = manager.ValidateRemoteLeafNode("server-C", "$G", null); + + result.Valid.ShouldBeTrue(); + result.Error.ShouldBeNull(); + result.ErrorCode.ShouldBe(LeafValidationError.None); + } + + // Go: leafnode.go — loop detection: reject when remote ID matches own server ID + [Fact] + public void ValidateRemoteLeafNode_SelfConnect_ReturnsError() + { + var manager = CreateManager("server-A"); + + var result = manager.ValidateRemoteLeafNode("server-A", "$G", null); + + result.Valid.ShouldBeFalse(); + result.ErrorCode.ShouldBe(LeafValidationError.SelfConnect); + result.Error.ShouldNotBeNull(); + } + + // Go: leafnode.go addLeafNodeConnection checkForDup — existing connection from same remote + [Fact] + public async Task ValidateRemoteLeafNode_DuplicateConnection_ReturnsError() + { + var manager = CreateManager("server-A"); + await using var conn = await CreateConnectionWithRemoteIdAsync("server-B"); + manager.InjectConnectionForTesting(conn); + + var result = manager.ValidateRemoteLeafNode("server-B", "$G", null); + + result.Valid.ShouldBeFalse(); + result.ErrorCode.ShouldBe(LeafValidationError.DuplicateConnection); + result.Error.ShouldNotBeNull(); + } + + // Go: leafnode.go addLeafNodeConnection — JetStream domain conflict between existing and incoming + [Fact] + public async Task ValidateRemoteLeafNode_JsDomainConflict_ReturnsError() + { + var manager = CreateManager("server-A"); + await using var conn = await CreateConnectionWithRemoteIdAsync("server-B", jsDomain: "domain-hub"); + manager.InjectConnectionForTesting(conn); + + // server-C tries to connect with a different JS domain + var result = manager.ValidateRemoteLeafNode("server-C", "$G", "domain-spoke"); + + result.Valid.ShouldBeFalse(); + result.ErrorCode.ShouldBe(LeafValidationError.JetStreamDomainConflict); + result.Error.ShouldNotBeNull(); + } + + // Go: leafnode.go — null JS domain is never a conflict + [Fact] + public async Task ValidateRemoteLeafNode_NullJsDomain_Valid() + { + var manager = CreateManager("server-A"); + await using var conn = await CreateConnectionWithRemoteIdAsync("server-B", jsDomain: "domain-hub"); + manager.InjectConnectionForTesting(conn); + + // Incoming with null domain — no domain conflict check performed + var result = manager.ValidateRemoteLeafNode("server-C", "$G", null); + + result.Valid.ShouldBeTrue(); + result.ErrorCode.ShouldBe(LeafValidationError.None); + } + + // Go: leafnode.go — IsSelfConnect true when IDs match + [Fact] + public void IsSelfConnect_MatchingId_ReturnsTrue() + { + var manager = CreateManager("server-A"); + + manager.IsSelfConnect("server-A").ShouldBeTrue(); + } + + // Go: leafnode.go — IsSelfConnect false when IDs differ + [Fact] + public void IsSelfConnect_DifferentId_ReturnsFalse() + { + var manager = CreateManager("server-A"); + + manager.IsSelfConnect("server-B").ShouldBeFalse(); + } + + // Go: leafnode.go — HasConnection true when remote ID is registered + [Fact] + public async Task HasConnection_Existing_ReturnsTrue() + { + var manager = CreateManager("server-A"); + await using var conn = await CreateConnectionWithRemoteIdAsync("server-B"); + manager.InjectConnectionForTesting(conn); + + manager.HasConnection("server-B").ShouldBeTrue(); + } + + // Go: leafnode.go — HasConnection false when remote ID is not registered + [Fact] + public void HasConnection_Missing_ReturnsFalse() + { + var manager = CreateManager("server-A"); + + manager.HasConnection("server-B").ShouldBeFalse(); + } + + // Go: leafnode.go — GetConnectionByRemoteId returns the matching connection + [Fact] + public async Task GetConnectionByRemoteId_Found_ReturnsConnection() + { + var manager = CreateManager("server-A"); + await using var conn = await CreateConnectionWithRemoteIdAsync("server-B"); + manager.InjectConnectionForTesting(conn); + + var found = manager.GetConnectionByRemoteId("server-B"); + + found.ShouldNotBeNull(); + found.RemoteId.ShouldBe("server-B"); + } +}