diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafJetStreamMigrationTests.cs b/tests/NATS.Server.Tests/LeafNodes/LeafJetStreamMigrationTests.cs new file mode 100644 index 0000000..6c5ce2f --- /dev/null +++ b/tests/NATS.Server.Tests/LeafNodes/LeafJetStreamMigrationTests.cs @@ -0,0 +1,231 @@ +using System.Net; +using System.Net.Sockets; +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Server.Configuration; +using NATS.Server.LeafNodes; + +namespace NATS.Server.Tests.LeafNodes; + +/// +/// Unit tests for JetStream migration checks on leaf node connections (Gap 12.4). +/// Verifies , +/// , +/// , and +/// . +/// Go reference: leafnode.go checkJetStreamMigrate. +/// +public class LeafJetStreamMigrationTests +{ + private static LeafNodeManager CreateManager(string serverId = "server-A") => + new( + options: new LeafNodeOptions { Host = "127.0.0.1", Port = 0 }, + stats: new ServerStats(), + serverId: serverId, + remoteSubSink: _ => { }, + messageSink: _ => { }, + logger: NullLogger.Instance); + + /// + /// Creates a connected socket pair using a loopback TcpListener and returns both sockets. + /// The caller is responsible for disposing both. + /// + private static async Task<(Socket serverSide, Socket clientSide)> CreateSocketPairAsync() + { + var listener = new TcpListener(IPAddress.Loopback, 0); + listener.Start(); + var port = ((IPEndPoint)listener.LocalEndpoint).Port; + var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await clientSocket.ConnectAsync(IPAddress.Loopback, port); + var serverSocket = await listener.AcceptSocketAsync(); + listener.Stop(); + return (serverSocket, clientSocket); + } + + private static async Task CreateConnectionAsync(string remoteId, string? jsDomain = null) + { + var (serverSide, clientSide) = await CreateSocketPairAsync(); + clientSide.Dispose(); + var conn = new LeafConnection(serverSide) + { + RemoteId = remoteId, + JetStreamDomain = jsDomain, + }; + return conn; + } + + // Go: leafnode.go checkJetStreamMigrate — valid migration to a new unused domain + [Fact] + public async Task CheckJetStreamMigrate_Valid_NewDomain() + { + var manager = CreateManager(); + await using var conn = await CreateConnectionAsync("server-B", jsDomain: "domain-old"); + manager.InjectConnectionForTesting(conn); + var connectionId = manager.GetConnectionIds().Single(); + + var result = manager.CheckJetStreamMigrate(connectionId, "domain-new"); + + result.Valid.ShouldBeTrue(); + result.Status.ShouldBe(JetStreamMigrationStatus.Valid); + result.Error.ShouldBeNull(); + } + + // Go: leafnode.go checkJetStreamMigrate — unknown connection ID + [Fact] + public void CheckJetStreamMigrate_ConnectionNotFound() + { + var manager = CreateManager(); + + var result = manager.CheckJetStreamMigrate("no-such-id", "some-domain"); + + result.Valid.ShouldBeFalse(); + result.Status.ShouldBe(JetStreamMigrationStatus.ConnectionNotFound); + result.Error.ShouldNotBeNull(); + } + + // Go: leafnode.go checkJetStreamMigrate — clearing the domain (null) is always valid + [Fact] + public async Task CheckJetStreamMigrate_NullDomain_AlwaysValid() + { + var manager = CreateManager(); + await using var conn = await CreateConnectionAsync("server-B", jsDomain: "domain-existing"); + manager.InjectConnectionForTesting(conn); + var connectionId = manager.GetConnectionIds().Single(); + + var result = manager.CheckJetStreamMigrate(connectionId, null); + + result.Valid.ShouldBeTrue(); + result.Status.ShouldBe(JetStreamMigrationStatus.Valid); + result.Error.ShouldBeNull(); + } + + // Go: leafnode.go checkJetStreamMigrate — proposed domain matches current, no migration needed + [Fact] + public async Task CheckJetStreamMigrate_SameDomain_NoChangeNeeded() + { + var manager = CreateManager(); + await using var conn = await CreateConnectionAsync("server-B", jsDomain: "domain-hub"); + manager.InjectConnectionForTesting(conn); + var connectionId = manager.GetConnectionIds().Single(); + + var result = manager.CheckJetStreamMigrate(connectionId, "domain-hub"); + + result.Valid.ShouldBeTrue(); + result.Status.ShouldBe(JetStreamMigrationStatus.NoChangeNeeded); + result.Error.ShouldBeNull(); + } + + // Go: leafnode.go checkJetStreamMigrate — another connection already uses the proposed domain + [Fact] + public async Task CheckJetStreamMigrate_DomainConflict() + { + var manager = CreateManager(); + + // conn-A already owns "domain-hub" + await using var connA = await CreateConnectionAsync("server-B", jsDomain: "domain-hub"); + manager.InjectConnectionForTesting(connA); + + // conn-B is on a different domain and wants to migrate to "domain-hub" + await using var connB = await CreateConnectionAsync("server-C", jsDomain: "domain-spoke"); + manager.InjectConnectionForTesting(connB); + + // Retrieve connection ID for server-C (conn-B) + var connectionIdB = manager.GetConnectionIds() + .Single(id => manager.GetConnectionByRemoteId("server-C") != null + && _connections_ContainsKey(manager, id, "server-C")); + + var result = manager.CheckJetStreamMigrate(connectionIdB, "domain-hub"); + + result.Valid.ShouldBeFalse(); + result.Status.ShouldBe(JetStreamMigrationStatus.DomainConflict); + result.Error.ShouldNotBeNull(); + } + + // Go: leafnode.go — GetActiveJetStreamDomains returns distinct domains + [Fact] + public async Task GetActiveJetStreamDomains_ReturnsDistinct() + { + var manager = CreateManager(); + + await using var connA = await CreateConnectionAsync("server-B", jsDomain: "domain-hub"); + await using var connB = await CreateConnectionAsync("server-C", jsDomain: "domain-hub"); + await using var connC = await CreateConnectionAsync("server-D", jsDomain: "domain-spoke"); + manager.InjectConnectionForTesting(connA); + manager.InjectConnectionForTesting(connB); + manager.InjectConnectionForTesting(connC); + + var domains = manager.GetActiveJetStreamDomains(); + + domains.Count.ShouldBe(2); + domains.ShouldContain("domain-hub"); + domains.ShouldContain("domain-spoke"); + } + + // Go: leafnode.go — GetActiveJetStreamDomains excludes connections without a domain + [Fact] + public async Task GetActiveJetStreamDomains_SkipsNull() + { + var manager = CreateManager(); + + await using var connWithDomain = await CreateConnectionAsync("server-B", jsDomain: "domain-hub"); + await using var connWithoutDomain = await CreateConnectionAsync("server-C", jsDomain: null); + manager.InjectConnectionForTesting(connWithDomain); + manager.InjectConnectionForTesting(connWithoutDomain); + + var domains = manager.GetActiveJetStreamDomains(); + + domains.Count.ShouldBe(1); + domains.ShouldContain("domain-hub"); + } + + // Go: leafnode.go — IsJetStreamDomainInUse returns true when domain is active + [Fact] + public async Task IsJetStreamDomainInUse_True() + { + var manager = CreateManager(); + await using var conn = await CreateConnectionAsync("server-B", jsDomain: "domain-hub"); + manager.InjectConnectionForTesting(conn); + + manager.IsJetStreamDomainInUse("domain-hub").ShouldBeTrue(); + } + + // Go: leafnode.go — IsJetStreamDomainInUse returns false when domain is not active + [Fact] + public async Task IsJetStreamDomainInUse_False() + { + var manager = CreateManager(); + await using var conn = await CreateConnectionAsync("server-B", jsDomain: "domain-hub"); + manager.InjectConnectionForTesting(conn); + + manager.IsJetStreamDomainInUse("domain-unknown").ShouldBeFalse(); + } + + // Go: leafnode.go — JetStreamEnabledConnectionCount counts only connections with a domain + [Fact] + public async Task JetStreamEnabledConnectionCount_CountsNonNull() + { + var manager = CreateManager(); + + await using var connA = await CreateConnectionAsync("server-B", jsDomain: "domain-hub"); + await using var connB = await CreateConnectionAsync("server-C", jsDomain: null); + await using var connC = await CreateConnectionAsync("server-D", jsDomain: "domain-spoke"); + manager.InjectConnectionForTesting(connA); + manager.InjectConnectionForTesting(connB); + manager.InjectConnectionForTesting(connC); + + manager.JetStreamEnabledConnectionCount.ShouldBe(2); + } + + // ── Internal helper ──────────────────────────────────────────────────────── + + /// + /// Checks whether the given connection key in the manager corresponds to the connection + /// with the specified RemoteId. Uses + /// as an indirect lookup since the key is internal. + /// + private static bool _connections_ContainsKey(LeafNodeManager manager, string key, string remoteId) + { + // We use GetConnectionIds() and GetConnectionByRemoteId() — both public/internal. + // The key format is "remoteId:endpoint:guid" so we can prefix-check. + return key.StartsWith(remoteId + ":", StringComparison.Ordinal); + } +}