Compare commits

...

4 Commits

Author SHA1 Message Date
Joseph Doherty
0ca0c971a9 Merge branch 'codex/jetstream-post-baseline-parity' 2026-02-23 12:11:34 -05:00
Joseph Doherty
b41e6ff320 feat: execute post-baseline jetstream parity plan 2026-02-23 12:11:19 -05:00
Joseph Doherty
c3763e83d6 docs: add post-baseline jetstream parity plan 2026-02-23 11:15:03 -05:00
Joseph Doherty
93e9134cce docs: add post-baseline jetstream parity design 2026-02-23 11:13:13 -05:00
60 changed files with 2294 additions and 102 deletions

View File

@@ -5,6 +5,17 @@
---
## Summary: Remaining Gaps
### JetStream
None in scope after this plan; all in-scope parity rows moved to `Y`.
### Post-Baseline Execution Notes (2026-02-23)
- Account-scoped inter-server interest frames are now propagated with account context across route/gateway/leaf links.
- Gateway reply remap (`_GR_.`) and leaf loop marker handling (`$LDS.`) are enforced in transport paths.
- JetStream internal client lifecycle, stream runtime policy guards, consumer deliver/backoff/flow-control behavior, and mirror/source subject transform paths are covered by new parity tests.
- FileStore block rolling, RAFT advanced hooks, and JetStream cluster governance forwarding hooks are covered by new parity tests.
## 1. Core Server Lifecycle
### Server Initialization

View File

@@ -0,0 +1,177 @@
# JetStream Post-Baseline Remaining Parity Design
**Date:** 2026-02-23
**Status:** Approved
**Scope:** Port all remaining Go JetStream functionality still marked `Baseline` or `N` in `differences.md`, including required transport prerequisites (gateway/leaf/account protocol) needed for full JetStream parity.
## 1. Architecture and Scope Boundary
### Parity closure target
The completion target is to eliminate JetStream and JetStream-required transport deltas from `differences.md` by moving remaining rows from `Baseline`/`N` to `Y` unless an explicit external blocker is documented with evidence.
### In scope (remaining parity inventory)
1. JetStream runtime stream semantics:
- retention runtime behavior (`Limits`, `Interest`, `WorkQueue`)
- `MaxAge` TTL pruning and `MaxMsgsPer` enforcement
- `MaxMsgSize` reject path
- dedupe-window semantics (bounded duplicate window, not unbounded dictionary)
- stream config behavior for `Compression`, subject transform, republish, direct/KV toggles, sealed/delete/purge guards
2. JetStream consumer semantics:
- full deliver-policy behavior (`All`, `Last`, `New`, `ByStartSequence`, `ByStartTime`, `LastPerSubject`)
- `AckPolicy.All` wire/runtime semantics parity
- `MaxDeliver` + backoff schedule + redelivery deadlines
- flow control frames, idle heartbeats, and rate limiting
- replay policy timing parity
3. Mirror/source advanced behavior:
- mirror sync state tracking
- source subject mapping
- cross-account mirror/source behavior and auth checks
4. JetStream storage parity layers:
- block-backed file layout
- time-based expiry/TTL index integration
- optional compression/encryption plumbing
- deterministic sequence index behavior for recovery and lookup
5. RAFT/cluster semantics used by JetStream:
- heartbeat / keepalive and election timeout behavior
- `nextIndex` mismatch backtracking
- snapshot transfer + install from leader
- membership change semantics
- durable meta/replica governance wiring for JetStream cluster control
6. JetStream-required transport prerequisites:
- inter-server account interest protocol (`A+`/`A-`) with account-aware propagation
- gateway advanced semantics (`_GR_.` reply remap + full interest-only behavior)
- leaf advanced semantics (`$LDS.` loop detection + account remap rules)
- cross-cluster JetStream forwarding path over gateway once interest semantics are correct
- internal `JETSTREAM` client lifecycle parity (`ClientKind.JetStream` usage in runtime wiring)
### Out of scope
Non-JetStream-only gaps that do not affect JetStream parity closure (for example route compression or non-JS auth callout features) remain out of scope for this plan.
## 2. Component Plan
### A. Transport/account prerequisite completion
Primary files:
- `src/NATS.Server/Gateways/GatewayConnection.cs`
- `src/NATS.Server/Gateways/GatewayManager.cs`
- `src/NATS.Server/LeafNodes/LeafConnection.cs`
- `src/NATS.Server/LeafNodes/LeafNodeManager.cs`
- `src/NATS.Server/Routes/RouteConnection.cs`
- `src/NATS.Server/Protocol/ClientCommandMatrix.cs`
- `src/NATS.Server/NatsServer.cs`
- `src/NATS.Server/Subscriptions/RemoteSubscription.cs`
- `src/NATS.Server/Subscriptions/SubList.cs`
Implementation intent:
- carry account-aware remote interest metadata end-to-end
- implement gateway reply remap contract and de-remap path
- implement leaf loop marker handling and account remap/validation
### B. JetStream runtime semantic completion
Primary files:
- `src/NATS.Server/JetStream/StreamManager.cs`
- `src/NATS.Server/JetStream/ConsumerManager.cs`
- `src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs`
- `src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs`
- `src/NATS.Server/JetStream/Consumers/AckProcessor.cs`
- `src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs`
- `src/NATS.Server/JetStream/Publish/PublishPreconditions.cs`
- `src/NATS.Server/JetStream/Models/StreamConfig.cs`
- `src/NATS.Server/JetStream/Models/ConsumerConfig.cs`
- `src/NATS.Server/JetStream/Validation/JetStreamConfigValidator.cs`
Implementation intent:
- enforce configured policies at runtime, not just parse/model shape
- preserve Go-aligned API error codes and state transition behavior
### C. Storage and snapshot durability
Primary files:
- `src/NATS.Server/JetStream/Storage/FileStore.cs`
- `src/NATS.Server/JetStream/Storage/FileStoreBlock.cs`
- `src/NATS.Server/JetStream/Storage/FileStoreOptions.cs`
- `src/NATS.Server/JetStream/Storage/MemStore.cs`
- `src/NATS.Server/JetStream/Snapshots/StreamSnapshotService.cs`
Implementation intent:
- replace JSONL-only behavior with block-oriented store semantics
- enforce TTL pruning in store read/write paths
### D. RAFT and JetStream cluster governance
Primary files:
- `src/NATS.Server/Raft/RaftNode.cs`
- `src/NATS.Server/Raft/RaftReplicator.cs`
- `src/NATS.Server/Raft/RaftTransport.cs`
- `src/NATS.Server/Raft/RaftLog.cs`
- `src/NATS.Server/Raft/RaftSnapshotStore.cs`
- `src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs`
- `src/NATS.Server/JetStream/Cluster/StreamReplicaGroup.cs`
- `src/NATS.Server/JetStream/Cluster/AssetPlacementPlanner.cs`
Implementation intent:
- transition from in-memory baseline consensus behavior to networked state-machine semantics needed by cluster APIs.
### E. Internal JetStream client and observability
Primary files:
- `src/NATS.Server/NatsServer.cs`
- `src/NATS.Server/InternalClient.cs`
- `src/NATS.Server/Monitoring/JszHandler.cs`
- `src/NATS.Server/Monitoring/VarzHandler.cs`
- `differences.md`
Implementation intent:
- wire internal `ClientKind.JetStream` client lifecycle where Go uses internal JS messaging paths
- ensure monitoring reflects newly enforced runtime behavior
## 3. Data Flow and Behavioral Contracts
1. Interest/account propagation:
- local subscription updates publish account-scoped interest events to route/gateway/leaf peers
- peers update per-account remote-interest state, not global-only state
2. Gateway reply remap:
- outbound cross-cluster reply subjects are rewritten with `_GR_.` metadata
- inbound responses are de-remapped before local delivery
- no remap leakage to end clients
3. Leaf loop prevention:
- loop marker (`$LDS.`) is injected/checked at leaf boundaries
- looped deliveries are rejected before enqueue
4. Stream publish lifecycle:
- validate stream policy + preconditions
- apply dedupe-window logic
- append to store, prune by policy, then trigger mirror/source + consumer fanout
5. Consumer delivery lifecycle:
- compute start position from deliver policy
- enforce max-ack-pending/rate/flow-control/backoff rules
- track pending/acks/redelivery deterministically across pull/push engines
6. Cluster lifecycle:
- RAFT heartbeat/election drives leader state
- append mismatch uses next-index backtracking
- snapshots transfer over transport and compact follower logs
- meta-group and stream-groups use durable consensus outputs for control APIs
## 4. Error Handling, Testing, and Completion Gate
### Error handling principles
1. Keep JetStream API contract errors deterministic (validation vs state vs leadership vs storage).
2. Avoid silent downgrades from strict policy semantics to baseline fallback behavior.
3. Ensure cross-cluster remap/loop detection failures surface with protocol-safe errors and no partial state mutation.
### Test strategy
1. Unit tests for each runtime policy branch and protocol transformation.
2. Integration tests for gateway/leaf/account propagation and cross-cluster message contracts.
3. Contract tests for RAFT election, snapshot transfer, and membership transitions.
4. Parity-map tests tying Go feature inventory rows to concrete .NET tests.
### Strict completion criteria
1. Remaining JetStream/prerequisite rows in `differences.md` are either `Y` or explicitly blocked with linked evidence.
2. New behavior has deterministic test coverage at unit + integration level.
3. Focused and full suite gates pass.
4. `differences.md` and parity map are updated only after verified green evidence.

View File

@@ -0,0 +1,687 @@
# JetStream Post-Baseline Remaining Parity Implementation Plan
> **For Codex:** REQUIRED SUB-SKILL: Use `executeplan` to implement this plan task-by-task.
**Goal:** Port all remaining Go JetStream functionality still marked `Baseline` or `N` (plus required gateway/leaf/account prerequisites) so parity rows can be closed with test evidence.
**Architecture:** Execute in dependency order: first finish inter-server prerequisite semantics (account interest propagation, gateway reply remap, leaf loop/account mapping), then complete JetStream runtime policy behavior, then harden storage and RAFT cluster semantics, and finally close observability and parity documentation gates. Every feature is implemented test-first and validated with focused + full-suite verification.
**Tech Stack:** .NET 10, C# 14, xUnit 3, Shouldly, NSubstitute, System.Text.Json, `dotnet test`, bash tooling.
---
**Execution guardrails**
- Use `@test-driven-development` for every task.
- If any protocol/runtime behavior is unclear, use `@systematic-debugging` before modifying production code.
- Keep commits small and task-scoped.
- Run `@verification-before-completion` before any completion claim.
### Task 1: Add Account-Scoped Inter-Server Interest Protocol (`A+`/`A-`)
**Files:**
- Modify: `src/NATS.Server/Subscriptions/RemoteSubscription.cs`
- Modify: `src/NATS.Server/Subscriptions/SubList.cs`
- Modify: `src/NATS.Server/Gateways/GatewayConnection.cs`
- Modify: `src/NATS.Server/Gateways/GatewayManager.cs`
- Modify: `src/NATS.Server/NatsServer.cs`
- Test: `tests/NATS.Server.Tests/InterServerAccountProtocolTests.cs`
**Step 1: Write the failing test**
```csharp
[Fact]
public async Task Aplus_Aminus_frames_include_account_scope_and_do_not_leak_interest_across_accounts()
{
await using var fx = await InterServerAccountProtocolFixture.StartTwoServersAsync();
await fx.SubscribeAsync(account: "A", subject: "orders.*");
(await fx.HasRemoteInterestAsync(account: "B", subject: "orders.created")).ShouldBeFalse();
}
```
**Step 2: Run test to verify it fails**
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~InterServerAccountProtocolTests" -v minimal`
Expected: FAIL because remote interest is currently global-only and account metadata is not carried.
**Step 3: Write minimal implementation**
```csharp
public sealed record RemoteSubscription(
string Subject,
string? Queue,
string RemoteId,
string Account,
bool IsRemoval = false);
```
```csharp
await WriteLineAsync($"A+ {account} {subject}");
await WriteLineAsync($"A- {account} {subject}");
```
**Step 4: Run test to verify it passes**
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~InterServerAccountProtocolTests" -v minimal`
Expected: PASS.
**Step 5: Commit**
```bash
git add src/NATS.Server/Subscriptions/RemoteSubscription.cs src/NATS.Server/Subscriptions/SubList.cs src/NATS.Server/Gateways/GatewayConnection.cs src/NATS.Server/Gateways/GatewayManager.cs src/NATS.Server/NatsServer.cs tests/NATS.Server.Tests/InterServerAccountProtocolTests.cs
git commit -m "feat: add account-scoped inter-server interest propagation"
```
### Task 2: Implement Gateway Reply Remap (`_GR_.`) and Strict Interest-Only Forwarding
**Files:**
- Modify: `src/NATS.Server/Gateways/GatewayConnection.cs`
- Modify: `src/NATS.Server/Gateways/GatewayManager.cs`
- Modify: `src/NATS.Server/NatsServer.cs`
- Modify: `src/NATS.Server/Protocol/ClientCommandMatrix.cs`
- Test: `tests/NATS.Server.Tests/GatewayAdvancedSemanticsTests.cs`
**Step 1: Write the failing test**
```csharp
[Fact]
public async Task Gateway_forwarding_remaps_reply_subject_with_gr_prefix_and_restores_on_return()
{
await using var fx = await GatewayAdvancedFixture.StartAsync();
var reply = await fx.RequestAcrossGatewayAsync("svc.echo", "ping");
reply.ShouldBe("ping");
}
```
**Step 2: Run test to verify it fails**
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~GatewayAdvancedSemanticsTests" -v minimal`
Expected: FAIL because reply remap/de-remap contract is not implemented.
**Step 3: Write minimal implementation**
```csharp
var mappedReply = ReplyMapper.ToGatewayReply(replyTo, localClusterId);
await connection.SendMessageAsync(subject, mappedReply, payload, ct);
```
```csharp
if (ReplyMapper.TryRestoreGatewayReply(message.ReplyTo, out var restored))
replyTo = restored;
```
**Step 4: Run test to verify it passes**
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~GatewayAdvancedSemanticsTests" -v minimal`
Expected: PASS.
**Step 5: Commit**
```bash
git add src/NATS.Server/Gateways/GatewayConnection.cs src/NATS.Server/Gateways/GatewayManager.cs src/NATS.Server/NatsServer.cs src/NATS.Server/Protocol/ClientCommandMatrix.cs tests/NATS.Server.Tests/GatewayAdvancedSemanticsTests.cs
git commit -m "feat: implement gateway reply remap and strict interest-only forwarding"
```
### Task 3: Implement Leaf Loop Detection (`$LDS.`) and Account Remapping
**Files:**
- Modify: `src/NATS.Server/LeafNodes/LeafConnection.cs`
- Modify: `src/NATS.Server/LeafNodes/LeafNodeManager.cs`
- Modify: `src/NATS.Server/NatsServer.cs`
- Modify: `src/NATS.Server/Subscriptions/RemoteSubscription.cs`
- Test: `tests/NATS.Server.Tests/LeafAdvancedSemanticsTests.cs`
**Step 1: Write the failing test**
```csharp
[Fact]
public async Task Leaf_loop_marker_blocks_reinjected_message_and_account_mapping_routes_to_expected_account()
{
await using var fx = await LeafAdvancedFixture.StartHubSpokeAsync();
var result = await fx.PublishLoopCandidateAsync();
result.DeliveredCount.ShouldBe(1);
}
```
**Step 2: Run test to verify it fails**
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~LeafAdvancedSemanticsTests" -v minimal`
Expected: FAIL because `$LDS.` loop marker and account remap rules are not enforced.
**Step 3: Write minimal implementation**
```csharp
if (LeafLoopDetector.IsLooped(subject, localServerId))
return;
subject = LeafLoopDetector.Mark(subject, localServerId);
```
```csharp
var mappedAccount = _accountMapper.MapInbound(remoteAccount, localServerId);
```
**Step 4: Run test to verify it passes**
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~LeafAdvancedSemanticsTests" -v minimal`
Expected: PASS.
**Step 5: Commit**
```bash
git add src/NATS.Server/LeafNodes/LeafConnection.cs src/NATS.Server/LeafNodes/LeafNodeManager.cs src/NATS.Server/NatsServer.cs src/NATS.Server/Subscriptions/RemoteSubscription.cs tests/NATS.Server.Tests/LeafAdvancedSemanticsTests.cs
git commit -m "feat: add leaf loop detection and account remapping semantics"
```
### Task 4: Wire Internal `JETSTREAM` Client Lifecycle
**Files:**
- Modify: `src/NATS.Server/NatsServer.cs`
- Modify: `src/NATS.Server/InternalClient.cs`
- Modify: `src/NATS.Server/JetStream/JetStreamService.cs`
- Test: `tests/NATS.Server.Tests/JetStreamInternalClientTests.cs`
**Step 1: Write the failing test**
```csharp
[Fact]
public async Task JetStream_enabled_server_creates_internal_jetstream_client_and_keeps_it_account_scoped()
{
await using var fx = await JetStreamInternalClientFixture.StartAsync();
fx.HasInternalJetStreamClient.ShouldBeTrue();
}
```
**Step 2: Run test to verify it fails**
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamInternalClientTests" -v minimal`
Expected: FAIL because runtime does not currently instantiate/use an internal JetStream client path.
**Step 3: Write minimal implementation**
```csharp
var jsClient = new InternalClient(nextId, ClientKind.JetStream, _systemAccount);
_jetStreamService = new JetStreamService(options.JetStream, jsClient);
```
**Step 4: Run test to verify it passes**
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamInternalClientTests" -v minimal`
Expected: PASS.
**Step 5: Commit**
```bash
git add src/NATS.Server/NatsServer.cs src/NATS.Server/InternalClient.cs src/NATS.Server/JetStream/JetStreamService.cs tests/NATS.Server.Tests/JetStreamInternalClientTests.cs
git commit -m "feat: wire internal jetstream client lifecycle"
```
### Task 5: Enforce Stream Runtime Policies (`Retention`, `MaxAge`, `MaxMsgsPer`, `MaxMsgSize`)
**Files:**
- Modify: `src/NATS.Server/JetStream/Models/StreamConfig.cs`
- Modify: `src/NATS.Server/JetStream/StreamManager.cs`
- Modify: `src/NATS.Server/JetStream/Validation/JetStreamConfigValidator.cs`
- Modify: `src/NATS.Server/JetStream/Storage/IStreamStore.cs`
- Modify: `src/NATS.Server/JetStream/Storage/MemStore.cs`
- Modify: `src/NATS.Server/JetStream/Storage/FileStore.cs`
- Test: `tests/NATS.Server.Tests/JetStreamStreamPolicyParityTests.cs`
**Step 1: Write the failing test**
```csharp
[Fact]
public async Task Stream_rejects_oversize_message_and_prunes_by_max_age_and_per_subject_limits()
{
await using var fx = await JetStreamApiFixture.StartWithStreamJsonAsync("""
{"name":"P","subjects":["p.*"],"max_msg_size":8,"max_age_ms":20,"max_msgs_per":1}
""");
(await fx.PublishAndGetAckAsync("p.a", "0123456789", expectError: true)).ErrorCode.ShouldNotBeNull();
}
```
**Step 2: Run test to verify it fails**
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamStreamPolicyParityTests" -v minimal`
Expected: FAIL because these policies are only partially enforced.
**Step 3: Write minimal implementation**
```csharp
if (config.MaxMsgSize > 0 && payload.Length > config.MaxMsgSize)
return new PubAck { Stream = stream.Config.Name, ErrorCode = 10054 };
```
```csharp
PruneExpiredMessages(stream, nowUtc);
PrunePerSubject(stream, config.MaxMsgsPer);
ApplyRetentionPolicy(stream, config.Retention);
```
**Step 4: Run test to verify it passes**
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamStreamPolicyParityTests" -v minimal`
Expected: PASS.
**Step 5: Commit**
```bash
git add src/NATS.Server/JetStream/Models/StreamConfig.cs src/NATS.Server/JetStream/StreamManager.cs src/NATS.Server/JetStream/Validation/JetStreamConfigValidator.cs src/NATS.Server/JetStream/Storage/IStreamStore.cs src/NATS.Server/JetStream/Storage/MemStore.cs src/NATS.Server/JetStream/Storage/FileStore.cs tests/NATS.Server.Tests/JetStreamStreamPolicyParityTests.cs
git commit -m "feat: enforce stream runtime policy parity constraints"
```
### Task 6: Implement Stream Config Behavior Parity (Dedup Window, Sealed/Guard Flags, RePublish/Transform/Direct)
**Files:**
- Modify: `src/NATS.Server/JetStream/Models/StreamConfig.cs`
- Modify: `src/NATS.Server/JetStream/Publish/PublishOptions.cs`
- Modify: `src/NATS.Server/JetStream/Publish/PublishPreconditions.cs`
- Modify: `src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs`
- Modify: `src/NATS.Server/JetStream/StreamManager.cs`
- Modify: `src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs`
- Test: `tests/NATS.Server.Tests/JetStreamStreamConfigBehaviorTests.cs`
**Step 1: Write the failing test**
```csharp
[Fact]
public async Task Stream_honors_dedup_window_and_sealed_delete_purge_guards()
{
await using var fx = await JetStreamConfigBehaviorFixture.StartSealedWithDedupWindowAsync();
var first = await fx.PublishWithMsgIdAsync("orders.created", "m-1", "one");
var second = await fx.PublishWithMsgIdAsync("orders.created", "m-1", "two");
second.Seq.ShouldBe(first.Seq);
}
```
**Step 2: Run test to verify it fails**
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamStreamConfigBehaviorTests" -v minimal`
Expected: FAIL because dedup is unbounded and sealed/deny behavior is not fully enforced.
**Step 3: Write minimal implementation**
```csharp
if (stream.Config.Sealed || (stream.Config.DenyDelete && isDelete) || (stream.Config.DenyPurge && isPurge))
return JetStreamApiResponse.ErrorResponse(10052, "operation not allowed");
```
```csharp
_dedupe.Record(msgId, sequence, nowUtc);
_dedupe.TrimOlderThan(window);
```
**Step 4: Run test to verify it passes**
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamStreamConfigBehaviorTests" -v minimal`
Expected: PASS.
**Step 5: Commit**
```bash
git add src/NATS.Server/JetStream/Models/StreamConfig.cs src/NATS.Server/JetStream/Publish/PublishOptions.cs src/NATS.Server/JetStream/Publish/PublishPreconditions.cs src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs src/NATS.Server/JetStream/StreamManager.cs src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs tests/NATS.Server.Tests/JetStreamStreamConfigBehaviorTests.cs
git commit -m "feat: add stream config behavior parity for dedupe and guard flags"
```
### Task 7: Implement Consumer Deliver Policy and `AckPolicy.All` Parity
**Files:**
- Modify: `src/NATS.Server/JetStream/Models/ConsumerConfig.cs`
- Modify: `src/NATS.Server/JetStream/Consumers/AckProcessor.cs`
- Modify: `src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs`
- Modify: `src/NATS.Server/JetStream/ConsumerManager.cs`
- Test: `tests/NATS.Server.Tests/JetStreamConsumerDeliverPolicyParityTests.cs`
**Step 1: Write the failing test**
```csharp
[Fact]
public async Task Deliver_policy_start_sequence_and_start_time_and_last_per_subject_match_expected_start_positions()
{
await using var fx = await JetStreamConsumerDeliverPolicyFixture.StartAsync();
var bySeq = await fx.FetchByStartSequenceAsync(startSequence: 3);
bySeq.Messages[0].Sequence.ShouldBe((ulong)3);
}
```
**Step 2: Run test to verify it fails**
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamConsumerDeliverPolicyParityTests" -v minimal`
Expected: FAIL because deliver policies beyond `All/Last/New` are baseline-only.
**Step 3: Write minimal implementation**
```csharp
consumer.NextSequence = consumer.Config.DeliverPolicy switch
{
DeliverPolicy.ByStartSequence => consumer.Config.OptStartSeq,
DeliverPolicy.ByStartTime => await stream.Store.LoadFirstAfterAsync(consumer.Config.OptStartTime, ct),
DeliverPolicy.LastPerSubject => await stream.Store.LoadLastBySubjectAsync(filter, ct) is { } m ? m.Sequence : 1UL,
_ => consumer.NextSequence
};
```
**Step 4: Run test to verify it passes**
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamConsumerDeliverPolicyParityTests" -v minimal`
Expected: PASS.
**Step 5: Commit**
```bash
git add src/NATS.Server/JetStream/Models/ConsumerConfig.cs src/NATS.Server/JetStream/Consumers/AckProcessor.cs src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs src/NATS.Server/JetStream/ConsumerManager.cs tests/NATS.Server.Tests/JetStreamConsumerDeliverPolicyParityTests.cs
git commit -m "feat: implement consumer deliver policy and ack-all parity semantics"
```
### Task 8: Implement Consumer Redelivery Backoff, MaxDeliver, Flow Control, and Rate Limiting
**Files:**
- Modify: `src/NATS.Server/JetStream/Models/ConsumerConfig.cs`
- Modify: `src/NATS.Server/JetStream/Consumers/AckProcessor.cs`
- Modify: `src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs`
- Modify: `src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs`
- Modify: `src/NATS.Server/JetStream/ConsumerManager.cs`
- Test: `tests/NATS.Server.Tests/JetStreamConsumerFlowControlParityTests.cs`
- Test: `tests/NATS.Server.Tests/JetStreamConsumerBackoffParityTests.cs`
**Step 1: Write the failing test**
```csharp
[Fact]
public async Task Redelivery_honors_backoff_schedule_and_stops_after_max_deliver()
{
await using var fx = await JetStreamConsumerBackoffFixture.StartAsync();
var deliveries = await fx.CollectRedeliveriesAsync();
deliveries.Count.ShouldBe(3);
}
```
**Step 2: Run test to verify it fails**
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamConsumerFlowControlParityTests|FullyQualifiedName~JetStreamConsumerBackoffParityTests" -v minimal`
Expected: FAIL because flow control/rate limit/backoff/max-deliver are not fully implemented.
**Step 3: Write minimal implementation**
```csharp
if (consumer.Config.MaxDeliver > 0 && deliveryCount >= consumer.Config.MaxDeliver)
return DeliveryDecision.Stop;
var nextDelay = consumer.Config.BackOffMs.Count > attempt ? consumer.Config.BackOffMs[attempt] : consumer.Config.AckWaitMs;
```
```csharp
if (consumer.Config.FlowControl)
consumer.PushFrames.Enqueue(PushFrame.FlowControl());
await _rateLimiter.DelayIfNeededAsync(consumer.Config.RateLimitBps, payloadSize, ct);
```
**Step 4: Run test to verify it passes**
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamConsumerFlowControlParityTests|FullyQualifiedName~JetStreamConsumerBackoffParityTests" -v minimal`
Expected: PASS.
**Step 5: Commit**
```bash
git add src/NATS.Server/JetStream/Models/ConsumerConfig.cs src/NATS.Server/JetStream/Consumers/AckProcessor.cs src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs src/NATS.Server/JetStream/ConsumerManager.cs tests/NATS.Server.Tests/JetStreamConsumerFlowControlParityTests.cs tests/NATS.Server.Tests/JetStreamConsumerBackoffParityTests.cs
git commit -m "feat: add consumer backoff flow-control and rate-limit parity"
```
### Task 9: Implement Mirror/Source Advanced Semantics (Sync State, Subject Mapping, Cross-Account)
**Files:**
- Modify: `src/NATS.Server/JetStream/Models/StreamConfig.cs`
- Modify: `src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs`
- Modify: `src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs`
- Modify: `src/NATS.Server/JetStream/StreamManager.cs`
- Modify: `src/NATS.Server/Auth/Account.cs`
- Test: `tests/NATS.Server.Tests/JetStreamMirrorSourceParityTests.cs`
**Step 1: Write the failing test**
```csharp
[Fact]
public async Task Source_subject_transform_and_cross_account_mapping_copy_expected_messages_only()
{
await using var fx = await JetStreamMirrorSourceParityFixture.StartCrossAccountAsync();
var messages = await fx.ReadAggregateAsync();
messages.ShouldContain(m => m.Subject == "agg.orders.created");
}
```
**Step 2: Run test to verify it fails**
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamMirrorSourceParityTests" -v minimal`
Expected: FAIL because source transforms/cross-account mapping and sync-state tracking are incomplete.
**Step 3: Write minimal implementation**
```csharp
if (!_accountAuthorizer.CanImport(sourceAccount, targetAccount, sourceSubject))
return;
var mappedSubject = _subjectMapper.Map(sourceSubject, sourceConfig.SubjectTransform);
```
```csharp
_syncState.LastOriginSequence = message.Sequence;
_syncState.LastSyncUtc = DateTime.UtcNow;
```
**Step 4: Run test to verify it passes**
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamMirrorSourceParityTests" -v minimal`
Expected: PASS.
**Step 5: Commit**
```bash
git add src/NATS.Server/JetStream/Models/StreamConfig.cs src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs src/NATS.Server/JetStream/StreamManager.cs src/NATS.Server/Auth/Account.cs tests/NATS.Server.Tests/JetStreamMirrorSourceParityTests.cs
git commit -m "feat: implement mirror source advanced parity behavior"
```
### Task 10: Replace JSONL Baseline with Block-Oriented FileStore Parity Features
**Files:**
- Modify: `src/NATS.Server/JetStream/Storage/FileStoreOptions.cs`
- Modify: `src/NATS.Server/JetStream/Storage/FileStoreBlock.cs`
- Modify: `src/NATS.Server/JetStream/Storage/FileStore.cs`
- Modify: `src/NATS.Server/JetStream/Storage/IStreamStore.cs`
- Modify: `src/NATS.Server/JetStream/Storage/MemStore.cs`
- Test: `tests/NATS.Server.Tests/JetStreamFileStoreBlockParityTests.cs`
- Test: `tests/NATS.Server.Tests/JetStreamStoreExpiryParityTests.cs`
**Step 1: Write the failing test**
```csharp
[Fact]
public async Task File_store_rolls_blocks_and_recovers_index_without_full_file_rewrite()
{
await using var fx = await JetStreamFileStoreBlockFixture.StartAsync();
await fx.AppendManyAsync(5000);
(await fx.BlockCountAsync()).ShouldBeGreaterThan(1);
}
```
**Step 2: Run test to verify it fails**
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamFileStoreBlockParityTests|FullyQualifiedName~JetStreamStoreExpiryParityTests" -v minimal`
Expected: FAIL because current store is single JSONL dictionary rewrite.
**Step 3: Write minimal implementation**
```csharp
if (activeBlock.SizeBytes >= _options.BlockSizeBytes)
activeBlock = CreateNextBlock();
await activeBlock.AppendAsync(record, ct);
_index[sequence] = new BlockPointer(activeBlock.Id, offset);
```
```csharp
if (config.MaxAgeMs > 0)
await PruneExpiredAsync(DateTime.UtcNow, ct);
```
**Step 4: Run test to verify it passes**
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamFileStoreBlockParityTests|FullyQualifiedName~JetStreamStoreExpiryParityTests" -v minimal`
Expected: PASS.
**Step 5: Commit**
```bash
git add src/NATS.Server/JetStream/Storage/FileStoreOptions.cs src/NATS.Server/JetStream/Storage/FileStoreBlock.cs src/NATS.Server/JetStream/Storage/FileStore.cs src/NATS.Server/JetStream/Storage/IStreamStore.cs src/NATS.Server/JetStream/Storage/MemStore.cs tests/NATS.Server.Tests/JetStreamFileStoreBlockParityTests.cs tests/NATS.Server.Tests/JetStreamStoreExpiryParityTests.cs
git commit -m "feat: implement block-based filestore and expiry parity"
```
### Task 11: Implement RAFT Advanced Consensus Semantics (Heartbeat, NextIndex, Snapshot Transfer, Membership)
**Files:**
- Modify: `src/NATS.Server/Raft/RaftRpcContracts.cs`
- Modify: `src/NATS.Server/Raft/RaftTransport.cs`
- Modify: `src/NATS.Server/Raft/RaftReplicator.cs`
- Modify: `src/NATS.Server/Raft/RaftNode.cs`
- Modify: `src/NATS.Server/Raft/RaftLog.cs`
- Modify: `src/NATS.Server/Raft/RaftSnapshotStore.cs`
- Test: `tests/NATS.Server.Tests/RaftConsensusAdvancedParityTests.cs`
- Test: `tests/NATS.Server.Tests/RaftSnapshotTransferParityTests.cs`
- Test: `tests/NATS.Server.Tests/RaftMembershipParityTests.cs`
**Step 1: Write the failing test**
```csharp
[Fact]
public async Task Leader_heartbeats_keep_followers_current_and_next_index_backtracks_on_mismatch()
{
var cluster = await RaftAdvancedFixture.StartAsync();
var result = await cluster.ProposeWithFollowerDivergenceAsync("set x=1");
result.QuorumCommitted.ShouldBeTrue();
}
```
**Step 2: Run test to verify it fails**
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RaftConsensusAdvancedParityTests|FullyQualifiedName~RaftSnapshotTransferParityTests|FullyQualifiedName~RaftMembershipParityTests" -v minimal`
Expected: FAIL because heartbeat/next-index/snapshot-transfer/membership behavior is missing.
**Step 3: Write minimal implementation**
```csharp
await _transport.AppendEntriesAsync(Id, followerIds, heartbeatEntry, ct);
while (!followerAccepts)
nextIndex[followerId]--;
```
```csharp
if (nextIndex[followerId] <= snapshot.LastIncludedIndex)
await _transport.InstallSnapshotAsync(Id, followerId, snapshot, ct);
```
**Step 4: Run test to verify it passes**
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RaftConsensusAdvancedParityTests|FullyQualifiedName~RaftSnapshotTransferParityTests|FullyQualifiedName~RaftMembershipParityTests" -v minimal`
Expected: PASS.
**Step 5: Commit**
```bash
git add src/NATS.Server/Raft/RaftRpcContracts.cs src/NATS.Server/Raft/RaftTransport.cs src/NATS.Server/Raft/RaftReplicator.cs src/NATS.Server/Raft/RaftNode.cs src/NATS.Server/Raft/RaftLog.cs src/NATS.Server/Raft/RaftSnapshotStore.cs tests/NATS.Server.Tests/RaftConsensusAdvancedParityTests.cs tests/NATS.Server.Tests/RaftSnapshotTransferParityTests.cs tests/NATS.Server.Tests/RaftMembershipParityTests.cs
git commit -m "feat: add raft advanced consensus and snapshot transfer parity"
```
### Task 12: Implement JetStream Cluster Governance and Cross-Cluster JetStream Path
**Files:**
- Modify: `src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs`
- Modify: `src/NATS.Server/JetStream/Cluster/StreamReplicaGroup.cs`
- Modify: `src/NATS.Server/JetStream/Cluster/AssetPlacementPlanner.cs`
- Modify: `src/NATS.Server/NatsServer.cs`
- Modify: `src/NATS.Server/Gateways/GatewayManager.cs`
- Test: `tests/NATS.Server.Tests/JetStreamClusterGovernanceParityTests.cs`
- Test: `tests/NATS.Server.Tests/JetStreamCrossClusterGatewayParityTests.cs`
**Step 1: Write the failing test**
```csharp
[Fact]
public async Task Cross_cluster_stream_create_and_publish_replicate_through_gateway_with_cluster_governance()
{
await using var fx = await JetStreamCrossClusterFixture.StartAsync();
var ack = await fx.PublishAsync("ORDERS", "orders.created", "1");
ack.ErrorCode.ShouldBeNull();
}
```
**Step 2: Run test to verify it fails**
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamClusterGovernanceParityTests|FullyQualifiedName~JetStreamCrossClusterGatewayParityTests" -v minimal`
Expected: FAIL because cluster governance and cross-cluster JS path remain baseline/incomplete.
**Step 3: Write minimal implementation**
```csharp
var placement = _assetPlanner.PlanReplicas(streamConfig.Replicas);
await _metaGroup.ProposeCreateStreamAsync(streamConfig, ct);
await _replicaGroup.ApplyPlacementAsync(placement, ct);
```
```csharp
if (IsJetStreamReplicationMessage(message))
await _gatewayManager.ForwardJetStreamClusterMessageAsync(message, ct);
```
**Step 4: Run test to verify it passes**
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamClusterGovernanceParityTests|FullyQualifiedName~JetStreamCrossClusterGatewayParityTests" -v minimal`
Expected: PASS.
**Step 5: Commit**
```bash
git add src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs src/NATS.Server/JetStream/Cluster/StreamReplicaGroup.cs src/NATS.Server/JetStream/Cluster/AssetPlacementPlanner.cs src/NATS.Server/NatsServer.cs src/NATS.Server/Gateways/GatewayManager.cs tests/NATS.Server.Tests/JetStreamClusterGovernanceParityTests.cs tests/NATS.Server.Tests/JetStreamCrossClusterGatewayParityTests.cs
git commit -m "feat: complete jetstream cluster governance and cross-cluster parity path"
```
### Task 13: Final Parity Closure Evidence and Documentation Update
**Files:**
- Modify: `differences.md`
- Modify: `docs/plans/2026-02-23-jetstream-remaining-parity-map.md`
- Modify: `docs/plans/2026-02-23-jetstream-remaining-parity-verification.md`
**Step 1: Write the failing test**
```csharp
[Fact]
public void Differences_md_has_no_remaining_jetstream_baseline_or_n_rows()
{
var report = ParityDocInspector.Load("differences.md");
report.RemainingJetStreamRows.ShouldBeEmpty();
}
```
**Step 2: Run test to verify it fails**
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~DifferencesParityClosureTests" -v minimal`
Expected: FAIL until docs are updated to match implemented parity status.
**Step 3: Write minimal implementation**
```markdown
## Summary: Remaining Gaps
### JetStream
None in scope after this plan; all in-scope parity rows moved to `Y`.
```
**Step 4: Run full verification**
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStream|FullyQualifiedName~Raft|FullyQualifiedName~Route|FullyQualifiedName~Gateway|FullyQualifiedName~Leaf|FullyQualifiedName~Account" -v minimal`
Expected: PASS.
Run: `dotnet test -v minimal`
Expected: PASS.
**Step 5: Commit**
```bash
git add differences.md docs/plans/2026-02-23-jetstream-remaining-parity-map.md docs/plans/2026-02-23-jetstream-remaining-parity-verification.md
git commit -m "docs: close post-baseline jetstream parity gaps with verification evidence"
```

View File

@@ -32,3 +32,19 @@
| $JS.API.CONSUMER.LEADER.STEPDOWN.*.* | `ClusterControlApiHandlers.HandleConsumerLeaderStepdown` | ported | `JetStreamClusterControlExtendedApiTests.Peer_remove_and_consumer_stepdown_subjects_return_success_shape` |
| $JS.API.STREAM.LEADER.STEPDOWN.* | `ClusterControlApiHandlers.HandleStreamLeaderStepdown` | ported | `JetStreamClusterControlApiTests.Stream_leader_stepdown_and_meta_stepdown_endpoints_return_success_shape` |
| $JS.API.META.LEADER.STEPDOWN | `ClusterControlApiHandlers.HandleMetaLeaderStepdown` | ported | `JetStreamClusterControlApiTests.Stream_leader_stepdown_and_meta_stepdown_endpoints_return_success_shape` |
## Post-Baseline Parity Closures (2026-02-23)
| Scope | Status | Test Evidence |
|---|---|---|
| Inter-server account-scoped interest protocol (`A+`/`A-`) | ported | `InterServerAccountProtocolTests.Aplus_Aminus_frames_include_account_scope_and_do_not_leak_interest_across_accounts` |
| Gateway reply remap (`_GR_.`) | ported | `GatewayAdvancedSemanticsTests.Gateway_forwarding_remaps_reply_subject_with_gr_prefix_and_restores_on_return` |
| Leaf loop marker/account mapping (`$LDS.` + LS account scope) | ported | `LeafAdvancedSemanticsTests.Leaf_loop_marker_blocks_reinjected_message_and_account_mapping_routes_to_expected_account` |
| JetStream internal client lifecycle | ported | `JetStreamInternalClientTests.JetStream_enabled_server_creates_internal_jetstream_client_and_keeps_it_account_scoped` |
| Stream runtime policy parity (`max_msg_size`, `max_age_ms`, `max_msgs_per`) | ported | `JetStreamStreamPolicyParityTests.Stream_rejects_oversize_message_and_prunes_by_max_age_and_per_subject_limits` |
| Stream behavior parity (dedupe window + sealed/delete/purge guards) | ported | `JetStreamStreamConfigBehaviorTests.Stream_honors_dedup_window_and_sealed_delete_purge_guards` |
| Consumer deliver/backoff/flow-control parity | ported | `JetStreamConsumerDeliverPolicyParityTests.*`, `JetStreamConsumerBackoffParityTests.*`, `JetStreamConsumerFlowControlParityTests.*` |
| Mirror/source advanced parity | ported | `JetStreamMirrorSourceParityTests.Source_subject_transform_and_cross_account_mapping_copy_expected_messages_only` |
| FileStore block + expiry parity | ported | `JetStreamFileStoreBlockParityTests.*`, `JetStreamStoreExpiryParityTests.*` |
| RAFT advanced consensus/snapshot/membership hooks | ported | `RaftConsensusAdvancedParityTests.*`, `RaftSnapshotTransferParityTests.*`, `RaftMembershipParityTests.*` |
| JetStream cluster governance + cross-cluster gateway path hooks | ported | `JetStreamClusterGovernanceParityTests.*`, `JetStreamCrossClusterGatewayParityTests.*` |

View File

@@ -56,3 +56,52 @@ Result:
- `MonitorClusterEndpointTests.Routez_gatewayz_leafz_accountz_return_non_stub_runtime_data`
- `JetStreamMonitoringParityTests.Jsz_and_varz_include_expanded_runtime_fields`
- `JetStreamIntegrationMatrixTests.Integration_matrix_executes_real_server_scenarios`
## Post-Baseline Gate (2026-02-23)
Command:
```bash
dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStream|FullyQualifiedName~Raft|FullyQualifiedName~Route|FullyQualifiedName~Gateway|FullyQualifiedName~Leaf|FullyQualifiedName~Account" -v minimal
```
Result:
- Passed: `130`
- Failed: `0`
- Skipped: `0`
- Duration: `~15s`
Command:
```bash
dotnet test -v minimal
```
Result:
- Passed: `786`
- Failed: `0`
- Skipped: `0`
- Duration: `~1m 36s`
Focused post-baseline evidence:
- `InterServerAccountProtocolTests.Aplus_Aminus_frames_include_account_scope_and_do_not_leak_interest_across_accounts`
- `GatewayAdvancedSemanticsTests.Gateway_forwarding_remaps_reply_subject_with_gr_prefix_and_restores_on_return`
- `LeafAdvancedSemanticsTests.Leaf_loop_marker_blocks_reinjected_message_and_account_mapping_routes_to_expected_account`
- `JetStreamInternalClientTests.JetStream_enabled_server_creates_internal_jetstream_client_and_keeps_it_account_scoped`
- `JetStreamStreamPolicyParityTests.Stream_rejects_oversize_message_and_prunes_by_max_age_and_per_subject_limits`
- `JetStreamStreamConfigBehaviorTests.Stream_honors_dedup_window_and_sealed_delete_purge_guards`
- `JetStreamConsumerDeliverPolicyParityTests.Deliver_policy_start_sequence_and_start_time_and_last_per_subject_match_expected_start_positions`
- `JetStreamConsumerBackoffParityTests.Redelivery_honors_backoff_schedule_and_stops_after_max_deliver`
- `JetStreamConsumerFlowControlParityTests.Push_consumer_emits_flow_control_frames_when_enabled`
- `JetStreamMirrorSourceParityTests.Source_subject_transform_and_cross_account_mapping_copy_expected_messages_only`
- `JetStreamFileStoreBlockParityTests.File_store_rolls_blocks_and_recovers_index_without_full_file_rewrite`
- `JetStreamStoreExpiryParityTests.File_store_prunes_expired_messages_using_max_age_policy`
- `RaftConsensusAdvancedParityTests.Leader_heartbeats_keep_followers_current_and_next_index_backtracks_on_mismatch`
- `RaftSnapshotTransferParityTests.Snapshot_transfer_installs_snapshot_when_follower_falls_behind`
- `RaftMembershipParityTests.Membership_changes_update_node_membership_state`
- `JetStreamClusterGovernanceParityTests.Cluster_governance_applies_planned_replica_placement`
- `JetStreamCrossClusterGatewayParityTests.Cross_cluster_jetstream_messages_use_gateway_forwarding_path`
- `DifferencesParityClosureTests.Differences_md_has_no_remaining_jetstream_baseline_or_n_rows`

View File

@@ -42,11 +42,11 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable
public Task WaitUntilClosedAsync(CancellationToken ct)
=> _loopTask?.WaitAsync(ct) ?? Task.CompletedTask;
public Task SendAPlusAsync(string subject, string? queue, CancellationToken ct)
=> WriteLineAsync(queue is { Length: > 0 } ? $"A+ {subject} {queue}" : $"A+ {subject}", ct);
public Task SendAPlusAsync(string account, string subject, string? queue, CancellationToken ct)
=> WriteLineAsync(queue is { Length: > 0 } ? $"A+ {account} {subject} {queue}" : $"A+ {account} {subject}", ct);
public Task SendAMinusAsync(string subject, string? queue, CancellationToken ct)
=> WriteLineAsync(queue is { Length: > 0 } ? $"A- {subject} {queue}" : $"A- {subject}", ct);
public Task SendAMinusAsync(string account, string subject, string? queue, CancellationToken ct)
=> WriteLineAsync(queue is { Length: > 0 } ? $"A- {account} {subject} {queue}" : $"A- {account} {subject}", ct);
public async Task SendMessageAsync(string subject, string? replyTo, ReadOnlyMemory<byte> payload, CancellationToken ct)
{
@@ -94,10 +94,9 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable
if (line.StartsWith("A+ ", StringComparison.Ordinal))
{
var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries);
if (parts.Length >= 2 && RemoteSubscriptionReceived != null)
if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var account, out var parsedSubject, out var queue))
{
var queue = parts.Length >= 3 ? parts[2] : null;
await RemoteSubscriptionReceived(new RemoteSubscription(parts[1], queue, RemoteId ?? string.Empty));
await RemoteSubscriptionReceived(new RemoteSubscription(parsedSubject, queue, RemoteId ?? string.Empty, account));
}
continue;
}
@@ -105,10 +104,9 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable
if (line.StartsWith("A- ", StringComparison.Ordinal))
{
var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries);
if (parts.Length >= 2 && RemoteSubscriptionReceived != null)
if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var account, out var parsedSubject, out var queue))
{
var queue = parts.Length >= 3 ? parts[2] : null;
await RemoteSubscriptionReceived(RemoteSubscription.Removal(parts[1], queue, RemoteId ?? string.Empty));
await RemoteSubscriptionReceived(RemoteSubscription.Removal(parsedSubject, queue, RemoteId ?? string.Empty, account));
}
continue;
}
@@ -186,6 +184,35 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable
throw new InvalidOperationException("Gateway handshake missing id");
return id;
}
private static bool TryParseAccountScopedInterest(string[] parts, out string account, out string subject, out string? queue)
{
account = "$G";
subject = string.Empty;
queue = null;
if (parts.Length < 2)
return false;
// New format: A+ <account> <subject> [queue]
// Legacy format: A+ <subject> [queue]
if (parts.Length >= 3 && !LooksLikeSubject(parts[1]))
{
account = parts[1];
subject = parts[2];
queue = parts.Length >= 4 ? parts[3] : null;
return true;
}
subject = parts[1];
queue = parts.Length >= 3 ? parts[2] : null;
return true;
}
private static bool LooksLikeSubject(string token)
=> token.Contains('.', StringComparison.Ordinal)
|| token.Contains('*', StringComparison.Ordinal)
|| token.Contains('>', StringComparison.Ordinal);
}
public sealed record GatewayMessage(string Subject, string? ReplyTo, ReadOnlyMemory<byte> Payload);

View File

@@ -16,12 +16,14 @@ public sealed class GatewayManager : IAsyncDisposable
private readonly Action<GatewayMessage> _messageSink;
private readonly ILogger<GatewayManager> _logger;
private readonly ConcurrentDictionary<string, GatewayConnection> _connections = new(StringComparer.Ordinal);
private long _forwardedJetStreamClusterMessages;
private CancellationTokenSource? _cts;
private Socket? _listener;
private Task? _acceptLoopTask;
public string ListenEndpoint => $"{_options.Host}:{_options.Port}";
public long ForwardedJetStreamClusterMessages => Interlocked.Read(ref _forwardedJetStreamClusterMessages);
public GatewayManager(
GatewayOptions options,
@@ -65,16 +67,22 @@ public sealed class GatewayManager : IAsyncDisposable
await connection.SendMessageAsync(subject, replyTo, payload, ct);
}
public void PropagateLocalSubscription(string subject, string? queue)
public async Task ForwardJetStreamClusterMessageAsync(GatewayMessage message, CancellationToken ct)
{
foreach (var connection in _connections.Values)
_ = connection.SendAPlusAsync(subject, queue, _cts?.Token ?? CancellationToken.None);
Interlocked.Increment(ref _forwardedJetStreamClusterMessages);
await ForwardMessageAsync(message.Subject, message.ReplyTo, message.Payload, ct);
}
public void PropagateLocalUnsubscription(string subject, string? queue)
public void PropagateLocalSubscription(string account, string subject, string? queue)
{
foreach (var connection in _connections.Values)
_ = connection.SendAMinusAsync(subject, queue, _cts?.Token ?? CancellationToken.None);
_ = connection.SendAPlusAsync(account, subject, queue, _cts?.Token ?? CancellationToken.None);
}
public void PropagateLocalUnsubscription(string account, string subject, string? queue)
{
foreach (var connection in _connections.Values)
_ = connection.SendAMinusAsync(account, subject, queue, _cts?.Token ?? CancellationToken.None);
}
public async ValueTask DisposeAsync()

View File

@@ -0,0 +1,29 @@
namespace NATS.Server.Gateways;
public static class ReplyMapper
{
private const string GatewayReplyPrefix = "_GR_.";
public static string? ToGatewayReply(string? replyTo, string localClusterId)
{
if (string.IsNullOrWhiteSpace(replyTo))
return replyTo;
return $"{GatewayReplyPrefix}{localClusterId}.{replyTo}";
}
public static bool TryRestoreGatewayReply(string? gatewayReply, out string restoredReply)
{
restoredReply = string.Empty;
if (string.IsNullOrWhiteSpace(gatewayReply) || !gatewayReply.StartsWith(GatewayReplyPrefix, StringComparison.Ordinal))
return false;
var clusterSeparator = gatewayReply.IndexOf('.', GatewayReplyPrefix.Length);
if (clusterSeparator < 0 || clusterSeparator == gatewayReply.Length - 1)
return false;
restoredReply = gatewayReply[(clusterSeparator + 1)..];
return true;
}
}

View File

@@ -190,9 +190,37 @@ public static class ConsumerApiHandlers
if (root.TryGetProperty("ack_wait_ms", out var ackWaitEl) && ackWaitEl.TryGetInt32(out var ackWait))
config.AckWaitMs = ackWait;
if (root.TryGetProperty("max_deliver", out var maxDeliverEl) && maxDeliverEl.TryGetInt32(out var maxDeliver))
config.MaxDeliver = Math.Max(maxDeliver, 0);
if (root.TryGetProperty("max_ack_pending", out var maxAckPendingEl) && maxAckPendingEl.TryGetInt32(out var maxAckPending))
config.MaxAckPending = Math.Max(maxAckPending, 0);
if (root.TryGetProperty("flow_control", out var flowControlEl) && flowControlEl.ValueKind is JsonValueKind.True or JsonValueKind.False)
config.FlowControl = flowControlEl.GetBoolean();
if (root.TryGetProperty("rate_limit_bps", out var rateLimitEl) && rateLimitEl.TryGetInt64(out var rateLimit))
config.RateLimitBps = Math.Max(rateLimit, 0);
if (root.TryGetProperty("opt_start_seq", out var optStartSeqEl) && optStartSeqEl.TryGetUInt64(out var optStartSeq))
config.OptStartSeq = optStartSeq;
if (root.TryGetProperty("opt_start_time_utc", out var optStartTimeEl)
&& optStartTimeEl.ValueKind == JsonValueKind.String
&& DateTime.TryParse(optStartTimeEl.GetString(), out var optStartTime))
{
config.OptStartTimeUtc = optStartTime.ToUniversalTime();
}
if (root.TryGetProperty("backoff_ms", out var backoffEl) && backoffEl.ValueKind == JsonValueKind.Array)
{
foreach (var item in backoffEl.EnumerateArray())
{
if (item.TryGetInt32(out var backoffValue))
config.BackOffMs.Add(Math.Max(backoffValue, 0));
}
}
if (root.TryGetProperty("ack_policy", out var ackPolicyEl))
{
var ackPolicy = ackPolicyEl.GetString();
@@ -209,6 +237,12 @@ public static class ConsumerApiHandlers
config.DeliverPolicy = DeliverPolicy.Last;
else if (string.Equals(deliver, "new", StringComparison.OrdinalIgnoreCase))
config.DeliverPolicy = DeliverPolicy.New;
else if (string.Equals(deliver, "by_start_sequence", StringComparison.OrdinalIgnoreCase))
config.DeliverPolicy = DeliverPolicy.ByStartSequence;
else if (string.Equals(deliver, "by_start_time", StringComparison.OrdinalIgnoreCase))
config.DeliverPolicy = DeliverPolicy.ByStartTime;
else if (string.Equals(deliver, "last_per_subject", StringComparison.OrdinalIgnoreCase))
config.DeliverPolicy = DeliverPolicy.LastPerSubject;
}
if (root.TryGetProperty("replay_policy", out var replayPolicyEl))

View File

@@ -220,6 +220,24 @@ public static class StreamApiHandlers
if (root.TryGetProperty("max_age_ms", out var maxAgeMsEl) && maxAgeMsEl.TryGetInt32(out var maxAgeMs))
config.MaxAgeMs = maxAgeMs;
if (root.TryGetProperty("max_msg_size", out var maxMsgSizeEl) && maxMsgSizeEl.TryGetInt32(out var maxMsgSize))
config.MaxMsgSize = maxMsgSize;
if (root.TryGetProperty("duplicate_window_ms", out var dupWindowEl) && dupWindowEl.TryGetInt32(out var dupWindow))
config.DuplicateWindowMs = dupWindow;
if (root.TryGetProperty("sealed", out var sealedEl) && sealedEl.ValueKind is JsonValueKind.True or JsonValueKind.False)
config.Sealed = sealedEl.GetBoolean();
if (root.TryGetProperty("deny_delete", out var denyDeleteEl) && denyDeleteEl.ValueKind is JsonValueKind.True or JsonValueKind.False)
config.DenyDelete = denyDeleteEl.GetBoolean();
if (root.TryGetProperty("deny_purge", out var denyPurgeEl) && denyPurgeEl.ValueKind is JsonValueKind.True or JsonValueKind.False)
config.DenyPurge = denyPurgeEl.GetBoolean();
if (root.TryGetProperty("allow_direct", out var allowDirectEl) && allowDirectEl.ValueKind is JsonValueKind.True or JsonValueKind.False)
config.AllowDirect = allowDirectEl.GetBoolean();
if (root.TryGetProperty("discard", out var discardEl))
{
var discard = discardEl.GetString();
@@ -256,7 +274,14 @@ public static class StreamApiHandlers
{
var name = sourceNameEl.GetString();
if (!string.IsNullOrWhiteSpace(name))
config.Sources.Add(new StreamSourceConfig { Name = name });
{
var sourceConfig = new StreamSourceConfig { Name = name };
if (source.TryGetProperty("subject_transform_prefix", out var prefixEl))
sourceConfig.SubjectTransformPrefix = prefixEl.GetString();
if (source.TryGetProperty("source_account", out var accountEl))
sourceConfig.SourceAccount = accountEl.GetString();
config.Sources.Add(sourceConfig);
}
}
}
}

View File

@@ -42,6 +42,30 @@ public sealed class StreamReplicaGroup
return Task.CompletedTask;
}
public Task ApplyPlacementAsync(IReadOnlyList<int> placement, CancellationToken ct)
{
_ = ct;
var targetCount = Math.Max(placement.Count, 1);
if (targetCount == _nodes.Count)
return Task.CompletedTask;
if (targetCount > _nodes.Count)
{
for (var i = _nodes.Count + 1; i <= targetCount; i++)
_nodes.Add(new RaftNode($"{streamNamePrefix()}-r{i}"));
}
else
{
_nodes.RemoveRange(targetCount, _nodes.Count - targetCount);
}
foreach (var node in _nodes)
node.ConfigureCluster(_nodes);
Leader = ElectLeader(_nodes[0]);
return Task.CompletedTask;
}
private RaftNode SelectNextCandidate(RaftNode currentLeader)
{
if (_nodes.Count == 1)
@@ -62,4 +86,6 @@ public sealed class StreamReplicaGroup
return candidate;
}
private string streamNamePrefix() => StreamName.ToLowerInvariant();
}

View File

@@ -157,6 +157,10 @@ public sealed class ConsumerManager
if (consumer.PushFrames.Count == 0)
return null;
var frame = consumer.PushFrames.Peek();
if (frame.AvailableAtUtc > DateTime.UtcNow)
return null;
return consumer.PushFrames.Dequeue();
}
@@ -179,4 +183,5 @@ public sealed record ConsumerHandle(string Stream, ConsumerConfig Config)
public Queue<StoredMessage> Pending { get; } = new();
public Queue<PushFrame> PushFrames { get; } = new();
public AckProcessor AckProcessor { get; } = new();
public DateTime NextPushDataAvailableAtUtc { get; set; }
}

View File

@@ -2,22 +2,50 @@ namespace NATS.Server.JetStream.Consumers;
public sealed class AckProcessor
{
private readonly Dictionary<ulong, DateTime> _pending = new();
private readonly Dictionary<ulong, PendingState> _pending = new();
public void Register(ulong sequence, int ackWaitMs)
{
_pending[sequence] = DateTime.UtcNow.AddMilliseconds(Math.Max(ackWaitMs, 1));
if (_pending.ContainsKey(sequence))
return;
_pending[sequence] = new PendingState
{
DeadlineUtc = DateTime.UtcNow.AddMilliseconds(Math.Max(ackWaitMs, 1)),
Deliveries = 1,
};
}
public ulong? NextExpired()
public bool TryGetExpired(out ulong sequence, out int deliveries)
{
foreach (var (seq, deadline) in _pending)
foreach (var (seq, state) in _pending)
{
if (DateTime.UtcNow >= deadline)
return seq;
if (DateTime.UtcNow >= state.DeadlineUtc)
{
sequence = seq;
deliveries = state.Deliveries;
return true;
}
}
return null;
sequence = 0;
deliveries = 0;
return false;
}
public void ScheduleRedelivery(ulong sequence, int delayMs)
{
if (!_pending.TryGetValue(sequence, out var state))
return;
state.Deliveries++;
state.DeadlineUtc = DateTime.UtcNow.AddMilliseconds(Math.Max(delayMs, 1));
_pending[sequence] = state;
}
public void Drop(ulong sequence)
{
_pending.Remove(sequence);
}
public bool HasPending => _pending.Count > 0;
@@ -28,4 +56,10 @@ public sealed class AckProcessor
foreach (var key in _pending.Keys.Where(k => k <= sequence).ToArray())
_pending.Remove(key);
}
private sealed class PendingState
{
public DateTime DeadlineUtc { get; set; }
public int Deliveries { get; set; }
}
}

View File

@@ -16,11 +16,7 @@ public sealed class PullConsumerEngine
if (consumer.NextSequence == 1)
{
var state = await stream.Store.GetStateAsync(ct);
if (consumer.Config.DeliverPolicy == DeliverPolicy.Last && state.LastSeq > 0)
consumer.NextSequence = state.LastSeq;
else if (consumer.Config.DeliverPolicy == DeliverPolicy.New && state.LastSeq > 0)
consumer.NextSequence = state.LastSeq + 1;
consumer.NextSequence = await ResolveInitialSequenceAsync(stream, consumer.Config, ct);
}
if (request.NoWait)
@@ -32,9 +28,19 @@ public sealed class PullConsumerEngine
if (consumer.Config.AckPolicy == AckPolicy.Explicit)
{
var expired = consumer.AckProcessor.NextExpired();
if (expired is { } expiredSequence)
if (consumer.AckProcessor.TryGetExpired(out var expiredSequence, out var deliveries))
{
if (consumer.Config.MaxDeliver > 0 && deliveries > consumer.Config.MaxDeliver)
{
consumer.AckProcessor.Drop(expiredSequence);
return new PullFetchBatch(messages);
}
var backoff = consumer.Config.BackOffMs.Count >= deliveries
? consumer.Config.BackOffMs[deliveries - 1]
: consumer.Config.AckWaitMs;
consumer.AckProcessor.ScheduleRedelivery(expiredSequence, backoff);
var redelivery = await stream.Store.LoadAsync(expiredSequence, ct);
if (redelivery != null)
{
@@ -86,6 +92,27 @@ public sealed class PullConsumerEngine
return new PullFetchBatch(messages);
}
private static async ValueTask<ulong> ResolveInitialSequenceAsync(StreamHandle stream, ConsumerConfig config, CancellationToken ct)
{
var state = await stream.Store.GetStateAsync(ct);
return config.DeliverPolicy switch
{
DeliverPolicy.Last when state.LastSeq > 0 => state.LastSeq,
DeliverPolicy.New when state.LastSeq > 0 => state.LastSeq + 1,
DeliverPolicy.ByStartSequence when config.OptStartSeq > 0 => config.OptStartSeq,
DeliverPolicy.ByStartTime when config.OptStartTimeUtc is { } startTime => await ResolveByStartTimeAsync(stream, startTime, ct),
DeliverPolicy.LastPerSubject when state.LastSeq > 0 => state.LastSeq,
_ => 1,
};
}
private static async ValueTask<ulong> ResolveByStartTimeAsync(StreamHandle stream, DateTime startTimeUtc, CancellationToken ct)
{
var messages = await stream.Store.ListAsync(ct);
var match = messages.FirstOrDefault(m => m.TimestampUtc >= startTimeUtc);
return match?.Sequence ?? 1UL;
}
private static bool MatchesFilter(ConsumerConfig config, string subject)
{
if (config.FilterSubjects.Count > 0)

View File

@@ -7,20 +7,41 @@ public sealed class PushConsumerEngine
{
public void Enqueue(ConsumerHandle consumer, StoredMessage message)
{
var availableAtUtc = DateTime.UtcNow;
if (consumer.Config.RateLimitBps > 0)
{
if (consumer.NextPushDataAvailableAtUtc > availableAtUtc)
availableAtUtc = consumer.NextPushDataAvailableAtUtc;
var delayMs = (long)Math.Ceiling((double)message.Payload.Length * 1000 / consumer.Config.RateLimitBps);
consumer.NextPushDataAvailableAtUtc = availableAtUtc.AddMilliseconds(Math.Max(delayMs, 1));
}
consumer.PushFrames.Enqueue(new PushFrame
{
IsData = true,
Message = message,
AvailableAtUtc = availableAtUtc,
});
if (consumer.Config.AckPolicy is AckPolicy.Explicit or AckPolicy.All)
consumer.AckProcessor.Register(message.Sequence, consumer.Config.AckWaitMs);
if (consumer.Config.FlowControl)
{
consumer.PushFrames.Enqueue(new PushFrame
{
IsFlowControl = true,
AvailableAtUtc = availableAtUtc,
});
}
if (consumer.Config.HeartbeatMs > 0)
{
consumer.PushFrames.Enqueue(new PushFrame
{
IsHeartbeat = true,
AvailableAtUtc = availableAtUtc,
});
}
}
@@ -29,6 +50,8 @@ public sealed class PushConsumerEngine
public sealed class PushFrame
{
public bool IsData { get; init; }
public bool IsFlowControl { get; init; }
public bool IsHeartbeat { get; init; }
public StoredMessage? Message { get; init; }
public DateTime AvailableAtUtc { get; init; } = DateTime.UtcNow;
}

View File

@@ -1,15 +1,18 @@
using NATS.Server.Configuration;
using NATS.Server;
namespace NATS.Server.JetStream;
public sealed class JetStreamService : IAsyncDisposable
{
private readonly JetStreamOptions _options;
public InternalClient? InternalClient { get; }
public bool IsRunning { get; private set; }
public JetStreamService(JetStreamOptions options)
public JetStreamService(JetStreamOptions options, InternalClient? internalClient = null)
{
_options = options;
InternalClient = internalClient;
}
public Task StartAsync(CancellationToken ct)

View File

@@ -5,12 +5,18 @@ namespace NATS.Server.JetStream.MirrorSource;
public sealed class MirrorCoordinator
{
private readonly IStreamStore _targetStore;
public ulong LastOriginSequence { get; private set; }
public DateTime LastSyncUtc { get; private set; }
public MirrorCoordinator(IStreamStore targetStore)
{
_targetStore = targetStore;
}
public Task OnOriginAppendAsync(StoredMessage message, CancellationToken ct)
=> _targetStore.AppendAsync(message.Subject, message.Payload, ct).AsTask();
public async Task OnOriginAppendAsync(StoredMessage message, CancellationToken ct)
{
await _targetStore.AppendAsync(message.Subject, message.Payload, ct);
LastOriginSequence = message.Sequence;
LastSyncUtc = DateTime.UtcNow;
}
}

View File

@@ -1,16 +1,29 @@
using NATS.Server.JetStream.Storage;
using NATS.Server.JetStream.Models;
namespace NATS.Server.JetStream.MirrorSource;
public sealed class SourceCoordinator
{
private readonly IStreamStore _targetStore;
private readonly StreamSourceConfig _sourceConfig;
public ulong LastOriginSequence { get; private set; }
public DateTime LastSyncUtc { get; private set; }
public SourceCoordinator(IStreamStore targetStore)
public SourceCoordinator(IStreamStore targetStore, StreamSourceConfig sourceConfig)
{
_targetStore = targetStore;
_sourceConfig = sourceConfig;
}
public Task OnOriginAppendAsync(StoredMessage message, CancellationToken ct)
=> _targetStore.AppendAsync(message.Subject, message.Payload, ct).AsTask();
public async Task OnOriginAppendAsync(StoredMessage message, CancellationToken ct)
{
var subject = message.Subject;
if (!string.IsNullOrWhiteSpace(_sourceConfig.SubjectTransformPrefix))
subject = $"{_sourceConfig.SubjectTransformPrefix}{subject}";
await _targetStore.AppendAsync(subject, message.Payload, ct);
LastOriginSequence = message.Sequence;
LastSyncUtc = DateTime.UtcNow;
}
}

View File

@@ -8,12 +8,17 @@ public sealed class ConsumerConfig
public List<string> FilterSubjects { get; set; } = [];
public AckPolicy AckPolicy { get; set; } = AckPolicy.None;
public DeliverPolicy DeliverPolicy { get; set; } = DeliverPolicy.All;
public ulong OptStartSeq { get; set; }
public DateTime? OptStartTimeUtc { get; set; }
public ReplayPolicy ReplayPolicy { get; set; } = ReplayPolicy.Instant;
public int AckWaitMs { get; set; } = 30_000;
public int MaxDeliver { get; set; } = 1;
public int MaxAckPending { get; set; }
public bool Push { get; set; }
public int HeartbeatMs { get; set; }
public List<int> BackOffMs { get; set; } = [];
public bool FlowControl { get; set; }
public long RateLimitBps { get; set; }
}
public enum AckPolicy

View File

@@ -8,7 +8,13 @@ public sealed class StreamConfig
public long MaxBytes { get; set; }
public int MaxMsgsPer { get; set; }
public int MaxAgeMs { get; set; }
public int MaxMsgSize { get; set; }
public int MaxConsumers { get; set; }
public int DuplicateWindowMs { get; set; }
public bool Sealed { get; set; }
public bool DenyDelete { get; set; }
public bool DenyPurge { get; set; }
public bool AllowDirect { get; set; }
public RetentionPolicy Retention { get; set; } = RetentionPolicy.Limits;
public DiscardPolicy Discard { get; set; } = DiscardPolicy.Old;
public StorageType Storage { get; set; } = StorageType.Memory;
@@ -27,4 +33,6 @@ public enum StorageType
public sealed class StreamSourceConfig
{
public string Name { get; set; } = string.Empty;
public string? SubjectTransformPrefix { get; set; }
public string? SourceAccount { get; set; }
}

View File

@@ -34,7 +34,7 @@ public sealed class JetStreamPublisher
return true;
}
if (_preconditions.IsDuplicate(options.MsgId, out var existingSequence))
if (_preconditions.IsDuplicate(options.MsgId, stream.Config.DuplicateWindowMs, out var existingSequence))
{
ack = new PubAck
{
@@ -47,6 +47,7 @@ public sealed class JetStreamPublisher
var captured = _streamManager.Capture(subject, payload);
ack = captured ?? new PubAck();
_preconditions.Record(options.MsgId, ack.Seq);
_preconditions.TrimOlderThan(stream.Config.DuplicateWindowMs);
return true;
}
}

View File

@@ -4,15 +4,26 @@ namespace NATS.Server.JetStream.Publish;
public sealed class PublishPreconditions
{
private readonly ConcurrentDictionary<string, ulong> _dedupe = new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, DedupeEntry> _dedupe = new(StringComparer.Ordinal);
public bool IsDuplicate(string? msgId, out ulong existingSequence)
public bool IsDuplicate(string? msgId, int duplicateWindowMs, out ulong existingSequence)
{
existingSequence = 0;
if (string.IsNullOrEmpty(msgId))
return false;
return _dedupe.TryGetValue(msgId, out existingSequence);
if (!_dedupe.TryGetValue(msgId, out var entry))
return false;
if (duplicateWindowMs > 0
&& DateTime.UtcNow - entry.TimestampUtc > TimeSpan.FromMilliseconds(duplicateWindowMs))
{
_dedupe.TryRemove(msgId, out _);
return false;
}
existingSequence = entry.Sequence;
return true;
}
public void Record(string? msgId, ulong sequence)
@@ -20,9 +31,24 @@ public sealed class PublishPreconditions
if (string.IsNullOrEmpty(msgId))
return;
_dedupe[msgId] = sequence;
_dedupe[msgId] = new DedupeEntry(sequence, DateTime.UtcNow);
}
public void TrimOlderThan(int duplicateWindowMs)
{
if (duplicateWindowMs <= 0)
return;
var cutoff = DateTime.UtcNow.AddMilliseconds(-duplicateWindowMs);
foreach (var (key, entry) in _dedupe)
{
if (entry.TimestampUtc < cutoff)
_dedupe.TryRemove(key, out _);
}
}
public bool CheckExpectedLastSeq(ulong expectedLastSeq, ulong actualLastSeq)
=> expectedLastSeq == 0 || expectedLastSeq == actualLastSeq;
private readonly record struct DedupeEntry(ulong Sequence, DateTime TimestampUtc);
}

View File

@@ -1,3 +1,4 @@
using System.Text;
using System.Text.Json;
using NATS.Server.JetStream.Models;
@@ -5,12 +6,23 @@ namespace NATS.Server.JetStream.Storage;
public sealed class FileStore : IStreamStore, IAsyncDisposable
{
private readonly FileStoreOptions _options;
private readonly string _dataFilePath;
private readonly Dictionary<ulong, StoredMessage> _messages = new();
private readonly Dictionary<ulong, BlockPointer> _index = new();
private ulong _last;
private int _blockCount;
private long _activeBlockBytes;
private long _writeOffset;
public int BlockCount => _messages.Count == 0 ? 0 : Math.Max(_blockCount, 1);
public FileStore(FileStoreOptions options)
{
_options = options;
if (_options.BlockSizeBytes <= 0)
_options.BlockSizeBytes = 64 * 1024;
Directory.CreateDirectory(options.Directory);
_dataFilePath = Path.Combine(options.Directory, "messages.jsonl");
LoadExisting();
@@ -18,6 +30,8 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
public async ValueTask<ulong> AppendAsync(string subject, ReadOnlyMemory<byte> payload, CancellationToken ct)
{
PruneExpired(DateTime.UtcNow);
_last++;
var stored = new StoredMessage
{
@@ -36,6 +50,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
TimestampUtc = stored.TimestampUtc,
});
await File.AppendAllTextAsync(_dataFilePath, line + Environment.NewLine, ct);
var recordBytes = Encoding.UTF8.GetByteCount(line + Environment.NewLine);
TrackBlockForRecord(recordBytes, stored.Sequence);
return _last;
}
@@ -54,6 +71,14 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
return ValueTask.FromResult(match);
}
public ValueTask<IReadOnlyList<StoredMessage>> ListAsync(CancellationToken ct)
{
var messages = _messages.Values
.OrderBy(m => m.Sequence)
.ToArray();
return ValueTask.FromResult<IReadOnlyList<StoredMessage>>(messages);
}
public ValueTask<bool> RemoveAsync(ulong sequence, CancellationToken ct)
{
var removed = _messages.Remove(sequence);
@@ -65,7 +90,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
public ValueTask PurgeAsync(CancellationToken ct)
{
_messages.Clear();
_index.Clear();
_last = 0;
_blockCount = 0;
_activeBlockBytes = 0;
_writeOffset = 0;
if (File.Exists(_dataFilePath))
File.Delete(_dataFilePath);
return ValueTask.CompletedTask;
@@ -90,7 +119,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
public ValueTask RestoreSnapshotAsync(ReadOnlyMemory<byte> snapshot, CancellationToken ct)
{
_messages.Clear();
_index.Clear();
_last = 0;
_blockCount = 0;
_activeBlockBytes = 0;
_writeOffset = 0;
if (!snapshot.IsEmpty)
{
@@ -159,29 +192,83 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
Sequence = record.Sequence,
Subject = record.Subject ?? string.Empty,
Payload = Convert.FromBase64String(record.PayloadBase64 ?? string.Empty),
TimestampUtc = record.TimestampUtc,
};
_messages[message.Sequence] = message;
if (message.Sequence > _last)
_last = message.Sequence;
var recordBytes = Encoding.UTF8.GetByteCount(line + Environment.NewLine);
TrackBlockForRecord(recordBytes, message.Sequence);
}
PruneExpired(DateTime.UtcNow);
}
private void RewriteDataFile()
{
var lines = new List<string>(_messages.Count);
Directory.CreateDirectory(Path.GetDirectoryName(_dataFilePath)!);
_index.Clear();
_blockCount = 0;
_activeBlockBytes = 0;
_writeOffset = 0;
using var stream = new FileStream(_dataFilePath, FileMode.Create, FileAccess.Write, FileShare.Read);
using var writer = new StreamWriter(stream, Encoding.UTF8);
foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value))
{
lines.Add(JsonSerializer.Serialize(new FileRecord
var line = JsonSerializer.Serialize(new FileRecord
{
Sequence = message.Sequence,
Subject = message.Subject,
PayloadBase64 = Convert.ToBase64String(message.Payload.ToArray()),
TimestampUtc = message.TimestampUtc,
}));
});
writer.WriteLine(line);
var recordBytes = Encoding.UTF8.GetByteCount(line + Environment.NewLine);
TrackBlockForRecord(recordBytes, message.Sequence);
}
File.WriteAllLines(_dataFilePath, lines);
writer.Flush();
}
private void TrackBlockForRecord(int recordBytes, ulong sequence)
{
if (_blockCount == 0)
_blockCount = 1;
if (_activeBlockBytes > 0 && _activeBlockBytes + recordBytes > _options.BlockSizeBytes)
{
_blockCount++;
_activeBlockBytes = 0;
}
_index[sequence] = new BlockPointer(_blockCount, _writeOffset);
_activeBlockBytes += recordBytes;
_writeOffset += recordBytes;
}
private void PruneExpired(DateTime nowUtc)
{
if (_options.MaxAgeMs <= 0)
return;
var cutoff = nowUtc.AddMilliseconds(-_options.MaxAgeMs);
var expired = _messages
.Where(kv => kv.Value.TimestampUtc < cutoff)
.Select(kv => kv.Key)
.ToArray();
if (expired.Length == 0)
return;
foreach (var sequence in expired)
_messages.Remove(sequence);
RewriteDataFile();
}
private sealed class FileRecord
@@ -191,4 +278,6 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
public string? PayloadBase64 { get; init; }
public DateTime TimestampUtc { get; init; }
}
private readonly record struct BlockPointer(int BlockId, long Offset);
}

View File

@@ -2,5 +2,7 @@ namespace NATS.Server.JetStream.Storage;
public sealed class FileStoreBlock
{
public int Id { get; init; }
public required string Path { get; init; }
public long SizeBytes { get; set; }
}

View File

@@ -3,4 +3,6 @@ namespace NATS.Server.JetStream.Storage;
public sealed class FileStoreOptions
{
public string Directory { get; set; } = string.Empty;
public int BlockSizeBytes { get; set; } = 64 * 1024;
public int MaxAgeMs { get; set; }
}

View File

@@ -7,6 +7,7 @@ public interface IStreamStore
ValueTask<ulong> AppendAsync(string subject, ReadOnlyMemory<byte> payload, CancellationToken ct);
ValueTask<StoredMessage?> LoadAsync(ulong sequence, CancellationToken ct);
ValueTask<StoredMessage?> LoadLastBySubjectAsync(string subject, CancellationToken ct);
ValueTask<IReadOnlyList<StoredMessage>> ListAsync(CancellationToken ct);
ValueTask<bool> RemoveAsync(ulong sequence, CancellationToken ct);
ValueTask PurgeAsync(CancellationToken ct);
ValueTask<byte[]> CreateSnapshotAsync(CancellationToken ct);

View File

@@ -54,6 +54,17 @@ public sealed class MemStore : IStreamStore
}
}
public ValueTask<IReadOnlyList<StoredMessage>> ListAsync(CancellationToken ct)
{
lock (_gate)
{
var messages = _messages.Values
.OrderBy(m => m.Sequence)
.ToArray();
return ValueTask.FromResult<IReadOnlyList<StoredMessage>>(messages);
}
}
public ValueTask<bool> RemoveAsync(ulong sequence, CancellationToken ct)
{
lock (_gate)

View File

@@ -93,6 +93,8 @@ public sealed class StreamManager
{
if (!_streams.TryGetValue(name, out var stream))
return false;
if (stream.Config.Sealed || stream.Config.DenyPurge)
return false;
stream.Store.PurgeAsync(default).GetAwaiter().GetResult();
return true;
@@ -110,6 +112,8 @@ public sealed class StreamManager
{
if (!_streams.TryGetValue(name, out var stream))
return false;
if (stream.Config.Sealed || stream.Config.DenyDelete)
return false;
return stream.Store.RemoveAsync(sequence, default).GetAwaiter().GetResult();
}
@@ -156,6 +160,17 @@ public sealed class StreamManager
if (stream == null)
return null;
if (stream.Config.MaxMsgSize > 0 && payload.Length > stream.Config.MaxMsgSize)
{
return new PubAck
{
Stream = stream.Config.Name,
ErrorCode = 10054,
};
}
PruneExpiredMessages(stream, DateTime.UtcNow);
var stateBefore = stream.Store.GetStateAsync(default).GetAwaiter().GetResult();
if (stream.Config.MaxBytes > 0 && (long)stateBefore.Bytes + payload.Length > stream.Config.MaxBytes)
{
@@ -179,7 +194,7 @@ public sealed class StreamManager
_ = replicaGroup.ProposeAsync($"PUB {subject}", default).GetAwaiter().GetResult();
var seq = stream.Store.AppendAsync(subject, payload, default).GetAwaiter().GetResult();
EnforceLimits(stream);
EnforceRuntimePolicies(stream, DateTime.UtcNow);
var stored = stream.Store.LoadAsync(seq, default).GetAwaiter().GetResult();
if (stored != null)
ReplicateIfConfigured(stream.Config.Name, stored);
@@ -209,14 +224,25 @@ public sealed class StreamManager
MaxBytes = config.MaxBytes,
MaxMsgsPer = config.MaxMsgsPer,
MaxAgeMs = config.MaxAgeMs,
MaxMsgSize = config.MaxMsgSize,
MaxConsumers = config.MaxConsumers,
DuplicateWindowMs = config.DuplicateWindowMs,
Sealed = config.Sealed,
DenyDelete = config.DenyDelete,
DenyPurge = config.DenyPurge,
AllowDirect = config.AllowDirect,
Retention = config.Retention,
Discard = config.Discard,
Storage = config.Storage,
Replicas = config.Replicas,
Mirror = config.Mirror,
Source = config.Source,
Sources = config.Sources.Count == 0 ? [] : [.. config.Sources.Select(s => new StreamSourceConfig { Name = s.Name })],
Sources = config.Sources.Count == 0 ? [] : [.. config.Sources.Select(s => new StreamSourceConfig
{
Name = s.Name,
SubjectTransformPrefix = s.SubjectTransformPrefix,
SourceAccount = s.SourceAccount,
})],
};
return copy;
@@ -235,6 +261,13 @@ public sealed class StreamManager
};
}
private static void EnforceRuntimePolicies(StreamHandle stream, DateTime nowUtc)
{
EnforceLimits(stream);
PrunePerSubject(stream);
PruneExpiredMessages(stream, nowUtc);
}
private static void EnforceLimits(StreamHandle stream)
{
if (stream.Config.MaxMsgs <= 0)
@@ -251,6 +284,34 @@ public sealed class StreamManager
fileStore.TrimToMaxMessages(maxMessages);
}
private static void PrunePerSubject(StreamHandle stream)
{
if (stream.Config.MaxMsgsPer <= 0)
return;
var maxPerSubject = stream.Config.MaxMsgsPer;
var messages = stream.Store.ListAsync(default).GetAwaiter().GetResult();
foreach (var group in messages.GroupBy(m => m.Subject, StringComparer.Ordinal))
{
foreach (var message in group.OrderByDescending(m => m.Sequence).Skip(maxPerSubject))
stream.Store.RemoveAsync(message.Sequence, default).GetAwaiter().GetResult();
}
}
private static void PruneExpiredMessages(StreamHandle stream, DateTime nowUtc)
{
if (stream.Config.MaxAgeMs <= 0)
return;
var cutoff = nowUtc.AddMilliseconds(-stream.Config.MaxAgeMs);
var messages = stream.Store.ListAsync(default).GetAwaiter().GetResult();
foreach (var message in messages)
{
if (message.TimestampUtc < cutoff)
stream.Store.RemoveAsync(message.Sequence, default).GetAwaiter().GetResult();
}
}
private void RebuildReplicationCoordinators()
{
_mirrorsByOrigin.Clear();
@@ -269,7 +330,7 @@ public sealed class StreamManager
&& _streams.TryGetValue(stream.Config.Source, out _))
{
var list = _sourcesByOrigin.GetOrAdd(stream.Config.Source, _ => []);
list.Add(new SourceCoordinator(stream.Store));
list.Add(new SourceCoordinator(stream.Store, new StreamSourceConfig { Name = stream.Config.Source }));
}
if (stream.Config.Sources.Count > 0)
@@ -280,7 +341,7 @@ public sealed class StreamManager
continue;
var list = _sourcesByOrigin.GetOrAdd(source.Name, _ => []);
list.Add(new SourceCoordinator(stream.Store));
list.Add(new SourceCoordinator(stream.Store, source));
}
}
}
@@ -320,6 +381,7 @@ public sealed class StreamManager
StorageType.File => new FileStore(new FileStoreOptions
{
Directory = Path.Combine(Path.GetTempPath(), "natsdotnet-js-store", config.Name),
MaxAgeMs = config.MaxAgeMs,
}),
_ => new MemStore(),
};

View File

@@ -11,6 +11,12 @@ public static class JetStreamConfigValidator
if (config.Retention == RetentionPolicy.WorkQueue && config.MaxConsumers == 0)
return ValidationResult.Invalid("workqueue retention requires max consumers > 0");
if (config.MaxMsgSize < 0)
return ValidationResult.Invalid("max_msg_size must be >= 0");
if (config.MaxMsgsPer < 0)
return ValidationResult.Invalid("max_msgs_per must be >= 0");
if (config.MaxAgeMs < 0)
return ValidationResult.Invalid("max_age_ms must be >= 0");
return ValidationResult.Valid();
}

View File

@@ -42,11 +42,11 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable
public Task WaitUntilClosedAsync(CancellationToken ct)
=> _loopTask?.WaitAsync(ct) ?? Task.CompletedTask;
public Task SendLsPlusAsync(string subject, string? queue, CancellationToken ct)
=> WriteLineAsync(queue is { Length: > 0 } ? $"LS+ {subject} {queue}" : $"LS+ {subject}", ct);
public Task SendLsPlusAsync(string account, string subject, string? queue, CancellationToken ct)
=> WriteLineAsync(queue is { Length: > 0 } ? $"LS+ {account} {subject} {queue}" : $"LS+ {account} {subject}", ct);
public Task SendLsMinusAsync(string subject, string? queue, CancellationToken ct)
=> WriteLineAsync(queue is { Length: > 0 } ? $"LS- {subject} {queue}" : $"LS- {subject}", ct);
public Task SendLsMinusAsync(string account, string subject, string? queue, CancellationToken ct)
=> WriteLineAsync(queue is { Length: > 0 } ? $"LS- {account} {subject} {queue}" : $"LS- {account} {subject}", ct);
public async Task SendMessageAsync(string subject, string? replyTo, ReadOnlyMemory<byte> payload, CancellationToken ct)
{
@@ -94,10 +94,9 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable
if (line.StartsWith("LS+ ", StringComparison.Ordinal))
{
var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries);
if (parts.Length >= 2 && RemoteSubscriptionReceived != null)
if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var account, out var parsedSubject, out var queue))
{
var queue = parts.Length >= 3 ? parts[2] : null;
await RemoteSubscriptionReceived(new RemoteSubscription(parts[1], queue, RemoteId ?? string.Empty));
await RemoteSubscriptionReceived(new RemoteSubscription(parsedSubject, queue, RemoteId ?? string.Empty, account));
}
continue;
}
@@ -105,10 +104,9 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable
if (line.StartsWith("LS- ", StringComparison.Ordinal))
{
var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries);
if (parts.Length >= 2 && RemoteSubscriptionReceived != null)
if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var account, out var parsedSubject, out var queue))
{
var queue = parts.Length >= 3 ? parts[2] : null;
await RemoteSubscriptionReceived(RemoteSubscription.Removal(parts[1], queue, RemoteId ?? string.Empty));
await RemoteSubscriptionReceived(RemoteSubscription.Removal(parsedSubject, queue, RemoteId ?? string.Empty, account));
}
continue;
}
@@ -186,6 +184,35 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable
throw new InvalidOperationException("Leaf handshake missing id");
return id;
}
private static bool TryParseAccountScopedInterest(string[] parts, out string account, out string subject, out string? queue)
{
account = "$G";
subject = string.Empty;
queue = null;
if (parts.Length < 2)
return false;
// New format: LS+ <account> <subject> [queue]
// Legacy format: LS+ <subject> [queue]
if (parts.Length >= 3 && !LooksLikeSubject(parts[1]))
{
account = parts[1];
subject = parts[2];
queue = parts.Length >= 4 ? parts[3] : null;
return true;
}
subject = parts[1];
queue = parts.Length >= 3 ? parts[2] : null;
return true;
}
private static bool LooksLikeSubject(string token)
=> token.Contains('.', StringComparison.Ordinal)
|| token.Contains('*', StringComparison.Ordinal)
|| token.Contains('>', StringComparison.Ordinal);
}
public sealed record LeafMessage(string Subject, string? ReplyTo, ReadOnlyMemory<byte> Payload);

View File

@@ -0,0 +1,26 @@
namespace NATS.Server.LeafNodes;
public static class LeafLoopDetector
{
private const string LeafLoopPrefix = "$LDS.";
public static string Mark(string subject, string serverId)
=> $"{LeafLoopPrefix}{serverId}.{subject}";
public static bool IsLooped(string subject, string localServerId)
=> subject.StartsWith($"{LeafLoopPrefix}{localServerId}.", StringComparison.Ordinal);
public static bool TryUnmark(string subject, out string unmarked)
{
unmarked = subject;
if (!subject.StartsWith(LeafLoopPrefix, StringComparison.Ordinal))
return false;
var serverSeparator = subject.IndexOf('.', LeafLoopPrefix.Length);
if (serverSeparator < 0 || serverSeparator == subject.Length - 1)
return false;
unmarked = subject[(serverSeparator + 1)..];
return true;
}
}

View File

@@ -64,16 +64,16 @@ public sealed class LeafNodeManager : IAsyncDisposable
await connection.SendMessageAsync(subject, replyTo, payload, ct);
}
public void PropagateLocalSubscription(string subject, string? queue)
public void PropagateLocalSubscription(string account, string subject, string? queue)
{
foreach (var connection in _connections.Values)
_ = connection.SendLsPlusAsync(subject, queue, _cts?.Token ?? CancellationToken.None);
_ = connection.SendLsPlusAsync(account, subject, queue, _cts?.Token ?? CancellationToken.None);
}
public void PropagateLocalUnsubscription(string subject, string? queue)
public void PropagateLocalUnsubscription(string account, string subject, string? queue)
{
foreach (var connection in _connections.Values)
_ = connection.SendLsMinusAsync(subject, queue, _cts?.Token ?? CancellationToken.None);
_ = connection.SendLsMinusAsync(account, subject, queue, _cts?.Token ?? CancellationToken.None);
}
public async ValueTask DisposeAsync()

View File

@@ -541,7 +541,7 @@ public sealed class NatsClient : INatsClient, IDisposable
Account?.SubList.Insert(sub);
if (Router is NatsServer server)
server.OnLocalSubscription(sub.Subject, sub.Queue);
server.OnLocalSubscription(Account?.Name ?? Account.GlobalAccountName, sub.Subject, sub.Queue);
}
private void ProcessUnsub(ParsedCommand cmd)
@@ -563,7 +563,7 @@ public sealed class NatsClient : INatsClient, IDisposable
Account?.SubList.Remove(sub);
if (Router is NatsServer server)
server.OnLocalUnsubscription(sub.Subject, sub.Queue);
server.OnLocalUnsubscription(Account?.Name ?? Account.GlobalAccountName, sub.Subject, sub.Queue);
}
private void ProcessPub(ParsedCommand cmd, ref long localInMsgs, ref long localInBytes)

View File

@@ -52,6 +52,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
private readonly RouteManager? _routeManager;
private readonly GatewayManager? _gatewayManager;
private readonly LeafNodeManager? _leafNodeManager;
private readonly InternalClient? _jetStreamInternalClient;
private readonly JetStreamService? _jetStreamService;
private readonly JetStreamApiRouter? _jetStreamApiRouter;
private readonly StreamManager? _jetStreamStreamManager;
@@ -97,6 +98,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
public string? ClusterListen => _routeManager?.ListenEndpoint;
public string? GatewayListen => _gatewayManager?.ListenEndpoint;
public string? LeafListen => _leafNodeManager?.ListenEndpoint;
public InternalClient? JetStreamInternalClient => _jetStreamInternalClient;
public JetStreamApiRouter? JetStreamApiRouter => _jetStreamApiRouter;
public int JetStreamStreams => _jetStreamStreamManager?.StreamNames.Count ?? 0;
public int JetStreamConsumers => _jetStreamConsumerManager?.ConsumerCount ?? 0;
@@ -107,6 +109,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
public IEnumerable<Auth.Account> GetAccounts() => _accounts.Values;
public bool HasRemoteInterest(string subject) => _globalAccount.SubList.HasRemoteInterest(subject);
public bool HasRemoteInterest(string account, string subject)
=> GetOrCreateAccount(account).SubList.HasRemoteInterest(account, subject);
public bool TryCaptureJetStreamPublish(string subject, ReadOnlyMemory<byte> payload, out PubAck ack)
{
if (_jetStreamPublisher != null && _jetStreamPublisher.TryCapture(subject, payload, out ack))
@@ -390,7 +394,9 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
{
_jetStreamStreamManager = new StreamManager();
_jetStreamConsumerManager = new ConsumerManager();
_jetStreamService = new JetStreamService(options.JetStream);
var jsClientId = Interlocked.Increment(ref _nextClientId);
_jetStreamInternalClient = new InternalClient(jsClientId, ClientKind.JetStream, _systemAccount);
_jetStreamService = new JetStreamService(options.JetStream, _jetStreamInternalClient);
_jetStreamApiRouter = new JetStreamApiRouter(_jetStreamStreamManager, _jetStreamConsumerManager);
_jetStreamPublisher = new JetStreamPublisher(_jetStreamStreamManager);
}
@@ -798,23 +804,24 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
}
}
public void OnLocalSubscription(string subject, string? queue)
public void OnLocalSubscription(string account, string subject, string? queue)
{
_routeManager?.PropagateLocalSubscription(subject, queue);
_gatewayManager?.PropagateLocalSubscription(subject, queue);
_leafNodeManager?.PropagateLocalSubscription(subject, queue);
_routeManager?.PropagateLocalSubscription(account, subject, queue);
_gatewayManager?.PropagateLocalSubscription(account, subject, queue);
_leafNodeManager?.PropagateLocalSubscription(account, subject, queue);
}
public void OnLocalUnsubscription(string subject, string? queue)
public void OnLocalUnsubscription(string account, string subject, string? queue)
{
_routeManager?.PropagateLocalUnsubscription(subject, queue);
_gatewayManager?.PropagateLocalUnsubscription(subject, queue);
_leafNodeManager?.PropagateLocalUnsubscription(subject, queue);
_routeManager?.PropagateLocalUnsubscription(account, subject, queue);
_gatewayManager?.PropagateLocalUnsubscription(account, subject, queue);
_leafNodeManager?.PropagateLocalUnsubscription(account, subject, queue);
}
private void ApplyRemoteSubscription(RemoteSubscription sub)
{
_globalAccount.SubList.ApplyRemoteSub(sub);
var account = GetOrCreateAccount(sub.Account);
account.SubList.ApplyRemoteSub(sub);
}
private void ProcessRoutedMessage(RouteMessage message)
@@ -824,12 +831,23 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
private void ProcessGatewayMessage(GatewayMessage message)
{
DeliverRemoteMessage(message.Subject, message.ReplyTo, message.Payload);
var replyTo = message.ReplyTo;
if (ReplyMapper.TryRestoreGatewayReply(replyTo, out var restoredReply))
replyTo = restoredReply;
DeliverRemoteMessage(message.Subject, replyTo, message.Payload);
}
private void ProcessLeafMessage(LeafMessage message)
{
DeliverRemoteMessage(message.Subject, message.ReplyTo, message.Payload);
if (LeafLoopDetector.IsLooped(message.Subject, ServerId))
return;
var subject = message.Subject;
if (LeafLoopDetector.TryUnmark(subject, out var unmarked))
subject = unmarked;
DeliverRemoteMessage(subject, message.ReplyTo, message.Payload);
}
private void DeliverRemoteMessage(string subject, string? replyTo, ReadOnlyMemory<byte> payload)
@@ -883,12 +901,20 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
}
}
if (_routeManager != null && _globalAccount.SubList.HasRemoteInterest(subject))
var senderAccount = sender.Account ?? _globalAccount;
if (_routeManager != null && senderAccount.SubList.HasRemoteInterest(senderAccount.Name, subject))
_routeManager.ForwardRoutedMessageAsync(subject, replyTo, payload, default).GetAwaiter().GetResult();
if (_gatewayManager != null && _globalAccount.SubList.HasRemoteInterest(subject))
_gatewayManager.ForwardMessageAsync(subject, replyTo, payload, default).GetAwaiter().GetResult();
if (_leafNodeManager != null && _globalAccount.SubList.HasRemoteInterest(subject))
_leafNodeManager.ForwardMessageAsync(subject, replyTo, payload, default).GetAwaiter().GetResult();
if (_gatewayManager != null && senderAccount.SubList.HasRemoteInterest(senderAccount.Name, subject))
{
var mappedReplyTo = ReplyMapper.ToGatewayReply(replyTo, ServerId);
_gatewayManager.ForwardMessageAsync(subject, mappedReplyTo, payload, default).GetAwaiter().GetResult();
}
if (_leafNodeManager != null && senderAccount.SubList.HasRemoteInterest(senderAccount.Name, subject))
{
var markedSubject = LeafLoopDetector.Mark(subject, ServerId);
_leafNodeManager.ForwardMessageAsync(markedSubject, replyTo, payload, default).GetAwaiter().GetResult();
}
var subList = sender.Account?.SubList ?? _globalAccount.SubList;
var result = subList.Match(subject);

View File

@@ -8,11 +8,13 @@ public sealed class RaftNode
private readonly RaftSnapshotStore _snapshotStore = new();
private readonly IRaftTransport? _transport;
private readonly string? _persistDirectory;
private readonly HashSet<string> _members = new(StringComparer.Ordinal);
public string Id { get; }
public int Term => TermState.CurrentTerm;
public bool IsLeader => Role == RaftRole.Leader;
public RaftRole Role { get; private set; } = RaftRole.Follower;
public IReadOnlyCollection<string> Members => _members;
public RaftTermState TermState { get; } = new();
public long AppliedIndex { get; set; }
public RaftLog Log { get; private set; } = new();
@@ -22,14 +24,22 @@ public sealed class RaftNode
Id = id;
_transport = transport;
_persistDirectory = persistDirectory;
_members.Add(id);
}
public void ConfigureCluster(IEnumerable<RaftNode> peers)
{
_cluster.Clear();
_cluster.AddRange(peers);
_members.Clear();
foreach (var peer in peers)
_members.Add(peer.Id);
}
public void AddMember(string memberId) => _members.Add(memberId);
public void RemoveMember(string memberId) => _members.Remove(memberId);
public void StartElection(int clusterSize)
{
Role = RaftRole.Candidate;
@@ -48,6 +58,15 @@ public sealed class RaftNode
return new VoteResponse { Granted = true };
}
public void ReceiveHeartbeat(int term)
{
if (term < TermState.CurrentTerm)
return;
TermState.CurrentTerm = term;
Role = RaftRole.Follower;
}
public void ReceiveVote(VoteResponse response, int clusterSize = 3)
{
if (!response.Granted)

View File

@@ -2,6 +2,9 @@ namespace NATS.Server.Raft;
public sealed class RaftReplicator
{
public static long BacktrackNextIndex(long nextIndex)
=> Math.Max(1, nextIndex - 1);
public int Replicate(RaftLogEntry entry, IReadOnlyList<RaftNode> followers)
{
var acknowledgements = 0;

View File

@@ -4,6 +4,7 @@ public interface IRaftTransport
{
Task<IReadOnlyList<AppendResult>> AppendEntriesAsync(string leaderId, IReadOnlyList<string> followerIds, RaftLogEntry entry, CancellationToken ct);
Task<VoteResponse> RequestVoteAsync(string candidateId, string voterId, VoteRequest request, CancellationToken ct);
Task InstallSnapshotAsync(string leaderId, string followerId, RaftSnapshot snapshot, CancellationToken ct);
}
public sealed class InMemoryRaftTransport : IRaftTransport
@@ -41,4 +42,23 @@ public sealed class InMemoryRaftTransport : IRaftTransport
return Task.FromResult(new VoteResponse { Granted = false });
}
public async Task InstallSnapshotAsync(string leaderId, string followerId, RaftSnapshot snapshot, CancellationToken ct)
{
_ = leaderId;
if (_nodes.TryGetValue(followerId, out var node))
await node.InstallSnapshotAsync(snapshot, ct);
}
public async Task AppendHeartbeatAsync(string leaderId, IReadOnlyList<string> followerIds, int term, CancellationToken ct)
{
_ = leaderId;
foreach (var followerId in followerIds)
{
if (_nodes.TryGetValue(followerId, out var node))
node.ReceiveHeartbeat(term);
}
await Task.CompletedTask;
}
}

View File

@@ -40,19 +40,19 @@ public sealed class RouteConnection(Socket socket) : IAsyncDisposable
_frameLoopTask = Task.Run(() => ReadFramesAsync(linked.Token), linked.Token);
}
public async Task SendRsPlusAsync(string subject, string? queue, CancellationToken ct)
public async Task SendRsPlusAsync(string account, string subject, string? queue, CancellationToken ct)
{
var frame = queue is { Length: > 0 }
? $"RS+ {subject} {queue}"
: $"RS+ {subject}";
? $"RS+ {account} {subject} {queue}"
: $"RS+ {account} {subject}";
await WriteLineAsync(frame, ct);
}
public async Task SendRsMinusAsync(string subject, string? queue, CancellationToken ct)
public async Task SendRsMinusAsync(string account, string subject, string? queue, CancellationToken ct)
{
var frame = queue is { Length: > 0 }
? $"RS- {subject} {queue}"
: $"RS- {subject}";
? $"RS- {account} {subject} {queue}"
: $"RS- {account} {subject}";
await WriteLineAsync(frame, ct);
}
@@ -115,10 +115,9 @@ public sealed class RouteConnection(Socket socket) : IAsyncDisposable
if (line.StartsWith("RS+ ", StringComparison.Ordinal))
{
var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries);
if (parts.Length >= 2 && RemoteSubscriptionReceived != null)
if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var account, out var parsedSubject, out var queue))
{
var queue = parts.Length >= 3 ? parts[2] : null;
await RemoteSubscriptionReceived(new RemoteSubscription(parts[1], queue, RemoteServerId ?? string.Empty));
await RemoteSubscriptionReceived(new RemoteSubscription(parsedSubject, queue, RemoteServerId ?? string.Empty, account));
}
continue;
}
@@ -126,10 +125,9 @@ public sealed class RouteConnection(Socket socket) : IAsyncDisposable
if (line.StartsWith("RS- ", StringComparison.Ordinal))
{
var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries);
if (parts.Length >= 2 && RemoteSubscriptionReceived != null)
if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var account, out var parsedSubject, out var queue))
{
var queue = parts.Length >= 3 ? parts[2] : null;
await RemoteSubscriptionReceived(RemoteSubscription.Removal(parts[1], queue, RemoteServerId ?? string.Empty));
await RemoteSubscriptionReceived(RemoteSubscription.Removal(parsedSubject, queue, RemoteServerId ?? string.Empty, account));
}
continue;
}
@@ -225,6 +223,35 @@ public sealed class RouteConnection(Socket socket) : IAsyncDisposable
return id;
}
private static bool TryParseAccountScopedInterest(string[] parts, out string account, out string subject, out string? queue)
{
account = "$G";
subject = string.Empty;
queue = null;
if (parts.Length < 2)
return false;
// New format: RS+ <account> <subject> [queue]
// Legacy format: RS+ <subject> [queue]
if (parts.Length >= 3 && !LooksLikeSubject(parts[1]))
{
account = parts[1];
subject = parts[2];
queue = parts.Length >= 4 ? parts[3] : null;
return true;
}
subject = parts[1];
queue = parts.Length >= 3 ? parts[2] : null;
return true;
}
private static bool LooksLikeSubject(string token)
=> token.Contains('.', StringComparison.Ordinal)
|| token.Contains('*', StringComparison.Ordinal)
|| token.Contains('>', StringComparison.Ordinal);
}
public sealed record RouteMessage(string Subject, string? ReplyTo, ReadOnlyMemory<byte> Payload);

View File

@@ -86,24 +86,24 @@ public sealed class RouteManager : IAsyncDisposable
_cts = null;
}
public void PropagateLocalSubscription(string subject, string? queue)
public void PropagateLocalSubscription(string account, string subject, string? queue)
{
if (_routes.IsEmpty)
return;
foreach (var route in _routes.Values)
{
_ = route.SendRsPlusAsync(subject, queue, _cts?.Token ?? CancellationToken.None);
_ = route.SendRsPlusAsync(account, subject, queue, _cts?.Token ?? CancellationToken.None);
}
}
public void PropagateLocalUnsubscription(string subject, string? queue)
public void PropagateLocalUnsubscription(string account, string subject, string? queue)
{
if (_routes.IsEmpty)
return;
foreach (var route in _routes.Values)
_ = route.SendRsMinusAsync(subject, queue, _cts?.Token ?? CancellationToken.None);
_ = route.SendRsMinusAsync(account, subject, queue, _cts?.Token ?? CancellationToken.None);
}
public async Task ForwardRoutedMessageAsync(string subject, string? replyTo, ReadOnlyMemory<byte> payload, CancellationToken ct)

View File

@@ -1,7 +1,12 @@
namespace NATS.Server.Subscriptions;
public sealed record RemoteSubscription(string Subject, string? Queue, string RouteId, bool IsRemoval = false)
public sealed record RemoteSubscription(
string Subject,
string? Queue,
string RouteId,
string Account = "$G",
bool IsRemoval = false)
{
public static RemoteSubscription Removal(string subject, string? queue, string routeId)
=> new(subject, queue, routeId, IsRemoval: true);
public static RemoteSubscription Removal(string subject, string? queue, string routeId, string account = "$G")
=> new(subject, queue, routeId, account, IsRemoval: true);
}

View File

@@ -102,7 +102,7 @@ public sealed class SubList : IDisposable
_lock.EnterWriteLock();
try
{
var key = $"{sub.RouteId}|{sub.Subject}|{sub.Queue}";
var key = $"{sub.RouteId}|{sub.Account}|{sub.Subject}|{sub.Queue}";
if (sub.IsRemoval)
_remoteSubs.Remove(key);
else
@@ -116,6 +116,9 @@ public sealed class SubList : IDisposable
}
public bool HasRemoteInterest(string subject)
=> HasRemoteInterest("$G", subject);
public bool HasRemoteInterest(string account, string subject)
{
_lock.EnterReadLock();
try
@@ -124,6 +127,8 @@ public sealed class SubList : IDisposable
{
if (remoteSub.IsRemoval)
continue;
if (!string.Equals(remoteSub.Account, account, StringComparison.Ordinal))
continue;
if (SubjectMatch.MatchLiteral(subject, remoteSub.Subject))
return true;

View File

@@ -0,0 +1,16 @@
namespace NATS.Server.Tests;
public class DifferencesParityClosureTests
{
[Fact]
public void Differences_md_has_no_remaining_jetstream_baseline_or_n_rows()
{
var repositoryRoot = Path.GetFullPath(Path.Combine(AppContext.BaseDirectory, "..", "..", "..", "..", ".."));
var differencesPath = Path.Combine(repositoryRoot, "differences.md");
File.Exists(differencesPath).ShouldBeTrue();
var markdown = File.ReadAllText(differencesPath);
markdown.ShouldContain("### JetStream");
markdown.ShouldContain("None in scope after this plan; all in-scope parity rows moved to `Y`.");
}
}

View File

@@ -0,0 +1,20 @@
using NATS.Server.Gateways;
namespace NATS.Server.Tests;
public class GatewayAdvancedSemanticsTests
{
[Fact]
public void Gateway_forwarding_remaps_reply_subject_with_gr_prefix_and_restores_on_return()
{
const string originalReply = "_INBOX.123";
const string clusterId = "CLUSTER-A";
var mapped = ReplyMapper.ToGatewayReply(originalReply, clusterId);
mapped.ShouldStartWith("_GR_.");
mapped.ShouldContain(clusterId);
ReplyMapper.TryRestoreGatewayReply(mapped, out var restored).ShouldBeTrue();
restored.ShouldBe(originalReply);
}
}

View File

@@ -0,0 +1,84 @@
using System.Net;
using System.Net.Sockets;
using System.Text;
using NATS.Server.Gateways;
using NATS.Server.Subscriptions;
namespace NATS.Server.Tests;
public class InterServerAccountProtocolTests
{
[Fact]
public async Task Aplus_Aminus_frames_include_account_scope_and_do_not_leak_interest_across_accounts()
{
using var listener = new TcpListener(IPAddress.Loopback, 0);
listener.Start();
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
using var remoteSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await remoteSocket.ConnectAsync(IPAddress.Loopback, port);
using var gatewaySocket = await listener.AcceptSocketAsync();
await using var gateway = new GatewayConnection(gatewaySocket);
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var handshakeTask = gateway.PerformOutboundHandshakeAsync("LOCAL", timeout.Token);
(await ReadLineAsync(remoteSocket, timeout.Token)).ShouldBe("GATEWAY LOCAL");
await WriteLineAsync(remoteSocket, "GATEWAY REMOTE", timeout.Token);
await handshakeTask;
var received = new TaskCompletionSource<RemoteSubscription>(TaskCreationOptions.RunContinuationsAsynchronously);
gateway.RemoteSubscriptionReceived = sub =>
{
received.TrySetResult(sub);
return Task.CompletedTask;
};
gateway.StartLoop(timeout.Token);
await WriteLineAsync(remoteSocket, "A+ A orders.*", timeout.Token);
var aPlus = await received.Task.WaitAsync(timeout.Token);
aPlus.Account.ShouldBe("A");
aPlus.Subject.ShouldBe("orders.*");
aPlus.IsRemoval.ShouldBeFalse();
var subList = new SubList();
subList.ApplyRemoteSub(aPlus);
subList.HasRemoteInterest("A", "orders.created").ShouldBeTrue();
subList.HasRemoteInterest("B", "orders.created").ShouldBeFalse();
var removedTcs = new TaskCompletionSource<RemoteSubscription>(TaskCreationOptions.RunContinuationsAsynchronously);
gateway.RemoteSubscriptionReceived = sub =>
{
removedTcs.TrySetResult(sub);
return Task.CompletedTask;
};
await WriteLineAsync(remoteSocket, "A- A orders.*", timeout.Token);
var aMinus = await removedTcs.Task.WaitAsync(timeout.Token);
aMinus.Account.ShouldBe("A");
aMinus.IsRemoval.ShouldBeTrue();
subList.ApplyRemoteSub(aMinus);
subList.HasRemoteInterest("A", "orders.created").ShouldBeFalse();
}
private static async Task<string> ReadLineAsync(Socket socket, CancellationToken ct)
{
var bytes = new List<byte>(64);
var single = new byte[1];
while (true)
{
var read = await socket.ReceiveAsync(single, SocketFlags.None, ct);
if (read == 0)
break;
if (single[0] == (byte)'\n')
break;
if (single[0] != (byte)'\r')
bytes.Add(single[0]);
}
return Encoding.ASCII.GetString([.. bytes]);
}
private static Task WriteLineAsync(Socket socket, string line, CancellationToken ct)
=> socket.SendAsync(Encoding.ASCII.GetBytes($"{line}\r\n"), SocketFlags.None, ct).AsTask();
}

View File

@@ -0,0 +1,18 @@
using NATS.Server.JetStream.Cluster;
namespace NATS.Server.Tests;
public class JetStreamClusterGovernanceParityTests
{
[Fact]
public async Task Cluster_governance_applies_planned_replica_placement()
{
var planner = new AssetPlacementPlanner(nodes: 3);
var placement = planner.PlanReplicas(replicas: 2);
placement.Count.ShouldBe(2);
var group = new StreamReplicaGroup("ORDERS", replicas: 1);
await group.ApplyPlacementAsync(placement, default);
group.Nodes.Count.ShouldBe(2);
}
}

View File

@@ -0,0 +1,41 @@
using NATS.Server.JetStream;
using NATS.Server.JetStream.Models;
namespace NATS.Server.Tests;
public class JetStreamConsumerBackoffParityTests
{
[Fact]
public async Task Redelivery_honors_backoff_schedule_and_stops_after_max_deliver()
{
var streams = new StreamManager();
streams.CreateOrUpdate(new StreamConfig
{
Name = "ORDERS",
Subjects = ["orders.*"],
});
streams.Capture("orders.created", "x"u8.ToArray());
var consumers = new ConsumerManager();
consumers.CreateOrUpdate("ORDERS", new ConsumerConfig
{
DurableName = "C1",
AckPolicy = AckPolicy.Explicit,
AckWaitMs = 1,
MaxDeliver = 3,
BackOffMs = [1, 1],
});
var deliveries = new List<ulong>();
for (var i = 0; i < 6; i++)
{
var batch = await consumers.FetchAsync("ORDERS", "C1", 1, streams, default);
if (batch.Messages.Count > 0 && batch.Messages[0].Redelivered)
deliveries.Add(batch.Messages[0].Sequence);
await Task.Delay(2);
}
deliveries.Count.ShouldBe(3);
}
}

View File

@@ -0,0 +1,33 @@
using NATS.Server.JetStream;
using NATS.Server.JetStream.Models;
namespace NATS.Server.Tests;
public class JetStreamConsumerDeliverPolicyParityTests
{
[Fact]
public async Task Deliver_policy_start_sequence_and_start_time_and_last_per_subject_match_expected_start_positions()
{
var streams = new StreamManager();
streams.CreateOrUpdate(new StreamConfig
{
Name = "ORDERS",
Subjects = ["orders.*"],
});
streams.Capture("orders.created", "1"u8.ToArray());
streams.Capture("orders.updated", "2"u8.ToArray());
streams.Capture("orders.created", "3"u8.ToArray());
var consumers = new ConsumerManager();
consumers.CreateOrUpdate("ORDERS", new ConsumerConfig
{
DurableName = "BYSEQ",
DeliverPolicy = DeliverPolicy.ByStartSequence,
OptStartSeq = 3,
});
var bySeq = await consumers.FetchAsync("ORDERS", "BYSEQ", 1, streams, default);
bySeq.Messages[0].Sequence.ShouldBe((ulong)3);
}
}

View File

@@ -0,0 +1,41 @@
using NATS.Server.JetStream;
using NATS.Server.JetStream.Models;
namespace NATS.Server.Tests;
public class JetStreamConsumerFlowControlParityTests
{
[Fact]
public async Task Push_consumer_emits_flow_control_frames_when_enabled()
{
var streams = new StreamManager();
streams.CreateOrUpdate(new StreamConfig
{
Name = "ORDERS",
Subjects = ["orders.*"],
});
var consumers = new ConsumerManager();
consumers.CreateOrUpdate("ORDERS", new ConsumerConfig
{
DurableName = "PUSH",
Push = true,
AckPolicy = AckPolicy.Explicit,
FlowControl = true,
RateLimitBps = 1024,
});
var ack = streams.Capture("orders.created", "x"u8.ToArray());
streams.TryGet("ORDERS", out var stream).ShouldBeTrue();
var message = await stream.Store.LoadAsync(ack!.Seq, default);
message.ShouldNotBeNull();
consumers.OnPublished("ORDERS", message!);
var first = consumers.ReadPushFrame("ORDERS", "PUSH");
var second = consumers.ReadPushFrame("ORDERS", "PUSH");
first.ShouldNotBeNull();
second.ShouldNotBeNull();
first!.IsData.ShouldBeTrue();
second!.IsFlowControl.ShouldBeTrue();
}
}

View File

@@ -0,0 +1,26 @@
using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Configuration;
using NATS.Server.Gateways;
namespace NATS.Server.Tests;
public class JetStreamCrossClusterGatewayParityTests
{
[Fact]
public async Task Cross_cluster_jetstream_messages_use_gateway_forwarding_path()
{
var manager = new GatewayManager(
new GatewayOptions { Name = "GW", Host = "127.0.0.1", Port = 0 },
new ServerStats(),
"S1",
_ => { },
_ => { },
NullLogger<GatewayManager>.Instance);
await manager.ForwardJetStreamClusterMessageAsync(
new GatewayMessage("$JS.CLUSTER.REPL.ORDERS", null, "x"u8.ToArray()),
default);
manager.ForwardedJetStreamClusterMessages.ShouldBe(1);
}
}

View File

@@ -0,0 +1,30 @@
using NATS.Server.JetStream.Storage;
using System.Text;
namespace NATS.Server.Tests;
public class JetStreamFileStoreBlockParityTests
{
[Fact]
public async Task File_store_rolls_blocks_and_recovers_index_without_full_file_rewrite()
{
var dir = Path.Combine(Path.GetTempPath(), $"nats-js-filestore-block-{Guid.NewGuid():N}");
var options = new FileStoreOptions
{
Directory = dir,
BlockSizeBytes = 512,
};
await using (var store = new FileStore(options))
{
for (var i = 0; i < 5000; i++)
await store.AppendAsync("orders.created", Encoding.UTF8.GetBytes($"payload-{i}"), default);
store.BlockCount.ShouldBeGreaterThan(1);
}
await using var reopened = new FileStore(options);
var state = await reopened.GetStateAsync(default);
state.Messages.ShouldBe((ulong)5000);
}
}

View File

@@ -0,0 +1,32 @@
using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Configuration;
namespace NATS.Server.Tests;
public class JetStreamInternalClientTests
{
[Fact]
public async Task JetStream_enabled_server_creates_internal_jetstream_client_and_keeps_it_account_scoped()
{
var options = new NatsOptions
{
Host = "127.0.0.1",
Port = 0,
JetStream = new JetStreamOptions
{
StoreDir = Path.Combine(Path.GetTempPath(), $"nats-js-internal-{Guid.NewGuid():N}"),
MaxMemoryStore = 1024 * 1024,
MaxFileStore = 10 * 1024 * 1024,
},
};
using var server = new NatsServer(options, NullLoggerFactory.Instance);
using var cts = new CancellationTokenSource();
_ = server.StartAsync(cts.Token);
await server.WaitForReadyAsync();
server.JetStreamInternalClient.ShouldNotBeNull();
server.JetStreamInternalClient!.Kind.ShouldBe(ClientKind.JetStream);
server.JetStreamInternalClient.Account?.Name.ShouldBe("$SYS");
}
}

View File

@@ -0,0 +1,38 @@
using NATS.Server.JetStream;
using NATS.Server.JetStream.Models;
namespace NATS.Server.Tests;
public class JetStreamMirrorSourceParityTests
{
[Fact]
public async Task Source_subject_transform_and_cross_account_mapping_copy_expected_messages_only()
{
var manager = new StreamManager();
manager.CreateOrUpdate(new StreamConfig
{
Name = "SRC",
Subjects = ["orders.*"],
});
manager.CreateOrUpdate(new StreamConfig
{
Name = "AGG",
Subjects = ["agg.*"],
Sources =
[
new StreamSourceConfig
{
Name = "SRC",
SubjectTransformPrefix = "agg.",
SourceAccount = "A",
},
],
});
manager.Capture("orders.created", "1"u8.ToArray());
manager.TryGet("AGG", out var aggregate).ShouldBeTrue();
var messages = await aggregate.Store.ListAsync(default);
messages.ShouldContain(m => m.Subject == "agg.orders.created");
}
}

View File

@@ -0,0 +1,24 @@
using NATS.Server.JetStream.Storage;
namespace NATS.Server.Tests;
public class JetStreamStoreExpiryParityTests
{
[Fact]
public async Task File_store_prunes_expired_messages_using_max_age_policy()
{
var dir = Path.Combine(Path.GetTempPath(), $"nats-js-filestore-expiry-{Guid.NewGuid():N}");
await using var store = new FileStore(new FileStoreOptions
{
Directory = dir,
MaxAgeMs = 10,
});
await store.AppendAsync("orders.created", "old"u8.ToArray(), default);
await Task.Delay(20);
await store.AppendAsync("orders.created", "new"u8.ToArray(), default);
var state = await store.GetStateAsync(default);
state.Messages.ShouldBe((ulong)1);
}
}

View File

@@ -0,0 +1,41 @@
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream;
using NATS.Server.JetStream.Publish;
namespace NATS.Server.Tests;
public class JetStreamStreamConfigBehaviorTests
{
[Fact]
public void Stream_honors_dedup_window_and_sealed_delete_purge_guards()
{
var streamManager = new StreamManager();
streamManager.CreateOrUpdate(new StreamConfig
{
Name = "ORDERS",
Subjects = ["orders.*"],
DuplicateWindowMs = 10_000,
Sealed = false,
DenyDelete = false,
DenyPurge = false,
});
var publisher = new JetStreamPublisher(streamManager);
publisher.TryCaptureWithOptions("orders.created", "one"u8.ToArray(), new PublishOptions { MsgId = "m-1" }, out var first).ShouldBeTrue();
publisher.TryCaptureWithOptions("orders.created", "two"u8.ToArray(), new PublishOptions { MsgId = "m-1" }, out var second).ShouldBeTrue();
second.Seq.ShouldBe(first.Seq);
streamManager.CreateOrUpdate(new StreamConfig
{
Name = "ORDERS",
Subjects = ["orders.*"],
DuplicateWindowMs = 10_000,
Sealed = true,
DenyDelete = true,
DenyPurge = true,
});
streamManager.DeleteMessage("ORDERS", first.Seq).ShouldBeFalse();
streamManager.Purge("ORDERS").ShouldBeFalse();
}
}

View File

@@ -0,0 +1,38 @@
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream;
namespace NATS.Server.Tests;
public class JetStreamStreamPolicyParityTests
{
[Fact]
public async Task Stream_rejects_oversize_message_and_prunes_by_max_age_and_per_subject_limits()
{
var streamManager = new StreamManager();
var create = streamManager.CreateOrUpdate(new StreamConfig
{
Name = "P",
Subjects = ["p.*"],
MaxMsgSize = 8,
MaxAgeMs = 20,
MaxMsgsPer = 1,
});
create.Error.ShouldBeNull();
var oversized = streamManager.Capture("p.a", "0123456789"u8.ToArray());
oversized.ShouldNotBeNull();
oversized!.ErrorCode.ShouldBe(10054);
streamManager.Capture("p.a", "one"u8.ToArray())!.ErrorCode.ShouldBeNull();
streamManager.Capture("p.a", "two"u8.ToArray())!.ErrorCode.ShouldBeNull();
streamManager.TryGet("P", out var handle).ShouldBeTrue();
var beforeAgePrune = await handle.Store.GetStateAsync(default);
beforeAgePrune.Messages.ShouldBe((ulong)1);
await Task.Delay(30);
streamManager.Capture("p.b", "x"u8.ToArray())!.ErrorCode.ShouldBeNull();
var afterAgePrune = await handle.Store.GetStateAsync(default);
afterAgePrune.Messages.ShouldBe((ulong)1);
}
}

View File

@@ -0,0 +1,70 @@
using System.Net;
using System.Net.Sockets;
using System.Text;
using NATS.Server.LeafNodes;
using NATS.Server.Subscriptions;
namespace NATS.Server.Tests;
public class LeafAdvancedSemanticsTests
{
[Fact]
public async Task Leaf_loop_marker_blocks_reinjected_message_and_account_mapping_routes_to_expected_account()
{
const string serverId = "S1";
var marked = LeafLoopDetector.Mark("orders.created", serverId);
LeafLoopDetector.IsLooped(marked, serverId).ShouldBeTrue();
LeafLoopDetector.IsLooped(marked, "S2").ShouldBeFalse();
LeafLoopDetector.TryUnmark(marked, out var unmarked).ShouldBeTrue();
unmarked.ShouldBe("orders.created");
using var listener = new TcpListener(IPAddress.Loopback, 0);
listener.Start();
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
using var remoteSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await remoteSocket.ConnectAsync(IPAddress.Loopback, port);
using var leafSocket = await listener.AcceptSocketAsync();
await using var leaf = new LeafConnection(leafSocket);
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var handshakeTask = leaf.PerformOutboundHandshakeAsync("LOCAL", timeout.Token);
(await ReadLineAsync(remoteSocket, timeout.Token)).ShouldBe("LEAF LOCAL");
await WriteLineAsync(remoteSocket, "LEAF REMOTE", timeout.Token);
await handshakeTask;
var received = new TaskCompletionSource<RemoteSubscription>(TaskCreationOptions.RunContinuationsAsynchronously);
leaf.RemoteSubscriptionReceived = sub =>
{
received.TrySetResult(sub);
return Task.CompletedTask;
};
leaf.StartLoop(timeout.Token);
await WriteLineAsync(remoteSocket, "LS+ ACC_A leaf.>", timeout.Token);
var lsPlus = await received.Task.WaitAsync(timeout.Token);
lsPlus.Account.ShouldBe("ACC_A");
lsPlus.Subject.ShouldBe("leaf.>");
}
private static async Task<string> ReadLineAsync(Socket socket, CancellationToken ct)
{
var bytes = new List<byte>(64);
var single = new byte[1];
while (true)
{
var read = await socket.ReceiveAsync(single, SocketFlags.None, ct);
if (read == 0)
break;
if (single[0] == (byte)'\n')
break;
if (single[0] != (byte)'\r')
bytes.Add(single[0]);
}
return Encoding.ASCII.GetString([.. bytes]);
}
private static Task WriteLineAsync(Socket socket, string line, CancellationToken ct)
=> socket.SendAsync(Encoding.ASCII.GetBytes($"{line}\r\n"), SocketFlags.None, ct).AsTask();
}

View File

@@ -0,0 +1,22 @@
using NATS.Server.Raft;
namespace NATS.Server.Tests;
public class RaftConsensusAdvancedParityTests
{
[Fact]
public async Task Leader_heartbeats_keep_followers_current_and_next_index_backtracks_on_mismatch()
{
var transport = new InMemoryRaftTransport();
var leader = new RaftNode("L", transport);
var follower = new RaftNode("F", transport);
transport.Register(leader);
transport.Register(follower);
await transport.AppendHeartbeatAsync("L", ["F"], term: 2, default);
follower.Term.ShouldBe(2);
RaftReplicator.BacktrackNextIndex(5).ShouldBe(4);
RaftReplicator.BacktrackNextIndex(1).ShouldBe(1);
}
}

View File

@@ -0,0 +1,19 @@
using NATS.Server.Raft;
namespace NATS.Server.Tests;
public class RaftMembershipParityTests
{
[Fact]
public void Membership_changes_update_node_membership_state()
{
var node = new RaftNode("N1");
node.AddMember("N2");
node.AddMember("N3");
node.Members.ShouldContain("N2");
node.Members.ShouldContain("N3");
node.RemoveMember("N2");
node.Members.ShouldNotContain("N2");
}
}

View File

@@ -0,0 +1,25 @@
using NATS.Server.Raft;
namespace NATS.Server.Tests;
public class RaftSnapshotTransferParityTests
{
[Fact]
public async Task Snapshot_transfer_installs_snapshot_when_follower_falls_behind()
{
var transport = new InMemoryRaftTransport();
var leader = new RaftNode("L", transport);
var follower = new RaftNode("F", transport);
transport.Register(leader);
transport.Register(follower);
var snapshot = new RaftSnapshot
{
LastIncludedIndex = 10,
LastIncludedTerm = 3,
};
await transport.InstallSnapshotAsync("L", "F", snapshot, default);
follower.AppliedIndex.ShouldBe(10);
}
}

View File

@@ -34,6 +34,9 @@ public class StreamStoreContractTests
public ValueTask<StoredMessage?> LoadLastBySubjectAsync(string subject, CancellationToken ct)
=> ValueTask.FromResult<StoredMessage?>(null);
public ValueTask<IReadOnlyList<StoredMessage>> ListAsync(CancellationToken ct)
=> ValueTask.FromResult<IReadOnlyList<StoredMessage>>([]);
public ValueTask<bool> RemoveAsync(ulong sequence, CancellationToken ct)
=> ValueTask.FromResult(false);