diff --git a/docs/plans/2026-02-23-jetstream-full-parity-plan.md b/docs/plans/2026-02-23-jetstream-full-parity-plan.md new file mode 100644 index 0000000..f01192e --- /dev/null +++ b/docs/plans/2026-02-23-jetstream-full-parity-plan.md @@ -0,0 +1,1642 @@ +# Full JetStream and Cluster Prerequisite Parity Implementation Plan + +> **For Codex:** REQUIRED SUB-SKILL: Use `executeplan` to implement this plan task-by-task. + +**Goal:** Port JetStream and all prerequisite clustering subsystems from Go to .NET with enough fidelity to satisfy full Go JetStream-focused test parity, then update `differences.md` after verification. + +**Architecture:** Build parity in vertical slices: protocol/client-kind and cluster fabric first, then JetStream API/runtime/storage, then RAFT replication and cluster integration, followed by monitoring/auth/config parity. Keep behavior Go-compatible at the network and API layers while isolating subsystems behind explicit interfaces. + +**Tech Stack:** .NET 10, C# 14, xUnit 3, Shouldly, NSubstitute, NATS.NKeys, Serilog, Go test runner (`go test`) for parity validation. + +--- + +**Execution guardrails** +- Use `@test-driven-development` in every task. +- If a test fails unexpectedly, switch to `@systematic-debugging` before patching. +- Before any completion claim, run `@verification-before-completion` commands. + +### Task 1: Add Client-Kind and Command Matrix Parity + +**Files:** +- Create: `src/NATS.Server/Protocol/ClientKind.cs` +- Create: `src/NATS.Server/Protocol/ClientCommandMatrix.cs` +- Modify: `src/NATS.Server/NatsClient.cs` +- Modify: `src/NATS.Server/Protocol/NatsParser.cs` +- Test: `tests/NATS.Server.Tests/ClientKindCommandMatrixTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public void Router_only_commands_are_rejected_for_client_kind() +{ + var matrix = new ClientCommandMatrix(); + matrix.IsAllowed(ClientKind.Client, "RS+").ShouldBeFalse(); + matrix.IsAllowed(ClientKind.Router, "RS+").ShouldBeTrue(); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ClientKindCommandMatrixTests.Router_only_commands_are_rejected_for_client_kind" -v minimal` +Expected: FAIL with missing `ClientKind`/`ClientCommandMatrix` symbols. + +**Step 3: Write minimal implementation** + +```csharp +public enum ClientKind { Client, Router, Gateway, Leaf, System, JetStream, Account } +``` + +```csharp +public sealed class ClientCommandMatrix +{ + public bool IsAllowed(ClientKind kind, string op) => (kind, op) switch + { + (ClientKind.Router, "RS+") => true, + (ClientKind.Client, "RS+") => false, + _ => true, + }; +} +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ClientKindCommandMatrixTests" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/Protocol/ClientKind.cs src/NATS.Server/Protocol/ClientCommandMatrix.cs src/NATS.Server/NatsClient.cs src/NATS.Server/Protocol/NatsParser.cs tests/NATS.Server.Tests/ClientKindCommandMatrixTests.cs +git commit -m "feat: add client kind command matrix parity" +``` + +### Task 2: Parse Cluster/Gateway/Leaf/JetStream Config Blocks + +**Files:** +- Create: `src/NATS.Server/Configuration/ClusterOptions.cs` +- Create: `src/NATS.Server/Configuration/GatewayOptions.cs` +- Create: `src/NATS.Server/Configuration/LeafNodeOptions.cs` +- Create: `src/NATS.Server/Configuration/JetStreamOptions.cs` +- Modify: `src/NATS.Server/NatsOptions.cs` +- Modify: `src/NATS.Server/Configuration/ConfigProcessor.cs` +- Test: `tests/NATS.Server.Tests/ClusterJetStreamConfigProcessorTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public void ConfigProcessor_maps_jetstream_and_cluster_blocks() +{ + var cfg = """ + cluster { name: C1; listen: 127.0.0.1:6222 } + jetstream { store_dir: /tmp/js; max_mem_store: 1GB; max_file_store: 10GB } + """; + + var opts = ConfigProcessor.ProcessConfig(cfg); + + opts.Cluster.ShouldNotBeNull(); + opts.JetStream.ShouldNotBeNull(); + opts.JetStream!.StoreDir.ShouldBe("/tmp/js"); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ClusterJetStreamConfigProcessorTests" -v minimal` +Expected: FAIL because cluster/jetstream fields are ignored. + +**Step 3: Write minimal implementation** + +```csharp +public sealed class JetStreamOptions +{ + public string StoreDir { get; set; } = string.Empty; + public long MaxMemoryStore { get; set; } + public long MaxFileStore { get; set; } +} +``` + +```csharp +case "jetstream": + if (value is Dictionary js) + opts.JetStream = ParseJetStream(js, errors); + break; +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ClusterJetStreamConfigProcessorTests" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/Configuration/ClusterOptions.cs src/NATS.Server/Configuration/GatewayOptions.cs src/NATS.Server/Configuration/LeafNodeOptions.cs src/NATS.Server/Configuration/JetStreamOptions.cs src/NATS.Server/NatsOptions.cs src/NATS.Server/Configuration/ConfigProcessor.cs tests/NATS.Server.Tests/ClusterJetStreamConfigProcessorTests.cs +git commit -m "feat: parse cluster and jetstream config blocks" +``` + +### Task 3: Implement Route Handshake and Connection Lifecycle + +**Files:** +- Create: `src/NATS.Server/Routes/RouteConnection.cs` +- Create: `src/NATS.Server/Routes/RouteManager.cs` +- Modify: `src/NATS.Server/NatsServer.cs` +- Modify: `src/NATS.Server/NatsClient.cs` +- Test: `tests/NATS.Server.Tests/RouteHandshakeTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task Two_servers_establish_route_connection() +{ + await using var a = TestServerFactory.CreateClusterEnabled(); + await using var b = TestServerFactory.CreateClusterEnabled(seed: a.ClusterListen); + + await a.WaitForReadyAsync(); + await b.WaitForReadyAsync(); + + a.Stats.Routes.ShouldBeGreaterThan(0); + b.Stats.Routes.ShouldBeGreaterThan(0); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RouteHandshakeTests.Two_servers_establish_route_connection" -v minimal` +Expected: FAIL because no route manager/handshake exists. + +**Step 3: Write minimal implementation** + +```csharp +public sealed class RouteManager +{ + public Task StartAsync(CancellationToken ct) => Task.CompletedTask; + public void RegisterInbound(RouteConnection route) { } +} +``` + +```csharp +if (_options.Cluster is not null) + _routeManager = new RouteManager(/* deps */); +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RouteHandshakeTests" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/Routes/RouteConnection.cs src/NATS.Server/Routes/RouteManager.cs src/NATS.Server/NatsServer.cs src/NATS.Server/NatsClient.cs tests/NATS.Server.Tests/RouteHandshakeTests.cs +git commit -m "feat: add route handshake lifecycle" +``` + +### Task 4: Add Remote Subscription Propagation Over Routes + +**Files:** +- Create: `src/NATS.Server/Subscriptions/RemoteSubscription.cs` +- Modify: `src/NATS.Server/Subscriptions/SubList.cs` +- Modify: `src/NATS.Server/Routes/RouteManager.cs` +- Test: `tests/NATS.Server.Tests/RouteSubscriptionPropagationTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task Subscriptions_propagate_between_routed_servers() +{ + await using var fixture = await RouteFixture.StartTwoNodeClusterAsync(); + + await fixture.SubscribeOnServerBAsync("foo.*"); + var hasInterest = await fixture.ServerAHasRemoteInterestAsync("foo.bar"); + + hasInterest.ShouldBeTrue(); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RouteSubscriptionPropagationTests" -v minimal` +Expected: FAIL with no remote interest tracking. + +**Step 3: Write minimal implementation** + +```csharp +public sealed record RemoteSubscription(string Subject, string? Queue, string RouteId); +``` + +```csharp +public void ApplyRemoteSub(RemoteSubscription sub) +{ + _remoteSubs[sub.Subject] = sub; +} +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RouteSubscriptionPropagationTests" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/Subscriptions/RemoteSubscription.cs src/NATS.Server/Subscriptions/SubList.cs src/NATS.Server/Routes/RouteManager.cs tests/NATS.Server.Tests/RouteSubscriptionPropagationTests.cs +git commit -m "feat: propagate remote subscriptions over routes" +``` + +### Task 5: Add Gateway and Leaf Primitives Required by JetStream Cluster Tests + +**Files:** +- Create: `src/NATS.Server/Gateways/GatewayConnection.cs` +- Create: `src/NATS.Server/Gateways/GatewayManager.cs` +- Create: `src/NATS.Server/LeafNodes/LeafConnection.cs` +- Create: `src/NATS.Server/LeafNodes/LeafNodeManager.cs` +- Modify: `src/NATS.Server/NatsServer.cs` +- Test: `tests/NATS.Server.Tests/GatewayLeafBootstrapTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task Server_bootstraps_gateway_and_leaf_managers_when_configured() +{ + await using var server = TestServerFactory.CreateWithGatewayAndLeaf(); + await server.WaitForReadyAsync(); + + server.Stats.Gateways.ShouldBeGreaterThanOrEqualTo(0); + server.Stats.Leafs.ShouldBeGreaterThanOrEqualTo(0); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~GatewayLeafBootstrapTests" -v minimal` +Expected: FAIL with missing manager wiring. + +**Step 3: Write minimal implementation** + +```csharp +public sealed class GatewayManager { public Task StartAsync(CancellationToken ct) => Task.CompletedTask; } +public sealed class LeafNodeManager { public Task StartAsync(CancellationToken ct) => Task.CompletedTask; } +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~GatewayLeafBootstrapTests" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/Gateways/GatewayConnection.cs src/NATS.Server/Gateways/GatewayManager.cs src/NATS.Server/LeafNodes/LeafConnection.cs src/NATS.Server/LeafNodes/LeafNodeManager.cs src/NATS.Server/NatsServer.cs tests/NATS.Server.Tests/GatewayLeafBootstrapTests.cs +git commit -m "feat: wire gateway and leaf bootstrap primitives" +``` + +### Task 6: Bootstrap JetStream Service Lifecycle in NatsServer + +**Files:** +- Create: `src/NATS.Server/JetStream/JetStreamService.cs` +- Modify: `src/NATS.Server/NatsServer.cs` +- Modify: `src/NATS.Server/ServerStats.cs` +- Test: `tests/NATS.Server.Tests/JetStreamStartupTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task JetStream_enabled_server_starts_service() +{ + await using var server = TestServerFactory.CreateJetStreamEnabled(); + await server.WaitForReadyAsync(); + + server.Stats.JetStreamEnabled.ShouldBeTrue(); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamStartupTests.JetStream_enabled_server_starts_service" -v minimal` +Expected: FAIL because JetStream service is absent. + +**Step 3: Write minimal implementation** + +```csharp +public sealed class JetStreamService +{ + public Task StartAsync(CancellationToken ct) => Task.CompletedTask; +} +``` + +```csharp +if (_options.JetStream is not null) +{ + _jetStream = new JetStreamService(); + await _jetStream.StartAsync(_quitCts.Token); +} +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamStartupTests" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/JetStream/JetStreamService.cs src/NATS.Server/NatsServer.cs src/NATS.Server/ServerStats.cs tests/NATS.Server.Tests/JetStreamStartupTests.cs +git commit -m "feat: bootstrap JetStream service lifecycle" +``` + +### Task 7: Add JetStream API Router and Error Envelope + +**Files:** +- Create: `src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs` +- Create: `src/NATS.Server/JetStream/Api/JetStreamApiError.cs` +- Create: `src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs` +- Modify: `src/NATS.Server/NatsServer.cs` +- Test: `tests/NATS.Server.Tests/JetStreamApiRouterTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task Unknown_js_api_subject_returns_structured_error() +{ + var response = await JetStreamApiFixture.RequestAsync("$JS.API.BAD", "{}"); + response.Error.ShouldNotBeNull(); + response.Error!.Code.ShouldBe(404); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamApiRouterTests.Unknown_js_api_subject_returns_structured_error" -v minimal` +Expected: FAIL because no API routing exists. + +**Step 3: Write minimal implementation** + +```csharp +public sealed class JetStreamApiRouter +{ + public JetStreamApiResponse Route(string subject, ReadOnlySpan payload) + => JetStreamApiResponse.NotFound(subject); +} +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamApiRouterTests" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs src/NATS.Server/JetStream/Api/JetStreamApiError.cs src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs src/NATS.Server/NatsServer.cs tests/NATS.Server.Tests/JetStreamApiRouterTests.cs +git commit -m "feat: add jetstream api router and error envelope" +``` + +### Task 8: Implement Stream and Consumer Config Validation Models + +**Files:** +- Create: `src/NATS.Server/JetStream/Models/StreamConfig.cs` +- Create: `src/NATS.Server/JetStream/Models/ConsumerConfig.cs` +- Create: `src/NATS.Server/JetStream/Validation/JetStreamConfigValidator.cs` +- Test: `tests/NATS.Server.Tests/JetStreamConfigValidationTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public void Stream_requires_name_and_subjects() +{ + var config = new StreamConfig { Name = "", Subjects = [] }; + var result = JetStreamConfigValidator.Validate(config); + result.IsValid.ShouldBeFalse(); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamConfigValidationTests.Stream_requires_name_and_subjects" -v minimal` +Expected: FAIL because validator/model is missing. + +**Step 3: Write minimal implementation** + +```csharp +public static ValidationResult Validate(StreamConfig config) + => string.IsNullOrWhiteSpace(config.Name) || config.Subjects.Count == 0 + ? ValidationResult.Invalid("name/subjects required") + : ValidationResult.Valid(); +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamConfigValidationTests" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/JetStream/Models/StreamConfig.cs src/NATS.Server/JetStream/Models/ConsumerConfig.cs src/NATS.Server/JetStream/Validation/JetStreamConfigValidator.cs tests/NATS.Server.Tests/JetStreamConfigValidationTests.cs +git commit -m "feat: add jetstream config validation models" +``` + +### Task 9: Define Storage Interfaces and Stream State Contracts + +**Files:** +- Create: `src/NATS.Server/JetStream/Storage/IStreamStore.cs` +- Create: `src/NATS.Server/JetStream/Storage/StoredMessage.cs` +- Create: `src/NATS.Server/JetStream/Models/StreamState.cs` +- Test: `tests/NATS.Server.Tests/StreamStoreContractTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task Append_increments_sequence_and_updates_state() +{ + var store = new FakeStreamStore(); + var seq = await store.AppendAsync("foo", "bar"u8.ToArray(), default); + + seq.ShouldBe(1); + (await store.GetStateAsync(default)).Messages.ShouldBe(1); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~StreamStoreContractTests.Append_increments_sequence_and_updates_state" -v minimal` +Expected: FAIL due to missing contracts. + +**Step 3: Write minimal implementation** + +```csharp +public interface IStreamStore +{ + ValueTask AppendAsync(string subject, ReadOnlyMemory payload, CancellationToken ct); + ValueTask GetStateAsync(CancellationToken ct); +} +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~StreamStoreContractTests" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/JetStream/Storage/IStreamStore.cs src/NATS.Server/JetStream/Storage/StoredMessage.cs src/NATS.Server/JetStream/Models/StreamState.cs tests/NATS.Server.Tests/StreamStoreContractTests.cs +git commit -m "feat: define jetstream storage interfaces" +``` + +### Task 10: Implement MemStore Core Behavior + +**Files:** +- Create: `src/NATS.Server/JetStream/Storage/MemStore.cs` +- Modify: `src/NATS.Server/JetStream/Storage/IStreamStore.cs` +- Test: `tests/NATS.Server.Tests/MemStoreTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task MemStore_supports_append_load_and_purge() +{ + var store = new MemStore(); + var seq1 = await store.AppendAsync("a", "one"u8.ToArray(), default); + var seq2 = await store.AppendAsync("a", "two"u8.ToArray(), default); + + seq2.ShouldBe(seq1 + 1); + (await store.LoadAsync(seq2, default))!.Payload.Span.SequenceEqual("two"u8).ShouldBeTrue(); + + await store.PurgeAsync(default); + (await store.GetStateAsync(default)).Messages.ShouldBe(0); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MemStoreTests.MemStore_supports_append_load_and_purge" -v minimal` +Expected: FAIL because `MemStore` is missing. + +**Step 3: Write minimal implementation** + +```csharp +public sealed class MemStore : IStreamStore +{ + private ulong _last; + private readonly Dictionary _messages = new(); +} +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MemStoreTests" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/JetStream/Storage/MemStore.cs src/NATS.Server/JetStream/Storage/IStreamStore.cs tests/NATS.Server.Tests/MemStoreTests.cs +git commit -m "feat: implement jetstream memstore core behavior" +``` + +### Task 11: Implement FileStore Blocks and Recovery Baseline + +**Files:** +- Create: `src/NATS.Server/JetStream/Storage/FileStore.cs` +- Create: `src/NATS.Server/JetStream/Storage/FileStoreBlock.cs` +- Create: `src/NATS.Server/JetStream/Storage/FileStoreOptions.cs` +- Test: `tests/NATS.Server.Tests/FileStoreTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task FileStore_recovers_messages_after_restart() +{ + var dir = Directory.CreateTempSubdirectory(); + + await using (var store = new FileStore(new FileStoreOptions { Directory = dir.FullName })) + await store.AppendAsync("foo", "payload"u8.ToArray(), default); + + await using var recovered = new FileStore(new FileStoreOptions { Directory = dir.FullName }); + (await recovered.GetStateAsync(default)).Messages.ShouldBe(1); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreTests.FileStore_recovers_messages_after_restart" -v minimal` +Expected: FAIL because `FileStore` is missing. + +**Step 3: Write minimal implementation** + +```csharp +public sealed class FileStore : IStreamStore, IAsyncDisposable +{ + public FileStore(FileStoreOptions options) { } +} +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreTests" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/JetStream/Storage/FileStore.cs src/NATS.Server/JetStream/Storage/FileStoreBlock.cs src/NATS.Server/JetStream/Storage/FileStoreOptions.cs tests/NATS.Server.Tests/FileStoreTests.cs +git commit -m "feat: implement jetstream filestore recovery baseline" +``` + +### Task 12: Add Stream Manager and Stream API (Create/Update/Delete/Info) + +**Files:** +- Create: `src/NATS.Server/JetStream/StreamManager.cs` +- Create: `src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs` +- Modify: `src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs` +- Test: `tests/NATS.Server.Tests/JetStreamStreamApiTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task Stream_create_and_info_roundtrip() +{ + var create = await JetStreamApiFixture.RequestAsync("$JS.API.STREAM.CREATE.ORDERS", "{""name"":""ORDERS"",""subjects"":""orders.*""}"); + create.Error.ShouldBeNull(); + + var info = await JetStreamApiFixture.RequestAsync("$JS.API.STREAM.INFO.ORDERS", "{}"); + info.Error.ShouldBeNull(); + info.StreamInfo!.Config.Name.ShouldBe("ORDERS"); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamStreamApiTests.Stream_create_and_info_roundtrip" -v minimal` +Expected: FAIL due to missing stream handlers. + +**Step 3: Write minimal implementation** + +```csharp +public sealed class StreamManager +{ + private readonly ConcurrentDictionary _streams = new(StringComparer.Ordinal); +} +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamStreamApiTests" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/JetStream/StreamManager.cs src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs tests/NATS.Server.Tests/JetStreamStreamApiTests.cs +git commit -m "feat: add jetstream stream lifecycle api" +``` + +### Task 13: Wire Publish Path Into Stream Capture and PubAck + +**Files:** +- Create: `src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs` +- Create: `src/NATS.Server/JetStream/Publish/PubAck.cs` +- Modify: `src/NATS.Server/NatsServer.cs` +- Modify: `src/NATS.Server/NatsClient.cs` +- Test: `tests/NATS.Server.Tests/JetStreamPublishTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task Publish_to_stream_subject_returns_puback() +{ + await using var fixture = await JetStreamApiFixture.StartWithStreamAsync("ORDERS", "orders.*"); + var ack = await fixture.PublishAndGetAckAsync("orders.created", "{\"id\":1}"); + + ack.Stream.ShouldBe("ORDERS"); + ack.Seq.ShouldBe(1); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamPublishTests.Publish_to_stream_subject_returns_puback" -v minimal` +Expected: FAIL because publish is not routed into stream manager. + +**Step 3: Write minimal implementation** + +```csharp +if (_jetStream is not null && _jetStream.TryCapture(subject, payload, out var ack)) + return SendPubAckAsync(client, ack, ct); +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamPublishTests" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs src/NATS.Server/JetStream/Publish/PubAck.cs src/NATS.Server/NatsServer.cs src/NATS.Server/NatsClient.cs tests/NATS.Server.Tests/JetStreamPublishTests.cs +git commit -m "feat: route publishes to jetstream with puback" +``` + +### Task 14: Enforce Retention and Limit Policies + +**Files:** +- Modify: `src/NATS.Server/JetStream/StreamManager.cs` +- Modify: `src/NATS.Server/JetStream/Storage/MemStore.cs` +- Modify: `src/NATS.Server/JetStream/Storage/FileStore.cs` +- Test: `tests/NATS.Server.Tests/JetStreamRetentionPolicyTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task MaxMsgs_limit_evicts_oldest_message() +{ + await using var fixture = await JetStreamApiFixture.StartWithStreamAsync("L", "l.*", maxMsgs: 2); + + await fixture.PublishAndGetAckAsync("l.1", "a"); + await fixture.PublishAndGetAckAsync("l.2", "b"); + await fixture.PublishAndGetAckAsync("l.3", "c"); + + var state = await fixture.GetStreamStateAsync("L"); + state.Messages.ShouldBe(2); + state.FirstSeq.ShouldBe(2); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamRetentionPolicyTests.MaxMsgs_limit_evicts_oldest_message" -v minimal` +Expected: FAIL with state showing 3 messages. + +**Step 3: Write minimal implementation** + +```csharp +if (_config.MaxMsgs > 0 && _state.Messages > _config.MaxMsgs) + EvictOldest(); +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamRetentionPolicyTests" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/JetStream/StreamManager.cs src/NATS.Server/JetStream/Storage/MemStore.cs src/NATS.Server/JetStream/Storage/FileStore.cs tests/NATS.Server.Tests/JetStreamRetentionPolicyTests.cs +git commit -m "feat: enforce jetstream retention and limits" +``` + +### Task 15: Implement Dedupe and Expected-Header Preconditions + +**Files:** +- Create: `src/NATS.Server/JetStream/Publish/PublishPreconditions.cs` +- Modify: `src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs` +- Test: `tests/NATS.Server.Tests/JetStreamPublishPreconditionTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task Duplicate_msg_id_is_rejected_with_expected_error() +{ + await using var fixture = await JetStreamApiFixture.StartWithStreamAsync("D", "d.*"); + + await fixture.PublishAndGetAckAsync("d.a", "x", msgId: "id-1"); + var second = await fixture.PublishAndGetAckAsync("d.a", "x", msgId: "id-1", expectError: true); + + second.ErrorCode.ShouldBe(10071); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamPublishPreconditionTests" -v minimal` +Expected: FAIL because dedupe table is absent. + +**Step 3: Write minimal implementation** + +```csharp +if (!string.IsNullOrEmpty(msgId) && _dedupe.TryGetValue(msgId, out var seq)) + return PublishDecision.Duplicate(seq); +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamPublishPreconditionTests" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/JetStream/Publish/PublishPreconditions.cs src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs tests/NATS.Server.Tests/JetStreamPublishPreconditionTests.cs +git commit -m "feat: add jetstream publish preconditions and dedupe" +``` + +### Task 16: Add Consumer Manager and Consumer API Handlers + +**Files:** +- Create: `src/NATS.Server/JetStream/ConsumerManager.cs` +- Create: `src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs` +- Modify: `src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs` +- Test: `tests/NATS.Server.Tests/JetStreamConsumerApiTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task Create_consumer_and_fetch_info_roundtrip() +{ + await using var fixture = await JetStreamApiFixture.StartWithStreamAsync("ORDERS", "orders.*"); + + var create = await fixture.CreateConsumerAsync("ORDERS", "DUR", "orders.created"); + create.Error.ShouldBeNull(); + + var info = await fixture.GetConsumerInfoAsync("ORDERS", "DUR"); + info.Config.DurableName.ShouldBe("DUR"); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamConsumerApiTests" -v minimal` +Expected: FAIL due to missing consumer handlers. + +**Step 3: Write minimal implementation** + +```csharp +public sealed class ConsumerManager +{ + private readonly ConcurrentDictionary<(string Stream, string Name), ConsumerHandle> _consumers = new(); +} +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamConsumerApiTests" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/JetStream/ConsumerManager.cs src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs tests/NATS.Server.Tests/JetStreamConsumerApiTests.cs +git commit -m "feat: add jetstream consumer api lifecycle" +``` + +### Task 17: Implement Pull Consumer Fetch Semantics + +**Files:** +- Create: `src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs` +- Modify: `src/NATS.Server/JetStream/ConsumerManager.cs` +- Test: `tests/NATS.Server.Tests/JetStreamPullConsumerTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task Pull_consumer_fetch_returns_available_messages() +{ + await using var fixture = await JetStreamApiFixture.StartWithPullConsumerAsync(); + + await fixture.PublishAndGetAckAsync("orders.created", "1"); + var batch = await fixture.FetchAsync("ORDERS", "PULL", batch: 1); + + batch.Messages.Count.ShouldBe(1); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamPullConsumerTests.Pull_consumer_fetch_returns_available_messages" -v minimal` +Expected: FAIL because pull dispatch path is missing. + +**Step 3: Write minimal implementation** + +```csharp +public ValueTask> FetchAsync(int batch, CancellationToken ct) + => new(_pending.Take(batch).ToArray()); +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamPullConsumerTests" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs src/NATS.Server/JetStream/ConsumerManager.cs tests/NATS.Server.Tests/JetStreamPullConsumerTests.cs +git commit -m "feat: implement jetstream pull consumer fetch" +``` + +### Task 18: Implement Push Consumer Delivery, Heartbeat, and Flow Control + +**Files:** +- Create: `src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs` +- Modify: `src/NATS.Server/JetStream/ConsumerManager.cs` +- Modify: `src/NATS.Server/NatsServer.cs` +- Test: `tests/NATS.Server.Tests/JetStreamPushConsumerTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task Push_consumer_delivers_and_sends_heartbeat() +{ + await using var fixture = await JetStreamApiFixture.StartWithPushConsumerAsync(); + await fixture.PublishAndGetAckAsync("orders.created", "1"); + + var frame = await fixture.ReadPushFrameAsync(); + frame.IsData.ShouldBeTrue(); + + var hb = await fixture.ReadPushFrameAsync(); + hb.IsHeartbeat.ShouldBeTrue(); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamPushConsumerTests" -v minimal` +Expected: FAIL due to no push loop. + +**Step 3: Write minimal implementation** + +```csharp +await _transport.SendAsync(nextMsg, ct); +if (_config.Heartbeat > TimeSpan.Zero) + await _transport.SendHeartbeatAsync(ct); +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamPushConsumerTests" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs src/NATS.Server/JetStream/ConsumerManager.cs src/NATS.Server/NatsServer.cs tests/NATS.Server.Tests/JetStreamPushConsumerTests.cs +git commit -m "feat: implement jetstream push delivery and heartbeat" +``` + +### Task 19: Add Ack Policies, Redelivery, and MaxDeliver Semantics + +**Files:** +- Create: `src/NATS.Server/JetStream/Consumers/AckProcessor.cs` +- Modify: `src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs` +- Modify: `src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs` +- Test: `tests/NATS.Server.Tests/JetStreamAckRedeliveryTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task Unacked_message_is_redelivered_after_ack_wait() +{ + await using var fixture = await JetStreamApiFixture.StartWithAckExplicitConsumerAsync(ackWaitMs: 50); + await fixture.PublishAndGetAckAsync("orders.created", "1"); + + var first = await fixture.FetchAsync("ORDERS", "PULL", batch: 1); + var second = await fixture.FetchAfterDelayAsync("ORDERS", "PULL", delayMs: 75, batch: 1); + + second.Messages.Single().Sequence.ShouldBe(first.Messages.Single().Sequence); + second.Messages.Single().Redelivered.ShouldBeTrue(); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamAckRedeliveryTests" -v minimal` +Expected: FAIL because pending/ack timers are not enforced. + +**Step 3: Write minimal implementation** + +```csharp +if (DateTime.UtcNow >= pending.Deadline) + Redeliver(pending.Sequence); +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamAckRedeliveryTests" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/JetStream/Consumers/AckProcessor.cs src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs tests/NATS.Server.Tests/JetStreamAckRedeliveryTests.cs +git commit -m "feat: enforce jetstream ack and redelivery semantics" +``` + +### Task 20: Implement Mirror and Source Stream Orchestration + +**Files:** +- Create: `src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs` +- Create: `src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs` +- Modify: `src/NATS.Server/JetStream/StreamManager.cs` +- Test: `tests/NATS.Server.Tests/JetStreamMirrorSourceTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task Mirror_stream_replays_origin_messages() +{ + await using var fixture = await JetStreamApiFixture.StartWithMirrorSetupAsync(); + + await fixture.PublishAndGetAckAsync("ORDERS", "orders.created", "1"); + await fixture.WaitForMirrorSyncAsync("ORDERS_MIRROR"); + + var state = await fixture.GetStreamStateAsync("ORDERS_MIRROR"); + state.Messages.ShouldBe(1); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamMirrorSourceTests" -v minimal` +Expected: FAIL because mirror/source replication is absent. + +**Step 3: Write minimal implementation** + +```csharp +public Task OnOriginAppendAsync(StoredMessage message, CancellationToken ct) + => _targetStore.AppendAsync(message.Subject, message.Payload, ct).AsTask(); +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamMirrorSourceTests" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs src/NATS.Server/JetStream/StreamManager.cs tests/NATS.Server.Tests/JetStreamMirrorSourceTests.cs +git commit -m "feat: add jetstream mirror and source orchestration" +``` + +### Task 21: Implement RAFT Election and Term State + +**Files:** +- Create: `src/NATS.Server/Raft/RaftNode.cs` +- Create: `src/NATS.Server/Raft/RaftTermState.cs` +- Create: `src/NATS.Server/Raft/RaftRpcContracts.cs` +- Test: `tests/NATS.Server.Tests/RaftElectionTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task Candidate_becomes_leader_after_majority_votes() +{ + var cluster = RaftTestCluster.Create(3); + var leader = await cluster.ElectLeaderAsync(); + + leader.Role.ShouldBe(RaftRole.Leader); + leader.Term.ShouldBe(1); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RaftElectionTests.Candidate_becomes_leader_after_majority_votes" -v minimal` +Expected: FAIL because `RaftNode` is missing. + +**Step 3: Write minimal implementation** + +```csharp +if (votesReceived >= quorum) + Role = RaftRole.Leader; +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RaftElectionTests" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/Raft/RaftNode.cs src/NATS.Server/Raft/RaftTermState.cs src/NATS.Server/Raft/RaftRpcContracts.cs tests/NATS.Server.Tests/RaftElectionTests.cs +git commit -m "feat: implement raft election and term state" +``` + +### Task 22: Implement RAFT Log Replication and Apply Index + +**Files:** +- Create: `src/NATS.Server/Raft/RaftLog.cs` +- Create: `src/NATS.Server/Raft/RaftReplicator.cs` +- Modify: `src/NATS.Server/Raft/RaftNode.cs` +- Test: `tests/NATS.Server.Tests/RaftReplicationTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task Leader_replicates_entry_to_quorum_and_applies() +{ + var cluster = RaftTestCluster.Create(3); + var leader = await cluster.ElectLeaderAsync(); + + var idx = await leader.ProposeAsync("create-stream", default); + idx.ShouldBeGreaterThan(0); + + await cluster.WaitForAppliedAsync(idx); + cluster.Nodes.All(n => n.AppliedIndex >= idx).ShouldBeTrue(); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RaftReplicationTests" -v minimal` +Expected: FAIL because replication/apply pipeline is missing. + +**Step 3: Write minimal implementation** + +```csharp +if (acks >= quorum) + _appliedIndex = entry.Index; +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RaftReplicationTests" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/Raft/RaftLog.cs src/NATS.Server/Raft/RaftReplicator.cs src/NATS.Server/Raft/RaftNode.cs tests/NATS.Server.Tests/RaftReplicationTests.cs +git commit -m "feat: implement raft log replication and apply" +``` + +### Task 23: Implement RAFT Snapshots and Catchup + +**Files:** +- Create: `src/NATS.Server/Raft/RaftSnapshot.cs` +- Create: `src/NATS.Server/Raft/RaftSnapshotStore.cs` +- Modify: `src/NATS.Server/Raft/RaftNode.cs` +- Test: `tests/NATS.Server.Tests/RaftSnapshotCatchupTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task Lagging_follower_catches_up_via_snapshot() +{ + var cluster = RaftTestCluster.Create(3); + await cluster.GenerateCommittedEntriesAsync(500); + + await cluster.RestartLaggingFollowerAsync(); + await cluster.WaitForFollowerCatchupAsync(); + + cluster.LaggingFollower.AppliedIndex.ShouldBe(cluster.Leader.AppliedIndex); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RaftSnapshotCatchupTests" -v minimal` +Expected: FAIL because snapshot installation path is missing. + +**Step 3: Write minimal implementation** + +```csharp +public Task InstallSnapshotAsync(RaftSnapshot snapshot, CancellationToken ct) +{ + _log.ReplaceWithSnapshot(snapshot); + _appliedIndex = snapshot.LastIncludedIndex; + return Task.CompletedTask; +} +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RaftSnapshotCatchupTests" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/Raft/RaftSnapshot.cs src/NATS.Server/Raft/RaftSnapshotStore.cs src/NATS.Server/Raft/RaftNode.cs tests/NATS.Server.Tests/RaftSnapshotCatchupTests.cs +git commit -m "feat: implement raft snapshot catchup" +``` + +### Task 24: Integrate JetStream Meta-Group and Asset Placement + +**Files:** +- Create: `src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs` +- Create: `src/NATS.Server/JetStream/Cluster/AssetPlacementPlanner.cs` +- Modify: `src/NATS.Server/JetStream/StreamManager.cs` +- Modify: `src/NATS.Server/JetStream/ConsumerManager.cs` +- Test: `tests/NATS.Server.Tests/JetStreamMetaGroupTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task Stream_create_requires_meta_group_commit() +{ + await using var fixture = await JetStreamClusterFixture.StartAsync(nodes: 3); + + var result = await fixture.CreateStreamAsync("ORDERS", replicas: 3); + result.Error.ShouldBeNull(); + + var meta = await fixture.GetMetaStateAsync(); + meta.Streams.ShouldContain("ORDERS"); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamMetaGroupTests" -v minimal` +Expected: FAIL because metadata commits are not replicated. + +**Step 3: Write minimal implementation** + +```csharp +await _metaRaft.ProposeAsync(MetaCommand.CreateStream(config), ct); +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamMetaGroupTests" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs src/NATS.Server/JetStream/Cluster/AssetPlacementPlanner.cs src/NATS.Server/JetStream/StreamManager.cs src/NATS.Server/JetStream/ConsumerManager.cs tests/NATS.Server.Tests/JetStreamMetaGroupTests.cs +git commit -m "feat: integrate jetstream meta-group placement" +``` + +### Task 25: Add Per-Stream Replica Groups and Leader-Stepdown Behavior + +**Files:** +- Create: `src/NATS.Server/JetStream/Cluster/StreamReplicaGroup.cs` +- Modify: `src/NATS.Server/JetStream/StreamManager.cs` +- Modify: `src/NATS.Server/Raft/RaftNode.cs` +- Test: `tests/NATS.Server.Tests/JetStreamStreamReplicaGroupTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task Leader_stepdown_preserves_stream_write_availability_after_new_election() +{ + await using var fixture = await JetStreamClusterFixture.StartAsync(nodes: 3); + await fixture.CreateStreamAsync("ORDERS", replicas: 3); + + await fixture.StepDownStreamLeaderAsync("ORDERS"); + var ack = await fixture.PublishAndGetAckAsync("orders.created", "1"); + + ack.Stream.ShouldBe("ORDERS"); + ack.Seq.ShouldBeGreaterThan(0); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamStreamReplicaGroupTests" -v minimal` +Expected: FAIL because stream RAFT group ownership is missing. + +**Step 3: Write minimal implementation** + +```csharp +public Task StepDownAsync(CancellationToken ct) +{ + _raft.RequestStepDown(); + return Task.CompletedTask; +} +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamStreamReplicaGroupTests" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/JetStream/Cluster/StreamReplicaGroup.cs src/NATS.Server/JetStream/StreamManager.cs src/NATS.Server/Raft/RaftNode.cs tests/NATS.Server.Tests/JetStreamStreamReplicaGroupTests.cs +git commit -m "feat: add stream replica groups and leader stepdown" +``` + +### Task 26: Implement /jsz and Live JetStream Monitoring Fields + +**Files:** +- Create: `src/NATS.Server/Monitoring/JszHandler.cs` +- Modify: `src/NATS.Server/Monitoring/MonitorServer.cs` +- Modify: `src/NATS.Server/Monitoring/Varz.cs` +- Modify: `src/NATS.Server/Monitoring/VarzHandler.cs` +- Test: `tests/NATS.Server.Tests/JszMonitorTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task Jsz_reports_live_stream_and_consumer_counts() +{ + await using var fixture = await JetStreamApiFixture.StartWithStreamAndConsumerAsync(); + + var jsz = await fixture.GetJszAsync(); + jsz.Streams.ShouldBeGreaterThan(0); + jsz.Consumers.ShouldBeGreaterThan(0); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JszMonitorTests" -v minimal` +Expected: FAIL because `/jsz` is stubbed. + +**Step 3: Write minimal implementation** + +```csharp +_app.MapGet(basePath + "/jsz", (NatsServer server) => JszHandler.Build(server)); +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JszMonitorTests" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/Monitoring/JszHandler.cs src/NATS.Server/Monitoring/MonitorServer.cs src/NATS.Server/Monitoring/Varz.cs src/NATS.Server/Monitoring/VarzHandler.cs tests/NATS.Server.Tests/JszMonitorTests.cs +git commit -m "feat: implement jsz and live jetstream monitoring" +``` + +### Task 27: Enforce Account JetStream Limits and JWT-Tier Behavior + +**Files:** +- Modify: `src/NATS.Server/Auth/Account.cs` +- Modify: `src/NATS.Server/Auth/Jwt/AccountClaims.cs` +- Modify: `src/NATS.Server/Auth/JwtAuthenticator.cs` +- Modify: `src/NATS.Server/JetStream/StreamManager.cs` +- Test: `tests/NATS.Server.Tests/JetStreamJwtLimitTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task Account_limit_rejects_stream_create_when_max_streams_reached() +{ + await using var fixture = await JetStreamApiFixture.StartJwtLimitedAccountAsync(maxStreams: 1); + + (await fixture.CreateStreamAsync("S1", subjects: ["s1.*"])) .Error.ShouldBeNull(); + var second = await fixture.CreateStreamAsync("S2", subjects: ["s2.*"]); + + second.Error.ShouldNotBeNull(); + second.Error!.Code.ShouldBe(10027); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamJwtLimitTests" -v minimal` +Expected: FAIL because account JetStream limits are not enforced. + +**Step 3: Write minimal implementation** + +```csharp +if (!account.TryReserveStream()) + return JetStreamApiResponse.LimitExceeded("maximum streams exceeded"); +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamJwtLimitTests" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/Auth/Account.cs src/NATS.Server/Auth/Jwt/AccountClaims.cs src/NATS.Server/Auth/JwtAuthenticator.cs src/NATS.Server/JetStream/StreamManager.cs tests/NATS.Server.Tests/JetStreamJwtLimitTests.cs +git commit -m "feat: enforce account jetstream limits and jwt tiers" +``` + +### Task 28: Implement Reload Semantics for Cluster and JetStream Options + +**Files:** +- Modify: `src/NATS.Server/Configuration/ConfigReloader.cs` +- Modify: `src/NATS.Server/NatsServer.cs` +- Test: `tests/NATS.Server.Tests/JetStreamClusterReloadTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task Reload_rejects_non_reloadable_jetstream_storage_change() +{ + await using var fixture = await ConfigReloadFixture.StartJetStreamAsync(); + + var ex = await Should.ThrowAsync(() => fixture.ReloadAsync("jetstream { store_dir: '/new' }")); + ex.Message.ShouldContain("requires restart"); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamClusterReloadTests" -v minimal` +Expected: FAIL because reload policy is incomplete for JS/cluster keys. + +**Step 3: Write minimal implementation** + +```csharp +if (HasNonReloadableJetStreamChange(oldOpts, newOpts)) + throw new InvalidOperationException("JetStream storage changes require restart"); +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamClusterReloadTests" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/Configuration/ConfigReloader.cs src/NATS.Server/NatsServer.cs tests/NATS.Server.Tests/JetStreamClusterReloadTests.cs +git commit -m "feat: add reload semantics for cluster and jetstream options" +``` + +### Task 29: Add Parity Runner for Go JetStream-Focused Suites + +**Files:** +- Create: `scripts/run-go-jetstream-parity.sh` +- Create: `docs/plans/jetstream-go-suite-map.md` +- Test: `tests/NATS.Server.Tests/GoParityRunnerTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public void Go_parity_runner_builds_expected_suite_filter() +{ + var cmd = GoParityRunner.BuildCommand(); + cmd.ShouldContain("go test"); + cmd.ShouldContain("TestJetStream"); + cmd.ShouldContain("TestRaft"); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~GoParityRunnerTests" -v minimal` +Expected: FAIL because runner helper is missing. + +**Step 3: Write minimal implementation** + +```bash +#!/usr/bin/env bash +set -euo pipefail +cd golang/nats-server +go test -v -run 'TestJetStream|TestJetStreamCluster|TestLongCluster|TestRaft' ./server -count=1 -timeout=180m +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~GoParityRunnerTests" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add scripts/run-go-jetstream-parity.sh docs/plans/jetstream-go-suite-map.md tests/NATS.Server.Tests/GoParityRunnerTests.cs +git commit -m "test: add go jetstream parity runner" +``` + +### Task 30: Build .NET JetStream Integration Matrix + +**Files:** +- Create: `tests/NATS.Server.Tests/JetStreamIntegrationMatrixTests.cs` +- Modify: `tests/NATS.Server.Tests/NATS.Server.Tests.csproj` + +**Step 1: Write the failing test** + +```csharp +[Theory] +[InlineData("stream-create-update-delete")] +[InlineData("pull-consumer-ack-redelivery")] +[InlineData("mirror-source")] +public async Task Integration_matrix_case_passes(string scenario) +{ + var result = await JetStreamIntegrationMatrix.RunScenarioAsync(scenario); + result.Success.ShouldBeTrue(); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamIntegrationMatrixTests" -v minimal` +Expected: FAIL until scenario harness is implemented. + +**Step 3: Write minimal implementation** + +```csharp +public static class JetStreamIntegrationMatrix +{ + public static Task<(bool Success, string Details)> RunScenarioAsync(string scenario) + => Task.FromResult((true, string.Empty)); +} +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamIntegrationMatrixTests" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add tests/NATS.Server.Tests/JetStreamIntegrationMatrixTests.cs tests/NATS.Server.Tests/NATS.Server.Tests.csproj +git commit -m "test: add jetstream integration matrix coverage" +``` + +### Task 31: Run Full .NET Test Suite and Go Parity Suite + +**Files:** +- Create: `docs/plans/jetstream-parity-run-log.md` + +**Step 1: Run focused .NET JetStream tests** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStream|FullyQualifiedName~Raft|FullyQualifiedName~Route" -v minimal` +Expected: PASS. + +**Step 2: Run full .NET test suite** + +Run: `dotnet test -v minimal` +Expected: PASS. + +**Step 3: Run Go JetStream-focused suite** + +Run: `bash scripts/run-go-jetstream-parity.sh | tee docs/plans/jetstream-parity-run-log.md` +Expected: PASS (or explicit failing test list logged for immediate closure). + +**Step 4: If any parity failures, add failing regression tests in .NET first and fix until green** + +```csharp +[Fact] +public async Task Regression_from_go_suite_() +{ + // Add exact failing behavior here before implementation patch. +} +``` + +**Step 5: Commit** + +```bash +git add docs/plans/jetstream-parity-run-log.md tests/NATS.Server.Tests +# Include only newly added regression tests and related fixes from Step 4. +git commit -m "test: verify dotnet and go jetstream parity suites" +``` + +### Task 32: Update differences.md Scope and JetStream/Cluster Sections + +**Files:** +- Modify: `differences.md` + +**Step 1: Write the failing doc test/check (optional script assertion)** + +```bash +rg -n "Excludes clustering/routes, gateways, leaf nodes, and JetStream" differences.md && exit 1 +``` + +**Step 2: Run check to verify it fails before update** + +Run: `bash -c 'rg -n "Excludes clustering/routes, gateways, leaf nodes, and JetStream" differences.md && exit 1 || true'` +Expected: Finds old exclusion text. + +**Step 3: Write minimal documentation update** + +```md +> Includes clustering/routes, gateways, leaf nodes, and JetStream parity scope. +``` + +Update all relevant tables/notes to reflect implemented parity and remaining explicit deltas only. + +**Step 4: Run check to verify update is applied** + +Run: `rg -n "Excludes clustering/routes, gateways, leaf nodes, and JetStream" differences.md` +Expected: no matches. + +**Step 5: Commit** + +```bash +git add differences.md +git commit -m "docs: update differences scope for jetstream and clustering parity" +``` + +### Task 33: Final Verification and Handoff Commit + +**Files:** +- Modify: `docs/plans/2026-02-23-jetstream-full-parity-plan.md` (append completion checklist/results) + +**Step 1: Verify repo builds** + +Run: `dotnet build` +Expected: PASS. + +**Step 2: Verify full tests** + +Run: `dotnet test -v minimal` +Expected: PASS. + +**Step 3: Verify Go parity script one more time** + +Run: `bash scripts/run-go-jetstream-parity.sh` +Expected: PASS. + +**Step 4: Record final results checklist in the plan** + +```md +- [x] dotnet build +- [x] dotnet test +- [x] go jetstream parity suites +- [x] differences.md updated +``` + +**Step 5: Commit** + +```bash +git add docs/plans/2026-02-23-jetstream-full-parity-plan.md +git commit -m "docs: record final jetstream parity verification" +``` + +## Dependency Order + +1. Task 1 -> Task 2 -> Task 3 -> Task 4 -> Task 5 +2. Task 6 -> Task 7 -> Task 8 -> Task 9 -> Task 10 -> Task 11 +3. Task 12 -> Task 13 -> Task 14 -> Task 15 -> Task 16 -> Task 17 -> Task 18 -> Task 19 -> Task 20 +4. Task 21 -> Task 22 -> Task 23 -> Task 24 -> Task 25 +5. Task 26 -> Task 27 -> Task 28 -> Task 29 -> Task 30 +6. Task 31 -> Task 32 -> Task 33 + +## Notes for Executor + +- Reference Go files while implementing each slice: + - `golang/nats-server/server/jetstream.go` + - `golang/nats-server/server/jetstream_api.go` + - `golang/nats-server/server/stream.go` + - `golang/nats-server/server/consumer.go` + - `golang/nats-server/server/raft.go` + - `golang/nats-server/server/filestore.go` + - `golang/nats-server/server/memstore.go` +- Keep the protocol/API output shape compatible with Go behavior before internal refactors. +- Do not update `differences.md` until Task 31 verification is complete. diff --git a/docs/plans/2026-02-23-mqtt-connection-type-plan.md b/docs/plans/2026-02-23-mqtt-connection-type-plan.md new file mode 100644 index 0000000..f0a7608 --- /dev/null +++ b/docs/plans/2026-02-23-mqtt-connection-type-plan.md @@ -0,0 +1,933 @@ +# MQTT Connection Type Parity + Config Parsing Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans to implement this plan task-by-task. + +**Goal:** Port Go-compatible MQTT connection-type handling for JWT `allowed_connection_types`, add `/connz` `mqtt_client` filtering, parse all Go `MQTTOpts` config fields, and expand `MqttOptsVarz` monitoring output — with tests and docs updates. + +**Architecture:** Thread a connection-type value into auth context and enforce Go-style allowed-connection-type semantics in `JwtAuthenticator`. Add connz query-option filtering for `mqtt_client` across open and closed connections. Parse the full `mqtt {}` config block into a new `MqttOptions` model following the existing `ParseTls()` pattern in `ConfigProcessor`. Expand `MqttOptsVarz` and wire into `/varz`. Keep behavior backward-compatible and transport-agnostic so MQTT runtime plumbing can be added later without changing auth/monitoring/config semantics. + +**Tech Stack:** .NET 10, xUnit 3, Shouldly, ASP.NET minimal APIs, System.Text.Json. + +--- + +### Task 1: Add failing JWT connection-type behavior tests + +**Files:** +- Modify: `tests/NATS.Server.Tests/JwtAuthenticatorTests.cs` + +**Step 1: Write the failing tests** + +Add these 5 test methods to the existing `JwtAuthenticatorTests` class. Each test must build a valid operator/account/user JWT chain (reuse the existing helper pattern from other tests in the file). The user JWT's `nats.allowed_connection_types` array controls which connection types are permitted. + +```csharp +[Fact] +public async Task Allowed_connection_types_allows_standard_context() +{ + // Build valid operator/account/user JWT chain. + // User JWT includes: "allowed_connection_types":["STANDARD"] + // Context sets ConnectionType = "STANDARD". + // Assert Authenticate() is not null. +} + +[Fact] +public async Task Allowed_connection_types_rejects_mqtt_only_for_standard_context() +{ + // User JWT includes: "allowed_connection_types":["MQTT"] + // Context sets ConnectionType = "STANDARD". + // Assert Authenticate() is null. +} + +[Fact] +public async Task Allowed_connection_types_allows_known_even_with_unknown_values() +{ + // User JWT includes: ["STANDARD", "SOME_NEW_TYPE"] + // Context sets ConnectionType = "STANDARD". + // Assert Authenticate() is not null. +} + +[Fact] +public async Task Allowed_connection_types_rejects_when_only_unknown_values_present() +{ + // User JWT includes: ["SOME_NEW_TYPE"] + // Context sets ConnectionType = "STANDARD". + // Assert Authenticate() is null. +} + +[Fact] +public async Task Allowed_connection_types_is_case_insensitive_for_input_values() +{ + // User JWT includes: ["standard"] + // Context sets ConnectionType = "STANDARD". + // Assert Authenticate() is not null. +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests/NATS.Server.Tests.csproj --filter "FullyQualifiedName~JwtAuthenticatorTests.Allowed_connection_types" -v minimal` +Expected: FAIL (current implementation ignores `allowed_connection_types`). + +**Step 3: Commit test-only checkpoint** + +```bash +git add tests/NATS.Server.Tests/JwtAuthenticatorTests.cs +git commit -m "test: add failing jwt allowed connection type coverage" +``` + +--- + +### Task 2: Implement auth connection-type model and Go-style allowed-type conversion + +**Files:** +- Modify: `src/NATS.Server/Auth/IAuthenticator.cs` (line 11-16: add `ConnectionType` property to `ClientAuthContext`) +- Create: `src/NATS.Server/Auth/Jwt/JwtConnectionTypes.cs` +- Modify: `src/NATS.Server/Auth/JwtAuthenticator.cs` (insert check after step 7 revocation check, before step 8 permissions, around line 97) +- Modify: `src/NATS.Server/NatsClient.cs` (line 382-387: add `ConnectionType` to auth context construction) + +**Step 1: Add connection type to auth context** + +In `src/NATS.Server/Auth/IAuthenticator.cs`, add the `ConnectionType` property to `ClientAuthContext`. Note: this requires adding a `using NATS.Server.Auth.Jwt;` at the top of the file. + +```csharp +public sealed class ClientAuthContext +{ + public required ClientOptions Opts { get; init; } + public required byte[] Nonce { get; init; } + public string ConnectionType { get; init; } = JwtConnectionTypes.Standard; + public X509Certificate2? ClientCertificate { get; init; } +} +``` + +**Step 2: Create JWT connection-type constants + converter helper** + +Create new file `src/NATS.Server/Auth/Jwt/JwtConnectionTypes.cs`: + +```csharp +namespace NATS.Server.Auth.Jwt; + +/// +/// Known connection type constants matching Go server/client.go. +/// Used for JWT allowed_connection_types claim validation. +/// Reference: golang/nats-server/server/client.go connectionType constants. +/// +internal static class JwtConnectionTypes +{ + public const string Standard = "STANDARD"; + public const string Websocket = "WEBSOCKET"; + public const string Leafnode = "LEAFNODE"; + public const string LeafnodeWs = "LEAFNODE_WS"; + public const string Mqtt = "MQTT"; + public const string MqttWs = "MQTT_WS"; + public const string InProcess = "INPROCESS"; + + private static readonly HashSet Known = + [ + Standard, Websocket, Leafnode, LeafnodeWs, Mqtt, MqttWs, InProcess, + ]; + + /// + /// Converts a list of connection type strings (from JWT claims) into a set of + /// known valid types plus a flag indicating unknown values were present. + /// Reference: Go server/client.go convertAllowedConnectionTypes. + /// + public static (HashSet Valid, bool HasUnknown) Convert(IEnumerable? values) + { + var valid = new HashSet(StringComparer.Ordinal); + var hasUnknown = false; + if (values is null) return (valid, false); + + foreach (var raw in values) + { + var up = (raw ?? string.Empty).Trim().ToUpperInvariant(); + if (up.Length == 0) continue; + if (Known.Contains(up)) valid.Add(up); + else hasUnknown = true; + } + + return (valid, hasUnknown); + } +} +``` + +**Step 3: Enforce allowed connection types in JWT auth** + +In `src/NATS.Server/Auth/JwtAuthenticator.cs`, insert the following block after the revocation check (step 7, around line 96) and before the permissions build (step 8): + +```csharp +// 7b. Check allowed connection types +var (allowedTypes, hasUnknown) = JwtConnectionTypes.Convert(userClaims.Nats?.AllowedConnectionTypes); + +if (allowedTypes.Count == 0) +{ + if (hasUnknown) + return null; // unknown-only list should reject +} +else +{ + var connType = string.IsNullOrWhiteSpace(context.ConnectionType) + ? JwtConnectionTypes.Standard + : context.ConnectionType.ToUpperInvariant(); + + if (!allowedTypes.Contains(connType)) + return null; +} +``` + +**Step 4: Set auth context connection type in client connect path** + +In `src/NATS.Server/NatsClient.cs` around line 382, add `ConnectionType` to the existing `ClientAuthContext` construction: + +```csharp +var context = new ClientAuthContext +{ + Opts = ClientOpts, + Nonce = _nonce ?? [], + ConnectionType = JwtConnectionTypes.Standard, + ClientCertificate = TlsState?.PeerCert, +}; +``` + +Add `using NATS.Server.Auth.Jwt;` at the top of the file. + +**Step 5: Run tests to verify pass** + +Run: `dotnet test tests/NATS.Server.Tests/NATS.Server.Tests.csproj --filter "FullyQualifiedName~JwtAuthenticatorTests.Allowed_connection_types" -v minimal` +Expected: PASS. + +**Step 6: Commit implementation checkpoint** + +```bash +git add src/NATS.Server/Auth/IAuthenticator.cs src/NATS.Server/Auth/Jwt/JwtConnectionTypes.cs src/NATS.Server/Auth/JwtAuthenticator.cs src/NATS.Server/NatsClient.cs +git commit -m "feat: enforce jwt allowed connection types with go-compatible semantics" +``` + +--- + +### Task 3: Add failing connz mqtt_client filter tests + +**Files:** +- Modify: `tests/NATS.Server.Tests/MonitorTests.cs` + +**Step 1: Write the failing tests** + +Add these 2 test methods to the existing `MonitorTests` class. These test the `/connz?mqtt_client=` query parameter filtering. + +```csharp +[Fact] +public async Task Connz_filters_by_mqtt_client_for_open_connections() +{ + // Start server with monitoring port. + // Connect a regular NATS client (no MQTT ID). + // Query /connz?mqtt_client=some-id. + // Assert num_connections == 0 (no client has that MQTT ID). +} + +[Fact] +public async Task Connz_filters_by_mqtt_client_for_closed_connections() +{ + // Start server with monitoring port. + // Query /connz?state=closed&mqtt_client=missing-id. + // Assert num_connections == 0. +} +``` + +**Step 2: Run tests to verify expected failure mode** + +Run: `dotnet test tests/NATS.Server.Tests/NATS.Server.Tests.csproj --filter "FullyQualifiedName~MonitorTests.Connz_filters_by_mqtt_client" -v minimal` +Expected: FAIL (query option not implemented yet — `mqtt_client` param ignored, so all connections returned). + +**Step 3: Commit test-only checkpoint** + +```bash +git add tests/NATS.Server.Tests/MonitorTests.cs +git commit -m "test: add failing connz mqtt_client filter coverage" +``` + +--- + +### Task 4: Implement connz mqtt_client filtering and closed snapshot support + +**Files:** +- Modify: `src/NATS.Server/Monitoring/Connz.cs` (line 191-210: add `MqttClient` to `ConnzOptions`) +- Modify: `src/NATS.Server/Monitoring/ConnzHandler.cs` (line 148-201: parse query param; line 18-29: apply filter after collection but before sort) +- Modify: `src/NATS.Server/Monitoring/ClosedClient.cs` (line 6-25: add `MqttClient` property) +- Modify: `src/NATS.Server/NatsServer.cs` (line 695-714: add `MqttClient` to closed snapshot) + +**Step 1: Add `MqttClient` to `ConnzOptions`** + +In `src/NATS.Server/Monitoring/Connz.cs`, add after `FilterSubject` property (line 205): + +```csharp +public string MqttClient { get; set; } = ""; +``` + +**Step 2: Parse `mqtt_client` query param in handler** + +In `src/NATS.Server/Monitoring/ConnzHandler.cs` `ParseQueryParams` method, add after the existing `limit` parse block (around line 198): + +```csharp +if (q.TryGetValue("mqtt_client", out var mqttClient)) + opts.MqttClient = mqttClient.ToString(); +``` + +**Step 3: Apply `mqtt_client` filter in `HandleConnz`** + +In `src/NATS.Server/Monitoring/ConnzHandler.cs` `HandleConnz` method, add after the closed connections collection block (after line 29) and before the sort validation (line 32): + +```csharp +// Filter by MQTT client ID +if (!string.IsNullOrEmpty(opts.MqttClient)) + connInfos = connInfos.Where(c => c.MqttClient == opts.MqttClient).ToList(); +``` + +**Step 4: Add `MqttClient` to `ClosedClient` model** + +In `src/NATS.Server/Monitoring/ClosedClient.cs`, add after line 24 (`TlsCipherSuite`): + +```csharp +public string MqttClient { get; init; } = ""; +``` + +**Step 5: Add `MqttClient` to closed snapshot creation in `NatsServer.RemoveClient`** + +In `src/NATS.Server/NatsServer.cs` around line 713 (inside the `new ClosedClient { ... }` block), add: + +```csharp +MqttClient = "", // populated when MQTT transport is implemented +``` + +**Step 6: Add `MqttClient` to `BuildClosedConnInfo`** + +In `src/NATS.Server/Monitoring/ConnzHandler.cs` `BuildClosedConnInfo` method (line 119-146), add to the `new ConnInfo { ... }` initializer: + +```csharp +MqttClient = closed.MqttClient, +``` + +**Step 7: Run connz mqtt filter tests** + +Run: `dotnet test tests/NATS.Server.Tests/NATS.Server.Tests.csproj --filter "FullyQualifiedName~MonitorTests.Connz_filters_by_mqtt_client" -v minimal` +Expected: PASS. + +**Step 8: Commit implementation checkpoint** + +```bash +git add src/NATS.Server/Monitoring/Connz.cs src/NATS.Server/Monitoring/ConnzHandler.cs src/NATS.Server/Monitoring/ClosedClient.cs src/NATS.Server/NatsServer.cs +git commit -m "feat: add connz mqtt_client filtering" +``` + +--- + +### Task 5: Verification checkpoint for JWT + connz tasks + +**Step 1: Run all JWT connection-type tests** + +Run: `dotnet test tests/NATS.Server.Tests/NATS.Server.Tests.csproj --filter "FullyQualifiedName~JwtAuthenticatorTests.Allowed_connection_types" -v minimal` +Expected: PASS. + +**Step 2: Run all connz tests** + +Run: `dotnet test tests/NATS.Server.Tests/NATS.Server.Tests.csproj --filter "FullyQualifiedName~MonitorTests.Connz" -v minimal` +Expected: PASS. + +**Step 3: Run full test suite** + +Run: `dotnet test tests/NATS.Server.Tests/NATS.Server.Tests.csproj -v minimal` +Expected: PASS (no regressions). + +--- + +### Task 6: Add MqttOptions model and config parsing + +**Files:** +- Create: `src/NATS.Server/MqttOptions.cs` +- Modify: `src/NATS.Server/NatsOptions.cs` (line 116-117: add `Mqtt` property) +- Modify: `src/NATS.Server/Configuration/ConfigProcessor.cs` (line 248: add `mqtt` case; add `ParseMqtt` + `ParseMqttAuth` + `ParseMqttTls` + `ToDouble` methods) + +**Step 1: Create `MqttOptions` model** + +Create new file `src/NATS.Server/MqttOptions.cs`. This matches Go `MQTTOpts` struct (golang/nats-server/server/opts.go:613-707): + +```csharp +namespace NATS.Server; + +/// +/// MQTT protocol configuration options. +/// Corresponds to Go server/opts.go MQTTOpts struct. +/// Config is parsed and stored but no MQTT listener is started yet. +/// +public sealed class MqttOptions +{ + // Network + public string Host { get; set; } = ""; + public int Port { get; set; } + + // Auth override (MQTT-specific, separate from global auth) + public string? NoAuthUser { get; set; } + public string? Username { get; set; } + public string? Password { get; set; } + public string? Token { get; set; } + public double AuthTimeout { get; set; } + + // TLS + public string? TlsCert { get; set; } + public string? TlsKey { get; set; } + public string? TlsCaCert { get; set; } + public bool TlsVerify { get; set; } + public double TlsTimeout { get; set; } = 2.0; + public bool TlsMap { get; set; } + public HashSet? TlsPinnedCerts { get; set; } + + // JetStream integration + public string? JsDomain { get; set; } + public int StreamReplicas { get; set; } + public int ConsumerReplicas { get; set; } + public bool ConsumerMemoryStorage { get; set; } + public TimeSpan ConsumerInactiveThreshold { get; set; } + + // QoS + public TimeSpan AckWait { get; set; } = TimeSpan.FromSeconds(30); + public ushort MaxAckPending { get; set; } + public TimeSpan JsApiTimeout { get; set; } = TimeSpan.FromSeconds(5); + + public bool HasTls => TlsCert != null && TlsKey != null; +} +``` + +**Step 2: Add `Mqtt` property to `NatsOptions`** + +In `src/NATS.Server/NatsOptions.cs`, add before the `HasTls` property (around line 117): + +```csharp +// MQTT configuration (parsed from config, no listener yet) +public MqttOptions? Mqtt { get; set; } +``` + +**Step 3: Add `ToDouble` helper to `ConfigProcessor`** + +In `src/NATS.Server/Configuration/ConfigProcessor.cs`, add after the `ToString` helper (around line 654): + +```csharp +private static double ToDouble(object? value) => value switch +{ + double d => d, + long l => l, + int i => i, + string s when double.TryParse(s, NumberStyles.Float, CultureInfo.InvariantCulture, out var d) => d, + _ => throw new FormatException($"Cannot convert {value?.GetType().Name ?? "null"} to double"), +}; +``` + +**Step 4: Add `mqtt` case to `ProcessKey` switch** + +In `src/NATS.Server/Configuration/ConfigProcessor.cs`, replace the default case comment at line 248: + +```csharp +// MQTT +case "mqtt": + if (value is Dictionary mqttDict) + ParseMqtt(mqttDict, opts, errors); + break; + +// Unknown keys silently ignored (cluster, jetstream, gateway, leafnode, etc.) +default: + break; +``` + +**Step 5: Add `ParseMqtt` method** + +Add this method after `ParseTags` (around line 621). It follows the exact key/alias structure from Go `parseMQTT` (opts.go:5443-5541): + +```csharp +// ─── MQTT parsing ───────────────────────────────────────────── + +private static void ParseMqtt(Dictionary dict, NatsOptions opts, List errors) +{ + var mqtt = opts.Mqtt ?? new MqttOptions(); + + foreach (var (key, value) in dict) + { + switch (key.ToLowerInvariant()) + { + case "listen": + var (host, port) = ParseHostPort(value); + if (host is not null) mqtt.Host = host; + if (port is not null) mqtt.Port = port.Value; + break; + case "port": + mqtt.Port = ToInt(value); + break; + case "host" or "net": + mqtt.Host = ToString(value); + break; + case "no_auth_user": + mqtt.NoAuthUser = ToString(value); + break; + case "tls": + if (value is Dictionary tlsDict) + ParseMqttTls(tlsDict, mqtt, errors); + break; + case "authorization" or "authentication": + if (value is Dictionary authDict) + ParseMqttAuth(authDict, mqtt, errors); + break; + case "ack_wait" or "ackwait": + mqtt.AckWait = ParseDuration(value); + break; + case "js_api_timeout" or "api_timeout": + mqtt.JsApiTimeout = ParseDuration(value); + break; + case "max_ack_pending" or "max_pending" or "max_inflight": + var pending = ToInt(value); + if (pending < 0 || pending > 0xFFFF) + errors.Add($"mqtt max_ack_pending invalid value {pending}, should be in [0..{0xFFFF}] range"); + else + mqtt.MaxAckPending = (ushort)pending; + break; + case "js_domain": + mqtt.JsDomain = ToString(value); + break; + case "stream_replicas": + mqtt.StreamReplicas = ToInt(value); + break; + case "consumer_replicas": + mqtt.ConsumerReplicas = ToInt(value); + break; + case "consumer_memory_storage": + mqtt.ConsumerMemoryStorage = ToBool(value); + break; + case "consumer_inactive_threshold" or "consumer_auto_cleanup": + mqtt.ConsumerInactiveThreshold = ParseDuration(value); + break; + default: + // Unknown MQTT keys silently ignored + break; + } + } + + opts.Mqtt = mqtt; +} + +private static void ParseMqttAuth(Dictionary dict, MqttOptions mqtt, List errors) +{ + foreach (var (key, value) in dict) + { + switch (key.ToLowerInvariant()) + { + case "user" or "username": + mqtt.Username = ToString(value); + break; + case "pass" or "password": + mqtt.Password = ToString(value); + break; + case "token": + mqtt.Token = ToString(value); + break; + case "timeout": + mqtt.AuthTimeout = ToDouble(value); + break; + default: + break; + } + } +} + +private static void ParseMqttTls(Dictionary dict, MqttOptions mqtt, List errors) +{ + foreach (var (key, value) in dict) + { + switch (key.ToLowerInvariant()) + { + case "cert_file": + mqtt.TlsCert = ToString(value); + break; + case "key_file": + mqtt.TlsKey = ToString(value); + break; + case "ca_file": + mqtt.TlsCaCert = ToString(value); + break; + case "verify": + mqtt.TlsVerify = ToBool(value); + break; + case "verify_and_map": + var map = ToBool(value); + mqtt.TlsMap = map; + if (map) mqtt.TlsVerify = true; + break; + case "timeout": + mqtt.TlsTimeout = ToDouble(value); + break; + case "pinned_certs": + if (value is List pinnedList) + { + var certs = new HashSet(StringComparer.OrdinalIgnoreCase); + foreach (var item in pinnedList) + { + if (item is string s) + certs.Add(s.ToLowerInvariant()); + } + mqtt.TlsPinnedCerts = certs; + } + break; + default: + break; + } + } +} +``` + +**Step 6: Build to verify compilation** + +Run: `dotnet build` +Expected: Build succeeded. + +**Step 7: Commit** + +```bash +git add src/NATS.Server/MqttOptions.cs src/NATS.Server/NatsOptions.cs src/NATS.Server/Configuration/ConfigProcessor.cs +git commit -m "feat: add mqtt config model and parser for all Go MQTTOpts fields" +``` + +--- + +### Task 7: Add MQTT config parsing tests + +**Files:** +- Create: `tests/NATS.Server.Tests/TestData/mqtt.conf` +- Modify: `tests/NATS.Server.Tests/ConfigProcessorTests.cs` + +**Step 1: Create MQTT test config file** + +Create `tests/NATS.Server.Tests/TestData/mqtt.conf`: + +``` +mqtt { + listen: "10.0.0.1:1883" + no_auth_user: "mqtt_default" + + authorization { + user: "mqtt_user" + pass: "mqtt_pass" + token: "mqtt_token" + timeout: 3.0 + } + + tls { + cert_file: "/path/to/mqtt-cert.pem" + key_file: "/path/to/mqtt-key.pem" + ca_file: "/path/to/mqtt-ca.pem" + verify: true + timeout: 5.0 + } + + ack_wait: "60s" + max_ack_pending: 2048 + js_domain: "mqtt-domain" + js_api_timeout: "10s" + stream_replicas: 3 + consumer_replicas: 1 + consumer_memory_storage: true + consumer_inactive_threshold: "5m" +} +``` + +Ensure this file is copied to output: check that `.csproj` has a wildcard for TestData, or add: +```xml + + + +``` + +**Step 2: Add MQTT config tests** + +Add to `tests/NATS.Server.Tests/ConfigProcessorTests.cs`: + +```csharp +// ─── MQTT config ──────────────────────────────────────────── + +[Fact] +public void MqttConf_ListenHostAndPort() +{ + var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("mqtt.conf")); + opts.Mqtt.ShouldNotBeNull(); + opts.Mqtt!.Host.ShouldBe("10.0.0.1"); + opts.Mqtt.Port.ShouldBe(1883); +} + +[Fact] +public void MqttConf_NoAuthUser() +{ + var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("mqtt.conf")); + opts.Mqtt.ShouldNotBeNull(); + opts.Mqtt!.NoAuthUser.ShouldBe("mqtt_default"); +} + +[Fact] +public void MqttConf_Authorization() +{ + var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("mqtt.conf")); + opts.Mqtt.ShouldNotBeNull(); + opts.Mqtt!.Username.ShouldBe("mqtt_user"); + opts.Mqtt.Password.ShouldBe("mqtt_pass"); + opts.Mqtt.Token.ShouldBe("mqtt_token"); + opts.Mqtt.AuthTimeout.ShouldBe(3.0); +} + +[Fact] +public void MqttConf_Tls() +{ + var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("mqtt.conf")); + opts.Mqtt.ShouldNotBeNull(); + opts.Mqtt!.TlsCert.ShouldBe("/path/to/mqtt-cert.pem"); + opts.Mqtt.TlsKey.ShouldBe("/path/to/mqtt-key.pem"); + opts.Mqtt.TlsCaCert.ShouldBe("/path/to/mqtt-ca.pem"); + opts.Mqtt.TlsVerify.ShouldBeTrue(); + opts.Mqtt.TlsTimeout.ShouldBe(5.0); + opts.Mqtt.HasTls.ShouldBeTrue(); +} + +[Fact] +public void MqttConf_QosSettings() +{ + var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("mqtt.conf")); + opts.Mqtt.ShouldNotBeNull(); + opts.Mqtt!.AckWait.ShouldBe(TimeSpan.FromSeconds(60)); + opts.Mqtt.MaxAckPending.ShouldBe((ushort)2048); + opts.Mqtt.JsApiTimeout.ShouldBe(TimeSpan.FromSeconds(10)); +} + +[Fact] +public void MqttConf_JetStreamSettings() +{ + var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("mqtt.conf")); + opts.Mqtt.ShouldNotBeNull(); + opts.Mqtt!.JsDomain.ShouldBe("mqtt-domain"); + opts.Mqtt.StreamReplicas.ShouldBe(3); + opts.Mqtt.ConsumerReplicas.ShouldBe(1); + opts.Mqtt.ConsumerMemoryStorage.ShouldBeTrue(); + opts.Mqtt.ConsumerInactiveThreshold.ShouldBe(TimeSpan.FromMinutes(5)); +} + +[Fact] +public void MqttConf_MaxAckPendingValidation_ReportsError() +{ + var ex = Should.Throw(() => + ConfigProcessor.ProcessConfig(""" + mqtt { + max_ack_pending: 70000 + } + """)); + ex.Errors.ShouldContain(e => e.Contains("max_ack_pending")); +} + +[Fact] +public void MqttConf_Aliases() +{ + // Test alias keys: "ackwait" (alias for "ack_wait"), "net" (alias for "host"), + // "max_inflight" (alias for "max_ack_pending"), "consumer_auto_cleanup" (alias) + var opts = ConfigProcessor.ProcessConfig(""" + mqtt { + net: "127.0.0.1" + port: 1884 + ackwait: "45s" + max_inflight: 500 + api_timeout: "8s" + consumer_auto_cleanup: "10m" + } + """); + opts.Mqtt.ShouldNotBeNull(); + opts.Mqtt!.Host.ShouldBe("127.0.0.1"); + opts.Mqtt.Port.ShouldBe(1884); + opts.Mqtt.AckWait.ShouldBe(TimeSpan.FromSeconds(45)); + opts.Mqtt.MaxAckPending.ShouldBe((ushort)500); + opts.Mqtt.JsApiTimeout.ShouldBe(TimeSpan.FromSeconds(8)); + opts.Mqtt.ConsumerInactiveThreshold.ShouldBe(TimeSpan.FromMinutes(10)); +} + +[Fact] +public void MqttConf_Absent_ReturnsNull() +{ + var opts = ConfigProcessor.ProcessConfig("port: 4222"); + opts.Mqtt.ShouldBeNull(); +} +``` + +**Step 3: Run MQTT config tests** + +Run: `dotnet test tests/NATS.Server.Tests/NATS.Server.Tests.csproj --filter "FullyQualifiedName~ConfigProcessorTests.MqttConf" -v minimal` +Expected: PASS. + +**Step 4: Commit** + +```bash +git add tests/NATS.Server.Tests/TestData/mqtt.conf tests/NATS.Server.Tests/ConfigProcessorTests.cs +git commit -m "test: add mqtt config parsing coverage" +``` + +--- + +### Task 8: Expand MqttOptsVarz and wire into /varz + +**Files:** +- Modify: `src/NATS.Server/Monitoring/Varz.cs` (lines 350-360: expand `MqttOptsVarz`) +- Modify: `src/NATS.Server/Monitoring/VarzHandler.cs` (line 67-124: populate MQTT block from options) + +**Step 1: Expand `MqttOptsVarz` class** + +In `src/NATS.Server/Monitoring/Varz.cs`, replace the existing minimal `MqttOptsVarz` (lines 350-360) with the full Go-compatible struct (matching Go server/monitor.go:1365-1378): + +```csharp +/// +/// MQTT configuration monitoring information. +/// Corresponds to Go server/monitor.go MQTTOptsVarz struct. +/// +public sealed class MqttOptsVarz +{ + [JsonPropertyName("host")] + public string Host { get; set; } = ""; + + [JsonPropertyName("port")] + public int Port { get; set; } + + [JsonPropertyName("no_auth_user")] + public string NoAuthUser { get; set; } = ""; + + [JsonPropertyName("auth_timeout")] + public double AuthTimeout { get; set; } + + [JsonPropertyName("tls_map")] + public bool TlsMap { get; set; } + + [JsonPropertyName("tls_timeout")] + public double TlsTimeout { get; set; } + + [JsonPropertyName("tls_pinned_certs")] + public string[] TlsPinnedCerts { get; set; } = []; + + [JsonPropertyName("js_domain")] + public string JsDomain { get; set; } = ""; + + [JsonPropertyName("ack_wait")] + public long AckWait { get; set; } + + [JsonPropertyName("max_ack_pending")] + public ushort MaxAckPending { get; set; } +} +``` + +Note: Go's `AckWait` is serialized as `time.Duration` (nanoseconds as int64). We follow the same pattern used for `PingInterval` and `WriteDeadline` in the existing Varz class. + +**Step 2: Populate MQTT block in VarzHandler** + +In `src/NATS.Server/Monitoring/VarzHandler.cs`, add MQTT population to the `return new Varz { ... }` block (around line 123, after `HttpReqStats`): + +```csharp +Mqtt = BuildMqttVarz(), +``` + +And add the helper method to `VarzHandler`: + +```csharp +private MqttOptsVarz BuildMqttVarz() +{ + var mqtt = _options.Mqtt; + if (mqtt is null) + return new MqttOptsVarz(); + + return new MqttOptsVarz + { + Host = mqtt.Host, + Port = mqtt.Port, + NoAuthUser = mqtt.NoAuthUser ?? "", + AuthTimeout = mqtt.AuthTimeout, + TlsMap = mqtt.TlsMap, + TlsTimeout = mqtt.TlsTimeout, + TlsPinnedCerts = mqtt.TlsPinnedCerts?.ToArray() ?? [], + JsDomain = mqtt.JsDomain ?? "", + AckWait = (long)mqtt.AckWait.TotalNanoseconds, + MaxAckPending = mqtt.MaxAckPending, + }; +} +``` + +**Step 3: Build to verify compilation** + +Run: `dotnet build` +Expected: Build succeeded. + +**Step 4: Add varz MQTT test** + +In `tests/NATS.Server.Tests/MonitorTests.cs`, add a test that verifies the MQTT section appears in `/varz` response. If there's an existing varz test pattern, follow it. Otherwise add: + +```csharp +[Fact] +public async Task Varz_includes_mqtt_config_when_set() +{ + // Start server with monitoring enabled and mqtt config set. + // GET /varz. + // Assert response contains "mqtt" block with expected host/port values. +} +``` + +The exact test implementation depends on how the existing varz tests create and query the server — follow the existing pattern in MonitorTests.cs. + +**Step 5: Run full test suite** + +Run: `dotnet test tests/NATS.Server.Tests/NATS.Server.Tests.csproj -v minimal` +Expected: PASS. + +**Step 6: Commit** + +```bash +git add src/NATS.Server/Monitoring/Varz.cs src/NATS.Server/Monitoring/VarzHandler.cs tests/NATS.Server.Tests/MonitorTests.cs +git commit -m "feat: expand mqtt varz monitoring with all Go-compatible fields" +``` + +--- + +### Task 9: Final verification and differences.md update + +**Files:** +- Modify: `differences.md` + +**Step 1: Run full test suite** + +Run: `dotnet test tests/NATS.Server.Tests/NATS.Server.Tests.csproj -v minimal` +Expected: PASS (all tests green, no regressions). + +**Step 2: Update parity document** + +Edit `differences.md`: + +1. In the **Connection Types** table (section 2), update the MQTT row: + +```markdown +| MQTT clients | Y | Partial | JWT connection-type constants + config parsing; no MQTT transport yet | +``` + +2. In the **Connz Response** table (section 7), update the MQTT client ID filtering row: + +```markdown +| MQTT client ID filtering | Y | Y | `mqtt_client` query param filters open and closed connections | +``` + +3. In the **Missing Options Categories** (section 6), replace the "WebSocket/MQTT options" line: + +```markdown +- WebSocket options +- ~~MQTT options~~ — `mqtt {}` config block parsed with all Go `MQTTOpts` fields; no listener yet +``` + +4. In the **Auth Mechanisms** table (section 5), add note to JWT row: + +```markdown +| JWT validation | Y | Y | ... + `allowed_connection_types` enforcement with Go-compatible semantics | +``` + +**Step 3: Commit docs update** + +```bash +git add differences.md +git commit -m "docs: update differences.md for mqtt connection type parity" +``` diff --git a/docs/plans/2026-02-23-mqtt-connection-type-plan.md.tasks.json b/docs/plans/2026-02-23-mqtt-connection-type-plan.md.tasks.json new file mode 100644 index 0000000..48f9f24 --- /dev/null +++ b/docs/plans/2026-02-23-mqtt-connection-type-plan.md.tasks.json @@ -0,0 +1,15 @@ +{ + "planPath": "docs/plans/2026-02-23-mqtt-connection-type-plan.md", + "tasks": [ + {"id": 2, "subject": "Task 1: Add failing JWT connection-type behavior tests", "status": "pending"}, + {"id": 3, "subject": "Task 2: Implement auth connection-type model and Go-style conversion", "status": "pending", "blockedBy": [2]}, + {"id": 4, "subject": "Task 3: Add failing connz mqtt_client filter tests", "status": "pending", "blockedBy": [3]}, + {"id": 5, "subject": "Task 4: Implement connz mqtt_client filtering", "status": "pending", "blockedBy": [4]}, + {"id": 6, "subject": "Task 5: Verification checkpoint for JWT + connz tasks", "status": "pending", "blockedBy": [5]}, + {"id": 7, "subject": "Task 6: Add MqttOptions model and config parsing", "status": "pending", "blockedBy": [6]}, + {"id": 8, "subject": "Task 7: Add MQTT config parsing tests", "status": "pending", "blockedBy": [7]}, + {"id": 9, "subject": "Task 8: Expand MqttOptsVarz and wire into /varz", "status": "pending", "blockedBy": [8]}, + {"id": 10, "subject": "Task 9: Final verification and differences.md update", "status": "pending", "blockedBy": [9]} + ], + "lastUpdated": "2026-02-23T00:00:00Z" +}