using Microsoft.Extensions.Logging.Abstractions; using NATS.Client.Core; using NATS.Server.Configuration; namespace NATS.Server.Tests.LeafNodes; /// /// Tests for JetStream behavior over leaf node connections. /// Reference: golang/nats-server/server/leafnode_test.go — TestLeafNodeJetStreamDomainMapCrossTalk, etc. /// public class LeafNodeJetStreamTests { // Go: TestLeafNodeJetStreamDomainMapCrossTalk server/leafnode_test.go:5948 [Fact] public async Task JetStream_API_requests_reach_hub_with_JS_enabled() { var hubOptions = new NatsOptions { Host = "127.0.0.1", Port = 0, LeafNode = new LeafNodeOptions { Host = "127.0.0.1", Port = 0 }, JetStream = new JetStreamOptions { StoreDir = Path.Combine(Path.GetTempPath(), $"nats-js-hub-{Guid.NewGuid():N}") }, }; var hub = new NatsServer(hubOptions, NullLoggerFactory.Instance); var hubCts = new CancellationTokenSource(); _ = hub.StartAsync(hubCts.Token); await hub.WaitForReadyAsync(); var spokeOptions = new NatsOptions { Host = "127.0.0.1", Port = 0, LeafNode = new LeafNodeOptions { Host = "127.0.0.1", Port = 0, Remotes = [hub.LeafListen!], }, }; var spoke = new NatsServer(spokeOptions, NullLoggerFactory.Instance); var spokeCts = new CancellationTokenSource(); _ = spoke.StartAsync(spokeCts.Token); await spoke.WaitForReadyAsync(); using var waitTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); while (!waitTimeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)) await Task.Delay(50, waitTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); hub.Stats.JetStreamEnabled.ShouldBeTrue(); // Verify hub counts leaf Interlocked.Read(ref hub.Stats.Leafs).ShouldBe(1); await spokeCts.CancelAsync(); await hubCts.CancelAsync(); spoke.Dispose(); hub.Dispose(); spokeCts.Dispose(); hubCts.Dispose(); // Clean up store dir if (Directory.Exists(hubOptions.JetStream.StoreDir)) Directory.Delete(hubOptions.JetStream.StoreDir, true); } // Go: TestLeafNodeJetStreamDomainMapCrossTalk server/leafnode_test.go:5948 [Fact] public async Task JetStream_on_hub_receives_messages_published_from_leaf() { var storeDir = Path.Combine(Path.GetTempPath(), $"nats-js-leaf-{Guid.NewGuid():N}"); var hubOptions = new NatsOptions { Host = "127.0.0.1", Port = 0, LeafNode = new LeafNodeOptions { Host = "127.0.0.1", Port = 0 }, JetStream = new JetStreamOptions { StoreDir = storeDir }, }; var hub = new NatsServer(hubOptions, NullLoggerFactory.Instance); var hubCts = new CancellationTokenSource(); _ = hub.StartAsync(hubCts.Token); await hub.WaitForReadyAsync(); var spokeOptions = new NatsOptions { Host = "127.0.0.1", Port = 0, LeafNode = new LeafNodeOptions { Host = "127.0.0.1", Port = 0, Remotes = [hub.LeafListen!], }, }; var spoke = new NatsServer(spokeOptions, NullLoggerFactory.Instance); var spokeCts = new CancellationTokenSource(); _ = spoke.StartAsync(spokeCts.Token); await spoke.WaitForReadyAsync(); using var waitTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); while (!waitTimeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)) await Task.Delay(50, waitTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); // Subscribe on hub for a subject await using var hubConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{hub.Port}", }); await hubConn.ConnectAsync(); await using var sub = await hubConn.SubscribeCoreAsync("js.leaf.test"); await hubConn.PingAsync(); // Wait for interest propagation using var interestTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); while (!interestTimeout.IsCancellationRequested && !spoke.HasRemoteInterest("js.leaf.test")) await Task.Delay(50, interestTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); // Publish from spoke await using var spokeConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{spoke.Port}", }); await spokeConn.ConnectAsync(); await spokeConn.PublishAsync("js.leaf.test", "from-leaf-to-js"); using var msgTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var msg = await sub.Msgs.ReadAsync(msgTimeout.Token); msg.Data.ShouldBe("from-leaf-to-js"); await spokeCts.CancelAsync(); await hubCts.CancelAsync(); spoke.Dispose(); hub.Dispose(); spokeCts.Dispose(); hubCts.Dispose(); if (Directory.Exists(storeDir)) Directory.Delete(storeDir, true); } // Go: TestLeafNodeStreamImport server/leafnode_test.go:3441 [Fact] public async Task Leaf_node_with_JetStream_disabled_spoke_still_forwards_messages() { var storeDir = Path.Combine(Path.GetTempPath(), $"nats-js-fwd-{Guid.NewGuid():N}"); var hubOptions = new NatsOptions { Host = "127.0.0.1", Port = 0, LeafNode = new LeafNodeOptions { Host = "127.0.0.1", Port = 0 }, JetStream = new JetStreamOptions { StoreDir = storeDir }, }; var hub = new NatsServer(hubOptions, NullLoggerFactory.Instance); var hubCts = new CancellationTokenSource(); _ = hub.StartAsync(hubCts.Token); await hub.WaitForReadyAsync(); // Spoke without JetStream var spokeOptions = new NatsOptions { Host = "127.0.0.1", Port = 0, LeafNode = new LeafNodeOptions { Host = "127.0.0.1", Port = 0, Remotes = [hub.LeafListen!], }, }; var spoke = new NatsServer(spokeOptions, NullLoggerFactory.Instance); var spokeCts = new CancellationTokenSource(); _ = spoke.StartAsync(spokeCts.Token); await spoke.WaitForReadyAsync(); using var waitTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); while (!waitTimeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)) await Task.Delay(50, waitTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); hub.Stats.JetStreamEnabled.ShouldBeTrue(); spoke.Stats.JetStreamEnabled.ShouldBeFalse(); // Subscribe on hub await using var hubConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{hub.Port}" }); await hubConn.ConnectAsync(); await using var sub = await hubConn.SubscribeCoreAsync("njs.forward"); await hubConn.PingAsync(); using var interestTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); while (!interestTimeout.IsCancellationRequested && !spoke.HasRemoteInterest("njs.forward")) await Task.Delay(50, interestTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); // Publish from spoke await using var spokeConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{spoke.Port}" }); await spokeConn.ConnectAsync(); await spokeConn.PublishAsync("njs.forward", "no-js-spoke"); using var msgTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var msg = await sub.Msgs.ReadAsync(msgTimeout.Token); msg.Data.ShouldBe("no-js-spoke"); await spokeCts.CancelAsync(); await hubCts.CancelAsync(); spoke.Dispose(); hub.Dispose(); spokeCts.Dispose(); hubCts.Dispose(); if (Directory.Exists(storeDir)) Directory.Delete(storeDir, true); } // Go: TestLeafNodeJetStreamDomainMapCrossTalk server/leafnode_test.go:5948 [Fact] public async Task Both_hub_and_spoke_with_JetStream_enabled_connect_successfully() { var hubStoreDir = Path.Combine(Path.GetTempPath(), $"nats-js-hub2-{Guid.NewGuid():N}"); var spokeStoreDir = Path.Combine(Path.GetTempPath(), $"nats-js-spoke2-{Guid.NewGuid():N}"); var hubOptions = new NatsOptions { Host = "127.0.0.1", Port = 0, LeafNode = new LeafNodeOptions { Host = "127.0.0.1", Port = 0 }, JetStream = new JetStreamOptions { StoreDir = hubStoreDir }, }; var hub = new NatsServer(hubOptions, NullLoggerFactory.Instance); var hubCts = new CancellationTokenSource(); _ = hub.StartAsync(hubCts.Token); await hub.WaitForReadyAsync(); var spokeOptions = new NatsOptions { Host = "127.0.0.1", Port = 0, LeafNode = new LeafNodeOptions { Host = "127.0.0.1", Port = 0, Remotes = [hub.LeafListen!], }, JetStream = new JetStreamOptions { StoreDir = spokeStoreDir }, }; var spoke = new NatsServer(spokeOptions, NullLoggerFactory.Instance); var spokeCts = new CancellationTokenSource(); _ = spoke.StartAsync(spokeCts.Token); await spoke.WaitForReadyAsync(); using var waitTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); while (!waitTimeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)) await Task.Delay(50, waitTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); hub.Stats.JetStreamEnabled.ShouldBeTrue(); spoke.Stats.JetStreamEnabled.ShouldBeTrue(); Interlocked.Read(ref hub.Stats.Leafs).ShouldBe(1); await spokeCts.CancelAsync(); await hubCts.CancelAsync(); spoke.Dispose(); hub.Dispose(); spokeCts.Dispose(); hubCts.Dispose(); if (Directory.Exists(hubStoreDir)) Directory.Delete(hubStoreDir, true); if (Directory.Exists(spokeStoreDir)) Directory.Delete(spokeStoreDir, true); } // Go: TestLeafNodeStreamAndShadowSubs server/leafnode_test.go:6176 [Fact] public async Task Leaf_node_message_forwarding_works_alongside_JetStream() { var storeDir = Path.Combine(Path.GetTempPath(), $"nats-js-combo-{Guid.NewGuid():N}"); var hubOptions = new NatsOptions { Host = "127.0.0.1", Port = 0, LeafNode = new LeafNodeOptions { Host = "127.0.0.1", Port = 0 }, JetStream = new JetStreamOptions { StoreDir = storeDir }, }; var hub = new NatsServer(hubOptions, NullLoggerFactory.Instance); var hubCts = new CancellationTokenSource(); _ = hub.StartAsync(hubCts.Token); await hub.WaitForReadyAsync(); var spokeOptions = new NatsOptions { Host = "127.0.0.1", Port = 0, LeafNode = new LeafNodeOptions { Host = "127.0.0.1", Port = 0, Remotes = [hub.LeafListen!], }, }; var spoke = new NatsServer(spokeOptions, NullLoggerFactory.Instance); var spokeCts = new CancellationTokenSource(); _ = spoke.StartAsync(spokeCts.Token); await spoke.WaitForReadyAsync(); using var waitTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); while (!waitTimeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)) await Task.Delay(50, waitTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); // Regular pub/sub should still work alongside JS await using var leafConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{spoke.Port}" }); await leafConn.ConnectAsync(); await using var hubConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{hub.Port}" }); await hubConn.ConnectAsync(); await using var sub = await leafConn.SubscribeCoreAsync("combo.test"); await leafConn.PingAsync(); using var interestTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); while (!interestTimeout.IsCancellationRequested && !hub.HasRemoteInterest("combo.test")) await Task.Delay(50, interestTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); await hubConn.PublishAsync("combo.test", "js-combo"); using var msgTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var msg = await sub.Msgs.ReadAsync(msgTimeout.Token); msg.Data.ShouldBe("js-combo"); await spokeCts.CancelAsync(); await hubCts.CancelAsync(); spoke.Dispose(); hub.Dispose(); spokeCts.Dispose(); hubCts.Dispose(); if (Directory.Exists(storeDir)) Directory.Delete(storeDir, true); } }