Files
natsdotnet/docs/plans/2026-03-12-e2e-full-gap-coverage-plan.md
Joseph Doherty 0e252d6ccf docs: add E2E full gap coverage implementation plan
12 tasks covering all 3 priority tiers: monitoring, headers,
shutdown/drain, JetStream extensions, cluster, leaf node,
gateway, MQTT, WebSocket, and advanced feature tests.
2026-03-12 19:05:48 -04:00

60 KiB

E2E Full Gap Coverage Implementation Plan

For Claude: REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans to implement this plan task-by-task.

Goal: Add ~50 E2E tests covering all gaps from e2e_gaps.md across monitoring, headers, shutdown/drain, clustering, leaf nodes, gateways, MQTT, WebSocket, JetStream extensions, and advanced features.

Architecture: Each test area gets its own test file with a shared xUnit collection fixture. Multi-server tests (cluster, leaf, gateway) spawn multiple NatsServerProcess instances with route/leaf/gateway config. Single-server features (MQTT, WebSocket) use one process with the relevant port enabled. All tests use NATS.Client.Core NuGet for client connections except MQTT (MQTTnet) and WebSocket (raw ClientWebSocket).

Tech Stack: .NET 10, xUnit 3, Shouldly, NATS.Client.Core 2.7.2, NATS.Client.JetStream 2.7.2, MQTTnet (new), System.Net.WebSockets (built-in)

Key conventions:

  • Use NatsServerProcess.WithConfig(...) for inline config
  • Use NatsServerProcess.AllocateFreePort() for ephemeral ports
  • Shouldly assertions only (never Assert.*)
  • using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(N)) for timeouts
  • await client.PingAsync() after subscribe to flush

Task 1: Add MQTTnet NuGet Package

Files:

  • Modify: Directory.Packages.props
  • Modify: tests/NATS.E2E.Tests/NATS.E2E.Tests.csproj

Step 1: Add MQTTnet to central package management

In Directory.Packages.props, add inside the <ItemGroup> after the NATS Client entries:

    <!-- MQTT Client (E2E tests) -->
    <PackageVersion Include="MQTTnet" Version="4.3.7.1207" />

Step 2: Add MQTTnet reference to E2E test project

In tests/NATS.E2E.Tests/NATS.E2E.Tests.csproj, add inside the <ItemGroup> with other <PackageReference> entries:

    <PackageReference Include="MQTTnet" />

Step 3: Verify build

Run: dotnet build tests/NATS.E2E.Tests Expected: Build succeeded

Step 4: Commit

git add Directory.Packages.props tests/NATS.E2E.Tests/NATS.E2E.Tests.csproj
git commit -m "chore: add MQTTnet NuGet package for E2E MQTT tests"

Task 2: Monitoring Endpoint Tests

Files:

  • Create: tests/NATS.E2E.Tests/MonitoringTests.cs
  • Reference: tests/NATS.E2E.Tests/Infrastructure/MonitorServerFixture.cs (exists, unused)

The MonitorServerFixture already exists with MonitorClient (HttpClient) and MonitorPort. The MonitorCollection ("E2E-Monitor") is defined.

Step 1: Write MonitoringTests.cs

using System.Text.Json;
using NATS.Client.Core;
using NATS.E2E.Tests.Infrastructure;

namespace NATS.E2E.Tests;

[Collection("E2E-Monitor")]
public class MonitoringTests(MonitorServerFixture fixture)
{
    [Fact]
    public async Task Varz_ReturnsServerInfo()
    {
        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
        var response = await fixture.MonitorClient.GetAsync("/varz", cts.Token);
        response.StatusCode.ShouldBe(System.Net.HttpStatusCode.OK);

        var json = await response.Content.ReadAsStringAsync(cts.Token);
        using var doc = JsonDocument.Parse(json);
        var root = doc.RootElement;

        root.TryGetProperty("server_name", out _).ShouldBeTrue();
        root.TryGetProperty("version", out _).ShouldBeTrue();
        root.TryGetProperty("max_payload", out _).ShouldBeTrue();
    }

    [Fact]
    public async Task Connz_ReflectsConnectedClients()
    {
        // Connect a client so connz shows at least 1 connection
        await using var client = fixture.CreateClient();
        await client.ConnectAsync();
        await client.PingAsync();

        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
        var response = await fixture.MonitorClient.GetAsync("/connz", cts.Token);
        response.StatusCode.ShouldBe(System.Net.HttpStatusCode.OK);

        var json = await response.Content.ReadAsStringAsync(cts.Token);
        using var doc = JsonDocument.Parse(json);
        var root = doc.RootElement;

        root.TryGetProperty("num_connections", out var numConns).ShouldBeTrue();
        numConns.GetInt32().ShouldBeGreaterThanOrEqualTo(1);
    }

    [Fact]
    public async Task Healthz_ReturnsOk()
    {
        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
        var response = await fixture.MonitorClient.GetAsync("/healthz", cts.Token);
        response.StatusCode.ShouldBe(System.Net.HttpStatusCode.OK);
    }
}

Step 2: Run tests to verify they pass

Run: dotnet test tests/NATS.E2E.Tests --filter "FullyQualifiedName~MonitoringTests" -v normal Expected: 3 passed

Step 3: Commit

git add tests/NATS.E2E.Tests/MonitoringTests.cs
git commit -m "test: add E2E monitoring endpoint tests (varz, connz, healthz)"

Task 3: Header Pub/Sub Tests

Files:

  • Create: tests/NATS.E2E.Tests/HeaderTests.cs

Uses existing NatsServerFixture and "E2E" collection.

Step 1: Write HeaderTests.cs

using NATS.Client.Core;
using NATS.E2E.Tests.Infrastructure;

namespace NATS.E2E.Tests;

[Collection("E2E")]
public class HeaderTests(NatsServerFixture fixture)
{
    [Fact]
    public async Task Headers_PublishWithHeaders_ReceivedIntact()
    {
        await using var pub = fixture.CreateClient();
        await using var sub = fixture.CreateClient();
        await pub.ConnectAsync();
        await sub.ConnectAsync();

        await using var subscription = await sub.SubscribeCoreAsync<string>("e2e.hdr.basic");
        await sub.PingAsync();

        var headers = new NatsHeaders { { "X-Test-Key", "test-value" } };
        await pub.PublishAsync("e2e.hdr.basic", "with-headers", headers: headers);

        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
        var msg = await subscription.Msgs.ReadAsync(cts.Token);

        msg.Data.ShouldBe("with-headers");
        msg.Headers.ShouldNotBeNull();
        msg.Headers!["X-Test-Key"].ToString().ShouldBe("test-value");
    }

    [Fact]
    public async Task Headers_MultipleHeaders_AllPreserved()
    {
        await using var pub = fixture.CreateClient();
        await using var sub = fixture.CreateClient();
        await pub.ConnectAsync();
        await sub.ConnectAsync();

        await using var subscription = await sub.SubscribeCoreAsync<string>("e2e.hdr.multi");
        await sub.PingAsync();

        var headers = new NatsHeaders
        {
            { "X-First", "one" },
            { "X-Second", "two" },
            { "X-Third", "three" },
        };
        await pub.PublishAsync("e2e.hdr.multi", "multi-hdr", headers: headers);

        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
        var msg = await subscription.Msgs.ReadAsync(cts.Token);

        msg.Headers.ShouldNotBeNull();
        msg.Headers!["X-First"].ToString().ShouldBe("one");
        msg.Headers!["X-Second"].ToString().ShouldBe("two");
        msg.Headers!["X-Third"].ToString().ShouldBe("three");
    }

    [Fact]
    public async Task Headers_EmptyValue_RoundTrips()
    {
        await using var pub = fixture.CreateClient();
        await using var sub = fixture.CreateClient();
        await pub.ConnectAsync();
        await sub.ConnectAsync();

        await using var subscription = await sub.SubscribeCoreAsync<string>("e2e.hdr.empty");
        await sub.PingAsync();

        var headers = new NatsHeaders { { "X-Empty", "" } };
        await pub.PublishAsync("e2e.hdr.empty", "empty-val", headers: headers);

        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
        var msg = await subscription.Msgs.ReadAsync(cts.Token);

        msg.Headers.ShouldNotBeNull();
        msg.Headers!.ContainsKey("X-Empty").ShouldBeTrue();
        msg.Headers!["X-Empty"].ToString().ShouldBe("");
    }
}

Step 2: Run tests

Run: dotnet test tests/NATS.E2E.Tests --filter "FullyQualifiedName~HeaderTests" -v normal Expected: 3 passed

Step 3: Commit

git add tests/NATS.E2E.Tests/HeaderTests.cs
git commit -m "test: add E2E header pub/sub tests (HPUB/HMSG)"

Task 4: Shutdown and Drain Tests

Files:

  • Create: tests/NATS.E2E.Tests/ShutdownDrainTests.cs

These tests create their own server per test (no shared fixture) since they kill the server.

Step 1: Write ShutdownDrainTests.cs

using NATS.Client.Core;
using NATS.E2E.Tests.Infrastructure;

namespace NATS.E2E.Tests;

public class ShutdownDrainTests
{
    [Fact]
    public async Task ClientDrain_CompletesInFlightMessages()
    {
        await using var server = new NatsServerProcess();
        await server.StartAsync();

        await using var pub = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{server.Port}" });
        var sub = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{server.Port}" });
        await using var _ = sub;

        await pub.ConnectAsync();
        await sub.ConnectAsync();

        await using var subscription = await sub.SubscribeCoreAsync<string>("e2e.drain.>");
        await sub.PingAsync();

        // Publish some messages
        for (var i = 0; i < 10; i++)
            await pub.PublishAsync($"e2e.drain.{i}", $"msg{i}");
        await pub.PingAsync();

        // Read messages before drain
        var received = new List<string?>();
        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));

        // Collect what we can, then dispose (drain) the subscription
        for (var i = 0; i < 10; i++)
        {
            var msg = await subscription.Msgs.ReadAsync(cts.Token);
            received.Add(msg.Data);
        }

        received.Count.ShouldBe(10);

        // Dispose the client (which drains)
        await sub.DisposeAsync();
    }

    [Fact]
    public async Task ServerShutdown_ClientDetectsDisconnection()
    {
        var server = new NatsServerProcess();
        await server.StartAsync();

        await using var client = new NatsConnection(new NatsOpts
        {
            Url = $"nats://127.0.0.1:{server.Port}",
            MaxReconnectRetry = 0,
        });
        await client.ConnectAsync();
        await client.PingAsync();

        client.ConnectionState.ShouldBe(NatsConnectionState.Open);

        // Kill the server
        await server.DisposeAsync();

        // Wait for client to detect disconnection
        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
        while (client.ConnectionState == NatsConnectionState.Open && !cts.IsCancellationRequested)
            await Task.Delay(100, cts.Token);

        client.ConnectionState.ShouldNotBe(NatsConnectionState.Open);
    }
}

Step 2: Run tests

Run: dotnet test tests/NATS.E2E.Tests --filter "FullyQualifiedName~ShutdownDrainTests" -v normal Expected: 2 passed

Step 3: Commit

git add tests/NATS.E2E.Tests/ShutdownDrainTests.cs
git commit -m "test: add E2E shutdown and drain tests"

Task 5: JetStream Extended Tests

Files:

  • Modify: tests/NATS.E2E.Tests/JetStreamTests.cs

Adds 8 new tests to the existing JetStream test file using the existing JetStreamServerFixture.

Step 1: Add push consumer test

Append to JetStreamTests.cs before the closing }:

    // -------------------------------------------------------------------------
    // Test 11 — Push consumer delivers messages to a subject
    // -------------------------------------------------------------------------
    [Fact]
    public async Task Consumer_PushDelivery()
    {
        await using var client = fixture.CreateClient();
        await client.ConnectAsync();
        var js = new NatsJSContext(client);

        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15));

        var streamName = $"E2E_PUSH_{Random.Shared.Next(100000)}";
        await js.CreateStreamAsync(new StreamConfig(streamName, [$"js.push.{streamName}.>"]), cts.Token);

        for (var i = 0; i < 5; i++)
            await js.PublishAsync($"js.push.{streamName}.{i}", $"push{i}", cancellationToken: cts.Token);

        var deliverSubject = $"_deliver.{streamName}";
        await js.CreateOrUpdateConsumerAsync(streamName,
            new ConsumerConfig
            {
                Name = "push-consumer",
                AckPolicy = ConsumerConfigAckPolicy.Explicit,
                DeliverSubject = deliverSubject,
            },
            cts.Token);

        // Subscribe to the deliver subject to receive push messages
        await using var subscription = await client.SubscribeCoreAsync<string>(deliverSubject);
        await client.PingAsync();

        var received = new List<string?>();
        for (var i = 0; i < 5; i++)
        {
            var msg = await subscription.Msgs.ReadAsync(cts.Token);
            received.Add(msg.Data);
        }

        received.Count.ShouldBe(5);
    }

    // -------------------------------------------------------------------------
    // Test 12 — AckNone policy: messages delivered without requiring ack
    // -------------------------------------------------------------------------
    [Fact]
    public async Task Consumer_AckNone()
    {
        await using var client = fixture.CreateClient();
        await client.ConnectAsync();
        var js = new NatsJSContext(client);

        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));

        var streamName = $"E2E_ACKNONE_{Random.Shared.Next(100000)}";
        await js.CreateStreamAsync(new StreamConfig(streamName, [$"js.acknone.{streamName}.>"]), cts.Token);

        for (var i = 0; i < 3; i++)
            await js.PublishAsync($"js.acknone.{streamName}.{i}", $"data{i}", cancellationToken: cts.Token);

        await js.CreateOrUpdateConsumerAsync(streamName,
            new ConsumerConfig { Name = "none-consumer", AckPolicy = ConsumerConfigAckPolicy.None },
            cts.Token);

        var consumer = await js.GetConsumerAsync(streamName, "none-consumer", cts.Token);

        var received = new List<string?>();
        await foreach (var msg in consumer.FetchAsync<string>(new NatsJSFetchOpts { MaxMsgs = 3 }, cancellationToken: cts.Token))
            received.Add(msg.Data);

        received.Count.ShouldBe(3);

        // With AckNone, consumer should have no pending acks — re-fetch yields nothing
        var second = new List<string?>();
        await foreach (var msg in consumer.FetchAsync<string>(new NatsJSFetchOpts { MaxMsgs = 3, Expires = TimeSpan.FromSeconds(1) }, cancellationToken: cts.Token))
            second.Add(msg.Data);

        second.Count.ShouldBe(0);
    }

    // -------------------------------------------------------------------------
    // Test 13 — AckAll policy: acking last message acks all prior
    // -------------------------------------------------------------------------
    [Fact]
    public async Task Consumer_AckAll()
    {
        await using var client = fixture.CreateClient();
        await client.ConnectAsync();
        var js = new NatsJSContext(client);

        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));

        var streamName = $"E2E_ACKALL_{Random.Shared.Next(100000)}";
        await js.CreateStreamAsync(new StreamConfig(streamName, [$"js.ackall.{streamName}.>"]), cts.Token);

        for (var i = 0; i < 5; i++)
            await js.PublishAsync($"js.ackall.{streamName}.{i}", $"data{i}", cancellationToken: cts.Token);

        await js.CreateOrUpdateConsumerAsync(streamName,
            new ConsumerConfig { Name = "all-consumer", AckPolicy = ConsumerConfigAckPolicy.All },
            cts.Token);

        var consumer = await js.GetConsumerAsync(streamName, "all-consumer", cts.Token);

        // Fetch all 5 messages, only ack the last one
        NatsJSMsg<string>? lastMsg = null;
        await foreach (var msg in consumer.FetchAsync<string>(new NatsJSFetchOpts { MaxMsgs = 5 }, cancellationToken: cts.Token))
            lastMsg = msg;

        lastMsg.ShouldNotBeNull();
        await lastMsg.Value.AckAsync(cancellationToken: cts.Token);

        // Re-fetch should yield nothing since all were acked
        var second = new List<string?>();
        await foreach (var msg in consumer.FetchAsync<string>(new NatsJSFetchOpts { MaxMsgs = 5, Expires = TimeSpan.FromSeconds(1) }, cancellationToken: cts.Token))
            second.Add(msg.Data);

        second.Count.ShouldBe(0);
    }

    // -------------------------------------------------------------------------
    // Test 14 — Interest retention: messages removed when all consumers ack
    // -------------------------------------------------------------------------
    [Fact]
    public async Task Retention_Interest()
    {
        await using var client = fixture.CreateClient();
        await client.ConnectAsync();
        var js = new NatsJSContext(client);

        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));

        var streamName = $"E2E_INTEREST_{Random.Shared.Next(100000)}";
        await js.CreateStreamAsync(
            new StreamConfig(streamName, [$"js.interest.{streamName}.>"])
            {
                Retention = StreamConfigRetention.Interest,
            },
            cts.Token);

        // Create a consumer BEFORE publishing (Interest retention requires active consumers)
        await js.CreateOrUpdateConsumerAsync(streamName,
            new ConsumerConfig { Name = "interest-c", AckPolicy = ConsumerConfigAckPolicy.Explicit },
            cts.Token);

        for (var i = 0; i < 3; i++)
            await js.PublishAsync($"js.interest.{streamName}.{i}", $"val{i}", cancellationToken: cts.Token);

        var consumer = await js.GetConsumerAsync(streamName, "interest-c", cts.Token);

        // Fetch and ack all
        await foreach (var msg in consumer.FetchAsync<string>(new NatsJSFetchOpts { MaxMsgs = 3 }, cancellationToken: cts.Token))
            await msg.AckAsync(cancellationToken: cts.Token);

        // Allow server to process acks
        await Task.Delay(500, cts.Token);

        // Stream should have 0 messages (interest satisfied)
        var stream = await js.GetStreamAsync(streamName, cancellationToken: cts.Token);
        stream.Info.State.Messages.ShouldBe(0L);
    }

    // -------------------------------------------------------------------------
    // Test 15 — WorkQueue retention: message removed after single consumer ack
    // -------------------------------------------------------------------------
    [Fact]
    public async Task Retention_WorkQueue()
    {
        await using var client = fixture.CreateClient();
        await client.ConnectAsync();
        var js = new NatsJSContext(client);

        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));

        var streamName = $"E2E_WQ_{Random.Shared.Next(100000)}";
        await js.CreateStreamAsync(
            new StreamConfig(streamName, [$"js.wq.{streamName}.>"])
            {
                Retention = StreamConfigRetention.Workqueue,
            },
            cts.Token);

        await js.CreateOrUpdateConsumerAsync(streamName,
            new ConsumerConfig { Name = "wq-c", AckPolicy = ConsumerConfigAckPolicy.Explicit },
            cts.Token);

        for (var i = 0; i < 5; i++)
            await js.PublishAsync($"js.wq.{streamName}.{i}", $"work{i}", cancellationToken: cts.Token);

        var consumer = await js.GetConsumerAsync(streamName, "wq-c", cts.Token);

        await foreach (var msg in consumer.FetchAsync<string>(new NatsJSFetchOpts { MaxMsgs = 5 }, cancellationToken: cts.Token))
            await msg.AckAsync(cancellationToken: cts.Token);

        await Task.Delay(500, cts.Token);

        var stream = await js.GetStreamAsync(streamName, cancellationToken: cts.Token);
        stream.Info.State.Messages.ShouldBe(0L);
    }

    // -------------------------------------------------------------------------
    // Test 16 — Ordered consumer: messages arrive in sequence order
    // -------------------------------------------------------------------------
    [Fact]
    public async Task Consumer_Ordered()
    {
        await using var client = fixture.CreateClient();
        await client.ConnectAsync();
        var js = new NatsJSContext(client);

        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));

        var streamName = $"E2E_ORD_{Random.Shared.Next(100000)}";
        await js.CreateStreamAsync(new StreamConfig(streamName, [$"js.ord.{streamName}.>"]), cts.Token);

        for (var i = 0; i < 10; i++)
            await js.PublishAsync($"js.ord.{streamName}.{i}", $"seq{i}", cancellationToken: cts.Token);

        var consumer = await js.CreateOrderedConsumerAsync(streamName, cancellationToken: cts.Token);

        var received = new List<string?>();
        await foreach (var msg in consumer.FetchAsync<string>(new NatsJSFetchOpts { MaxMsgs = 10 }, cancellationToken: cts.Token))
            received.Add(msg.Data);

        received.Count.ShouldBe(10);
        for (var i = 0; i < 10; i++)
            received[i].ShouldBe($"seq{i}");
    }

    // -------------------------------------------------------------------------
    // Test 17 — Stream mirroring: mirror reflects source messages
    // -------------------------------------------------------------------------
    [Fact]
    public async Task Stream_Mirror()
    {
        await using var client = fixture.CreateClient();
        await client.ConnectAsync();
        var js = new NatsJSContext(client);

        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15));

        var sourceName = $"E2E_MIRROR_SRC_{Random.Shared.Next(100000)}";
        var mirrorName = $"E2E_MIRROR_DST_{Random.Shared.Next(100000)}";

        await js.CreateStreamAsync(new StreamConfig(sourceName, [$"js.mirror.{sourceName}.>"]), cts.Token);

        for (var i = 0; i < 5; i++)
            await js.PublishAsync($"js.mirror.{sourceName}.{i}", $"mirrored{i}", cancellationToken: cts.Token);

        await js.CreateStreamAsync(
            new StreamConfig(mirrorName, [])
            {
                Mirror = new StreamSource { Name = sourceName },
            },
            cts.Token);

        // Wait for mirror to catch up
        await Task.Delay(2000, cts.Token);

        var mirror = await js.GetStreamAsync(mirrorName, cancellationToken: cts.Token);
        mirror.Info.State.Messages.ShouldBe(5L);
    }

    // -------------------------------------------------------------------------
    // Test 18 — Stream sourcing: sourced stream aggregates from source
    // -------------------------------------------------------------------------
    [Fact]
    public async Task Stream_Source()
    {
        await using var client = fixture.CreateClient();
        await client.ConnectAsync();
        var js = new NatsJSContext(client);

        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15));

        var srcName = $"E2E_SRC_{Random.Shared.Next(100000)}";
        var aggName = $"E2E_AGG_{Random.Shared.Next(100000)}";

        await js.CreateStreamAsync(new StreamConfig(srcName, [$"js.source.{srcName}.>"]), cts.Token);

        for (var i = 0; i < 5; i++)
            await js.PublishAsync($"js.source.{srcName}.{i}", $"sourced{i}", cancellationToken: cts.Token);

        await js.CreateStreamAsync(
            new StreamConfig(aggName, [])
            {
                Sources = [new StreamSource { Name = srcName }],
            },
            cts.Token);

        // Wait for source to replicate
        await Task.Delay(2000, cts.Token);

        var agg = await js.GetStreamAsync(aggName, cancellationToken: cts.Token);
        agg.Info.State.Messages.ShouldBe(5L);
    }

Step 2: Run tests

Run: dotnet test tests/NATS.E2E.Tests --filter "FullyQualifiedName~JetStreamTests" -v normal Expected: 18 passed (10 existing + 8 new)

Step 3: Commit

git add tests/NATS.E2E.Tests/JetStreamTests.cs
git commit -m "test: add E2E JetStream push consumers, ACK policies, retention modes, ordered, mirror, source"

Task 6: Cluster Fixture and Tests

Files:

  • Create: tests/NATS.E2E.Tests/Infrastructure/ClusterFixture.cs
  • Create: tests/NATS.E2E.Tests/ClusterTests.cs

Step 1: Write ClusterFixture.cs

The fixture starts 3 NATS server processes with route config pointing at each other. Each server gets an ephemeral client port and an ephemeral cluster port.

using NATS.Client.Core;

namespace NATS.E2E.Tests.Infrastructure;

public sealed class ClusterFixture : IAsyncLifetime
{
    private NatsServerProcess _server1 = null!;
    private NatsServerProcess _server2 = null!;
    private NatsServerProcess _server3 = null!;

    public int Port1 => _server1.Port;
    public int Port2 => _server2.Port;
    public int Port3 => _server3.Port;

    public async Task InitializeAsync()
    {
        var clusterPort1 = NatsServerProcess.AllocateFreePort();
        var clusterPort2 = NatsServerProcess.AllocateFreePort();
        var clusterPort3 = NatsServerProcess.AllocateFreePort();

        var routes = $"""
            nats-route://127.0.0.1:{clusterPort1}
            nats-route://127.0.0.1:{clusterPort2}
            nats-route://127.0.0.1:{clusterPort3}
            """;

        string MakeConfig(string name, int clusterPort) => $$"""
            server_name: {{name}}
            cluster {
                name: e2e-cluster
                listen: 127.0.0.1:{{clusterPort}}
                routes: [
                    {{routes}}
                ]
            }
            """;

        _server1 = NatsServerProcess.WithConfig(MakeConfig("node1", clusterPort1));
        _server2 = NatsServerProcess.WithConfig(MakeConfig("node2", clusterPort2));
        _server3 = NatsServerProcess.WithConfig(MakeConfig("node3", clusterPort3));

        // Start all three in parallel
        await Task.WhenAll(
            _server1.StartAsync(),
            _server2.StartAsync(),
            _server3.StartAsync());

        // Give routes time to form
        await Task.Delay(2000);
    }

    public async Task DisposeAsync()
    {
        await Task.WhenAll(
            _server1.DisposeAsync().AsTask(),
            _server2.DisposeAsync().AsTask(),
            _server3.DisposeAsync().AsTask());
    }

    public NatsConnection CreateClient(int nodeIndex = 0)
    {
        var port = nodeIndex switch
        {
            0 => Port1,
            1 => Port2,
            2 => Port3,
            _ => throw new ArgumentOutOfRangeException(nameof(nodeIndex)),
        };
        return new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{port}" });
    }
}

[CollectionDefinition("E2E-Cluster")]
public class ClusterCollection : ICollectionFixture<ClusterFixture>;

Step 2: Write ClusterTests.cs

using NATS.Client.Core;
using NATS.E2E.Tests.Infrastructure;

namespace NATS.E2E.Tests;

[Collection("E2E-Cluster")]
public class ClusterTests(ClusterFixture fixture)
{
    [Fact]
    public async Task Cluster_MessagePropagatesAcrossNodes()
    {
        await using var pub = fixture.CreateClient(0);
        await using var sub = fixture.CreateClient(1);
        await pub.ConnectAsync();
        await sub.ConnectAsync();

        await using var subscription = await sub.SubscribeCoreAsync<string>("e2e.cluster.cross");
        await sub.PingAsync();

        // Give route subscription propagation time
        await Task.Delay(500);

        await pub.PublishAsync("e2e.cluster.cross", "across-nodes");

        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
        var msg = await subscription.Msgs.ReadAsync(cts.Token);
        msg.Data.ShouldBe("across-nodes");
    }

    [Fact]
    public async Task Cluster_LateSubscriberReceivesMessages()
    {
        await using var pub = fixture.CreateClient(0);
        await pub.ConnectAsync();

        // Subscribe on node 2 mid-stream
        await using var sub = fixture.CreateClient(2);
        await sub.ConnectAsync();

        await using var subscription = await sub.SubscribeCoreAsync<string>("e2e.cluster.late");
        await sub.PingAsync();

        await Task.Delay(500);

        await pub.PublishAsync("e2e.cluster.late", "late-join");

        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
        var msg = await subscription.Msgs.ReadAsync(cts.Token);
        msg.Data.ShouldBe("late-join");
    }

    [Fact]
    public async Task Cluster_QueueGroupAcrossNodes_DeliversOnce()
    {
        await using var pub = fixture.CreateClient(0);
        await using var sub1 = fixture.CreateClient(1);
        await using var sub2 = fixture.CreateClient(2);
        await pub.ConnectAsync();
        await sub1.ConnectAsync();
        await sub2.ConnectAsync();

        await using var s1 = await sub1.SubscribeCoreAsync<int>("e2e.cluster.qg", queueGroup: "cq");
        await using var s2 = await sub2.SubscribeCoreAsync<int>("e2e.cluster.qg", queueGroup: "cq");
        await sub1.PingAsync();
        await sub2.PingAsync();

        await Task.Delay(500);

        using var collectionCts = new CancellationTokenSource(TimeSpan.FromSeconds(15));
        var count1 = 0;
        var count2 = 0;

        async Task Collect(INatsSub<int> sub, Action inc)
        {
            try
            {
                await foreach (var _ in sub.Msgs.ReadAllAsync(collectionCts.Token))
                    inc();
            }
            catch (OperationCanceledException) { }
        }

        var tasks = new[]
        {
            Collect(s1, () => Interlocked.Increment(ref count1)),
            Collect(s2, () => Interlocked.Increment(ref count2)),
        };

        for (var i = 0; i < 20; i++)
            await pub.PublishAsync("e2e.cluster.qg", i);
        await pub.PingAsync();

        using var deadline = new CancellationTokenSource(TimeSpan.FromSeconds(10));
        while (count1 + count2 < 20 && !deadline.IsCancellationRequested)
            await Task.Delay(50, deadline.Token).ContinueWith(_ => { });

        collectionCts.Cancel();
        await Task.WhenAll(tasks);

        (count1 + count2).ShouldBe(20);
    }
}

Step 3: Run tests

Run: dotnet test tests/NATS.E2E.Tests --filter "FullyQualifiedName~ClusterTests" -v normal Expected: 3 passed

Step 4: Commit

git add tests/NATS.E2E.Tests/Infrastructure/ClusterFixture.cs tests/NATS.E2E.Tests/ClusterTests.cs
git commit -m "test: add E2E cluster tests (cross-node messaging, late join, queue groups)"

Task 7: Leaf Node Fixture and Tests

Files:

  • Create: tests/NATS.E2E.Tests/Infrastructure/LeafNodeFixture.cs
  • Create: tests/NATS.E2E.Tests/LeafNodeTests.cs

Step 1: Write LeafNodeFixture.cs

using NATS.Client.Core;

namespace NATS.E2E.Tests.Infrastructure;

public sealed class LeafNodeFixture : IAsyncLifetime
{
    private NatsServerProcess _hub = null!;
    private NatsServerProcess _leaf = null!;

    public int HubPort => _hub.Port;
    public int LeafPort => _leaf.Port;

    public async Task InitializeAsync()
    {
        var leafListenPort = NatsServerProcess.AllocateFreePort();

        var hubConfig = $$"""
            server_name: hub
            leafnodes {
                listen: 127.0.0.1:{{leafListenPort}}
            }
            """;

        _hub = NatsServerProcess.WithConfig(hubConfig);
        await _hub.StartAsync();

        var leafConfig = $$"""
            server_name: leaf
            leafnodes {
                remotes [
                    { url: "nats-leaf://127.0.0.1:{{leafListenPort}}" }
                ]
            }
            """;

        _leaf = NatsServerProcess.WithConfig(leafConfig);
        await _leaf.StartAsync();

        // Give leaf time to connect to hub
        await Task.Delay(2000);
    }

    public async Task DisposeAsync()
    {
        await _leaf.DisposeAsync();
        await _hub.DisposeAsync();
    }

    public NatsConnection CreateHubClient()
        => new(new NatsOpts { Url = $"nats://127.0.0.1:{HubPort}" });

    public NatsConnection CreateLeafClient()
        => new(new NatsOpts { Url = $"nats://127.0.0.1:{LeafPort}" });
}

[CollectionDefinition("E2E-LeafNode")]
public class LeafNodeCollection : ICollectionFixture<LeafNodeFixture>;

Step 2: Write LeafNodeTests.cs

using NATS.Client.Core;
using NATS.E2E.Tests.Infrastructure;

namespace NATS.E2E.Tests;

[Collection("E2E-LeafNode")]
public class LeafNodeTests(LeafNodeFixture fixture)
{
    [Fact]
    public async Task LeafNode_HubToLeaf_MessageDelivered()
    {
        await using var pub = fixture.CreateHubClient();
        await using var sub = fixture.CreateLeafClient();
        await pub.ConnectAsync();
        await sub.ConnectAsync();

        await using var subscription = await sub.SubscribeCoreAsync<string>("e2e.leaf.h2l");
        await sub.PingAsync();

        // Give subscription propagation time through leaf connection
        await Task.Delay(500);

        await pub.PublishAsync("e2e.leaf.h2l", "hub-to-leaf");

        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
        var msg = await subscription.Msgs.ReadAsync(cts.Token);
        msg.Data.ShouldBe("hub-to-leaf");
    }

    [Fact]
    public async Task LeafNode_LeafToHub_MessageDelivered()
    {
        await using var pub = fixture.CreateLeafClient();
        await using var sub = fixture.CreateHubClient();
        await pub.ConnectAsync();
        await sub.ConnectAsync();

        await using var subscription = await sub.SubscribeCoreAsync<string>("e2e.leaf.l2h");
        await sub.PingAsync();

        await Task.Delay(500);

        await pub.PublishAsync("e2e.leaf.l2h", "leaf-to-hub");

        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
        var msg = await subscription.Msgs.ReadAsync(cts.Token);
        msg.Data.ShouldBe("leaf-to-hub");
    }

    [Fact]
    public async Task LeafNode_OnlySubscribedSubjectsPropagate()
    {
        await using var pub = fixture.CreateHubClient();
        await using var sub = fixture.CreateLeafClient();
        await pub.ConnectAsync();
        await sub.ConnectAsync();

        // Subscribe to a specific subject on leaf
        await using var subscription = await sub.SubscribeCoreAsync<string>("e2e.leaf.specific");
        await sub.PingAsync();
        await Task.Delay(500);

        // Publish to a different subject on hub — should NOT arrive at leaf subscriber
        await pub.PublishAsync("e2e.leaf.other", "wrong-subject");

        // Publish to the correct subject — SHOULD arrive
        await pub.PublishAsync("e2e.leaf.specific", "right-subject");

        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
        var msg = await subscription.Msgs.ReadAsync(cts.Token);
        msg.Data.ShouldBe("right-subject");
    }
}

Step 3: Run tests

Run: dotnet test tests/NATS.E2E.Tests --filter "FullyQualifiedName~LeafNodeTests" -v normal Expected: 3 passed

Step 4: Commit

git add tests/NATS.E2E.Tests/Infrastructure/LeafNodeFixture.cs tests/NATS.E2E.Tests/LeafNodeTests.cs
git commit -m "test: add E2E leaf node tests (hub-to-leaf, leaf-to-hub, subject propagation)"

Task 8: Gateway Fixture and Tests

Files:

  • Create: tests/NATS.E2E.Tests/Infrastructure/GatewayFixture.cs
  • Create: tests/NATS.E2E.Tests/GatewayTests.cs

Step 1: Write GatewayFixture.cs

using NATS.Client.Core;

namespace NATS.E2E.Tests.Infrastructure;

public sealed class GatewayFixture : IAsyncLifetime
{
    private NatsServerProcess _serverA = null!;
    private NatsServerProcess _serverB = null!;

    public int PortA => _serverA.Port;
    public int PortB => _serverB.Port;

    public async Task InitializeAsync()
    {
        var gwPortA = NatsServerProcess.AllocateFreePort();
        var gwPortB = NatsServerProcess.AllocateFreePort();

        var configA = $$"""
            server_name: gw-a
            gateway {
                name: cluster-a
                listen: 127.0.0.1:{{gwPortA}}
                gateways: [
                    { name: cluster-b, url: nats://127.0.0.1:{{gwPortB}} }
                ]
            }
            """;

        var configB = $$"""
            server_name: gw-b
            gateway {
                name: cluster-b
                listen: 127.0.0.1:{{gwPortB}}
                gateways: [
                    { name: cluster-a, url: nats://127.0.0.1:{{gwPortA}} }
                ]
            }
            """;

        _serverA = NatsServerProcess.WithConfig(configA);
        _serverB = NatsServerProcess.WithConfig(configB);

        await Task.WhenAll(_serverA.StartAsync(), _serverB.StartAsync());

        // Give gateways time to connect
        await Task.Delay(2000);
    }

    public async Task DisposeAsync()
    {
        await Task.WhenAll(
            _serverA.DisposeAsync().AsTask(),
            _serverB.DisposeAsync().AsTask());
    }

    public NatsConnection CreateClientA()
        => new(new NatsOpts { Url = $"nats://127.0.0.1:{PortA}" });

    public NatsConnection CreateClientB()
        => new(new NatsOpts { Url = $"nats://127.0.0.1:{PortB}" });
}

[CollectionDefinition("E2E-Gateway")]
public class GatewayCollection : ICollectionFixture<GatewayFixture>;

Step 2: Write GatewayTests.cs

using NATS.Client.Core;
using NATS.E2E.Tests.Infrastructure;

namespace NATS.E2E.Tests;

[Collection("E2E-Gateway")]
public class GatewayTests(GatewayFixture fixture)
{
    [Fact]
    public async Task Gateway_MessageCrossesGateway()
    {
        await using var pub = fixture.CreateClientA();
        await using var sub = fixture.CreateClientB();
        await pub.ConnectAsync();
        await sub.ConnectAsync();

        await using var subscription = await sub.SubscribeCoreAsync<string>("e2e.gw.cross");
        await sub.PingAsync();

        // Give gateway interest propagation time
        await Task.Delay(1000);

        await pub.PublishAsync("e2e.gw.cross", "gateway-msg");

        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
        var msg = await subscription.Msgs.ReadAsync(cts.Token);
        msg.Data.ShouldBe("gateway-msg");
    }

    [Fact]
    public async Task Gateway_NoInterest_NoDelivery()
    {
        await using var pub = fixture.CreateClientA();
        await using var sub = fixture.CreateClientB();
        await pub.ConnectAsync();
        await sub.ConnectAsync();

        // Subscribe on B to a DIFFERENT subject than what A publishes
        await using var subscription = await sub.SubscribeCoreAsync<string>("e2e.gw.listen");
        await sub.PingAsync();
        await Task.Delay(1000);

        // Publish to a subject with no interest on B
        await pub.PublishAsync("e2e.gw.nolisten", "should-not-arrive");
        // Publish to the subscribed subject to verify gateway works
        await pub.PublishAsync("e2e.gw.listen", "should-arrive");

        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
        var msg = await subscription.Msgs.ReadAsync(cts.Token);
        msg.Data.ShouldBe("should-arrive");
    }
}

Step 3: Run tests

Run: dotnet test tests/NATS.E2E.Tests --filter "FullyQualifiedName~GatewayTests" -v normal Expected: 2 passed

Step 4: Commit

git add tests/NATS.E2E.Tests/Infrastructure/GatewayFixture.cs tests/NATS.E2E.Tests/GatewayTests.cs
git commit -m "test: add E2E gateway tests (cross-gateway messaging, interest-only)"

Task 9: MQTT Fixture and Tests

Files:

  • Create: tests/NATS.E2E.Tests/Infrastructure/MqttServerFixture.cs
  • Create: tests/NATS.E2E.Tests/MqttTests.cs

Step 1: Write MqttServerFixture.cs

The MQTT fixture starts a server with JetStream enabled (required for MQTT persistence) and an MQTT port.

using NATS.Client.Core;

namespace NATS.E2E.Tests.Infrastructure;

public sealed class MqttServerFixture : IAsyncLifetime
{
    private NatsServerProcess _server = null!;
    private string _storeDir = null!;

    public int Port => _server.Port;
    public int MqttPort { get; private set; }

    public async Task InitializeAsync()
    {
        MqttPort = NatsServerProcess.AllocateFreePort();
        _storeDir = Path.Combine(Path.GetTempPath(), "nats-e2e-mqtt-" + Guid.NewGuid().ToString("N")[..8]);
        Directory.CreateDirectory(_storeDir);

        var config = $$"""
            jetstream {
                store_dir: "{{_storeDir}}"
                max_mem_store: 64mb
                max_file_store: 256mb
            }
            mqtt {
                listen: 127.0.0.1:{{MqttPort}}
            }
            """;

        _server = NatsServerProcess.WithConfig(config);
        await _server.StartAsync();
    }

    public async Task DisposeAsync()
    {
        await _server.DisposeAsync();

        if (_storeDir is not null && Directory.Exists(_storeDir))
        {
            try { Directory.Delete(_storeDir, recursive: true); }
            catch { /* best-effort cleanup */ }
        }
    }

    public NatsConnection CreateNatsClient()
        => new(new NatsOpts { Url = $"nats://127.0.0.1:{Port}" });
}

[CollectionDefinition("E2E-Mqtt")]
public class MqttCollection : ICollectionFixture<MqttServerFixture>;

Step 2: Write MqttTests.cs

using MQTTnet;
using MQTTnet.Client;
using NATS.Client.Core;
using NATS.E2E.Tests.Infrastructure;

namespace NATS.E2E.Tests;

[Collection("E2E-Mqtt")]
public class MqttTests(MqttServerFixture fixture)
{
    [Fact]
    public async Task Mqtt_NatsPublish_MqttReceives()
    {
        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15));

        // Connect MQTT subscriber
        var mqttFactory = new MqttFactory();
        using var mqttClient = mqttFactory.CreateMqttClient();
        var mqttOpts = new MqttClientOptionsBuilder()
            .WithTcpServer("127.0.0.1", fixture.MqttPort)
            .WithClientId("e2e-mqtt-sub")
            .Build();

        await mqttClient.ConnectAsync(mqttOpts, cts.Token);

        string? receivedPayload = null;
        var received = new TaskCompletionSource<string>();

        mqttClient.ApplicationMessageReceivedAsync += e =>
        {
            var payload = System.Text.Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment);
            received.TrySetResult(payload);
            return Task.CompletedTask;
        };

        await mqttClient.SubscribeAsync(
            new MqttClientSubscribeOptionsBuilder()
                .WithTopicFilter("e2e/mqtt/nats2mqtt")
                .Build(),
            cts.Token);

        // Publish via NATS (MQTT topics use / but NATS maps / to .)
        await using var natsClient = fixture.CreateNatsClient();
        await natsClient.ConnectAsync();

        // NATS subject maps: e2e.mqtt.nats2mqtt → MQTT topic e2e/mqtt/nats2mqtt
        await natsClient.PublishAsync("e2e.mqtt.nats2mqtt", "from-nats");

        var result = await received.Task.WaitAsync(cts.Token);
        result.ShouldBe("from-nats");

        await mqttClient.DisconnectAsync(cancellationToken: cts.Token);
    }

    [Fact]
    public async Task Mqtt_MqttPublish_NatsReceives()
    {
        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15));

        // Subscribe via NATS
        await using var natsClient = fixture.CreateNatsClient();
        await natsClient.ConnectAsync();

        await using var subscription = await natsClient.SubscribeCoreAsync<string>("e2e.mqtt.mqtt2nats");
        await natsClient.PingAsync();

        // Connect MQTT publisher
        var mqttFactory = new MqttFactory();
        using var mqttClient = mqttFactory.CreateMqttClient();
        var mqttOpts = new MqttClientOptionsBuilder()
            .WithTcpServer("127.0.0.1", fixture.MqttPort)
            .WithClientId("e2e-mqtt-pub")
            .Build();

        await mqttClient.ConnectAsync(mqttOpts, cts.Token);

        var message = new MqttApplicationMessageBuilder()
            .WithTopic("e2e/mqtt/mqtt2nats")
            .WithPayload("from-mqtt"u8.ToArray())
            .Build();

        await mqttClient.PublishAsync(message, cts.Token);

        var msg = await subscription.Msgs.ReadAsync(cts.Token);
        msg.Data.ShouldBe("from-mqtt");

        await mqttClient.DisconnectAsync(cancellationToken: cts.Token);
    }

    [Fact]
    public async Task Mqtt_Qos1_Delivery()
    {
        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15));

        var mqttFactory = new MqttFactory();
        using var mqttPub = mqttFactory.CreateMqttClient();
        using var mqttSub = mqttFactory.CreateMqttClient();

        var pubOpts = new MqttClientOptionsBuilder()
            .WithTcpServer("127.0.0.1", fixture.MqttPort)
            .WithClientId("e2e-qos1-pub")
            .Build();

        var subOpts = new MqttClientOptionsBuilder()
            .WithTcpServer("127.0.0.1", fixture.MqttPort)
            .WithClientId("e2e-qos1-sub")
            .WithCleanSession(true)
            .Build();

        await mqttSub.ConnectAsync(subOpts, cts.Token);

        var received = new TaskCompletionSource<string>();
        mqttSub.ApplicationMessageReceivedAsync += e =>
        {
            var payload = System.Text.Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment);
            received.TrySetResult(payload);
            return Task.CompletedTask;
        };

        await mqttSub.SubscribeAsync(
            new MqttClientSubscribeOptionsBuilder()
                .WithTopicFilter("e2e/mqtt/qos1", MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce)
                .Build(),
            cts.Token);

        await mqttPub.ConnectAsync(pubOpts, cts.Token);

        var message = new MqttApplicationMessageBuilder()
            .WithTopic("e2e/mqtt/qos1")
            .WithPayload("qos1-msg"u8.ToArray())
            .WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce)
            .Build();

        await mqttPub.PublishAsync(message, cts.Token);

        var result = await received.Task.WaitAsync(cts.Token);
        result.ShouldBe("qos1-msg");

        await mqttPub.DisconnectAsync(cancellationToken: cts.Token);
        await mqttSub.DisconnectAsync(cancellationToken: cts.Token);
    }
}

Step 3: Run tests

Run: dotnet test tests/NATS.E2E.Tests --filter "FullyQualifiedName~MqttTests" -v normal Expected: 3 passed

Step 4: Commit

git add tests/NATS.E2E.Tests/Infrastructure/MqttServerFixture.cs tests/NATS.E2E.Tests/MqttTests.cs
git commit -m "test: add E2E MQTT bridge tests (NATS-to-MQTT, MQTT-to-NATS, QoS 1)"

Task 10: WebSocket Fixture and Tests

Files:

  • Create: tests/NATS.E2E.Tests/Infrastructure/WebSocketServerFixture.cs
  • Create: tests/NATS.E2E.Tests/WebSocketTests.cs

Step 1: Write WebSocketServerFixture.cs

using NATS.Client.Core;

namespace NATS.E2E.Tests.Infrastructure;

public sealed class WebSocketServerFixture : IAsyncLifetime
{
    private NatsServerProcess _server = null!;

    public int Port => _server.Port;
    public int WsPort { get; private set; }

    public async Task InitializeAsync()
    {
        WsPort = NatsServerProcess.AllocateFreePort();

        var config = $$"""
            websocket {
                listen: 127.0.0.1:{{WsPort}}
                no_tls: true
            }
            """;

        _server = NatsServerProcess.WithConfig(config);
        await _server.StartAsync();
    }

    public async Task DisposeAsync()
    {
        await _server.DisposeAsync();
    }

    public NatsConnection CreateNatsClient()
        => new(new NatsOpts { Url = $"nats://127.0.0.1:{Port}" });
}

[CollectionDefinition("E2E-WebSocket")]
public class WebSocketCollection : ICollectionFixture<WebSocketServerFixture>;

Step 2: Write WebSocketTests.cs

Uses raw ClientWebSocket to connect and speak NATS protocol over WebSocket.

using System.Net.WebSockets;
using System.Text;
using NATS.Client.Core;
using NATS.E2E.Tests.Infrastructure;

namespace NATS.E2E.Tests;

[Collection("E2E-WebSocket")]
public class WebSocketTests(WebSocketServerFixture fixture)
{
    [Fact]
    public async Task WebSocket_ConnectAndReceiveInfo()
    {
        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
        using var ws = new ClientWebSocket();

        await ws.ConnectAsync(new Uri($"ws://127.0.0.1:{fixture.WsPort}"), cts.Token);
        ws.State.ShouldBe(WebSocketState.Open);

        // Read INFO line
        var buffer = new byte[4096];
        var result = await ws.ReceiveAsync(buffer, cts.Token);
        var info = Encoding.ASCII.GetString(buffer, 0, result.Count);
        info.ShouldStartWith("INFO");

        await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, null, cts.Token);
    }

    [Fact]
    public async Task WebSocket_PubSub_RoundTrip()
    {
        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
        using var ws = new ClientWebSocket();

        await ws.ConnectAsync(new Uri($"ws://127.0.0.1:{fixture.WsPort}"), cts.Token);

        // Read INFO
        var buffer = new byte[4096];
        await ws.ReceiveAsync(buffer, cts.Token);

        // Send CONNECT
        await WsSend(ws, "CONNECT {\"verbose\":false,\"protocol\":1}\r\n", cts.Token);

        // Subscribe
        await WsSend(ws, "SUB e2e.ws.test 1\r\n", cts.Token);

        // PING/PONG to flush
        await WsSend(ws, "PING\r\n", cts.Token);
        var pong = await WsReadLine(ws, buffer, cts.Token);
        pong.ShouldBe("PONG");

        // Publish via regular NATS client
        await using var natsClient = fixture.CreateNatsClient();
        await natsClient.ConnectAsync();
        await natsClient.PublishAsync("e2e.ws.test", "ws-hello");
        await natsClient.PingAsync();

        // Read MSG from WebSocket
        var msgLine = await WsReadLine(ws, buffer, cts.Token);
        msgLine.ShouldStartWith("MSG e2e.ws.test 1");

        // Read payload
        var payload = await WsReadLine(ws, buffer, cts.Token);
        payload.ShouldBe("ws-hello");

        await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, null, cts.Token);
    }

    private static async Task WsSend(ClientWebSocket ws, string data, CancellationToken ct)
    {
        var bytes = Encoding.ASCII.GetBytes(data);
        await ws.SendAsync(bytes, WebSocketMessageType.Text, true, ct);
    }

    private static async Task<string> WsReadLine(ClientWebSocket ws, byte[] buffer, CancellationToken ct)
    {
        var sb = new StringBuilder();
        while (true)
        {
            var result = await ws.ReceiveAsync(buffer, ct);
            var chunk = Encoding.ASCII.GetString(buffer, 0, result.Count);
            sb.Append(chunk);
            var full = sb.ToString();
            if (full.Contains("\r\n"))
                return full.TrimEnd('\r', '\n');
            if (result.EndOfMessage)
                return full.TrimEnd('\r', '\n');
        }
    }
}

Step 3: Run tests

Run: dotnet test tests/NATS.E2E.Tests --filter "FullyQualifiedName~WebSocketTests" -v normal Expected: 2 passed

Step 4: Commit

git add tests/NATS.E2E.Tests/Infrastructure/WebSocketServerFixture.cs tests/NATS.E2E.Tests/WebSocketTests.cs
git commit -m "test: add E2E WebSocket transport tests (connect, pub/sub round-trip)"

Task 11: Advanced Tests (Config, Max Connections, System Events, Account Imports)

Files:

  • Create: tests/NATS.E2E.Tests/AdvancedTests.cs

These tests each spin up their own server with specific config, so no shared fixture is needed.

Step 1: Write AdvancedTests.cs

using System.Text.Json;
using NATS.Client.Core;
using NATS.E2E.Tests.Infrastructure;

namespace NATS.E2E.Tests;

public class AdvancedTests
{
    // -------------------------------------------------------------------------
    // Config file loading: server started with full config, verify behavior
    // -------------------------------------------------------------------------
    [Fact]
    public async Task ConfigFile_FullConfig_ServerStartsAndAcceptsConnections()
    {
        var config = """
            server_name: e2e-config-test
            max_payload: 2048
            max_connections: 100
            """;

        await using var server = NatsServerProcess.WithConfig(config);
        await server.StartAsync();

        await using var client = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{server.Port}" });
        await client.ConnectAsync();
        await client.PingAsync();

        client.ConnectionState.ShouldBe(NatsConnectionState.Open);
        client.ServerInfo.ShouldNotBeNull();
        client.ServerInfo!.MaxPayload.ShouldBe(2048);
    }

    // -------------------------------------------------------------------------
    // Max connections enforcement
    // -------------------------------------------------------------------------
    [Fact]
    public async Task MaxConnections_ExceedsLimit_Rejected()
    {
        var config = """
            max_connections: 2
            """;

        await using var server = NatsServerProcess.WithConfig(config);
        await server.StartAsync();

        var url = $"nats://127.0.0.1:{server.Port}";

        await using var c1 = new NatsConnection(new NatsOpts { Url = url });
        await using var c2 = new NatsConnection(new NatsOpts { Url = url });

        await c1.ConnectAsync();
        await c1.PingAsync();

        await c2.ConnectAsync();
        await c2.PingAsync();

        // Third connection should be rejected
        await using var c3 = new NatsConnection(new NatsOpts
        {
            Url = url,
            MaxReconnectRetry = 0,
        });

        var ex = await Should.ThrowAsync<Exception>(async () =>
        {
            await c3.ConnectAsync();
            await c3.PingAsync();
        });

        ex.ShouldNotBeNull();
    }

    // -------------------------------------------------------------------------
    // System events: subscribe to $SYS.>, detect client connect event
    // -------------------------------------------------------------------------
    [Fact]
    public async Task SystemEvents_ClientConnect_EventPublished()
    {
        var config = """
            accounts {
                SYS {
                    users = [{ user: "sys", password: "sys" }]
                }
                APP {
                    users = [{ user: "app", password: "app" }]
                }
            }
            system_account: SYS
            """;

        await using var server = NatsServerProcess.WithConfig(config);
        await server.StartAsync();

        var url = $"nats://127.0.0.1:{server.Port}";

        // Connect as system account user to see events
        await using var sysClient = new NatsConnection(new NatsOpts
        {
            Url = url,
            AuthOpts = new NatsAuthOpts { Username = "sys", Password = "sys" },
        });
        await sysClient.ConnectAsync();

        await using var subscription = await sysClient.SubscribeCoreAsync<string>("$SYS.ACCOUNT.*.CONNECT");
        await sysClient.PingAsync();

        // Connect a new app client — should trigger a system event
        await using var appClient = new NatsConnection(new NatsOpts
        {
            Url = url,
            AuthOpts = new NatsAuthOpts { Username = "app", Password = "app" },
        });
        await appClient.ConnectAsync();
        await appClient.PingAsync();

        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
        var msg = await subscription.Msgs.ReadAsync(cts.Token);
        msg.Subject.ShouldContain("CONNECT");
    }

    // -------------------------------------------------------------------------
    // Account imports/exports: cross-account service call
    // -------------------------------------------------------------------------
    [Fact]
    public async Task AccountImportExport_CrossAccountServiceCall()
    {
        var config = """
            accounts {
                PROVIDER {
                    users = [{ user: "provider", password: "prov" }]
                    exports = [
                        { service: "svc.echo" }
                    ]
                }
                CONSUMER {
                    users = [{ user: "consumer", password: "cons" }]
                    imports = [
                        { service: { account: PROVIDER, subject: "svc.echo" } }
                    ]
                }
            }
            """;

        await using var server = NatsServerProcess.WithConfig(config);
        await server.StartAsync();

        var url = $"nats://127.0.0.1:{server.Port}";

        // Provider subscribes to service subject
        await using var provider = new NatsConnection(new NatsOpts
        {
            Url = url,
            AuthOpts = new NatsAuthOpts { Username = "provider", Password = "prov" },
        });
        await provider.ConnectAsync();

        await using var svcSub = await provider.SubscribeCoreAsync<string>("svc.echo");
        await provider.PingAsync();

        // Background task: provider replies to requests
        var responderTask = Task.Run(async () =>
        {
            using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
            var msg = await svcSub.Msgs.ReadAsync(cts.Token);
            await provider.PublishAsync(msg.ReplyTo!, $"echo: {msg.Data}");
        });

        // Consumer calls the imported service
        await using var consumer = new NatsConnection(new NatsOpts
        {
            Url = url,
            AuthOpts = new NatsAuthOpts { Username = "consumer", Password = "cons" },
        });
        await consumer.ConnectAsync();

        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
        var reply = await consumer.RequestAsync<string, string>("svc.echo", "hello", cancellationToken: cts.Token);

        reply.Data.ShouldBe("echo: hello");

        await responderTask;
    }

    // -------------------------------------------------------------------------
    // Subject transforms: publish on mapped subject, receive on target
    // Note: Subject transforms are not yet parsed from config, so this test
    // is skipped until the feature is implemented.
    // -------------------------------------------------------------------------
    [Fact(Skip = "Subject transforms not yet implemented in config parsing")]
    public async Task SubjectTransforms_MappedSubject_ReceivedOnTarget()
    {
        // Placeholder — will be enabled when SubjectMappings config parsing is added
        await Task.CompletedTask;
    }

    // -------------------------------------------------------------------------
    // JWT authentication
    // Note: Full JWT operator mode is not yet implemented in config parsing.
    // NKey auth (the foundation for JWT) is already tested in AuthTests.
    // -------------------------------------------------------------------------
    [Fact(Skip = "JWT operator mode not yet implemented in config parsing")]
    public async Task JwtAuth_ValidJwt_Connects()
    {
        // Placeholder — will be enabled when JWT resolver config parsing is added
        await Task.CompletedTask;
    }
}

Step 2: Run tests

Run: dotnet test tests/NATS.E2E.Tests --filter "FullyQualifiedName~AdvancedTests" -v normal Expected: 4 passed, 2 skipped (SubjectTransforms and JwtAuth)

Step 3: Commit

git add tests/NATS.E2E.Tests/AdvancedTests.cs
git commit -m "test: add E2E advanced tests (config, max connections, system events, account imports)"

Task 12: Final Verification and Cleanup

Files:

  • No new files

Step 1: Build the entire solution

Run: dotnet build Expected: Build succeeded

Step 2: Run ALL E2E tests

Run: dotnet test tests/NATS.E2E.Tests -v normal Expected: ~90+ tests (42 existing + ~48 new), all passing (2 skipped)

Step 3: Verify test count

Run: dotnet test tests/NATS.E2E.Tests --list-tests 2>/dev/null | wc -l Expected: ~90+ lines

Step 4: Commit any final fixes

If any tests fail, fix them and commit:

git add -A
git commit -m "fix: resolve E2E test failures from final verification"

Task Dependency Graph

Task 1 (MQTTnet NuGet) ──────────────────────────────┐
Task 2 (Monitoring) ─────── no deps                   │
Task 3 (Headers) ────────── no deps                   │
Task 4 (Shutdown/Drain) ─── no deps                   │
Task 5 (JetStream Ext) ──── no deps                   │
Task 6 (Cluster) ────────── no deps                   │
Task 7 (Leaf Node) ──────── no deps                   │
Task 8 (Gateway) ────────── no deps                   │
Task 9 (MQTT) ───────────── blocked by Task 1         │
Task 10 (WebSocket) ─────── no deps                   │
Task 11 (Advanced) ──────── no deps                   │
Task 12 (Verification) ──── blocked by ALL above ─────┘

Parallelizable batches:

  • Batch A (all independent): Tasks 1, 2, 3, 4, 5, 6, 7, 8, 10, 11
  • Batch B (after Task 1): Task 9
  • Batch C (after all): Task 12