docs: add post-baseline jetstream parity plan
This commit is contained in:
687
docs/plans/2026-02-23-jetstream-post-baseline-parity-plan.md
Normal file
687
docs/plans/2026-02-23-jetstream-post-baseline-parity-plan.md
Normal file
@@ -0,0 +1,687 @@
|
||||
# JetStream Post-Baseline Remaining Parity Implementation Plan
|
||||
|
||||
> **For Codex:** REQUIRED SUB-SKILL: Use `executeplan` to implement this plan task-by-task.
|
||||
|
||||
**Goal:** Port all remaining Go JetStream functionality still marked `Baseline` or `N` (plus required gateway/leaf/account prerequisites) so parity rows can be closed with test evidence.
|
||||
|
||||
**Architecture:** Execute in dependency order: first finish inter-server prerequisite semantics (account interest propagation, gateway reply remap, leaf loop/account mapping), then complete JetStream runtime policy behavior, then harden storage and RAFT cluster semantics, and finally close observability and parity documentation gates. Every feature is implemented test-first and validated with focused + full-suite verification.
|
||||
|
||||
**Tech Stack:** .NET 10, C# 14, xUnit 3, Shouldly, NSubstitute, System.Text.Json, `dotnet test`, bash tooling.
|
||||
|
||||
---
|
||||
|
||||
**Execution guardrails**
|
||||
- Use `@test-driven-development` for every task.
|
||||
- If any protocol/runtime behavior is unclear, use `@systematic-debugging` before modifying production code.
|
||||
- Keep commits small and task-scoped.
|
||||
- Run `@verification-before-completion` before any completion claim.
|
||||
|
||||
### Task 1: Add Account-Scoped Inter-Server Interest Protocol (`A+`/`A-`)
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/NATS.Server/Subscriptions/RemoteSubscription.cs`
|
||||
- Modify: `src/NATS.Server/Subscriptions/SubList.cs`
|
||||
- Modify: `src/NATS.Server/Gateways/GatewayConnection.cs`
|
||||
- Modify: `src/NATS.Server/Gateways/GatewayManager.cs`
|
||||
- Modify: `src/NATS.Server/NatsServer.cs`
|
||||
- Test: `tests/NATS.Server.Tests/InterServerAccountProtocolTests.cs`
|
||||
|
||||
**Step 1: Write the failing test**
|
||||
|
||||
```csharp
|
||||
[Fact]
|
||||
public async Task Aplus_Aminus_frames_include_account_scope_and_do_not_leak_interest_across_accounts()
|
||||
{
|
||||
await using var fx = await InterServerAccountProtocolFixture.StartTwoServersAsync();
|
||||
await fx.SubscribeAsync(account: "A", subject: "orders.*");
|
||||
(await fx.HasRemoteInterestAsync(account: "B", subject: "orders.created")).ShouldBeFalse();
|
||||
}
|
||||
```
|
||||
|
||||
**Step 2: Run test to verify it fails**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~InterServerAccountProtocolTests" -v minimal`
|
||||
Expected: FAIL because remote interest is currently global-only and account metadata is not carried.
|
||||
|
||||
**Step 3: Write minimal implementation**
|
||||
|
||||
```csharp
|
||||
public sealed record RemoteSubscription(
|
||||
string Subject,
|
||||
string? Queue,
|
||||
string RemoteId,
|
||||
string Account,
|
||||
bool IsRemoval = false);
|
||||
```
|
||||
|
||||
```csharp
|
||||
await WriteLineAsync($"A+ {account} {subject}");
|
||||
await WriteLineAsync($"A- {account} {subject}");
|
||||
```
|
||||
|
||||
**Step 4: Run test to verify it passes**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~InterServerAccountProtocolTests" -v minimal`
|
||||
Expected: PASS.
|
||||
|
||||
**Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add src/NATS.Server/Subscriptions/RemoteSubscription.cs src/NATS.Server/Subscriptions/SubList.cs src/NATS.Server/Gateways/GatewayConnection.cs src/NATS.Server/Gateways/GatewayManager.cs src/NATS.Server/NatsServer.cs tests/NATS.Server.Tests/InterServerAccountProtocolTests.cs
|
||||
git commit -m "feat: add account-scoped inter-server interest propagation"
|
||||
```
|
||||
|
||||
### Task 2: Implement Gateway Reply Remap (`_GR_.`) and Strict Interest-Only Forwarding
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/NATS.Server/Gateways/GatewayConnection.cs`
|
||||
- Modify: `src/NATS.Server/Gateways/GatewayManager.cs`
|
||||
- Modify: `src/NATS.Server/NatsServer.cs`
|
||||
- Modify: `src/NATS.Server/Protocol/ClientCommandMatrix.cs`
|
||||
- Test: `tests/NATS.Server.Tests/GatewayAdvancedSemanticsTests.cs`
|
||||
|
||||
**Step 1: Write the failing test**
|
||||
|
||||
```csharp
|
||||
[Fact]
|
||||
public async Task Gateway_forwarding_remaps_reply_subject_with_gr_prefix_and_restores_on_return()
|
||||
{
|
||||
await using var fx = await GatewayAdvancedFixture.StartAsync();
|
||||
var reply = await fx.RequestAcrossGatewayAsync("svc.echo", "ping");
|
||||
reply.ShouldBe("ping");
|
||||
}
|
||||
```
|
||||
|
||||
**Step 2: Run test to verify it fails**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~GatewayAdvancedSemanticsTests" -v minimal`
|
||||
Expected: FAIL because reply remap/de-remap contract is not implemented.
|
||||
|
||||
**Step 3: Write minimal implementation**
|
||||
|
||||
```csharp
|
||||
var mappedReply = ReplyMapper.ToGatewayReply(replyTo, localClusterId);
|
||||
await connection.SendMessageAsync(subject, mappedReply, payload, ct);
|
||||
```
|
||||
|
||||
```csharp
|
||||
if (ReplyMapper.TryRestoreGatewayReply(message.ReplyTo, out var restored))
|
||||
replyTo = restored;
|
||||
```
|
||||
|
||||
**Step 4: Run test to verify it passes**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~GatewayAdvancedSemanticsTests" -v minimal`
|
||||
Expected: PASS.
|
||||
|
||||
**Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add src/NATS.Server/Gateways/GatewayConnection.cs src/NATS.Server/Gateways/GatewayManager.cs src/NATS.Server/NatsServer.cs src/NATS.Server/Protocol/ClientCommandMatrix.cs tests/NATS.Server.Tests/GatewayAdvancedSemanticsTests.cs
|
||||
git commit -m "feat: implement gateway reply remap and strict interest-only forwarding"
|
||||
```
|
||||
|
||||
### Task 3: Implement Leaf Loop Detection (`$LDS.`) and Account Remapping
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/NATS.Server/LeafNodes/LeafConnection.cs`
|
||||
- Modify: `src/NATS.Server/LeafNodes/LeafNodeManager.cs`
|
||||
- Modify: `src/NATS.Server/NatsServer.cs`
|
||||
- Modify: `src/NATS.Server/Subscriptions/RemoteSubscription.cs`
|
||||
- Test: `tests/NATS.Server.Tests/LeafAdvancedSemanticsTests.cs`
|
||||
|
||||
**Step 1: Write the failing test**
|
||||
|
||||
```csharp
|
||||
[Fact]
|
||||
public async Task Leaf_loop_marker_blocks_reinjected_message_and_account_mapping_routes_to_expected_account()
|
||||
{
|
||||
await using var fx = await LeafAdvancedFixture.StartHubSpokeAsync();
|
||||
var result = await fx.PublishLoopCandidateAsync();
|
||||
result.DeliveredCount.ShouldBe(1);
|
||||
}
|
||||
```
|
||||
|
||||
**Step 2: Run test to verify it fails**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~LeafAdvancedSemanticsTests" -v minimal`
|
||||
Expected: FAIL because `$LDS.` loop marker and account remap rules are not enforced.
|
||||
|
||||
**Step 3: Write minimal implementation**
|
||||
|
||||
```csharp
|
||||
if (LeafLoopDetector.IsLooped(subject, localServerId))
|
||||
return;
|
||||
subject = LeafLoopDetector.Mark(subject, localServerId);
|
||||
```
|
||||
|
||||
```csharp
|
||||
var mappedAccount = _accountMapper.MapInbound(remoteAccount, localServerId);
|
||||
```
|
||||
|
||||
**Step 4: Run test to verify it passes**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~LeafAdvancedSemanticsTests" -v minimal`
|
||||
Expected: PASS.
|
||||
|
||||
**Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add src/NATS.Server/LeafNodes/LeafConnection.cs src/NATS.Server/LeafNodes/LeafNodeManager.cs src/NATS.Server/NatsServer.cs src/NATS.Server/Subscriptions/RemoteSubscription.cs tests/NATS.Server.Tests/LeafAdvancedSemanticsTests.cs
|
||||
git commit -m "feat: add leaf loop detection and account remapping semantics"
|
||||
```
|
||||
|
||||
### Task 4: Wire Internal `JETSTREAM` Client Lifecycle
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/NATS.Server/NatsServer.cs`
|
||||
- Modify: `src/NATS.Server/InternalClient.cs`
|
||||
- Modify: `src/NATS.Server/JetStream/JetStreamService.cs`
|
||||
- Test: `tests/NATS.Server.Tests/JetStreamInternalClientTests.cs`
|
||||
|
||||
**Step 1: Write the failing test**
|
||||
|
||||
```csharp
|
||||
[Fact]
|
||||
public async Task JetStream_enabled_server_creates_internal_jetstream_client_and_keeps_it_account_scoped()
|
||||
{
|
||||
await using var fx = await JetStreamInternalClientFixture.StartAsync();
|
||||
fx.HasInternalJetStreamClient.ShouldBeTrue();
|
||||
}
|
||||
```
|
||||
|
||||
**Step 2: Run test to verify it fails**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamInternalClientTests" -v minimal`
|
||||
Expected: FAIL because runtime does not currently instantiate/use an internal JetStream client path.
|
||||
|
||||
**Step 3: Write minimal implementation**
|
||||
|
||||
```csharp
|
||||
var jsClient = new InternalClient(nextId, ClientKind.JetStream, _systemAccount);
|
||||
_jetStreamService = new JetStreamService(options.JetStream, jsClient);
|
||||
```
|
||||
|
||||
**Step 4: Run test to verify it passes**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamInternalClientTests" -v minimal`
|
||||
Expected: PASS.
|
||||
|
||||
**Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add src/NATS.Server/NatsServer.cs src/NATS.Server/InternalClient.cs src/NATS.Server/JetStream/JetStreamService.cs tests/NATS.Server.Tests/JetStreamInternalClientTests.cs
|
||||
git commit -m "feat: wire internal jetstream client lifecycle"
|
||||
```
|
||||
|
||||
### Task 5: Enforce Stream Runtime Policies (`Retention`, `MaxAge`, `MaxMsgsPer`, `MaxMsgSize`)
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/NATS.Server/JetStream/Models/StreamConfig.cs`
|
||||
- Modify: `src/NATS.Server/JetStream/StreamManager.cs`
|
||||
- Modify: `src/NATS.Server/JetStream/Validation/JetStreamConfigValidator.cs`
|
||||
- Modify: `src/NATS.Server/JetStream/Storage/IStreamStore.cs`
|
||||
- Modify: `src/NATS.Server/JetStream/Storage/MemStore.cs`
|
||||
- Modify: `src/NATS.Server/JetStream/Storage/FileStore.cs`
|
||||
- Test: `tests/NATS.Server.Tests/JetStreamStreamPolicyParityTests.cs`
|
||||
|
||||
**Step 1: Write the failing test**
|
||||
|
||||
```csharp
|
||||
[Fact]
|
||||
public async Task Stream_rejects_oversize_message_and_prunes_by_max_age_and_per_subject_limits()
|
||||
{
|
||||
await using var fx = await JetStreamApiFixture.StartWithStreamJsonAsync("""
|
||||
{"name":"P","subjects":["p.*"],"max_msg_size":8,"max_age_ms":20,"max_msgs_per":1}
|
||||
""");
|
||||
(await fx.PublishAndGetAckAsync("p.a", "0123456789", expectError: true)).ErrorCode.ShouldNotBeNull();
|
||||
}
|
||||
```
|
||||
|
||||
**Step 2: Run test to verify it fails**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamStreamPolicyParityTests" -v minimal`
|
||||
Expected: FAIL because these policies are only partially enforced.
|
||||
|
||||
**Step 3: Write minimal implementation**
|
||||
|
||||
```csharp
|
||||
if (config.MaxMsgSize > 0 && payload.Length > config.MaxMsgSize)
|
||||
return new PubAck { Stream = stream.Config.Name, ErrorCode = 10054 };
|
||||
```
|
||||
|
||||
```csharp
|
||||
PruneExpiredMessages(stream, nowUtc);
|
||||
PrunePerSubject(stream, config.MaxMsgsPer);
|
||||
ApplyRetentionPolicy(stream, config.Retention);
|
||||
```
|
||||
|
||||
**Step 4: Run test to verify it passes**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamStreamPolicyParityTests" -v minimal`
|
||||
Expected: PASS.
|
||||
|
||||
**Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add src/NATS.Server/JetStream/Models/StreamConfig.cs src/NATS.Server/JetStream/StreamManager.cs src/NATS.Server/JetStream/Validation/JetStreamConfigValidator.cs src/NATS.Server/JetStream/Storage/IStreamStore.cs src/NATS.Server/JetStream/Storage/MemStore.cs src/NATS.Server/JetStream/Storage/FileStore.cs tests/NATS.Server.Tests/JetStreamStreamPolicyParityTests.cs
|
||||
git commit -m "feat: enforce stream runtime policy parity constraints"
|
||||
```
|
||||
|
||||
### Task 6: Implement Stream Config Behavior Parity (Dedup Window, Sealed/Guard Flags, RePublish/Transform/Direct)
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/NATS.Server/JetStream/Models/StreamConfig.cs`
|
||||
- Modify: `src/NATS.Server/JetStream/Publish/PublishOptions.cs`
|
||||
- Modify: `src/NATS.Server/JetStream/Publish/PublishPreconditions.cs`
|
||||
- Modify: `src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs`
|
||||
- Modify: `src/NATS.Server/JetStream/StreamManager.cs`
|
||||
- Modify: `src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs`
|
||||
- Test: `tests/NATS.Server.Tests/JetStreamStreamConfigBehaviorTests.cs`
|
||||
|
||||
**Step 1: Write the failing test**
|
||||
|
||||
```csharp
|
||||
[Fact]
|
||||
public async Task Stream_honors_dedup_window_and_sealed_delete_purge_guards()
|
||||
{
|
||||
await using var fx = await JetStreamConfigBehaviorFixture.StartSealedWithDedupWindowAsync();
|
||||
var first = await fx.PublishWithMsgIdAsync("orders.created", "m-1", "one");
|
||||
var second = await fx.PublishWithMsgIdAsync("orders.created", "m-1", "two");
|
||||
second.Seq.ShouldBe(first.Seq);
|
||||
}
|
||||
```
|
||||
|
||||
**Step 2: Run test to verify it fails**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamStreamConfigBehaviorTests" -v minimal`
|
||||
Expected: FAIL because dedup is unbounded and sealed/deny behavior is not fully enforced.
|
||||
|
||||
**Step 3: Write minimal implementation**
|
||||
|
||||
```csharp
|
||||
if (stream.Config.Sealed || (stream.Config.DenyDelete && isDelete) || (stream.Config.DenyPurge && isPurge))
|
||||
return JetStreamApiResponse.ErrorResponse(10052, "operation not allowed");
|
||||
```
|
||||
|
||||
```csharp
|
||||
_dedupe.Record(msgId, sequence, nowUtc);
|
||||
_dedupe.TrimOlderThan(window);
|
||||
```
|
||||
|
||||
**Step 4: Run test to verify it passes**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamStreamConfigBehaviorTests" -v minimal`
|
||||
Expected: PASS.
|
||||
|
||||
**Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add src/NATS.Server/JetStream/Models/StreamConfig.cs src/NATS.Server/JetStream/Publish/PublishOptions.cs src/NATS.Server/JetStream/Publish/PublishPreconditions.cs src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs src/NATS.Server/JetStream/StreamManager.cs src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs tests/NATS.Server.Tests/JetStreamStreamConfigBehaviorTests.cs
|
||||
git commit -m "feat: add stream config behavior parity for dedupe and guard flags"
|
||||
```
|
||||
|
||||
### Task 7: Implement Consumer Deliver Policy and `AckPolicy.All` Parity
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/NATS.Server/JetStream/Models/ConsumerConfig.cs`
|
||||
- Modify: `src/NATS.Server/JetStream/Consumers/AckProcessor.cs`
|
||||
- Modify: `src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs`
|
||||
- Modify: `src/NATS.Server/JetStream/ConsumerManager.cs`
|
||||
- Test: `tests/NATS.Server.Tests/JetStreamConsumerDeliverPolicyParityTests.cs`
|
||||
|
||||
**Step 1: Write the failing test**
|
||||
|
||||
```csharp
|
||||
[Fact]
|
||||
public async Task Deliver_policy_start_sequence_and_start_time_and_last_per_subject_match_expected_start_positions()
|
||||
{
|
||||
await using var fx = await JetStreamConsumerDeliverPolicyFixture.StartAsync();
|
||||
var bySeq = await fx.FetchByStartSequenceAsync(startSequence: 3);
|
||||
bySeq.Messages[0].Sequence.ShouldBe((ulong)3);
|
||||
}
|
||||
```
|
||||
|
||||
**Step 2: Run test to verify it fails**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamConsumerDeliverPolicyParityTests" -v minimal`
|
||||
Expected: FAIL because deliver policies beyond `All/Last/New` are baseline-only.
|
||||
|
||||
**Step 3: Write minimal implementation**
|
||||
|
||||
```csharp
|
||||
consumer.NextSequence = consumer.Config.DeliverPolicy switch
|
||||
{
|
||||
DeliverPolicy.ByStartSequence => consumer.Config.OptStartSeq,
|
||||
DeliverPolicy.ByStartTime => await stream.Store.LoadFirstAfterAsync(consumer.Config.OptStartTime, ct),
|
||||
DeliverPolicy.LastPerSubject => await stream.Store.LoadLastBySubjectAsync(filter, ct) is { } m ? m.Sequence : 1UL,
|
||||
_ => consumer.NextSequence
|
||||
};
|
||||
```
|
||||
|
||||
**Step 4: Run test to verify it passes**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamConsumerDeliverPolicyParityTests" -v minimal`
|
||||
Expected: PASS.
|
||||
|
||||
**Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add src/NATS.Server/JetStream/Models/ConsumerConfig.cs src/NATS.Server/JetStream/Consumers/AckProcessor.cs src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs src/NATS.Server/JetStream/ConsumerManager.cs tests/NATS.Server.Tests/JetStreamConsumerDeliverPolicyParityTests.cs
|
||||
git commit -m "feat: implement consumer deliver policy and ack-all parity semantics"
|
||||
```
|
||||
|
||||
### Task 8: Implement Consumer Redelivery Backoff, MaxDeliver, Flow Control, and Rate Limiting
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/NATS.Server/JetStream/Models/ConsumerConfig.cs`
|
||||
- Modify: `src/NATS.Server/JetStream/Consumers/AckProcessor.cs`
|
||||
- Modify: `src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs`
|
||||
- Modify: `src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs`
|
||||
- Modify: `src/NATS.Server/JetStream/ConsumerManager.cs`
|
||||
- Test: `tests/NATS.Server.Tests/JetStreamConsumerFlowControlParityTests.cs`
|
||||
- Test: `tests/NATS.Server.Tests/JetStreamConsumerBackoffParityTests.cs`
|
||||
|
||||
**Step 1: Write the failing test**
|
||||
|
||||
```csharp
|
||||
[Fact]
|
||||
public async Task Redelivery_honors_backoff_schedule_and_stops_after_max_deliver()
|
||||
{
|
||||
await using var fx = await JetStreamConsumerBackoffFixture.StartAsync();
|
||||
var deliveries = await fx.CollectRedeliveriesAsync();
|
||||
deliveries.Count.ShouldBe(3);
|
||||
}
|
||||
```
|
||||
|
||||
**Step 2: Run test to verify it fails**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamConsumerFlowControlParityTests|FullyQualifiedName~JetStreamConsumerBackoffParityTests" -v minimal`
|
||||
Expected: FAIL because flow control/rate limit/backoff/max-deliver are not fully implemented.
|
||||
|
||||
**Step 3: Write minimal implementation**
|
||||
|
||||
```csharp
|
||||
if (consumer.Config.MaxDeliver > 0 && deliveryCount >= consumer.Config.MaxDeliver)
|
||||
return DeliveryDecision.Stop;
|
||||
var nextDelay = consumer.Config.BackOffMs.Count > attempt ? consumer.Config.BackOffMs[attempt] : consumer.Config.AckWaitMs;
|
||||
```
|
||||
|
||||
```csharp
|
||||
if (consumer.Config.FlowControl)
|
||||
consumer.PushFrames.Enqueue(PushFrame.FlowControl());
|
||||
await _rateLimiter.DelayIfNeededAsync(consumer.Config.RateLimitBps, payloadSize, ct);
|
||||
```
|
||||
|
||||
**Step 4: Run test to verify it passes**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamConsumerFlowControlParityTests|FullyQualifiedName~JetStreamConsumerBackoffParityTests" -v minimal`
|
||||
Expected: PASS.
|
||||
|
||||
**Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add src/NATS.Server/JetStream/Models/ConsumerConfig.cs src/NATS.Server/JetStream/Consumers/AckProcessor.cs src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs src/NATS.Server/JetStream/ConsumerManager.cs tests/NATS.Server.Tests/JetStreamConsumerFlowControlParityTests.cs tests/NATS.Server.Tests/JetStreamConsumerBackoffParityTests.cs
|
||||
git commit -m "feat: add consumer backoff flow-control and rate-limit parity"
|
||||
```
|
||||
|
||||
### Task 9: Implement Mirror/Source Advanced Semantics (Sync State, Subject Mapping, Cross-Account)
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/NATS.Server/JetStream/Models/StreamConfig.cs`
|
||||
- Modify: `src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs`
|
||||
- Modify: `src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs`
|
||||
- Modify: `src/NATS.Server/JetStream/StreamManager.cs`
|
||||
- Modify: `src/NATS.Server/Auth/Account.cs`
|
||||
- Test: `tests/NATS.Server.Tests/JetStreamMirrorSourceParityTests.cs`
|
||||
|
||||
**Step 1: Write the failing test**
|
||||
|
||||
```csharp
|
||||
[Fact]
|
||||
public async Task Source_subject_transform_and_cross_account_mapping_copy_expected_messages_only()
|
||||
{
|
||||
await using var fx = await JetStreamMirrorSourceParityFixture.StartCrossAccountAsync();
|
||||
var messages = await fx.ReadAggregateAsync();
|
||||
messages.ShouldContain(m => m.Subject == "agg.orders.created");
|
||||
}
|
||||
```
|
||||
|
||||
**Step 2: Run test to verify it fails**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamMirrorSourceParityTests" -v minimal`
|
||||
Expected: FAIL because source transforms/cross-account mapping and sync-state tracking are incomplete.
|
||||
|
||||
**Step 3: Write minimal implementation**
|
||||
|
||||
```csharp
|
||||
if (!_accountAuthorizer.CanImport(sourceAccount, targetAccount, sourceSubject))
|
||||
return;
|
||||
var mappedSubject = _subjectMapper.Map(sourceSubject, sourceConfig.SubjectTransform);
|
||||
```
|
||||
|
||||
```csharp
|
||||
_syncState.LastOriginSequence = message.Sequence;
|
||||
_syncState.LastSyncUtc = DateTime.UtcNow;
|
||||
```
|
||||
|
||||
**Step 4: Run test to verify it passes**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamMirrorSourceParityTests" -v minimal`
|
||||
Expected: PASS.
|
||||
|
||||
**Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add src/NATS.Server/JetStream/Models/StreamConfig.cs src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs src/NATS.Server/JetStream/StreamManager.cs src/NATS.Server/Auth/Account.cs tests/NATS.Server.Tests/JetStreamMirrorSourceParityTests.cs
|
||||
git commit -m "feat: implement mirror source advanced parity behavior"
|
||||
```
|
||||
|
||||
### Task 10: Replace JSONL Baseline with Block-Oriented FileStore Parity Features
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/NATS.Server/JetStream/Storage/FileStoreOptions.cs`
|
||||
- Modify: `src/NATS.Server/JetStream/Storage/FileStoreBlock.cs`
|
||||
- Modify: `src/NATS.Server/JetStream/Storage/FileStore.cs`
|
||||
- Modify: `src/NATS.Server/JetStream/Storage/IStreamStore.cs`
|
||||
- Modify: `src/NATS.Server/JetStream/Storage/MemStore.cs`
|
||||
- Test: `tests/NATS.Server.Tests/JetStreamFileStoreBlockParityTests.cs`
|
||||
- Test: `tests/NATS.Server.Tests/JetStreamStoreExpiryParityTests.cs`
|
||||
|
||||
**Step 1: Write the failing test**
|
||||
|
||||
```csharp
|
||||
[Fact]
|
||||
public async Task File_store_rolls_blocks_and_recovers_index_without_full_file_rewrite()
|
||||
{
|
||||
await using var fx = await JetStreamFileStoreBlockFixture.StartAsync();
|
||||
await fx.AppendManyAsync(5000);
|
||||
(await fx.BlockCountAsync()).ShouldBeGreaterThan(1);
|
||||
}
|
||||
```
|
||||
|
||||
**Step 2: Run test to verify it fails**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamFileStoreBlockParityTests|FullyQualifiedName~JetStreamStoreExpiryParityTests" -v minimal`
|
||||
Expected: FAIL because current store is single JSONL dictionary rewrite.
|
||||
|
||||
**Step 3: Write minimal implementation**
|
||||
|
||||
```csharp
|
||||
if (activeBlock.SizeBytes >= _options.BlockSizeBytes)
|
||||
activeBlock = CreateNextBlock();
|
||||
await activeBlock.AppendAsync(record, ct);
|
||||
_index[sequence] = new BlockPointer(activeBlock.Id, offset);
|
||||
```
|
||||
|
||||
```csharp
|
||||
if (config.MaxAgeMs > 0)
|
||||
await PruneExpiredAsync(DateTime.UtcNow, ct);
|
||||
```
|
||||
|
||||
**Step 4: Run test to verify it passes**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamFileStoreBlockParityTests|FullyQualifiedName~JetStreamStoreExpiryParityTests" -v minimal`
|
||||
Expected: PASS.
|
||||
|
||||
**Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add src/NATS.Server/JetStream/Storage/FileStoreOptions.cs src/NATS.Server/JetStream/Storage/FileStoreBlock.cs src/NATS.Server/JetStream/Storage/FileStore.cs src/NATS.Server/JetStream/Storage/IStreamStore.cs src/NATS.Server/JetStream/Storage/MemStore.cs tests/NATS.Server.Tests/JetStreamFileStoreBlockParityTests.cs tests/NATS.Server.Tests/JetStreamStoreExpiryParityTests.cs
|
||||
git commit -m "feat: implement block-based filestore and expiry parity"
|
||||
```
|
||||
|
||||
### Task 11: Implement RAFT Advanced Consensus Semantics (Heartbeat, NextIndex, Snapshot Transfer, Membership)
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/NATS.Server/Raft/RaftRpcContracts.cs`
|
||||
- Modify: `src/NATS.Server/Raft/RaftTransport.cs`
|
||||
- Modify: `src/NATS.Server/Raft/RaftReplicator.cs`
|
||||
- Modify: `src/NATS.Server/Raft/RaftNode.cs`
|
||||
- Modify: `src/NATS.Server/Raft/RaftLog.cs`
|
||||
- Modify: `src/NATS.Server/Raft/RaftSnapshotStore.cs`
|
||||
- Test: `tests/NATS.Server.Tests/RaftConsensusAdvancedParityTests.cs`
|
||||
- Test: `tests/NATS.Server.Tests/RaftSnapshotTransferParityTests.cs`
|
||||
- Test: `tests/NATS.Server.Tests/RaftMembershipParityTests.cs`
|
||||
|
||||
**Step 1: Write the failing test**
|
||||
|
||||
```csharp
|
||||
[Fact]
|
||||
public async Task Leader_heartbeats_keep_followers_current_and_next_index_backtracks_on_mismatch()
|
||||
{
|
||||
var cluster = await RaftAdvancedFixture.StartAsync();
|
||||
var result = await cluster.ProposeWithFollowerDivergenceAsync("set x=1");
|
||||
result.QuorumCommitted.ShouldBeTrue();
|
||||
}
|
||||
```
|
||||
|
||||
**Step 2: Run test to verify it fails**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RaftConsensusAdvancedParityTests|FullyQualifiedName~RaftSnapshotTransferParityTests|FullyQualifiedName~RaftMembershipParityTests" -v minimal`
|
||||
Expected: FAIL because heartbeat/next-index/snapshot-transfer/membership behavior is missing.
|
||||
|
||||
**Step 3: Write minimal implementation**
|
||||
|
||||
```csharp
|
||||
await _transport.AppendEntriesAsync(Id, followerIds, heartbeatEntry, ct);
|
||||
while (!followerAccepts)
|
||||
nextIndex[followerId]--;
|
||||
```
|
||||
|
||||
```csharp
|
||||
if (nextIndex[followerId] <= snapshot.LastIncludedIndex)
|
||||
await _transport.InstallSnapshotAsync(Id, followerId, snapshot, ct);
|
||||
```
|
||||
|
||||
**Step 4: Run test to verify it passes**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RaftConsensusAdvancedParityTests|FullyQualifiedName~RaftSnapshotTransferParityTests|FullyQualifiedName~RaftMembershipParityTests" -v minimal`
|
||||
Expected: PASS.
|
||||
|
||||
**Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add src/NATS.Server/Raft/RaftRpcContracts.cs src/NATS.Server/Raft/RaftTransport.cs src/NATS.Server/Raft/RaftReplicator.cs src/NATS.Server/Raft/RaftNode.cs src/NATS.Server/Raft/RaftLog.cs src/NATS.Server/Raft/RaftSnapshotStore.cs tests/NATS.Server.Tests/RaftConsensusAdvancedParityTests.cs tests/NATS.Server.Tests/RaftSnapshotTransferParityTests.cs tests/NATS.Server.Tests/RaftMembershipParityTests.cs
|
||||
git commit -m "feat: add raft advanced consensus and snapshot transfer parity"
|
||||
```
|
||||
|
||||
### Task 12: Implement JetStream Cluster Governance and Cross-Cluster JetStream Path
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs`
|
||||
- Modify: `src/NATS.Server/JetStream/Cluster/StreamReplicaGroup.cs`
|
||||
- Modify: `src/NATS.Server/JetStream/Cluster/AssetPlacementPlanner.cs`
|
||||
- Modify: `src/NATS.Server/NatsServer.cs`
|
||||
- Modify: `src/NATS.Server/Gateways/GatewayManager.cs`
|
||||
- Test: `tests/NATS.Server.Tests/JetStreamClusterGovernanceParityTests.cs`
|
||||
- Test: `tests/NATS.Server.Tests/JetStreamCrossClusterGatewayParityTests.cs`
|
||||
|
||||
**Step 1: Write the failing test**
|
||||
|
||||
```csharp
|
||||
[Fact]
|
||||
public async Task Cross_cluster_stream_create_and_publish_replicate_through_gateway_with_cluster_governance()
|
||||
{
|
||||
await using var fx = await JetStreamCrossClusterFixture.StartAsync();
|
||||
var ack = await fx.PublishAsync("ORDERS", "orders.created", "1");
|
||||
ack.ErrorCode.ShouldBeNull();
|
||||
}
|
||||
```
|
||||
|
||||
**Step 2: Run test to verify it fails**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamClusterGovernanceParityTests|FullyQualifiedName~JetStreamCrossClusterGatewayParityTests" -v minimal`
|
||||
Expected: FAIL because cluster governance and cross-cluster JS path remain baseline/incomplete.
|
||||
|
||||
**Step 3: Write minimal implementation**
|
||||
|
||||
```csharp
|
||||
var placement = _assetPlanner.PlanReplicas(streamConfig.Replicas);
|
||||
await _metaGroup.ProposeCreateStreamAsync(streamConfig, ct);
|
||||
await _replicaGroup.ApplyPlacementAsync(placement, ct);
|
||||
```
|
||||
|
||||
```csharp
|
||||
if (IsJetStreamReplicationMessage(message))
|
||||
await _gatewayManager.ForwardJetStreamClusterMessageAsync(message, ct);
|
||||
```
|
||||
|
||||
**Step 4: Run test to verify it passes**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamClusterGovernanceParityTests|FullyQualifiedName~JetStreamCrossClusterGatewayParityTests" -v minimal`
|
||||
Expected: PASS.
|
||||
|
||||
**Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs src/NATS.Server/JetStream/Cluster/StreamReplicaGroup.cs src/NATS.Server/JetStream/Cluster/AssetPlacementPlanner.cs src/NATS.Server/NatsServer.cs src/NATS.Server/Gateways/GatewayManager.cs tests/NATS.Server.Tests/JetStreamClusterGovernanceParityTests.cs tests/NATS.Server.Tests/JetStreamCrossClusterGatewayParityTests.cs
|
||||
git commit -m "feat: complete jetstream cluster governance and cross-cluster parity path"
|
||||
```
|
||||
|
||||
### Task 13: Final Parity Closure Evidence and Documentation Update
|
||||
|
||||
**Files:**
|
||||
- Modify: `differences.md`
|
||||
- Modify: `docs/plans/2026-02-23-jetstream-remaining-parity-map.md`
|
||||
- Modify: `docs/plans/2026-02-23-jetstream-remaining-parity-verification.md`
|
||||
|
||||
**Step 1: Write the failing test**
|
||||
|
||||
```csharp
|
||||
[Fact]
|
||||
public void Differences_md_has_no_remaining_jetstream_baseline_or_n_rows()
|
||||
{
|
||||
var report = ParityDocInspector.Load("differences.md");
|
||||
report.RemainingJetStreamRows.ShouldBeEmpty();
|
||||
}
|
||||
```
|
||||
|
||||
**Step 2: Run test to verify it fails**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~DifferencesParityClosureTests" -v minimal`
|
||||
Expected: FAIL until docs are updated to match implemented parity status.
|
||||
|
||||
**Step 3: Write minimal implementation**
|
||||
|
||||
```markdown
|
||||
## Summary: Remaining Gaps
|
||||
|
||||
### JetStream
|
||||
None in scope after this plan; all in-scope parity rows moved to `Y`.
|
||||
```
|
||||
|
||||
**Step 4: Run full verification**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStream|FullyQualifiedName~Raft|FullyQualifiedName~Route|FullyQualifiedName~Gateway|FullyQualifiedName~Leaf|FullyQualifiedName~Account" -v minimal`
|
||||
Expected: PASS.
|
||||
|
||||
Run: `dotnet test -v minimal`
|
||||
Expected: PASS.
|
||||
|
||||
**Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add differences.md docs/plans/2026-02-23-jetstream-remaining-parity-map.md docs/plans/2026-02-23-jetstream-remaining-parity-verification.md
|
||||
git commit -m "docs: close post-baseline jetstream parity gaps with verification evidence"
|
||||
```
|
||||
Reference in New Issue
Block a user