diff --git a/docs/plans/2026-03-12-e2e-full-gap-coverage-plan.md b/docs/plans/2026-03-12-e2e-full-gap-coverage-plan.md new file mode 100644 index 0000000..adde369 --- /dev/null +++ b/docs/plans/2026-03-12-e2e-full-gap-coverage-plan.md @@ -0,0 +1,1846 @@ +# E2E Full Gap Coverage Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans to implement this plan task-by-task. + +**Goal:** Add ~50 E2E tests covering all gaps from `e2e_gaps.md` across monitoring, headers, shutdown/drain, clustering, leaf nodes, gateways, MQTT, WebSocket, JetStream extensions, and advanced features. + +**Architecture:** Each test area gets its own test file with a shared xUnit collection fixture. Multi-server tests (cluster, leaf, gateway) spawn multiple `NatsServerProcess` instances with route/leaf/gateway config. Single-server features (MQTT, WebSocket) use one process with the relevant port enabled. All tests use `NATS.Client.Core` NuGet for client connections except MQTT (MQTTnet) and WebSocket (raw `ClientWebSocket`). + +**Tech Stack:** .NET 10, xUnit 3, Shouldly, NATS.Client.Core 2.7.2, NATS.Client.JetStream 2.7.2, MQTTnet (new), System.Net.WebSockets (built-in) + +**Key conventions:** +- Use `NatsServerProcess.WithConfig(...)` for inline config +- Use `NatsServerProcess.AllocateFreePort()` for ephemeral ports +- Shouldly assertions only (never `Assert.*`) +- `using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(N))` for timeouts +- `await client.PingAsync()` after subscribe to flush + +--- + +### Task 1: Add MQTTnet NuGet Package + +**Files:** +- Modify: `Directory.Packages.props` +- Modify: `tests/NATS.E2E.Tests/NATS.E2E.Tests.csproj` + +**Step 1: Add MQTTnet to central package management** + +In `Directory.Packages.props`, add inside the `` after the NATS Client entries: + +```xml + + +``` + +**Step 2: Add MQTTnet reference to E2E test project** + +In `tests/NATS.E2E.Tests/NATS.E2E.Tests.csproj`, add inside the `` with other `` entries: + +```xml + +``` + +**Step 3: Verify build** + +Run: `dotnet build tests/NATS.E2E.Tests` +Expected: Build succeeded + +**Step 4: Commit** + +```bash +git add Directory.Packages.props tests/NATS.E2E.Tests/NATS.E2E.Tests.csproj +git commit -m "chore: add MQTTnet NuGet package for E2E MQTT tests" +``` + +--- + +### Task 2: Monitoring Endpoint Tests + +**Files:** +- Create: `tests/NATS.E2E.Tests/MonitoringTests.cs` +- Reference: `tests/NATS.E2E.Tests/Infrastructure/MonitorServerFixture.cs` (exists, unused) + +The `MonitorServerFixture` already exists with `MonitorClient` (HttpClient) and `MonitorPort`. The `MonitorCollection` ("E2E-Monitor") is defined. + +**Step 1: Write MonitoringTests.cs** + +```csharp +using System.Text.Json; +using NATS.Client.Core; +using NATS.E2E.Tests.Infrastructure; + +namespace NATS.E2E.Tests; + +[Collection("E2E-Monitor")] +public class MonitoringTests(MonitorServerFixture fixture) +{ + [Fact] + public async Task Varz_ReturnsServerInfo() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var response = await fixture.MonitorClient.GetAsync("/varz", cts.Token); + response.StatusCode.ShouldBe(System.Net.HttpStatusCode.OK); + + var json = await response.Content.ReadAsStringAsync(cts.Token); + using var doc = JsonDocument.Parse(json); + var root = doc.RootElement; + + root.TryGetProperty("server_name", out _).ShouldBeTrue(); + root.TryGetProperty("version", out _).ShouldBeTrue(); + root.TryGetProperty("max_payload", out _).ShouldBeTrue(); + } + + [Fact] + public async Task Connz_ReflectsConnectedClients() + { + // Connect a client so connz shows at least 1 connection + await using var client = fixture.CreateClient(); + await client.ConnectAsync(); + await client.PingAsync(); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var response = await fixture.MonitorClient.GetAsync("/connz", cts.Token); + response.StatusCode.ShouldBe(System.Net.HttpStatusCode.OK); + + var json = await response.Content.ReadAsStringAsync(cts.Token); + using var doc = JsonDocument.Parse(json); + var root = doc.RootElement; + + root.TryGetProperty("num_connections", out var numConns).ShouldBeTrue(); + numConns.GetInt32().ShouldBeGreaterThanOrEqualTo(1); + } + + [Fact] + public async Task Healthz_ReturnsOk() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var response = await fixture.MonitorClient.GetAsync("/healthz", cts.Token); + response.StatusCode.ShouldBe(System.Net.HttpStatusCode.OK); + } +} +``` + +**Step 2: Run tests to verify they pass** + +Run: `dotnet test tests/NATS.E2E.Tests --filter "FullyQualifiedName~MonitoringTests" -v normal` +Expected: 3 passed + +**Step 3: Commit** + +```bash +git add tests/NATS.E2E.Tests/MonitoringTests.cs +git commit -m "test: add E2E monitoring endpoint tests (varz, connz, healthz)" +``` + +--- + +### Task 3: Header Pub/Sub Tests + +**Files:** +- Create: `tests/NATS.E2E.Tests/HeaderTests.cs` + +Uses existing `NatsServerFixture` and `"E2E"` collection. + +**Step 1: Write HeaderTests.cs** + +```csharp +using NATS.Client.Core; +using NATS.E2E.Tests.Infrastructure; + +namespace NATS.E2E.Tests; + +[Collection("E2E")] +public class HeaderTests(NatsServerFixture fixture) +{ + [Fact] + public async Task Headers_PublishWithHeaders_ReceivedIntact() + { + await using var pub = fixture.CreateClient(); + await using var sub = fixture.CreateClient(); + await pub.ConnectAsync(); + await sub.ConnectAsync(); + + await using var subscription = await sub.SubscribeCoreAsync("e2e.hdr.basic"); + await sub.PingAsync(); + + var headers = new NatsHeaders { { "X-Test-Key", "test-value" } }; + await pub.PublishAsync("e2e.hdr.basic", "with-headers", headers: headers); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var msg = await subscription.Msgs.ReadAsync(cts.Token); + + msg.Data.ShouldBe("with-headers"); + msg.Headers.ShouldNotBeNull(); + msg.Headers!["X-Test-Key"].ToString().ShouldBe("test-value"); + } + + [Fact] + public async Task Headers_MultipleHeaders_AllPreserved() + { + await using var pub = fixture.CreateClient(); + await using var sub = fixture.CreateClient(); + await pub.ConnectAsync(); + await sub.ConnectAsync(); + + await using var subscription = await sub.SubscribeCoreAsync("e2e.hdr.multi"); + await sub.PingAsync(); + + var headers = new NatsHeaders + { + { "X-First", "one" }, + { "X-Second", "two" }, + { "X-Third", "three" }, + }; + await pub.PublishAsync("e2e.hdr.multi", "multi-hdr", headers: headers); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var msg = await subscription.Msgs.ReadAsync(cts.Token); + + msg.Headers.ShouldNotBeNull(); + msg.Headers!["X-First"].ToString().ShouldBe("one"); + msg.Headers!["X-Second"].ToString().ShouldBe("two"); + msg.Headers!["X-Third"].ToString().ShouldBe("three"); + } + + [Fact] + public async Task Headers_EmptyValue_RoundTrips() + { + await using var pub = fixture.CreateClient(); + await using var sub = fixture.CreateClient(); + await pub.ConnectAsync(); + await sub.ConnectAsync(); + + await using var subscription = await sub.SubscribeCoreAsync("e2e.hdr.empty"); + await sub.PingAsync(); + + var headers = new NatsHeaders { { "X-Empty", "" } }; + await pub.PublishAsync("e2e.hdr.empty", "empty-val", headers: headers); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var msg = await subscription.Msgs.ReadAsync(cts.Token); + + msg.Headers.ShouldNotBeNull(); + msg.Headers!.ContainsKey("X-Empty").ShouldBeTrue(); + msg.Headers!["X-Empty"].ToString().ShouldBe(""); + } +} +``` + +**Step 2: Run tests** + +Run: `dotnet test tests/NATS.E2E.Tests --filter "FullyQualifiedName~HeaderTests" -v normal` +Expected: 3 passed + +**Step 3: Commit** + +```bash +git add tests/NATS.E2E.Tests/HeaderTests.cs +git commit -m "test: add E2E header pub/sub tests (HPUB/HMSG)" +``` + +--- + +### Task 4: Shutdown and Drain Tests + +**Files:** +- Create: `tests/NATS.E2E.Tests/ShutdownDrainTests.cs` + +These tests create their own server per test (no shared fixture) since they kill the server. + +**Step 1: Write ShutdownDrainTests.cs** + +```csharp +using NATS.Client.Core; +using NATS.E2E.Tests.Infrastructure; + +namespace NATS.E2E.Tests; + +public class ShutdownDrainTests +{ + [Fact] + public async Task ClientDrain_CompletesInFlightMessages() + { + await using var server = new NatsServerProcess(); + await server.StartAsync(); + + await using var pub = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{server.Port}" }); + var sub = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{server.Port}" }); + await using var _ = sub; + + await pub.ConnectAsync(); + await sub.ConnectAsync(); + + await using var subscription = await sub.SubscribeCoreAsync("e2e.drain.>"); + await sub.PingAsync(); + + // Publish some messages + for (var i = 0; i < 10; i++) + await pub.PublishAsync($"e2e.drain.{i}", $"msg{i}"); + await pub.PingAsync(); + + // Read messages before drain + var received = new List(); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + + // Collect what we can, then dispose (drain) the subscription + for (var i = 0; i < 10; i++) + { + var msg = await subscription.Msgs.ReadAsync(cts.Token); + received.Add(msg.Data); + } + + received.Count.ShouldBe(10); + + // Dispose the client (which drains) + await sub.DisposeAsync(); + } + + [Fact] + public async Task ServerShutdown_ClientDetectsDisconnection() + { + var server = new NatsServerProcess(); + await server.StartAsync(); + + await using var client = new NatsConnection(new NatsOpts + { + Url = $"nats://127.0.0.1:{server.Port}", + MaxReconnectRetry = 0, + }); + await client.ConnectAsync(); + await client.PingAsync(); + + client.ConnectionState.ShouldBe(NatsConnectionState.Open); + + // Kill the server + await server.DisposeAsync(); + + // Wait for client to detect disconnection + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + while (client.ConnectionState == NatsConnectionState.Open && !cts.IsCancellationRequested) + await Task.Delay(100, cts.Token); + + client.ConnectionState.ShouldNotBe(NatsConnectionState.Open); + } +} +``` + +**Step 2: Run tests** + +Run: `dotnet test tests/NATS.E2E.Tests --filter "FullyQualifiedName~ShutdownDrainTests" -v normal` +Expected: 2 passed + +**Step 3: Commit** + +```bash +git add tests/NATS.E2E.Tests/ShutdownDrainTests.cs +git commit -m "test: add E2E shutdown and drain tests" +``` + +--- + +### Task 5: JetStream Extended Tests + +**Files:** +- Modify: `tests/NATS.E2E.Tests/JetStreamTests.cs` + +Adds 8 new tests to the existing JetStream test file using the existing `JetStreamServerFixture`. + +**Step 1: Add push consumer test** + +Append to `JetStreamTests.cs` before the closing `}`: + +```csharp + // ------------------------------------------------------------------------- + // Test 11 — Push consumer delivers messages to a subject + // ------------------------------------------------------------------------- + [Fact] + public async Task Consumer_PushDelivery() + { + await using var client = fixture.CreateClient(); + await client.ConnectAsync(); + var js = new NatsJSContext(client); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + + var streamName = $"E2E_PUSH_{Random.Shared.Next(100000)}"; + await js.CreateStreamAsync(new StreamConfig(streamName, [$"js.push.{streamName}.>"]), cts.Token); + + for (var i = 0; i < 5; i++) + await js.PublishAsync($"js.push.{streamName}.{i}", $"push{i}", cancellationToken: cts.Token); + + var deliverSubject = $"_deliver.{streamName}"; + await js.CreateOrUpdateConsumerAsync(streamName, + new ConsumerConfig + { + Name = "push-consumer", + AckPolicy = ConsumerConfigAckPolicy.Explicit, + DeliverSubject = deliverSubject, + }, + cts.Token); + + // Subscribe to the deliver subject to receive push messages + await using var subscription = await client.SubscribeCoreAsync(deliverSubject); + await client.PingAsync(); + + var received = new List(); + for (var i = 0; i < 5; i++) + { + var msg = await subscription.Msgs.ReadAsync(cts.Token); + received.Add(msg.Data); + } + + received.Count.ShouldBe(5); + } + + // ------------------------------------------------------------------------- + // Test 12 — AckNone policy: messages delivered without requiring ack + // ------------------------------------------------------------------------- + [Fact] + public async Task Consumer_AckNone() + { + await using var client = fixture.CreateClient(); + await client.ConnectAsync(); + var js = new NatsJSContext(client); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + var streamName = $"E2E_ACKNONE_{Random.Shared.Next(100000)}"; + await js.CreateStreamAsync(new StreamConfig(streamName, [$"js.acknone.{streamName}.>"]), cts.Token); + + for (var i = 0; i < 3; i++) + await js.PublishAsync($"js.acknone.{streamName}.{i}", $"data{i}", cancellationToken: cts.Token); + + await js.CreateOrUpdateConsumerAsync(streamName, + new ConsumerConfig { Name = "none-consumer", AckPolicy = ConsumerConfigAckPolicy.None }, + cts.Token); + + var consumer = await js.GetConsumerAsync(streamName, "none-consumer", cts.Token); + + var received = new List(); + await foreach (var msg in consumer.FetchAsync(new NatsJSFetchOpts { MaxMsgs = 3 }, cancellationToken: cts.Token)) + received.Add(msg.Data); + + received.Count.ShouldBe(3); + + // With AckNone, consumer should have no pending acks — re-fetch yields nothing + var second = new List(); + await foreach (var msg in consumer.FetchAsync(new NatsJSFetchOpts { MaxMsgs = 3, Expires = TimeSpan.FromSeconds(1) }, cancellationToken: cts.Token)) + second.Add(msg.Data); + + second.Count.ShouldBe(0); + } + + // ------------------------------------------------------------------------- + // Test 13 — AckAll policy: acking last message acks all prior + // ------------------------------------------------------------------------- + [Fact] + public async Task Consumer_AckAll() + { + await using var client = fixture.CreateClient(); + await client.ConnectAsync(); + var js = new NatsJSContext(client); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + var streamName = $"E2E_ACKALL_{Random.Shared.Next(100000)}"; + await js.CreateStreamAsync(new StreamConfig(streamName, [$"js.ackall.{streamName}.>"]), cts.Token); + + for (var i = 0; i < 5; i++) + await js.PublishAsync($"js.ackall.{streamName}.{i}", $"data{i}", cancellationToken: cts.Token); + + await js.CreateOrUpdateConsumerAsync(streamName, + new ConsumerConfig { Name = "all-consumer", AckPolicy = ConsumerConfigAckPolicy.All }, + cts.Token); + + var consumer = await js.GetConsumerAsync(streamName, "all-consumer", cts.Token); + + // Fetch all 5 messages, only ack the last one + NatsJSMsg? lastMsg = null; + await foreach (var msg in consumer.FetchAsync(new NatsJSFetchOpts { MaxMsgs = 5 }, cancellationToken: cts.Token)) + lastMsg = msg; + + lastMsg.ShouldNotBeNull(); + await lastMsg.Value.AckAsync(cancellationToken: cts.Token); + + // Re-fetch should yield nothing since all were acked + var second = new List(); + await foreach (var msg in consumer.FetchAsync(new NatsJSFetchOpts { MaxMsgs = 5, Expires = TimeSpan.FromSeconds(1) }, cancellationToken: cts.Token)) + second.Add(msg.Data); + + second.Count.ShouldBe(0); + } + + // ------------------------------------------------------------------------- + // Test 14 — Interest retention: messages removed when all consumers ack + // ------------------------------------------------------------------------- + [Fact] + public async Task Retention_Interest() + { + await using var client = fixture.CreateClient(); + await client.ConnectAsync(); + var js = new NatsJSContext(client); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + var streamName = $"E2E_INTEREST_{Random.Shared.Next(100000)}"; + await js.CreateStreamAsync( + new StreamConfig(streamName, [$"js.interest.{streamName}.>"]) + { + Retention = StreamConfigRetention.Interest, + }, + cts.Token); + + // Create a consumer BEFORE publishing (Interest retention requires active consumers) + await js.CreateOrUpdateConsumerAsync(streamName, + new ConsumerConfig { Name = "interest-c", AckPolicy = ConsumerConfigAckPolicy.Explicit }, + cts.Token); + + for (var i = 0; i < 3; i++) + await js.PublishAsync($"js.interest.{streamName}.{i}", $"val{i}", cancellationToken: cts.Token); + + var consumer = await js.GetConsumerAsync(streamName, "interest-c", cts.Token); + + // Fetch and ack all + await foreach (var msg in consumer.FetchAsync(new NatsJSFetchOpts { MaxMsgs = 3 }, cancellationToken: cts.Token)) + await msg.AckAsync(cancellationToken: cts.Token); + + // Allow server to process acks + await Task.Delay(500, cts.Token); + + // Stream should have 0 messages (interest satisfied) + var stream = await js.GetStreamAsync(streamName, cancellationToken: cts.Token); + stream.Info.State.Messages.ShouldBe(0L); + } + + // ------------------------------------------------------------------------- + // Test 15 — WorkQueue retention: message removed after single consumer ack + // ------------------------------------------------------------------------- + [Fact] + public async Task Retention_WorkQueue() + { + await using var client = fixture.CreateClient(); + await client.ConnectAsync(); + var js = new NatsJSContext(client); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + var streamName = $"E2E_WQ_{Random.Shared.Next(100000)}"; + await js.CreateStreamAsync( + new StreamConfig(streamName, [$"js.wq.{streamName}.>"]) + { + Retention = StreamConfigRetention.Workqueue, + }, + cts.Token); + + await js.CreateOrUpdateConsumerAsync(streamName, + new ConsumerConfig { Name = "wq-c", AckPolicy = ConsumerConfigAckPolicy.Explicit }, + cts.Token); + + for (var i = 0; i < 5; i++) + await js.PublishAsync($"js.wq.{streamName}.{i}", $"work{i}", cancellationToken: cts.Token); + + var consumer = await js.GetConsumerAsync(streamName, "wq-c", cts.Token); + + await foreach (var msg in consumer.FetchAsync(new NatsJSFetchOpts { MaxMsgs = 5 }, cancellationToken: cts.Token)) + await msg.AckAsync(cancellationToken: cts.Token); + + await Task.Delay(500, cts.Token); + + var stream = await js.GetStreamAsync(streamName, cancellationToken: cts.Token); + stream.Info.State.Messages.ShouldBe(0L); + } + + // ------------------------------------------------------------------------- + // Test 16 — Ordered consumer: messages arrive in sequence order + // ------------------------------------------------------------------------- + [Fact] + public async Task Consumer_Ordered() + { + await using var client = fixture.CreateClient(); + await client.ConnectAsync(); + var js = new NatsJSContext(client); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + var streamName = $"E2E_ORD_{Random.Shared.Next(100000)}"; + await js.CreateStreamAsync(new StreamConfig(streamName, [$"js.ord.{streamName}.>"]), cts.Token); + + for (var i = 0; i < 10; i++) + await js.PublishAsync($"js.ord.{streamName}.{i}", $"seq{i}", cancellationToken: cts.Token); + + var consumer = await js.CreateOrderedConsumerAsync(streamName, cancellationToken: cts.Token); + + var received = new List(); + await foreach (var msg in consumer.FetchAsync(new NatsJSFetchOpts { MaxMsgs = 10 }, cancellationToken: cts.Token)) + received.Add(msg.Data); + + received.Count.ShouldBe(10); + for (var i = 0; i < 10; i++) + received[i].ShouldBe($"seq{i}"); + } + + // ------------------------------------------------------------------------- + // Test 17 — Stream mirroring: mirror reflects source messages + // ------------------------------------------------------------------------- + [Fact] + public async Task Stream_Mirror() + { + await using var client = fixture.CreateClient(); + await client.ConnectAsync(); + var js = new NatsJSContext(client); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + + var sourceName = $"E2E_MIRROR_SRC_{Random.Shared.Next(100000)}"; + var mirrorName = $"E2E_MIRROR_DST_{Random.Shared.Next(100000)}"; + + await js.CreateStreamAsync(new StreamConfig(sourceName, [$"js.mirror.{sourceName}.>"]), cts.Token); + + for (var i = 0; i < 5; i++) + await js.PublishAsync($"js.mirror.{sourceName}.{i}", $"mirrored{i}", cancellationToken: cts.Token); + + await js.CreateStreamAsync( + new StreamConfig(mirrorName, []) + { + Mirror = new StreamSource { Name = sourceName }, + }, + cts.Token); + + // Wait for mirror to catch up + await Task.Delay(2000, cts.Token); + + var mirror = await js.GetStreamAsync(mirrorName, cancellationToken: cts.Token); + mirror.Info.State.Messages.ShouldBe(5L); + } + + // ------------------------------------------------------------------------- + // Test 18 — Stream sourcing: sourced stream aggregates from source + // ------------------------------------------------------------------------- + [Fact] + public async Task Stream_Source() + { + await using var client = fixture.CreateClient(); + await client.ConnectAsync(); + var js = new NatsJSContext(client); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + + var srcName = $"E2E_SRC_{Random.Shared.Next(100000)}"; + var aggName = $"E2E_AGG_{Random.Shared.Next(100000)}"; + + await js.CreateStreamAsync(new StreamConfig(srcName, [$"js.source.{srcName}.>"]), cts.Token); + + for (var i = 0; i < 5; i++) + await js.PublishAsync($"js.source.{srcName}.{i}", $"sourced{i}", cancellationToken: cts.Token); + + await js.CreateStreamAsync( + new StreamConfig(aggName, []) + { + Sources = [new StreamSource { Name = srcName }], + }, + cts.Token); + + // Wait for source to replicate + await Task.Delay(2000, cts.Token); + + var agg = await js.GetStreamAsync(aggName, cancellationToken: cts.Token); + agg.Info.State.Messages.ShouldBe(5L); + } +``` + +**Step 2: Run tests** + +Run: `dotnet test tests/NATS.E2E.Tests --filter "FullyQualifiedName~JetStreamTests" -v normal` +Expected: 18 passed (10 existing + 8 new) + +**Step 3: Commit** + +```bash +git add tests/NATS.E2E.Tests/JetStreamTests.cs +git commit -m "test: add E2E JetStream push consumers, ACK policies, retention modes, ordered, mirror, source" +``` + +--- + +### Task 6: Cluster Fixture and Tests + +**Files:** +- Create: `tests/NATS.E2E.Tests/Infrastructure/ClusterFixture.cs` +- Create: `tests/NATS.E2E.Tests/ClusterTests.cs` + +**Step 1: Write ClusterFixture.cs** + +The fixture starts 3 NATS server processes with route config pointing at each other. Each server gets an ephemeral client port and an ephemeral cluster port. + +```csharp +using NATS.Client.Core; + +namespace NATS.E2E.Tests.Infrastructure; + +public sealed class ClusterFixture : IAsyncLifetime +{ + private NatsServerProcess _server1 = null!; + private NatsServerProcess _server2 = null!; + private NatsServerProcess _server3 = null!; + + public int Port1 => _server1.Port; + public int Port2 => _server2.Port; + public int Port3 => _server3.Port; + + public async Task InitializeAsync() + { + var clusterPort1 = NatsServerProcess.AllocateFreePort(); + var clusterPort2 = NatsServerProcess.AllocateFreePort(); + var clusterPort3 = NatsServerProcess.AllocateFreePort(); + + var routes = $""" + nats-route://127.0.0.1:{clusterPort1} + nats-route://127.0.0.1:{clusterPort2} + nats-route://127.0.0.1:{clusterPort3} + """; + + string MakeConfig(string name, int clusterPort) => $$""" + server_name: {{name}} + cluster { + name: e2e-cluster + listen: 127.0.0.1:{{clusterPort}} + routes: [ + {{routes}} + ] + } + """; + + _server1 = NatsServerProcess.WithConfig(MakeConfig("node1", clusterPort1)); + _server2 = NatsServerProcess.WithConfig(MakeConfig("node2", clusterPort2)); + _server3 = NatsServerProcess.WithConfig(MakeConfig("node3", clusterPort3)); + + // Start all three in parallel + await Task.WhenAll( + _server1.StartAsync(), + _server2.StartAsync(), + _server3.StartAsync()); + + // Give routes time to form + await Task.Delay(2000); + } + + public async Task DisposeAsync() + { + await Task.WhenAll( + _server1.DisposeAsync().AsTask(), + _server2.DisposeAsync().AsTask(), + _server3.DisposeAsync().AsTask()); + } + + public NatsConnection CreateClient(int nodeIndex = 0) + { + var port = nodeIndex switch + { + 0 => Port1, + 1 => Port2, + 2 => Port3, + _ => throw new ArgumentOutOfRangeException(nameof(nodeIndex)), + }; + return new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{port}" }); + } +} + +[CollectionDefinition("E2E-Cluster")] +public class ClusterCollection : ICollectionFixture; +``` + +**Step 2: Write ClusterTests.cs** + +```csharp +using NATS.Client.Core; +using NATS.E2E.Tests.Infrastructure; + +namespace NATS.E2E.Tests; + +[Collection("E2E-Cluster")] +public class ClusterTests(ClusterFixture fixture) +{ + [Fact] + public async Task Cluster_MessagePropagatesAcrossNodes() + { + await using var pub = fixture.CreateClient(0); + await using var sub = fixture.CreateClient(1); + await pub.ConnectAsync(); + await sub.ConnectAsync(); + + await using var subscription = await sub.SubscribeCoreAsync("e2e.cluster.cross"); + await sub.PingAsync(); + + // Give route subscription propagation time + await Task.Delay(500); + + await pub.PublishAsync("e2e.cluster.cross", "across-nodes"); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var msg = await subscription.Msgs.ReadAsync(cts.Token); + msg.Data.ShouldBe("across-nodes"); + } + + [Fact] + public async Task Cluster_LateSubscriberReceivesMessages() + { + await using var pub = fixture.CreateClient(0); + await pub.ConnectAsync(); + + // Subscribe on node 2 mid-stream + await using var sub = fixture.CreateClient(2); + await sub.ConnectAsync(); + + await using var subscription = await sub.SubscribeCoreAsync("e2e.cluster.late"); + await sub.PingAsync(); + + await Task.Delay(500); + + await pub.PublishAsync("e2e.cluster.late", "late-join"); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var msg = await subscription.Msgs.ReadAsync(cts.Token); + msg.Data.ShouldBe("late-join"); + } + + [Fact] + public async Task Cluster_QueueGroupAcrossNodes_DeliversOnce() + { + await using var pub = fixture.CreateClient(0); + await using var sub1 = fixture.CreateClient(1); + await using var sub2 = fixture.CreateClient(2); + await pub.ConnectAsync(); + await sub1.ConnectAsync(); + await sub2.ConnectAsync(); + + await using var s1 = await sub1.SubscribeCoreAsync("e2e.cluster.qg", queueGroup: "cq"); + await using var s2 = await sub2.SubscribeCoreAsync("e2e.cluster.qg", queueGroup: "cq"); + await sub1.PingAsync(); + await sub2.PingAsync(); + + await Task.Delay(500); + + using var collectionCts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + var count1 = 0; + var count2 = 0; + + async Task Collect(INatsSub sub, Action inc) + { + try + { + await foreach (var _ in sub.Msgs.ReadAllAsync(collectionCts.Token)) + inc(); + } + catch (OperationCanceledException) { } + } + + var tasks = new[] + { + Collect(s1, () => Interlocked.Increment(ref count1)), + Collect(s2, () => Interlocked.Increment(ref count2)), + }; + + for (var i = 0; i < 20; i++) + await pub.PublishAsync("e2e.cluster.qg", i); + await pub.PingAsync(); + + using var deadline = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + while (count1 + count2 < 20 && !deadline.IsCancellationRequested) + await Task.Delay(50, deadline.Token).ContinueWith(_ => { }); + + collectionCts.Cancel(); + await Task.WhenAll(tasks); + + (count1 + count2).ShouldBe(20); + } +} +``` + +**Step 3: Run tests** + +Run: `dotnet test tests/NATS.E2E.Tests --filter "FullyQualifiedName~ClusterTests" -v normal` +Expected: 3 passed + +**Step 4: Commit** + +```bash +git add tests/NATS.E2E.Tests/Infrastructure/ClusterFixture.cs tests/NATS.E2E.Tests/ClusterTests.cs +git commit -m "test: add E2E cluster tests (cross-node messaging, late join, queue groups)" +``` + +--- + +### Task 7: Leaf Node Fixture and Tests + +**Files:** +- Create: `tests/NATS.E2E.Tests/Infrastructure/LeafNodeFixture.cs` +- Create: `tests/NATS.E2E.Tests/LeafNodeTests.cs` + +**Step 1: Write LeafNodeFixture.cs** + +```csharp +using NATS.Client.Core; + +namespace NATS.E2E.Tests.Infrastructure; + +public sealed class LeafNodeFixture : IAsyncLifetime +{ + private NatsServerProcess _hub = null!; + private NatsServerProcess _leaf = null!; + + public int HubPort => _hub.Port; + public int LeafPort => _leaf.Port; + + public async Task InitializeAsync() + { + var leafListenPort = NatsServerProcess.AllocateFreePort(); + + var hubConfig = $$""" + server_name: hub + leafnodes { + listen: 127.0.0.1:{{leafListenPort}} + } + """; + + _hub = NatsServerProcess.WithConfig(hubConfig); + await _hub.StartAsync(); + + var leafConfig = $$""" + server_name: leaf + leafnodes { + remotes [ + { url: "nats-leaf://127.0.0.1:{{leafListenPort}}" } + ] + } + """; + + _leaf = NatsServerProcess.WithConfig(leafConfig); + await _leaf.StartAsync(); + + // Give leaf time to connect to hub + await Task.Delay(2000); + } + + public async Task DisposeAsync() + { + await _leaf.DisposeAsync(); + await _hub.DisposeAsync(); + } + + public NatsConnection CreateHubClient() + => new(new NatsOpts { Url = $"nats://127.0.0.1:{HubPort}" }); + + public NatsConnection CreateLeafClient() + => new(new NatsOpts { Url = $"nats://127.0.0.1:{LeafPort}" }); +} + +[CollectionDefinition("E2E-LeafNode")] +public class LeafNodeCollection : ICollectionFixture; +``` + +**Step 2: Write LeafNodeTests.cs** + +```csharp +using NATS.Client.Core; +using NATS.E2E.Tests.Infrastructure; + +namespace NATS.E2E.Tests; + +[Collection("E2E-LeafNode")] +public class LeafNodeTests(LeafNodeFixture fixture) +{ + [Fact] + public async Task LeafNode_HubToLeaf_MessageDelivered() + { + await using var pub = fixture.CreateHubClient(); + await using var sub = fixture.CreateLeafClient(); + await pub.ConnectAsync(); + await sub.ConnectAsync(); + + await using var subscription = await sub.SubscribeCoreAsync("e2e.leaf.h2l"); + await sub.PingAsync(); + + // Give subscription propagation time through leaf connection + await Task.Delay(500); + + await pub.PublishAsync("e2e.leaf.h2l", "hub-to-leaf"); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var msg = await subscription.Msgs.ReadAsync(cts.Token); + msg.Data.ShouldBe("hub-to-leaf"); + } + + [Fact] + public async Task LeafNode_LeafToHub_MessageDelivered() + { + await using var pub = fixture.CreateLeafClient(); + await using var sub = fixture.CreateHubClient(); + await pub.ConnectAsync(); + await sub.ConnectAsync(); + + await using var subscription = await sub.SubscribeCoreAsync("e2e.leaf.l2h"); + await sub.PingAsync(); + + await Task.Delay(500); + + await pub.PublishAsync("e2e.leaf.l2h", "leaf-to-hub"); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var msg = await subscription.Msgs.ReadAsync(cts.Token); + msg.Data.ShouldBe("leaf-to-hub"); + } + + [Fact] + public async Task LeafNode_OnlySubscribedSubjectsPropagate() + { + await using var pub = fixture.CreateHubClient(); + await using var sub = fixture.CreateLeafClient(); + await pub.ConnectAsync(); + await sub.ConnectAsync(); + + // Subscribe to a specific subject on leaf + await using var subscription = await sub.SubscribeCoreAsync("e2e.leaf.specific"); + await sub.PingAsync(); + await Task.Delay(500); + + // Publish to a different subject on hub — should NOT arrive at leaf subscriber + await pub.PublishAsync("e2e.leaf.other", "wrong-subject"); + + // Publish to the correct subject — SHOULD arrive + await pub.PublishAsync("e2e.leaf.specific", "right-subject"); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var msg = await subscription.Msgs.ReadAsync(cts.Token); + msg.Data.ShouldBe("right-subject"); + } +} +``` + +**Step 3: Run tests** + +Run: `dotnet test tests/NATS.E2E.Tests --filter "FullyQualifiedName~LeafNodeTests" -v normal` +Expected: 3 passed + +**Step 4: Commit** + +```bash +git add tests/NATS.E2E.Tests/Infrastructure/LeafNodeFixture.cs tests/NATS.E2E.Tests/LeafNodeTests.cs +git commit -m "test: add E2E leaf node tests (hub-to-leaf, leaf-to-hub, subject propagation)" +``` + +--- + +### Task 8: Gateway Fixture and Tests + +**Files:** +- Create: `tests/NATS.E2E.Tests/Infrastructure/GatewayFixture.cs` +- Create: `tests/NATS.E2E.Tests/GatewayTests.cs` + +**Step 1: Write GatewayFixture.cs** + +```csharp +using NATS.Client.Core; + +namespace NATS.E2E.Tests.Infrastructure; + +public sealed class GatewayFixture : IAsyncLifetime +{ + private NatsServerProcess _serverA = null!; + private NatsServerProcess _serverB = null!; + + public int PortA => _serverA.Port; + public int PortB => _serverB.Port; + + public async Task InitializeAsync() + { + var gwPortA = NatsServerProcess.AllocateFreePort(); + var gwPortB = NatsServerProcess.AllocateFreePort(); + + var configA = $$""" + server_name: gw-a + gateway { + name: cluster-a + listen: 127.0.0.1:{{gwPortA}} + gateways: [ + { name: cluster-b, url: nats://127.0.0.1:{{gwPortB}} } + ] + } + """; + + var configB = $$""" + server_name: gw-b + gateway { + name: cluster-b + listen: 127.0.0.1:{{gwPortB}} + gateways: [ + { name: cluster-a, url: nats://127.0.0.1:{{gwPortA}} } + ] + } + """; + + _serverA = NatsServerProcess.WithConfig(configA); + _serverB = NatsServerProcess.WithConfig(configB); + + await Task.WhenAll(_serverA.StartAsync(), _serverB.StartAsync()); + + // Give gateways time to connect + await Task.Delay(2000); + } + + public async Task DisposeAsync() + { + await Task.WhenAll( + _serverA.DisposeAsync().AsTask(), + _serverB.DisposeAsync().AsTask()); + } + + public NatsConnection CreateClientA() + => new(new NatsOpts { Url = $"nats://127.0.0.1:{PortA}" }); + + public NatsConnection CreateClientB() + => new(new NatsOpts { Url = $"nats://127.0.0.1:{PortB}" }); +} + +[CollectionDefinition("E2E-Gateway")] +public class GatewayCollection : ICollectionFixture; +``` + +**Step 2: Write GatewayTests.cs** + +```csharp +using NATS.Client.Core; +using NATS.E2E.Tests.Infrastructure; + +namespace NATS.E2E.Tests; + +[Collection("E2E-Gateway")] +public class GatewayTests(GatewayFixture fixture) +{ + [Fact] + public async Task Gateway_MessageCrossesGateway() + { + await using var pub = fixture.CreateClientA(); + await using var sub = fixture.CreateClientB(); + await pub.ConnectAsync(); + await sub.ConnectAsync(); + + await using var subscription = await sub.SubscribeCoreAsync("e2e.gw.cross"); + await sub.PingAsync(); + + // Give gateway interest propagation time + await Task.Delay(1000); + + await pub.PublishAsync("e2e.gw.cross", "gateway-msg"); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var msg = await subscription.Msgs.ReadAsync(cts.Token); + msg.Data.ShouldBe("gateway-msg"); + } + + [Fact] + public async Task Gateway_NoInterest_NoDelivery() + { + await using var pub = fixture.CreateClientA(); + await using var sub = fixture.CreateClientB(); + await pub.ConnectAsync(); + await sub.ConnectAsync(); + + // Subscribe on B to a DIFFERENT subject than what A publishes + await using var subscription = await sub.SubscribeCoreAsync("e2e.gw.listen"); + await sub.PingAsync(); + await Task.Delay(1000); + + // Publish to a subject with no interest on B + await pub.PublishAsync("e2e.gw.nolisten", "should-not-arrive"); + // Publish to the subscribed subject to verify gateway works + await pub.PublishAsync("e2e.gw.listen", "should-arrive"); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var msg = await subscription.Msgs.ReadAsync(cts.Token); + msg.Data.ShouldBe("should-arrive"); + } +} +``` + +**Step 3: Run tests** + +Run: `dotnet test tests/NATS.E2E.Tests --filter "FullyQualifiedName~GatewayTests" -v normal` +Expected: 2 passed + +**Step 4: Commit** + +```bash +git add tests/NATS.E2E.Tests/Infrastructure/GatewayFixture.cs tests/NATS.E2E.Tests/GatewayTests.cs +git commit -m "test: add E2E gateway tests (cross-gateway messaging, interest-only)" +``` + +--- + +### Task 9: MQTT Fixture and Tests + +**Files:** +- Create: `tests/NATS.E2E.Tests/Infrastructure/MqttServerFixture.cs` +- Create: `tests/NATS.E2E.Tests/MqttTests.cs` + +**Step 1: Write MqttServerFixture.cs** + +The MQTT fixture starts a server with JetStream enabled (required for MQTT persistence) and an MQTT port. + +```csharp +using NATS.Client.Core; + +namespace NATS.E2E.Tests.Infrastructure; + +public sealed class MqttServerFixture : IAsyncLifetime +{ + private NatsServerProcess _server = null!; + private string _storeDir = null!; + + public int Port => _server.Port; + public int MqttPort { get; private set; } + + public async Task InitializeAsync() + { + MqttPort = NatsServerProcess.AllocateFreePort(); + _storeDir = Path.Combine(Path.GetTempPath(), "nats-e2e-mqtt-" + Guid.NewGuid().ToString("N")[..8]); + Directory.CreateDirectory(_storeDir); + + var config = $$""" + jetstream { + store_dir: "{{_storeDir}}" + max_mem_store: 64mb + max_file_store: 256mb + } + mqtt { + listen: 127.0.0.1:{{MqttPort}} + } + """; + + _server = NatsServerProcess.WithConfig(config); + await _server.StartAsync(); + } + + public async Task DisposeAsync() + { + await _server.DisposeAsync(); + + if (_storeDir is not null && Directory.Exists(_storeDir)) + { + try { Directory.Delete(_storeDir, recursive: true); } + catch { /* best-effort cleanup */ } + } + } + + public NatsConnection CreateNatsClient() + => new(new NatsOpts { Url = $"nats://127.0.0.1:{Port}" }); +} + +[CollectionDefinition("E2E-Mqtt")] +public class MqttCollection : ICollectionFixture; +``` + +**Step 2: Write MqttTests.cs** + +```csharp +using MQTTnet; +using MQTTnet.Client; +using NATS.Client.Core; +using NATS.E2E.Tests.Infrastructure; + +namespace NATS.E2E.Tests; + +[Collection("E2E-Mqtt")] +public class MqttTests(MqttServerFixture fixture) +{ + [Fact] + public async Task Mqtt_NatsPublish_MqttReceives() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + + // Connect MQTT subscriber + var mqttFactory = new MqttFactory(); + using var mqttClient = mqttFactory.CreateMqttClient(); + var mqttOpts = new MqttClientOptionsBuilder() + .WithTcpServer("127.0.0.1", fixture.MqttPort) + .WithClientId("e2e-mqtt-sub") + .Build(); + + await mqttClient.ConnectAsync(mqttOpts, cts.Token); + + string? receivedPayload = null; + var received = new TaskCompletionSource(); + + mqttClient.ApplicationMessageReceivedAsync += e => + { + var payload = System.Text.Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment); + received.TrySetResult(payload); + return Task.CompletedTask; + }; + + await mqttClient.SubscribeAsync( + new MqttClientSubscribeOptionsBuilder() + .WithTopicFilter("e2e/mqtt/nats2mqtt") + .Build(), + cts.Token); + + // Publish via NATS (MQTT topics use / but NATS maps / to .) + await using var natsClient = fixture.CreateNatsClient(); + await natsClient.ConnectAsync(); + + // NATS subject maps: e2e.mqtt.nats2mqtt → MQTT topic e2e/mqtt/nats2mqtt + await natsClient.PublishAsync("e2e.mqtt.nats2mqtt", "from-nats"); + + var result = await received.Task.WaitAsync(cts.Token); + result.ShouldBe("from-nats"); + + await mqttClient.DisconnectAsync(cancellationToken: cts.Token); + } + + [Fact] + public async Task Mqtt_MqttPublish_NatsReceives() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + + // Subscribe via NATS + await using var natsClient = fixture.CreateNatsClient(); + await natsClient.ConnectAsync(); + + await using var subscription = await natsClient.SubscribeCoreAsync("e2e.mqtt.mqtt2nats"); + await natsClient.PingAsync(); + + // Connect MQTT publisher + var mqttFactory = new MqttFactory(); + using var mqttClient = mqttFactory.CreateMqttClient(); + var mqttOpts = new MqttClientOptionsBuilder() + .WithTcpServer("127.0.0.1", fixture.MqttPort) + .WithClientId("e2e-mqtt-pub") + .Build(); + + await mqttClient.ConnectAsync(mqttOpts, cts.Token); + + var message = new MqttApplicationMessageBuilder() + .WithTopic("e2e/mqtt/mqtt2nats") + .WithPayload("from-mqtt"u8.ToArray()) + .Build(); + + await mqttClient.PublishAsync(message, cts.Token); + + var msg = await subscription.Msgs.ReadAsync(cts.Token); + msg.Data.ShouldBe("from-mqtt"); + + await mqttClient.DisconnectAsync(cancellationToken: cts.Token); + } + + [Fact] + public async Task Mqtt_Qos1_Delivery() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + + var mqttFactory = new MqttFactory(); + using var mqttPub = mqttFactory.CreateMqttClient(); + using var mqttSub = mqttFactory.CreateMqttClient(); + + var pubOpts = new MqttClientOptionsBuilder() + .WithTcpServer("127.0.0.1", fixture.MqttPort) + .WithClientId("e2e-qos1-pub") + .Build(); + + var subOpts = new MqttClientOptionsBuilder() + .WithTcpServer("127.0.0.1", fixture.MqttPort) + .WithClientId("e2e-qos1-sub") + .WithCleanSession(true) + .Build(); + + await mqttSub.ConnectAsync(subOpts, cts.Token); + + var received = new TaskCompletionSource(); + mqttSub.ApplicationMessageReceivedAsync += e => + { + var payload = System.Text.Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment); + received.TrySetResult(payload); + return Task.CompletedTask; + }; + + await mqttSub.SubscribeAsync( + new MqttClientSubscribeOptionsBuilder() + .WithTopicFilter("e2e/mqtt/qos1", MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce) + .Build(), + cts.Token); + + await mqttPub.ConnectAsync(pubOpts, cts.Token); + + var message = new MqttApplicationMessageBuilder() + .WithTopic("e2e/mqtt/qos1") + .WithPayload("qos1-msg"u8.ToArray()) + .WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce) + .Build(); + + await mqttPub.PublishAsync(message, cts.Token); + + var result = await received.Task.WaitAsync(cts.Token); + result.ShouldBe("qos1-msg"); + + await mqttPub.DisconnectAsync(cancellationToken: cts.Token); + await mqttSub.DisconnectAsync(cancellationToken: cts.Token); + } +} +``` + +**Step 3: Run tests** + +Run: `dotnet test tests/NATS.E2E.Tests --filter "FullyQualifiedName~MqttTests" -v normal` +Expected: 3 passed + +**Step 4: Commit** + +```bash +git add tests/NATS.E2E.Tests/Infrastructure/MqttServerFixture.cs tests/NATS.E2E.Tests/MqttTests.cs +git commit -m "test: add E2E MQTT bridge tests (NATS-to-MQTT, MQTT-to-NATS, QoS 1)" +``` + +--- + +### Task 10: WebSocket Fixture and Tests + +**Files:** +- Create: `tests/NATS.E2E.Tests/Infrastructure/WebSocketServerFixture.cs` +- Create: `tests/NATS.E2E.Tests/WebSocketTests.cs` + +**Step 1: Write WebSocketServerFixture.cs** + +```csharp +using NATS.Client.Core; + +namespace NATS.E2E.Tests.Infrastructure; + +public sealed class WebSocketServerFixture : IAsyncLifetime +{ + private NatsServerProcess _server = null!; + + public int Port => _server.Port; + public int WsPort { get; private set; } + + public async Task InitializeAsync() + { + WsPort = NatsServerProcess.AllocateFreePort(); + + var config = $$""" + websocket { + listen: 127.0.0.1:{{WsPort}} + no_tls: true + } + """; + + _server = NatsServerProcess.WithConfig(config); + await _server.StartAsync(); + } + + public async Task DisposeAsync() + { + await _server.DisposeAsync(); + } + + public NatsConnection CreateNatsClient() + => new(new NatsOpts { Url = $"nats://127.0.0.1:{Port}" }); +} + +[CollectionDefinition("E2E-WebSocket")] +public class WebSocketCollection : ICollectionFixture; +``` + +**Step 2: Write WebSocketTests.cs** + +Uses raw `ClientWebSocket` to connect and speak NATS protocol over WebSocket. + +```csharp +using System.Net.WebSockets; +using System.Text; +using NATS.Client.Core; +using NATS.E2E.Tests.Infrastructure; + +namespace NATS.E2E.Tests; + +[Collection("E2E-WebSocket")] +public class WebSocketTests(WebSocketServerFixture fixture) +{ + [Fact] + public async Task WebSocket_ConnectAndReceiveInfo() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + using var ws = new ClientWebSocket(); + + await ws.ConnectAsync(new Uri($"ws://127.0.0.1:{fixture.WsPort}"), cts.Token); + ws.State.ShouldBe(WebSocketState.Open); + + // Read INFO line + var buffer = new byte[4096]; + var result = await ws.ReceiveAsync(buffer, cts.Token); + var info = Encoding.ASCII.GetString(buffer, 0, result.Count); + info.ShouldStartWith("INFO"); + + await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, null, cts.Token); + } + + [Fact] + public async Task WebSocket_PubSub_RoundTrip() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + using var ws = new ClientWebSocket(); + + await ws.ConnectAsync(new Uri($"ws://127.0.0.1:{fixture.WsPort}"), cts.Token); + + // Read INFO + var buffer = new byte[4096]; + await ws.ReceiveAsync(buffer, cts.Token); + + // Send CONNECT + await WsSend(ws, "CONNECT {\"verbose\":false,\"protocol\":1}\r\n", cts.Token); + + // Subscribe + await WsSend(ws, "SUB e2e.ws.test 1\r\n", cts.Token); + + // PING/PONG to flush + await WsSend(ws, "PING\r\n", cts.Token); + var pong = await WsReadLine(ws, buffer, cts.Token); + pong.ShouldBe("PONG"); + + // Publish via regular NATS client + await using var natsClient = fixture.CreateNatsClient(); + await natsClient.ConnectAsync(); + await natsClient.PublishAsync("e2e.ws.test", "ws-hello"); + await natsClient.PingAsync(); + + // Read MSG from WebSocket + var msgLine = await WsReadLine(ws, buffer, cts.Token); + msgLine.ShouldStartWith("MSG e2e.ws.test 1"); + + // Read payload + var payload = await WsReadLine(ws, buffer, cts.Token); + payload.ShouldBe("ws-hello"); + + await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, null, cts.Token); + } + + private static async Task WsSend(ClientWebSocket ws, string data, CancellationToken ct) + { + var bytes = Encoding.ASCII.GetBytes(data); + await ws.SendAsync(bytes, WebSocketMessageType.Text, true, ct); + } + + private static async Task WsReadLine(ClientWebSocket ws, byte[] buffer, CancellationToken ct) + { + var sb = new StringBuilder(); + while (true) + { + var result = await ws.ReceiveAsync(buffer, ct); + var chunk = Encoding.ASCII.GetString(buffer, 0, result.Count); + sb.Append(chunk); + var full = sb.ToString(); + if (full.Contains("\r\n")) + return full.TrimEnd('\r', '\n'); + if (result.EndOfMessage) + return full.TrimEnd('\r', '\n'); + } + } +} +``` + +**Step 3: Run tests** + +Run: `dotnet test tests/NATS.E2E.Tests --filter "FullyQualifiedName~WebSocketTests" -v normal` +Expected: 2 passed + +**Step 4: Commit** + +```bash +git add tests/NATS.E2E.Tests/Infrastructure/WebSocketServerFixture.cs tests/NATS.E2E.Tests/WebSocketTests.cs +git commit -m "test: add E2E WebSocket transport tests (connect, pub/sub round-trip)" +``` + +--- + +### Task 11: Advanced Tests (Config, Max Connections, System Events, Account Imports) + +**Files:** +- Create: `tests/NATS.E2E.Tests/AdvancedTests.cs` + +These tests each spin up their own server with specific config, so no shared fixture is needed. + +**Step 1: Write AdvancedTests.cs** + +```csharp +using System.Text.Json; +using NATS.Client.Core; +using NATS.E2E.Tests.Infrastructure; + +namespace NATS.E2E.Tests; + +public class AdvancedTests +{ + // ------------------------------------------------------------------------- + // Config file loading: server started with full config, verify behavior + // ------------------------------------------------------------------------- + [Fact] + public async Task ConfigFile_FullConfig_ServerStartsAndAcceptsConnections() + { + var config = """ + server_name: e2e-config-test + max_payload: 2048 + max_connections: 100 + """; + + await using var server = NatsServerProcess.WithConfig(config); + await server.StartAsync(); + + await using var client = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{server.Port}" }); + await client.ConnectAsync(); + await client.PingAsync(); + + client.ConnectionState.ShouldBe(NatsConnectionState.Open); + client.ServerInfo.ShouldNotBeNull(); + client.ServerInfo!.MaxPayload.ShouldBe(2048); + } + + // ------------------------------------------------------------------------- + // Max connections enforcement + // ------------------------------------------------------------------------- + [Fact] + public async Task MaxConnections_ExceedsLimit_Rejected() + { + var config = """ + max_connections: 2 + """; + + await using var server = NatsServerProcess.WithConfig(config); + await server.StartAsync(); + + var url = $"nats://127.0.0.1:{server.Port}"; + + await using var c1 = new NatsConnection(new NatsOpts { Url = url }); + await using var c2 = new NatsConnection(new NatsOpts { Url = url }); + + await c1.ConnectAsync(); + await c1.PingAsync(); + + await c2.ConnectAsync(); + await c2.PingAsync(); + + // Third connection should be rejected + await using var c3 = new NatsConnection(new NatsOpts + { + Url = url, + MaxReconnectRetry = 0, + }); + + var ex = await Should.ThrowAsync(async () => + { + await c3.ConnectAsync(); + await c3.PingAsync(); + }); + + ex.ShouldNotBeNull(); + } + + // ------------------------------------------------------------------------- + // System events: subscribe to $SYS.>, detect client connect event + // ------------------------------------------------------------------------- + [Fact] + public async Task SystemEvents_ClientConnect_EventPublished() + { + var config = """ + accounts { + SYS { + users = [{ user: "sys", password: "sys" }] + } + APP { + users = [{ user: "app", password: "app" }] + } + } + system_account: SYS + """; + + await using var server = NatsServerProcess.WithConfig(config); + await server.StartAsync(); + + var url = $"nats://127.0.0.1:{server.Port}"; + + // Connect as system account user to see events + await using var sysClient = new NatsConnection(new NatsOpts + { + Url = url, + AuthOpts = new NatsAuthOpts { Username = "sys", Password = "sys" }, + }); + await sysClient.ConnectAsync(); + + await using var subscription = await sysClient.SubscribeCoreAsync("$SYS.ACCOUNT.*.CONNECT"); + await sysClient.PingAsync(); + + // Connect a new app client — should trigger a system event + await using var appClient = new NatsConnection(new NatsOpts + { + Url = url, + AuthOpts = new NatsAuthOpts { Username = "app", Password = "app" }, + }); + await appClient.ConnectAsync(); + await appClient.PingAsync(); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var msg = await subscription.Msgs.ReadAsync(cts.Token); + msg.Subject.ShouldContain("CONNECT"); + } + + // ------------------------------------------------------------------------- + // Account imports/exports: cross-account service call + // ------------------------------------------------------------------------- + [Fact] + public async Task AccountImportExport_CrossAccountServiceCall() + { + var config = """ + accounts { + PROVIDER { + users = [{ user: "provider", password: "prov" }] + exports = [ + { service: "svc.echo" } + ] + } + CONSUMER { + users = [{ user: "consumer", password: "cons" }] + imports = [ + { service: { account: PROVIDER, subject: "svc.echo" } } + ] + } + } + """; + + await using var server = NatsServerProcess.WithConfig(config); + await server.StartAsync(); + + var url = $"nats://127.0.0.1:{server.Port}"; + + // Provider subscribes to service subject + await using var provider = new NatsConnection(new NatsOpts + { + Url = url, + AuthOpts = new NatsAuthOpts { Username = "provider", Password = "prov" }, + }); + await provider.ConnectAsync(); + + await using var svcSub = await provider.SubscribeCoreAsync("svc.echo"); + await provider.PingAsync(); + + // Background task: provider replies to requests + var responderTask = Task.Run(async () => + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var msg = await svcSub.Msgs.ReadAsync(cts.Token); + await provider.PublishAsync(msg.ReplyTo!, $"echo: {msg.Data}"); + }); + + // Consumer calls the imported service + await using var consumer = new NatsConnection(new NatsOpts + { + Url = url, + AuthOpts = new NatsAuthOpts { Username = "consumer", Password = "cons" }, + }); + await consumer.ConnectAsync(); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var reply = await consumer.RequestAsync("svc.echo", "hello", cancellationToken: cts.Token); + + reply.Data.ShouldBe("echo: hello"); + + await responderTask; + } + + // ------------------------------------------------------------------------- + // Subject transforms: publish on mapped subject, receive on target + // Note: Subject transforms are not yet parsed from config, so this test + // is skipped until the feature is implemented. + // ------------------------------------------------------------------------- + [Fact(Skip = "Subject transforms not yet implemented in config parsing")] + public async Task SubjectTransforms_MappedSubject_ReceivedOnTarget() + { + // Placeholder — will be enabled when SubjectMappings config parsing is added + await Task.CompletedTask; + } + + // ------------------------------------------------------------------------- + // JWT authentication + // Note: Full JWT operator mode is not yet implemented in config parsing. + // NKey auth (the foundation for JWT) is already tested in AuthTests. + // ------------------------------------------------------------------------- + [Fact(Skip = "JWT operator mode not yet implemented in config parsing")] + public async Task JwtAuth_ValidJwt_Connects() + { + // Placeholder — will be enabled when JWT resolver config parsing is added + await Task.CompletedTask; + } +} +``` + +**Step 2: Run tests** + +Run: `dotnet test tests/NATS.E2E.Tests --filter "FullyQualifiedName~AdvancedTests" -v normal` +Expected: 4 passed, 2 skipped (SubjectTransforms and JwtAuth) + +**Step 3: Commit** + +```bash +git add tests/NATS.E2E.Tests/AdvancedTests.cs +git commit -m "test: add E2E advanced tests (config, max connections, system events, account imports)" +``` + +--- + +### Task 12: Final Verification and Cleanup + +**Files:** +- No new files + +**Step 1: Build the entire solution** + +Run: `dotnet build` +Expected: Build succeeded + +**Step 2: Run ALL E2E tests** + +Run: `dotnet test tests/NATS.E2E.Tests -v normal` +Expected: ~90+ tests (42 existing + ~48 new), all passing (2 skipped) + +**Step 3: Verify test count** + +Run: `dotnet test tests/NATS.E2E.Tests --list-tests 2>/dev/null | wc -l` +Expected: ~90+ lines + +**Step 4: Commit any final fixes** + +If any tests fail, fix them and commit: + +```bash +git add -A +git commit -m "fix: resolve E2E test failures from final verification" +``` + +--- + +## Task Dependency Graph + +``` +Task 1 (MQTTnet NuGet) ──────────────────────────────┐ +Task 2 (Monitoring) ─────── no deps │ +Task 3 (Headers) ────────── no deps │ +Task 4 (Shutdown/Drain) ─── no deps │ +Task 5 (JetStream Ext) ──── no deps │ +Task 6 (Cluster) ────────── no deps │ +Task 7 (Leaf Node) ──────── no deps │ +Task 8 (Gateway) ────────── no deps │ +Task 9 (MQTT) ───────────── blocked by Task 1 │ +Task 10 (WebSocket) ─────── no deps │ +Task 11 (Advanced) ──────── no deps │ +Task 12 (Verification) ──── blocked by ALL above ─────┘ +``` + +**Parallelizable batches:** +- **Batch A** (all independent): Tasks 1, 2, 3, 4, 5, 6, 7, 8, 10, 11 +- **Batch B** (after Task 1): Task 9 +- **Batch C** (after all): Task 12 diff --git a/docs/plans/2026-03-12-e2e-full-gap-coverage-plan.md.tasks.json b/docs/plans/2026-03-12-e2e-full-gap-coverage-plan.md.tasks.json new file mode 100644 index 0000000..7b064d0 --- /dev/null +++ b/docs/plans/2026-03-12-e2e-full-gap-coverage-plan.md.tasks.json @@ -0,0 +1,18 @@ +{ + "planPath": "docs/plans/2026-03-12-e2e-full-gap-coverage-plan.md", + "tasks": [ + {"id": 1, "subject": "Task 1: Add MQTTnet NuGet Package", "status": "pending"}, + {"id": 2, "subject": "Task 2: Monitoring Endpoint Tests", "status": "pending"}, + {"id": 3, "subject": "Task 3: Header Pub/Sub Tests", "status": "pending"}, + {"id": 4, "subject": "Task 4: Shutdown and Drain Tests", "status": "pending"}, + {"id": 5, "subject": "Task 5: JetStream Extended Tests", "status": "pending"}, + {"id": 6, "subject": "Task 6: Cluster Fixture and Tests", "status": "pending"}, + {"id": 7, "subject": "Task 7: Leaf Node Fixture and Tests", "status": "pending"}, + {"id": 8, "subject": "Task 8: Gateway Fixture and Tests", "status": "pending"}, + {"id": 9, "subject": "Task 9: MQTT Fixture and Tests", "status": "pending", "blockedBy": [1]}, + {"id": 10, "subject": "Task 10: WebSocket Fixture and Tests", "status": "pending"}, + {"id": 11, "subject": "Task 11: Advanced Tests", "status": "pending"}, + {"id": 12, "subject": "Task 12: Final Verification and Cleanup", "status": "pending", "blockedBy": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]} + ], + "lastUpdated": "2026-03-12T00:00:00Z" +}