diff --git a/docs/plans/2026-02-25-production-gaps-plan.md b/docs/plans/2026-02-25-production-gaps-plan.md new file mode 100644 index 0000000..21e1890 --- /dev/null +++ b/docs/plans/2026-02-25-production-gaps-plan.md @@ -0,0 +1,3735 @@ +# Production Gap Closure Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans to implement this plan task-by-task. + +**Goal:** Port 15 CRITICAL/HIGH gaps from Go NATS server to .NET, covering FileStore, RAFT, JetStream Cluster, Consumer/Stream engines, Client protocol, MQTT persistence, config reload, and networking discovery. + +**Architecture:** Bottom-up dependency approach — Phase 1 builds storage and consensus foundations, Phase 2 adds cluster coordination on top, Phase 3 adds consumer/stream engines, Phase 4 adds client performance, MQTT, config, and networking. Each phase has an exit gate with measurable criteria. + +**Tech Stack:** .NET 10 / C# 14, xUnit 3, Shouldly, NSubstitute, System.IO.Pipelines, IronSnappy (S2), ChaCha20-Poly1305/AES-GCM, SQLite (test parity DB) + +**Codex review:** Verified 2026-02-25 — dependency gaps fixed, path errors corrected, feedback incorporated. + +**Test strategy:** Only run targeted unit tests during implementation (`dotnet test --filter`). Run full suite only after all 4 phases complete. Update `docs/test_parity.db` when porting Go tests. + +**Codex feedback incorporated:** +1. Added missing dependencies: Task 3←{1,2}, Task 4←{1,2,3}, Task 8←7, Task 13←{10,12} +2. Fixed paths: RouteManager.cs is in `Routes/`, GatewayManager.cs is in `Gateways/` (not `Clustering/`) +3. Phase 4A-C (client perf), 4E (SIGHUP), 4F (discovery) are independent lanes — can start parallel with Phase 3 +4. Phase 4D (MQTT) correctly gated on storage + consumer dependencies +5. WAL format should include per-record checksum and record type — implement during Task 7 +6. Encryption: add explicit nonce/AAD/key-version rules during Task 1 implementation + +**Parity DB update pattern:** +```bash +sqlite3 docs/test_parity.db "UPDATE go_tests SET status='mapped', dotnet_test='DotNetTestName', dotnet_file='TestFile.cs' WHERE go_test='GoTestName';" +``` + +--- + +## Phase 1: FileStore + RAFT Foundation + +**Dependencies:** None (pure infrastructure) +**Exit gate:** All IStreamStore methods implemented, FileStore round-trips encrypted+compressed blocks, RAFT persists and recovers state across restarts, joint consensus passes membership change tests. + +--- + +### Task 1: MsgBlock Encryption Integration + +Wire `AeadEncryptor` into `MsgBlock` so blocks are encrypted at rest. + +**Files:** +- Modify: `src/NATS.Server/JetStream/Storage/MsgBlock.cs` +- Modify: `src/NATS.Server/JetStream/Storage/FileStore.cs` +- Modify: `src/NATS.Server/JetStream/Storage/FileStoreOptions.cs` +- Test: `tests/NATS.Server.Tests/FileStoreEncryptionTests.cs` +- Reference: `golang/nats-server/server/filestore.go:816-907` (genEncryptionKeys, recoverAEK, setupAEK) + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/FileStoreEncryptionTests.cs`: + +```csharp +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests; + +public class FileStoreEncryptionTests +{ + [Fact] + public async Task Encrypted_block_round_trips_message() + { + // Go: TestFileStoreEncryption server/filestore_test.go + var dir = Directory.CreateTempSubdirectory(); + var key = new byte[32]; + RandomNumberGenerator.Fill(key); + + await using (var store = new FileStore(new FileStoreOptions + { + Directory = dir.FullName, + Cipher = StoreCipher.ChaCha, + EncryptionKey = key, + })) + { + await store.AppendAsync("test.subj", "hello encrypted"u8.ToArray(), default); + } + + // Raw block file should NOT contain plaintext + var blkFiles = Directory.GetFiles(dir.FullName, "*.blk"); + blkFiles.ShouldNotBeEmpty(); + var raw = File.ReadAllBytes(blkFiles[0]); + System.Text.Encoding.UTF8.GetString(raw).ShouldNotContain("hello encrypted"); + + // Recover with same key should return plaintext + await using var recovered = new FileStore(new FileStoreOptions + { + Directory = dir.FullName, + Cipher = StoreCipher.ChaCha, + EncryptionKey = key, + }); + var msg = await recovered.LoadAsync(1, default); + msg.ShouldNotBeNull(); + System.Text.Encoding.UTF8.GetString(msg.Payload).ShouldBe("hello encrypted"); + } + + [Fact] + public async Task Encrypted_block_with_aes_round_trips() + { + var dir = Directory.CreateTempSubdirectory(); + var key = new byte[32]; + RandomNumberGenerator.Fill(key); + + await using (var store = new FileStore(new FileStoreOptions + { + Directory = dir.FullName, + Cipher = StoreCipher.AES, + EncryptionKey = key, + })) + { + await store.AppendAsync("aes.subj", "aes payload"u8.ToArray(), default); + } + + await using var recovered = new FileStore(new FileStoreOptions + { + Directory = dir.FullName, + Cipher = StoreCipher.AES, + EncryptionKey = key, + }); + var msg = await recovered.LoadAsync(1, default); + msg.ShouldNotBeNull(); + System.Text.Encoding.UTF8.GetString(msg.Payload).ShouldBe("aes payload"); + } + + [Fact] + public async Task Wrong_key_fails_to_decrypt() + { + var dir = Directory.CreateTempSubdirectory(); + var key1 = new byte[32]; + var key2 = new byte[32]; + RandomNumberGenerator.Fill(key1); + RandomNumberGenerator.Fill(key2); + + await using (var store = new FileStore(new FileStoreOptions + { + Directory = dir.FullName, + Cipher = StoreCipher.ChaCha, + EncryptionKey = key1, + })) + { + await store.AppendAsync("secret", "data"u8.ToArray(), default); + } + + // Recovery with wrong key should throw or return no messages + var act = () => new FileStore(new FileStoreOptions + { + Directory = dir.FullName, + Cipher = StoreCipher.ChaCha, + EncryptionKey = key2, + }); + Should.Throw(act); + } +} +``` + +**Step 2: Run test to verify it fails** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreEncryptionTests" -v normal +``` +Expected: FAIL — encryption not wired into block I/O + +**Step 3: Implement encryption in MsgBlock** + +In `MsgBlock.cs`, add: +- New fields: `byte[]? _aek` (per-block AEAD key), `StoreCipher _cipher` +- New constructor parameter for encryption key and cipher +- Modify `Write`/`WriteAt` to encrypt payload via `AeadEncryptor.Encrypt()` before disk write +- Modify `Read`/`ReadRecord` to decrypt payload via `AeadEncryptor.Decrypt()` after disk read +- Add `static MsgBlock CreateEncrypted(string path, int blockId, long maxBytes, ulong firstSeq, byte[] aek, StoreCipher cipher)` +- Add `static MsgBlock RecoverEncrypted(string path, int blockId, long maxBytes, byte[] aek, StoreCipher cipher)` + +In `FileStore.cs`, modify: +- `EnsureActiveBlock()` — pass encryption key to `MsgBlock.CreateEncrypted()` when `_useAead` +- `RecoverBlocks()` — pass encryption key to `MsgBlock.RecoverEncrypted()` when `_useAead` +- Add `GenPerBlockKey(int blockId)` — derives per-block key from stream key using HKDF +- Add `SaveBlockKey(int blockId, byte[] aek)` — persists per-block key to `.key` metadata file +- Add `LoadBlockKey(int blockId)` — loads per-block key from `.key` metadata file + +**Step 4: Run test to verify it passes** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreEncryptionTests" -v normal +``` +Expected: PASS + +**Step 5: Update parity DB** + +```bash +sqlite3 docs/test_parity.db "UPDATE go_tests SET status='mapped', dotnet_test='Encrypted_block_round_trips_message', dotnet_file='FileStoreEncryptionTests.cs' WHERE go_test='TestFileStoreEncryption';" +``` + +**Step 6: Commit** + +```bash +git add tests/NATS.Server.Tests/FileStoreEncryptionTests.cs src/NATS.Server/JetStream/Storage/MsgBlock.cs src/NATS.Server/JetStream/Storage/FileStore.cs +git commit -m "feat(filestore): wire AeadEncryptor into MsgBlock for at-rest encryption" +``` + +--- + +### Task 2: MsgBlock Compression Integration + +Wire `S2Codec` into `MsgBlock` for per-message compression. + +**Files:** +- Modify: `src/NATS.Server/JetStream/Storage/MsgBlock.cs` +- Modify: `src/NATS.Server/JetStream/Storage/FileStore.cs` +- Test: `tests/NATS.Server.Tests/FileStoreCompressionTests.cs` +- Reference: `golang/nats-server/server/filestore.go:4443` (setupWriteCache with compression) + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/FileStoreCompressionTests.cs`: + +```csharp +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests; + +public class FileStoreCompressionTests +{ + [Fact] + public async Task Compressed_block_round_trips_message() + { + // Go: TestFileStoreCompression server/filestore_test.go + var dir = Directory.CreateTempSubdirectory(); + + await using (var store = new FileStore(new FileStoreOptions + { + Directory = dir.FullName, + Compression = StoreCompression.S2Compression, + })) + { + var payload = new byte[1024]; + Array.Fill(payload, (byte)'A'); // highly compressible + await store.AppendAsync("comp.subj", payload, default); + } + + // Block file should be smaller than uncompressed payload + var blkFiles = Directory.GetFiles(dir.FullName, "*.blk"); + blkFiles.ShouldNotBeEmpty(); + new FileInfo(blkFiles[0]).Length.ShouldBeLessThan(1024); + + // Recover should return original payload + await using var recovered = new FileStore(new FileStoreOptions + { + Directory = dir.FullName, + Compression = StoreCompression.S2Compression, + }); + var msg = await recovered.LoadAsync(1, default); + msg.ShouldNotBeNull(); + msg.Payload.Length.ShouldBe(1024); + msg.Payload.All(b => b == (byte)'A').ShouldBeTrue(); + } + + [Fact] + public async Task Compressed_and_encrypted_round_trips() + { + var dir = Directory.CreateTempSubdirectory(); + var key = new byte[32]; + RandomNumberGenerator.Fill(key); + + await using (var store = new FileStore(new FileStoreOptions + { + Directory = dir.FullName, + Compression = StoreCompression.S2Compression, + Cipher = StoreCipher.ChaCha, + EncryptionKey = key, + })) + { + await store.AppendAsync("both.subj", "compress+encrypt"u8.ToArray(), default); + } + + await using var recovered = new FileStore(new FileStoreOptions + { + Directory = dir.FullName, + Compression = StoreCompression.S2Compression, + Cipher = StoreCipher.ChaCha, + EncryptionKey = key, + }); + var msg = await recovered.LoadAsync(1, default); + msg.ShouldNotBeNull(); + System.Text.Encoding.UTF8.GetString(msg.Payload).ShouldBe("compress+encrypt"); + } +} +``` + +**Step 2: Run test to verify it fails** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreCompressionTests" -v normal +``` +Expected: FAIL — compression not wired into block path + +**Step 3: Implement compression in MsgBlock** + +In `MsgBlock.cs`: +- Add field: `bool _compressed` +- Modify `Write`/`WriteAt`: if `_compressed`, call `S2Codec.Compress(payload)` before writing record +- Modify `Read`/`ReadRecord`: if `_compressed`, call `S2Codec.Decompress(data)` after reading record +- Order: compress first, then encrypt (on write); decrypt first, then decompress (on read) + +In `FileStore.cs`: +- Pass `_useS2` flag to MsgBlock factory methods +- Persist compression flag in block metadata so recovery knows to decompress + +**Step 4: Run test to verify it passes** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreCompressionTests" -v normal +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add tests/NATS.Server.Tests/FileStoreCompressionTests.cs src/NATS.Server/JetStream/Storage/MsgBlock.cs src/NATS.Server/JetStream/Storage/FileStore.cs +git commit -m "feat(filestore): wire S2Codec into MsgBlock for per-message compression" +``` + +--- + +### Task 3: Block Rotation & Lifecycle + +Add automatic block rotation when active block exceeds size threshold, block sealing, and background flusher. + +**Files:** +- Modify: `src/NATS.Server/JetStream/Storage/MsgBlock.cs` +- Modify: `src/NATS.Server/JetStream/Storage/FileStore.cs` +- Test: `tests/NATS.Server.Tests/FileStoreBlockRotationTests.cs` +- Reference: `golang/nats-server/server/filestore.go:4485` (newMsgBlockForWrite), `5783-5842` (flushLoop) + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/FileStoreBlockRotationTests.cs`: + +```csharp +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests; + +public class FileStoreBlockRotationTests +{ + [Fact] + public async Task Block_rotates_when_size_exceeded() + { + // Go: TestFileStoreBlockRotation server/filestore_test.go + var dir = Directory.CreateTempSubdirectory(); + var opts = new FileStoreOptions + { + Directory = dir.FullName, + BlockSizeBytes = 512, // small block for testing + }; + + await using var store = new FileStore(opts); + var payload = new byte[200]; + + // Write enough to trigger rotation (3 x 200 > 512) + await store.AppendAsync("rot.1", payload, default); + await store.AppendAsync("rot.2", payload, default); + await store.AppendAsync("rot.3", payload, default); + + store.BlockCount.ShouldBeGreaterThan(1); + } + + [Fact] + public async Task Sealed_block_is_read_only() + { + var dir = Directory.CreateTempSubdirectory(); + var opts = new FileStoreOptions + { + Directory = dir.FullName, + BlockSizeBytes = 256, + }; + + await using var store = new FileStore(opts); + var payload = new byte[200]; + + await store.AppendAsync("s.1", payload, default); + await store.AppendAsync("s.2", payload, default); // triggers rotation + + // First block should be sealed + var blkFiles = Directory.GetFiles(dir.FullName, "*.blk").OrderBy(f => f).ToArray(); + blkFiles.Length.ShouldBeGreaterThan(1); + } + + [Fact] + public async Task Messages_across_blocks_recover_correctly() + { + var dir = Directory.CreateTempSubdirectory(); + var opts = new FileStoreOptions + { + Directory = dir.FullName, + BlockSizeBytes = 256, + }; + + await using (var store = new FileStore(opts)) + { + for (int i = 1; i <= 10; i++) + await store.AppendAsync($"msg.{i}", $"payload-{i}"u8.ToArray(), default); + } + + await using var recovered = new FileStore(new FileStoreOptions + { + Directory = dir.FullName, + BlockSizeBytes = 256, + }); + var state = await recovered.GetStateAsync(default); + state.Messages.ShouldBe(10UL); + + for (ulong seq = 1; seq <= 10; seq++) + { + var msg = await recovered.LoadAsync(seq, default); + msg.ShouldNotBeNull(); + msg.Subject.ShouldBe($"msg.{seq}"); + } + } +} +``` + +**Step 2: Run test to verify it fails** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreBlockRotationTests" -v normal +``` +Expected: FAIL — no automatic rotation + +**Step 3: Implement block rotation** + +In `MsgBlock.cs`: +- Add `bool IsSealed` property (set by `Seal()`) +- Add `void Seal()` — marks block read-only, flushes pending writes +- Add `long BytesWritten` property — current write offset + +In `FileStore.cs`: +- Modify `EnsureActiveBlock()` — check `_activeBlock.BytesWritten >= _options.BlockSizeBytes`, call `RotateBlock()` if exceeded +- Add `RotateBlock()` — seals active block, creates new block with `_nextBlockId++`, adds to `_blocks` +- Modify `RecoverBlocks()` — handle multiple `.blk` files, recover each in order + +**Step 4: Run test to verify it passes** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreBlockRotationTests" -v normal +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add tests/NATS.Server.Tests/FileStoreBlockRotationTests.cs src/NATS.Server/JetStream/Storage/MsgBlock.cs src/NATS.Server/JetStream/Storage/FileStore.cs +git commit -m "feat(filestore): add block rotation on size threshold and sealing" +``` + +--- + +### Task 4: Crash Recovery Enhancement + +Enhance recovery to rebuild TTL wheel, validate checksums, and handle corrupt/truncated blocks. + +**Files:** +- Modify: `src/NATS.Server/JetStream/Storage/FileStore.cs` +- Modify: `src/NATS.Server/JetStream/Storage/MsgBlock.cs` +- Modify: `src/NATS.Server/JetStream/Storage/MessageRecord.cs` +- Test: `tests/NATS.Server.Tests/FileStoreCrashRecoveryTests.cs` +- Reference: `golang/nats-server/server/filestore.go:1754-2401` (recoverFullState, recoverTTLState) + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/FileStoreCrashRecoveryTests.cs`: + +```csharp +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests; + +public class FileStoreCrashRecoveryTests +{ + [Fact] + public async Task Recovery_rebuilds_ttl_wheel_and_expires_old() + { + var dir = Directory.CreateTempSubdirectory(); + + await using (var store = new FileStore(new FileStoreOptions + { + Directory = dir.FullName, + MaxAgeMs = 100, // 100ms TTL + })) + { + await store.AppendAsync("ttl.1", "old"u8.ToArray(), default); + } + + // Wait for messages to expire + await Task.Delay(200); + + // Recovery should expire old messages + await using var recovered = new FileStore(new FileStoreOptions + { + Directory = dir.FullName, + MaxAgeMs = 100, + }); + var state = await recovered.GetStateAsync(default); + state.Messages.ShouldBe(0UL); + } + + [Fact] + public async Task Recovery_handles_truncated_block() + { + var dir = Directory.CreateTempSubdirectory(); + + await using (var store = new FileStore(new FileStoreOptions { Directory = dir.FullName })) + { + await store.AppendAsync("t.1", "data1"u8.ToArray(), default); + await store.AppendAsync("t.2", "data2"u8.ToArray(), default); + } + + // Truncate the block file to simulate crash mid-write + var blkFile = Directory.GetFiles(dir.FullName, "*.blk")[0]; + var info = new FileInfo(blkFile); + using (var fs = File.OpenWrite(blkFile)) + fs.SetLength(info.Length - 5); // chop last 5 bytes + + // Recovery should salvage valid messages + await using var recovered = new FileStore(new FileStoreOptions { Directory = dir.FullName }); + var state = await recovered.GetStateAsync(default); + state.Messages.ShouldBeGreaterThanOrEqualTo(1UL); + } + + [Fact] + public async Task Atomic_state_write_survives_crash() + { + var dir = Directory.CreateTempSubdirectory(); + + await using var store = new FileStore(new FileStoreOptions { Directory = dir.FullName }); + await store.AppendAsync("a.1", "payload"u8.ToArray(), default); + + // Force state write + store.FlushAllPending(); + + // State file should exist + var stateFile = Path.Combine(dir.FullName, "stream.state"); + File.Exists(stateFile).ShouldBeTrue(); + } +} +``` + +**Step 2: Run test to verify it fails** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreCrashRecoveryTests" -v normal +``` +Expected: FAIL + +**Step 3: Implement crash recovery enhancements** + +In `FileStore.cs`: +- Add `RecoverTtlState()` — scan all blocks, re-register unexpired messages in `_ttlWheel` +- Add `WriteStateAtomically(path, data)` — write to temp file, then `File.Move(temp, target, overwrite: true)` +- Enhance `RecoverBlocks()` — handle truncated records gracefully (catch decode errors, skip corrupt tail) +- Implement `FlushAllPending()` — flush active block, write stream state atomically + +In `MsgBlock.cs`: +- Add `RecoverWithValidation()` — validate each record during recovery, skip corrupt entries +- Add `TryReadRecord(offset)` — returns null on corrupt data instead of throwing + +**Step 4: Run test to verify it passes** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreCrashRecoveryTests" -v normal +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add tests/NATS.Server.Tests/FileStoreCrashRecoveryTests.cs src/NATS.Server/JetStream/Storage/FileStore.cs src/NATS.Server/JetStream/Storage/MsgBlock.cs +git commit -m "feat(filestore): enhance crash recovery with TTL rebuild and truncation handling" +``` + +--- + +### Task 5: IStreamStore Methods — Batch 1 (Core Operations) + +Implement the core sync IStreamStore methods: `StoreMsg`, `StoreRawMsg`, `LoadMsg`, `LoadNextMsg`, `LoadLastMsg`, `LoadPrevMsg`, `RemoveMsg`, `EraseMsg`, `State`, `FastState`, `Type`, `Stop`, `Delete`. + +**Files:** +- Modify: `src/NATS.Server/JetStream/Storage/FileStore.cs` +- Test: `tests/NATS.Server.Tests/FileStoreStreamStoreTests.cs` +- Reference: `golang/nats-server/server/filestore.go` (various) + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/FileStoreStreamStoreTests.cs`: + +```csharp +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests; + +public class FileStoreStreamStoreTests +{ + private FileStore CreateStore() + { + var dir = Directory.CreateTempSubdirectory(); + return new FileStore(new FileStoreOptions { Directory = dir.FullName }); + } + + [Fact] + public void StoreMsg_appends_and_returns_seq_ts() + { + using var store = CreateStore(); + var (seq, ts) = store.StoreMsg("foo.bar", null, "hello"u8.ToArray(), 0); + seq.ShouldBe(1UL); + ts.ShouldBeGreaterThan(0L); + } + + [Fact] + public void LoadMsg_returns_stored_message() + { + using var store = CreateStore(); + store.StoreMsg("load.test", null, "payload"u8.ToArray(), 0); + var sm = store.LoadMsg(1, null); + sm.Seq.ShouldBe(1UL); + sm.Subj.ShouldBe("load.test"); + System.Text.Encoding.UTF8.GetString(sm.Msg).ShouldBe("payload"); + } + + [Fact] + public void LoadNextMsg_finds_next_matching() + { + using var store = CreateStore(); + store.StoreMsg("a.1", null, "m1"u8.ToArray(), 0); + store.StoreMsg("b.1", null, "m2"u8.ToArray(), 0); + store.StoreMsg("a.2", null, "m3"u8.ToArray(), 0); + + var (msg, skip) = store.LoadNextMsg("a.*", true, 1, null); + msg.Seq.ShouldBe(1UL); + msg.Subj.ShouldBe("a.1"); + } + + [Fact] + public void LoadLastMsg_returns_most_recent_on_subject() + { + using var store = CreateStore(); + store.StoreMsg("last.subj", null, "first"u8.ToArray(), 0); + store.StoreMsg("last.subj", null, "second"u8.ToArray(), 0); + store.StoreMsg("other", null, "other"u8.ToArray(), 0); + + var sm = store.LoadLastMsg("last.subj", null); + sm.Seq.ShouldBe(2UL); + System.Text.Encoding.UTF8.GetString(sm.Msg).ShouldBe("second"); + } + + [Fact] + public void LoadPrevMsg_returns_message_before_seq() + { + using var store = CreateStore(); + store.StoreMsg("p.1", null, "m1"u8.ToArray(), 0); + store.StoreMsg("p.2", null, "m2"u8.ToArray(), 0); + store.StoreMsg("p.3", null, "m3"u8.ToArray(), 0); + + var sm = store.LoadPrevMsg(3, null); + sm.Seq.ShouldBe(2UL); + } + + [Fact] + public void RemoveMsg_soft_deletes() + { + using var store = CreateStore(); + store.StoreMsg("rm.1", null, "data"u8.ToArray(), 0); + store.RemoveMsg(1).ShouldBeTrue(); + + var state = store.State(); + state.Msgs.ShouldBe(0UL); + } + + [Fact] + public void State_returns_full_stream_state() + { + using var store = CreateStore(); + store.StoreMsg("s.1", null, "a"u8.ToArray(), 0); + store.StoreMsg("s.2", null, "b"u8.ToArray(), 0); + + var state = store.State(); + state.Msgs.ShouldBe(2UL); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe(2UL); + } + + [Fact] + public void Type_returns_file() + { + using var store = CreateStore(); + store.Type().ShouldBe(StorageType.File); + } + + [Fact] + public void Stop_flushes_and_closes() + { + using var store = CreateStore(); + store.StoreMsg("stop.1", null, "data"u8.ToArray(), 0); + store.Stop(); + // After stop, store should not accept writes + Should.Throw(() => store.StoreMsg("stop.2", null, "more"u8.ToArray(), 0)); + } +} +``` + +**Step 2: Run test to verify it fails** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreStreamStoreTests" -v normal +``` +Expected: FAIL — all methods throw `NotSupportedException` + +**Step 3: Implement IStreamStore core methods in FileStore.cs** + +Implement each method: +- `StoreMsg` — synchronous wrapper around `AppendAsync` logic, return `(seq, timestamp)` +- `StoreRawMsg` — append with caller-specified seq/ts, bypass auto-increment +- `LoadMsg` — lookup by sequence in `_messages` dict +- `LoadNextMsg` — scan from `start` seq forward, match `filter` with `SubjectMatch.IsMatch()` +- `LoadLastMsg` — reverse scan `_messages` for subject match +- `LoadPrevMsg` — scan backward from `start-1` +- `RemoveMsg` — mark deleted in `_messages` and `MsgBlock._deleted` +- `EraseMsg` — remove + overwrite bytes on disk with random data +- `State()` — return `StreamState` with first/last/msgs/bytes/deleted +- `FastState()` — populate ref param with minimal fields +- `Type()` — return `StorageType.File` +- `Stop()` — flush active block, close all FDs, mark store as stopped +- `Delete(inline)` — Stop + delete all `.blk` files and directory + +Add `StoreMsg` record type if not already defined: +```csharp +public record struct StoreMsg(ulong Seq, string Subj, byte[]? Hdr, byte[] Msg, long Ts); +``` + +**Step 4: Run test to verify it passes** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreStreamStoreTests" -v normal +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add tests/NATS.Server.Tests/FileStoreStreamStoreTests.cs src/NATS.Server/JetStream/Storage/FileStore.cs +git commit -m "feat(filestore): implement core IStreamStore sync methods" +``` + +--- + +### Task 6: IStreamStore Methods — Batch 2 (Query & State) + +Implement remaining methods: `Purge`, `PurgeEx`, `Compact`, `Truncate`, `GetSeqFromTime`, `FilteredState`, `SubjectsState`, `SubjectsTotals`, `NumPending`, `EncodedStreamState`, `UpdateConfig`, `ResetState`, `FlushAllPending`, `SkipMsg`, `SkipMsgs`. + +**Files:** +- Modify: `src/NATS.Server/JetStream/Storage/FileStore.cs` +- Test: `tests/NATS.Server.Tests/FileStoreQueryTests.cs` +- Reference: `golang/nats-server/server/filestore.go` (various) + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/FileStoreQueryTests.cs`: + +```csharp +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests; + +public class FileStoreQueryTests +{ + private FileStore CreateStore() + { + var dir = Directory.CreateTempSubdirectory(); + return new FileStore(new FileStoreOptions { Directory = dir.FullName }); + } + + [Fact] + public void PurgeEx_with_subject_filter() + { + using var store = CreateStore(); + store.StoreMsg("a.1", null, "m1"u8.ToArray(), 0); + store.StoreMsg("b.1", null, "m2"u8.ToArray(), 0); + store.StoreMsg("a.2", null, "m3"u8.ToArray(), 0); + + var purged = store.PurgeEx("a.*", 0, 0); + purged.ShouldBe(2UL); // only a.1 and a.2 + + var state = store.State(); + state.Msgs.ShouldBe(1UL); // b.1 remains + } + + [Fact] + public void Compact_removes_below_seq() + { + using var store = CreateStore(); + for (int i = 0; i < 5; i++) + store.StoreMsg("c.x", null, $"m{i}"u8.ToArray(), 0); + + store.Compact(3); // remove seq 1, 2 + var state = store.State(); + state.FirstSeq.ShouldBe(3UL); + } + + [Fact] + public void FilteredState_returns_matching_counts() + { + using var store = CreateStore(); + store.StoreMsg("f.a", null, "m1"u8.ToArray(), 0); + store.StoreMsg("f.b", null, "m2"u8.ToArray(), 0); + store.StoreMsg("f.a", null, "m3"u8.ToArray(), 0); + + var ss = store.FilteredState(1, "f.a"); + ss.Msgs.ShouldBe(2UL); + } + + [Fact] + public void SubjectsTotals_returns_per_subject_counts() + { + using var store = CreateStore(); + store.StoreMsg("t.a", null, "m1"u8.ToArray(), 0); + store.StoreMsg("t.b", null, "m2"u8.ToArray(), 0); + store.StoreMsg("t.a", null, "m3"u8.ToArray(), 0); + + var totals = store.SubjectsTotals("t.*"); + totals["t.a"].ShouldBe(2UL); + totals["t.b"].ShouldBe(1UL); + } + + [Fact] + public void NumPending_counts_from_start_seq() + { + using var store = CreateStore(); + store.StoreMsg("np.x", null, "m1"u8.ToArray(), 0); + store.StoreMsg("np.x", null, "m2"u8.ToArray(), 0); + store.StoreMsg("np.y", null, "m3"u8.ToArray(), 0); + + var (total, _) = store.NumPending(2, "np.x", false); + total.ShouldBe(1UL); // only seq 2 matches + } + + [Fact] + public void GetSeqFromTime_returns_first_at_or_after() + { + using var store = CreateStore(); + store.StoreMsg("time.1", null, "m1"u8.ToArray(), 0); + var beforeSecond = DateTime.UtcNow; + store.StoreMsg("time.2", null, "m2"u8.ToArray(), 0); + + var seq = store.GetSeqFromTime(beforeSecond); + seq.ShouldBe(2UL); + } + + [Fact] + public void SkipMsg_reserves_sequence() + { + using var store = CreateStore(); + store.StoreMsg("sk.1", null, "m1"u8.ToArray(), 0); + store.SkipMsg(2); + store.StoreMsg("sk.3", null, "m3"u8.ToArray(), 0); + + var state = store.State(); + state.LastSeq.ShouldBe(3UL); + state.Msgs.ShouldBe(2UL); // seq 2 is skipped, not a message + } +} +``` + +**Step 2: Run test to verify it fails** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreQueryTests" -v normal +``` +Expected: FAIL + +**Step 3: Implement query and state methods in FileStore.cs** + +Implement each remaining method using the in-memory `_messages` dictionary and block structures. Key approaches: +- `PurgeEx` — iterate `_messages`, match subject with wildcard support, remove matching +- `Compact` — remove all entries with seq < given +- `Truncate` — remove all entries with seq > given +- `GetSeqFromTime` — binary search or linear scan by timestamp +- `FilteredState` — count messages matching subject filter from given seq +- `SubjectsState` — group by subject, compute per-subject `SimpleState` +- `SubjectsTotals` — group by subject, count +- `NumPending` — count from start seq matching filter +- `EncodedStreamState` — binary serialize stream state (versioned format) +- `UpdateConfig` — apply new block size, retention, limits +- `ResetState` — clear caches +- `SkipMsg` / `SkipMsgs` — reserve sequence(s) without storing data + +**Step 4: Run test to verify it passes** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreQueryTests" -v normal +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add tests/NATS.Server.Tests/FileStoreQueryTests.cs src/NATS.Server/JetStream/Storage/FileStore.cs +git commit -m "feat(filestore): implement query and state IStreamStore methods" +``` + +--- + +### Task 7: RAFT Binary WAL + +Replace in-memory JSON persistence with binary WAL for production durability. + +**Files:** +- Modify: `src/NATS.Server/Raft/RaftLog.cs` +- Modify: `src/NATS.Server/Raft/RaftNode.cs` +- Create: `src/NATS.Server/Raft/RaftWal.cs` +- Test: `tests/NATS.Server.Tests/RaftWalTests.cs` +- Reference: `golang/nats-server/server/raft.go:1000-1100` + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/RaftWalTests.cs`: + +```csharp +using NATS.Server.Raft; + +namespace NATS.Server.Tests; + +public class RaftWalTests +{ + [Fact] + public async Task Wal_persists_and_recovers_entries() + { + var dir = Directory.CreateTempSubdirectory(); + var walPath = Path.Combine(dir.FullName, "raft.wal"); + + // Write entries + { + var wal = new RaftWal(walPath); + await wal.AppendAsync(new RaftLogEntry(1, 1, "cmd-1")); + await wal.AppendAsync(new RaftLogEntry(2, 1, "cmd-2")); + await wal.AppendAsync(new RaftLogEntry(3, 2, "cmd-3")); + await wal.SyncAsync(); + wal.Dispose(); + } + + // Recover + var recovered = RaftWal.Load(walPath); + var entries = recovered.Entries.ToList(); + entries.Count.ShouldBe(3); + entries[0].Index.ShouldBe(1); + entries[0].Term.ShouldBe(1); + entries[0].Command.ShouldBe("cmd-1"); + entries[2].Index.ShouldBe(3); + entries[2].Term.ShouldBe(2); + recovered.Dispose(); + } + + [Fact] + public async Task Wal_compact_removes_old_entries() + { + var dir = Directory.CreateTempSubdirectory(); + var walPath = Path.Combine(dir.FullName, "raft.wal"); + + var wal = new RaftWal(walPath); + for (int i = 1; i <= 10; i++) + await wal.AppendAsync(new RaftLogEntry(i, 1, $"cmd-{i}")); + await wal.SyncAsync(); + + await wal.CompactAsync(5); // remove entries 1-5 + + var recovered = RaftWal.Load(walPath); + recovered.Entries.Count().ShouldBe(5); + recovered.Entries.First().Index.ShouldBe(6); + wal.Dispose(); + recovered.Dispose(); + } + + [Fact] + public async Task Wal_handles_truncated_file() + { + var dir = Directory.CreateTempSubdirectory(); + var walPath = Path.Combine(dir.FullName, "raft.wal"); + + { + var wal = new RaftWal(walPath); + await wal.AppendAsync(new RaftLogEntry(1, 1, "good-entry")); + await wal.AppendAsync(new RaftLogEntry(2, 1, "will-be-truncated")); + await wal.SyncAsync(); + wal.Dispose(); + } + + // Truncate last few bytes + using (var fs = File.OpenWrite(walPath)) + fs.SetLength(fs.Length - 3); + + var recovered = RaftWal.Load(walPath); + recovered.Entries.Count().ShouldBeGreaterThanOrEqualTo(1); + recovered.Entries.First().Command.ShouldBe("good-entry"); + recovered.Dispose(); + } + + [Fact] + public async Task RaftNode_persists_term_and_vote() + { + var dir = Directory.CreateTempSubdirectory(); + var node = new RaftNode("n1", persistDirectory: dir.FullName); + node.TermState.CurrentTerm = 5; + node.TermState.VotedFor = "n2"; + + await node.PersistAsync(); + + var recovered = new RaftNode("n1", persistDirectory: dir.FullName); + await recovered.LoadPersistedStateAsync(); + recovered.Term.ShouldBe(5); + recovered.TermState.VotedFor.ShouldBe("n2"); + } +} +``` + +**Step 2: Run test to verify it fails** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RaftWalTests" -v normal +``` +Expected: FAIL — `RaftWal` class doesn't exist + +**Step 3: Implement RaftWal** + +Create `src/NATS.Server/Raft/RaftWal.cs`: +```csharp +namespace NATS.Server.Raft; + +/// +/// Binary write-ahead log for RAFT entries. +/// Format: [4-byte version header]([4-byte length][8-byte index][4-byte term][N-byte command])* +/// Go reference: raft.go WAL patterns. +/// +public sealed class RaftWal : IDisposable +{ + private const int Version = 1; + private FileStream _file; + private readonly List _entries = []; + + // Constructor, AppendAsync, SyncAsync, CompactAsync, Load, Dispose... +} +``` + +Key implementation: +- Binary record format: `[4:length_le][8:index_le][4:term_le][N:utf8_command]` +- Version header: `[4:magic][4:version]` at file start +- `AppendAsync` — write length-prefixed record, add to in-memory list +- `SyncAsync` — `FileStream.FlushAsync()` for fsync +- `CompactAsync(upToIndex)` — rewrite WAL from `upToIndex+1` onward using temp file + rename +- `Load(path)` — scan WAL, validate records, skip corrupt tail + +Modify `RaftLog.cs`: +- Add optional `RaftWal` backing — when `_wal` is set, delegate `Append`/`Compact` to it +- Keep in-memory list for fast access, WAL for durability + +Modify `RaftNode.cs`: +- Wire `_persistDirectory` — create WAL at `{dir}/raft.wal` +- `PersistAsync()` — write `meta.json` with term/votedFor +- `LoadPersistedStateAsync()` — load `meta.json` + WAL entries + +**Step 4: Run test to verify it passes** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RaftWalTests" -v normal +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add src/NATS.Server/Raft/RaftWal.cs tests/NATS.Server.Tests/RaftWalTests.cs src/NATS.Server/Raft/RaftLog.cs src/NATS.Server/Raft/RaftNode.cs +git commit -m "feat(raft): add binary WAL for persistent log storage" +``` + +--- + +### Task 8: RAFT Joint Consensus + +Implement safe two-phase membership changes per Raft paper Section 4. + +**Files:** +- Modify: `src/NATS.Server/Raft/RaftNode.cs` +- Test: `tests/NATS.Server.Tests/RaftJointConsensusTests.cs` +- Reference: `golang/nats-server/server/raft.go` Section 4 + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/RaftJointConsensusTests.cs`: + +```csharp +using NATS.Server.Raft; + +namespace NATS.Server.Tests; + +public class RaftJointConsensusTests +{ + [Fact] + public async Task AddPeer_uses_joint_consensus() + { + var cluster = RaftTestCluster.Create(3); + var leader = await cluster.ElectLeaderAsync(); + + await leader.ProposeAddPeerAsync("n4"); + + // During joint phase, quorum requires majority of both old and new + leader.Members.ShouldContain("n4"); + leader.Members.Count.ShouldBe(4); + } + + [Fact] + public async Task RemovePeer_uses_joint_consensus() + { + var cluster = RaftTestCluster.Create(3); + var leader = await cluster.ElectLeaderAsync(); + + await leader.ProposeRemovePeerAsync("n3"); + + leader.Members.ShouldNotContain("n3"); + leader.Members.Count.ShouldBe(2); + } + + [Fact] + public async Task Joint_quorum_requires_both_old_and_new_majority() + { + var cluster = RaftTestCluster.Create(3); + var leader = await cluster.ElectLeaderAsync(); + + // Start add — enters joint config + leader.BeginJointConsensus(["n1", "n2", "n3"], ["n1", "n2", "n3", "n4"]); + + // Joint quorum: majority of {n1,n2,n3} AND majority of {n1,n2,n3,n4} + leader.CalculateJointQuorum(["n1", "n2"], ["n1", "n2", "n3"]).ShouldBeTrue(); // 2/3 and 3/4 + leader.CalculateJointQuorum(["n1"], ["n1", "n2"]).ShouldBeFalse(); // 1/3 fails old quorum + } + + [Fact] + public async Task Only_one_membership_change_at_a_time() + { + var cluster = RaftTestCluster.Create(3); + var leader = await cluster.ElectLeaderAsync(); + + await leader.ProposeAddPeerAsync("n4"); + + // Second concurrent change should be rejected + var ex = await Should.ThrowAsync( + () => leader.ProposeAddPeerAsync("n5")); + } +} +``` + +**Step 2: Run test to verify it fails** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RaftJointConsensusTests" -v normal +``` +Expected: FAIL + +**Step 3: Implement joint consensus in RaftNode.cs** + +Add to `RaftNode.cs`: +- New field: `HashSet? _jointNewMembers` — Cnew during transition +- `BeginJointConsensus(cold, cnew)` — set `_jointNewMembers = cnew`, propose joint entry +- `CalculateJointQuorum(coldVotes, cnewVotes)` — `majority(cold) AND majority(cnew)` +- Modify `ProposeAddPeerAsync`: + 1. Check `MembershipChangeInProgress` — reject if true + 2. Propose joint entry (`Cold ∪ Cnew`) to log + 3. Wait for joint entry commit + 4. Propose final Cnew entry to log + 5. Wait for final entry commit + 6. Clear `_jointNewMembers` +- Modify `ProposeRemovePeerAsync` — same two-phase pattern +- Modify `ApplyMembershipChange(entry)`: + - If joint entry: set `_jointNewMembers` + - If final entry: replace `_members` with Cnew, clear `_jointNewMembers` + +**Step 4: Run test to verify it passes** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RaftJointConsensusTests" -v normal +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add tests/NATS.Server.Tests/RaftJointConsensusTests.cs src/NATS.Server/Raft/RaftNode.cs +git commit -m "feat(raft): implement joint consensus for safe membership changes" +``` + +--- + +**Phase 1 Exit Gate Verification:** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStore" -v normal +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RaftWal" -v normal +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RaftJointConsensus" -v normal +``` + +All Phase 1 tests must pass before proceeding to Phase 2. + +--- + +## Phase 2: JetStream Cluster Coordination + +**Dependencies:** Phase 1 (RAFT persistence, FileStore) +**Exit gate:** Meta-group processes stream/consumer assignments via RAFT, snapshots encode/decode round-trip, leadership transitions handled correctly, orphan detection works. + +--- + +### Task 9: Meta Snapshot Encoding/Decoding + +Implement binary snapshot codec for meta-group state with S2 compression. + +**Files:** +- Create: `src/NATS.Server/JetStream/Cluster/MetaSnapshotCodec.cs` +- Test: `tests/NATS.Server.Tests/MetaSnapshotCodecTests.cs` +- Reference: `golang/nats-server/server/jetstream_cluster.go:2031-2145` (encodeMetaSnapshot, decodeMetaSnapshot) + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/MetaSnapshotCodecTests.cs`: + +```csharp +using NATS.Server.JetStream.Cluster; + +namespace NATS.Server.Tests; + +public class MetaSnapshotCodecTests +{ + [Fact] + public void Encode_decode_round_trips() + { + var assignments = new Dictionary + { + ["stream-A"] = new StreamAssignment + { + StreamName = "stream-A", + Group = new RaftGroup { Name = "rg-a", Peers = ["n1", "n2", "n3"] }, + ConfigJson = """{"subjects":["foo.>"]}""", + }, + ["stream-B"] = new StreamAssignment + { + StreamName = "stream-B", + Group = new RaftGroup { Name = "rg-b", Peers = ["n1", "n2"] }, + ConfigJson = """{"subjects":["bar.>"]}""", + Consumers = + { + ["con-1"] = new ConsumerAssignment + { + ConsumerName = "con-1", + StreamName = "stream-B", + Group = new RaftGroup { Name = "rg-c1", Peers = ["n1", "n2"] }, + }, + }, + }, + }; + + var encoded = MetaSnapshotCodec.Encode(assignments); + encoded.ShouldNotBeEmpty(); + + var decoded = MetaSnapshotCodec.Decode(encoded); + decoded.Count.ShouldBe(2); + decoded["stream-A"].StreamName.ShouldBe("stream-A"); + decoded["stream-A"].Group.Peers.Count.ShouldBe(3); + decoded["stream-B"].Consumers.Count.ShouldBe(1); + decoded["stream-B"].Consumers["con-1"].ConsumerName.ShouldBe("con-1"); + } + + [Fact] + public void Encoded_snapshot_is_compressed() + { + var assignments = new Dictionary(); + for (int i = 0; i < 100; i++) + { + assignments[$"stream-{i}"] = new StreamAssignment + { + StreamName = $"stream-{i}", + Group = new RaftGroup { Name = $"rg-{i}", Peers = ["n1", "n2", "n3"] }, + ConfigJson = """{"subjects":["test.>"]}""", + }; + } + + var encoded = MetaSnapshotCodec.Encode(assignments); + var json = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(assignments); + + // S2 compressed should be smaller than raw JSON + encoded.Length.ShouldBeLessThan(json.Length); + } + + [Fact] + public void Empty_snapshot_round_trips() + { + var empty = new Dictionary(); + var encoded = MetaSnapshotCodec.Encode(empty); + var decoded = MetaSnapshotCodec.Decode(encoded); + decoded.ShouldBeEmpty(); + } + + [Fact] + public void Versioned_format_rejects_unknown_version() + { + var bad = new byte[] { 0xFF, 0xFF, 0, 0 }; // invalid version + Should.Throw(() => MetaSnapshotCodec.Decode(bad)); + } +} +``` + +**Step 2: Run test to verify it fails** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MetaSnapshotCodecTests" -v normal +``` +Expected: FAIL — `MetaSnapshotCodec` doesn't exist + +**Step 3: Implement MetaSnapshotCodec** + +Create `src/NATS.Server/JetStream/Cluster/MetaSnapshotCodec.cs`: + +```csharp +using System.Buffers.Binary; +using System.Text.Json; +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.JetStream.Cluster; + +/// +/// Binary codec for meta-group snapshots. Format: +/// [2:version][N:S2-compressed JSON of assignment map] +/// Go reference: jetstream_cluster.go:2075-2145 (encodeMetaSnapshot/decodeMetaSnapshot) +/// +public static class MetaSnapshotCodec +{ + private const ushort CurrentVersion = 1; + + public static byte[] Encode(Dictionary assignments) { ... } + public static Dictionary Decode(byte[] data) { ... } +} +``` + +Implementation: +- `Encode`: JSON serialize → S2 compress → prepend 2-byte version header +- `Decode`: read version → strip header → S2 decompress → JSON deserialize +- Reject unknown versions with `InvalidOperationException` + +**Step 4: Run test to verify it passes** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MetaSnapshotCodecTests" -v normal +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add src/NATS.Server/JetStream/Cluster/MetaSnapshotCodec.cs tests/NATS.Server.Tests/MetaSnapshotCodecTests.cs +git commit -m "feat(cluster): add MetaSnapshotCodec with S2 compression and versioned format" +``` + +--- + +### Task 10: Cluster Monitoring Loop + +Background loop that processes meta RAFT entries and drives cluster state transitions. + +**Files:** +- Create: `src/NATS.Server/JetStream/Cluster/JetStreamClusterMonitor.cs` +- Test: `tests/NATS.Server.Tests/JetStreamClusterMonitorTests.cs` +- Reference: `golang/nats-server/server/jetstream_cluster.go:1455-1825` (monitorCluster) + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/JetStreamClusterMonitorTests.cs`: + +```csharp +using System.Threading.Channels; +using NATS.Server.JetStream.Cluster; +using NATS.Server.Raft; + +namespace NATS.Server.Tests; + +public class JetStreamClusterMonitorTests +{ + [Fact] + public async Task Monitor_processes_stream_assignment_entry() + { + var meta = new JetStreamMetaGroup(3); + var channel = Channel.CreateUnbounded(); + var monitor = new JetStreamClusterMonitor(meta, channel.Reader); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); + var monitorTask = monitor.StartAsync(cts.Token); + + // Write a stream assignment entry + var assignJson = System.Text.Json.JsonSerializer.Serialize(new + { + Op = "assignStream", + StreamName = "test-stream", + Peers = new[] { "n1", "n2", "n3" }, + Config = """{"subjects":["test.>"]}""", + }); + await channel.Writer.WriteAsync(new RaftLogEntry(1, 1, assignJson)); + + // Give the monitor time to process + await Task.Delay(100); + + meta.StreamCount.ShouldBe(1); + meta.GetStreamAssignment("test-stream").ShouldNotBeNull(); + + cts.Cancel(); + await monitorTask; + } + + [Fact] + public async Task Monitor_processes_consumer_assignment_entry() + { + var meta = new JetStreamMetaGroup(3); + var channel = Channel.CreateUnbounded(); + var monitor = new JetStreamClusterMonitor(meta, channel.Reader); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); + var monitorTask = monitor.StartAsync(cts.Token); + + // First assign stream + var streamJson = System.Text.Json.JsonSerializer.Serialize(new + { + Op = "assignStream", + StreamName = "s1", + Peers = new[] { "n1", "n2", "n3" }, + Config = """{"subjects":["x.>"]}""", + }); + await channel.Writer.WriteAsync(new RaftLogEntry(1, 1, streamJson)); + + // Then assign consumer + var consumerJson = System.Text.Json.JsonSerializer.Serialize(new + { + Op = "assignConsumer", + StreamName = "s1", + ConsumerName = "c1", + Peers = new[] { "n1", "n2", "n3" }, + }); + await channel.Writer.WriteAsync(new RaftLogEntry(2, 1, consumerJson)); + + await Task.Delay(100); + + meta.ConsumerCount.ShouldBe(1); + + cts.Cancel(); + await monitorTask; + } + + [Fact] + public async Task Monitor_processes_stream_removal() + { + var meta = new JetStreamMetaGroup(3); + var channel = Channel.CreateUnbounded(); + var monitor = new JetStreamClusterMonitor(meta, channel.Reader); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); + var monitorTask = monitor.StartAsync(cts.Token); + + // Assign then remove + var assignJson = System.Text.Json.JsonSerializer.Serialize(new + { + Op = "assignStream", + StreamName = "to-remove", + Peers = new[] { "n1", "n2", "n3" }, + Config = """{"subjects":["rm.>"]}""", + }); + await channel.Writer.WriteAsync(new RaftLogEntry(1, 1, assignJson)); + await Task.Delay(50); + + var removeJson = System.Text.Json.JsonSerializer.Serialize(new + { + Op = "removeStream", + StreamName = "to-remove", + }); + await channel.Writer.WriteAsync(new RaftLogEntry(2, 1, removeJson)); + await Task.Delay(100); + + meta.StreamCount.ShouldBe(0); + + cts.Cancel(); + await monitorTask; + } + + [Fact] + public async Task Monitor_applies_meta_snapshot() + { + var meta = new JetStreamMetaGroup(3); + var channel = Channel.CreateUnbounded(); + var monitor = new JetStreamClusterMonitor(meta, channel.Reader); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); + var monitorTask = monitor.StartAsync(cts.Token); + + // Create a snapshot with known state + var assignments = new Dictionary + { + ["snap-stream"] = new StreamAssignment + { + StreamName = "snap-stream", + Group = new RaftGroup { Name = "rg-snap", Peers = ["n1", "n2", "n3"] }, + }, + }; + var snapshot = MetaSnapshotCodec.Encode(assignments); + var snapshotB64 = Convert.ToBase64String(snapshot); + + var snapshotJson = System.Text.Json.JsonSerializer.Serialize(new + { + Op = "snapshot", + Data = snapshotB64, + }); + await channel.Writer.WriteAsync(new RaftLogEntry(1, 1, snapshotJson)); + await Task.Delay(100); + + meta.StreamCount.ShouldBe(1); + meta.GetStreamAssignment("snap-stream").ShouldNotBeNull(); + + cts.Cancel(); + await monitorTask; + } +} +``` + +**Step 2: Run test to verify it fails** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamClusterMonitorTests" -v normal +``` +Expected: FAIL — `JetStreamClusterMonitor` doesn't exist + +**Step 3: Implement JetStreamClusterMonitor** + +Create `src/NATS.Server/JetStream/Cluster/JetStreamClusterMonitor.cs`: + +```csharp +using System.Text.Json; +using System.Threading.Channels; +using NATS.Server.Raft; + +namespace NATS.Server.JetStream.Cluster; + +/// +/// Background loop consuming meta RAFT entries and dispatching cluster state changes. +/// Go reference: jetstream_cluster.go:1455-1825 (monitorCluster). +/// +public sealed class JetStreamClusterMonitor +{ + private readonly JetStreamMetaGroup _meta; + private readonly ChannelReader _entries; + + public JetStreamClusterMonitor(JetStreamMetaGroup meta, ChannelReader entries) { ... } + + public async Task StartAsync(CancellationToken ct) { ... } + private void ApplyMetaEntry(RaftLogEntry entry) { ... } + private void ProcessStreamAssignment(JsonElement data) { ... } + private void ProcessConsumerAssignment(JsonElement data) { ... } + private void ProcessStreamRemoval(JsonElement data) { ... } + private void ProcessConsumerRemoval(JsonElement data) { ... } + private void ApplyMetaSnapshot(JsonElement data) { ... } +} +``` + +The main loop: +1. Read entries from channel +2. Parse JSON, dispatch by `Op` field +3. Call corresponding handler on `JetStreamMetaGroup` + +Add to `JetStreamMetaGroup.cs`: +- `AddStreamAssignment(StreamAssignment sa)` — adds to `_assignments` +- `RemoveStreamAssignment(string streamName)` — removes from `_assignments` +- `AddConsumerAssignment(string streamName, ConsumerAssignment ca)` — adds to stream's consumers +- `RemoveConsumerAssignment(string streamName, string consumerName)` — removes consumer +- `ReplaceAllAssignments(Dictionary newState)` — for snapshot apply + +**Step 4: Run test to verify it passes** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamClusterMonitorTests" -v normal +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add src/NATS.Server/JetStream/Cluster/JetStreamClusterMonitor.cs tests/NATS.Server.Tests/JetStreamClusterMonitorTests.cs src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs +git commit -m "feat(cluster): add JetStreamClusterMonitor for meta RAFT entry processing" +``` + +--- + +### Task 11: Stream/Consumer Assignment Processing + +Enhance `JetStreamMetaGroup` to validate and process assignments with proper error handling. + +**Files:** +- Modify: `src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs` +- Test: `tests/NATS.Server.Tests/JetStreamAssignmentProcessingTests.cs` +- Reference: `golang/nats-server/server/jetstream_cluster.go:4541-5925` + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/JetStreamAssignmentProcessingTests.cs`: + +```csharp +using NATS.Server.JetStream.Cluster; + +namespace NATS.Server.Tests; + +public class JetStreamAssignmentProcessingTests +{ + [Fact] + public void ProcessStreamAssignment_validates_config() + { + var meta = new JetStreamMetaGroup(3); + var sa = new StreamAssignment + { + StreamName = "valid-stream", + Group = new RaftGroup { Name = "rg-1", Peers = ["n1", "n2", "n3"] }, + ConfigJson = """{"subjects":["test.>"]}""", + }; + + meta.ProcessStreamAssignment(sa).ShouldBeTrue(); + meta.StreamCount.ShouldBe(1); + } + + [Fact] + public void ProcessStreamAssignment_rejects_empty_name() + { + var meta = new JetStreamMetaGroup(3); + var sa = new StreamAssignment + { + StreamName = "", + Group = new RaftGroup { Name = "rg-1", Peers = ["n1", "n2", "n3"] }, + }; + + meta.ProcessStreamAssignment(sa).ShouldBeFalse(); + meta.StreamCount.ShouldBe(0); + } + + [Fact] + public void ProcessUpdateStreamAssignment_applies_config_change() + { + var meta = new JetStreamMetaGroup(3); + var sa = new StreamAssignment + { + StreamName = "updatable", + Group = new RaftGroup { Name = "rg-u", Peers = ["n1", "n2", "n3"] }, + ConfigJson = """{"subjects":["old.>"]}""", + }; + meta.ProcessStreamAssignment(sa); + + var updated = new StreamAssignment + { + StreamName = "updatable", + Group = new RaftGroup { Name = "rg-u", Peers = ["n1", "n2", "n3"] }, + ConfigJson = """{"subjects":["new.>"]}""", + }; + meta.ProcessUpdateStreamAssignment(updated).ShouldBeTrue(); + + var assignment = meta.GetStreamAssignment("updatable"); + assignment!.ConfigJson.ShouldContain("new.>"); + } + + [Fact] + public void ProcessConsumerAssignment_requires_existing_stream() + { + var meta = new JetStreamMetaGroup(3); + var ca = new ConsumerAssignment + { + ConsumerName = "orphan-consumer", + StreamName = "nonexistent-stream", + Group = new RaftGroup { Name = "rg-c", Peers = ["n1", "n2", "n3"] }, + }; + + meta.ProcessConsumerAssignment(ca).ShouldBeFalse(); + } + + [Fact] + public void ProcessStreamRemoval_cascades_to_consumers() + { + var meta = new JetStreamMetaGroup(3); + meta.ProcessStreamAssignment(new StreamAssignment + { + StreamName = "cascade", + Group = new RaftGroup { Name = "rg-cas", Peers = ["n1", "n2", "n3"] }, + }); + meta.ProcessConsumerAssignment(new ConsumerAssignment + { + ConsumerName = "c1", + StreamName = "cascade", + Group = new RaftGroup { Name = "rg-c1", Peers = ["n1", "n2", "n3"] }, + }); + + meta.ProcessStreamRemoval("cascade").ShouldBeTrue(); + meta.StreamCount.ShouldBe(0); + meta.ConsumerCount.ShouldBe(0); + } +} +``` + +**Step 2: Run test to verify it fails** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamAssignmentProcessingTests" -v normal +``` +Expected: FAIL + +**Step 3: Implement assignment processing in JetStreamMetaGroup.cs** + +Add methods: +- `bool ProcessStreamAssignment(StreamAssignment sa)` — validate name, check duplicates, add +- `bool ProcessUpdateStreamAssignment(StreamAssignment sa)` — find existing, update config +- `bool ProcessStreamRemoval(string streamName)` — remove stream + cascade consumers +- `bool ProcessConsumerAssignment(ConsumerAssignment ca)` — validate stream exists, add consumer +- `bool ProcessConsumerRemoval(string streamName, string consumerName)` — remove consumer + +**Step 4: Run test to verify it passes** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamAssignmentProcessingTests" -v normal +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add tests/NATS.Server.Tests/JetStreamAssignmentProcessingTests.cs src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs +git commit -m "feat(cluster): add stream/consumer assignment processing with validation" +``` + +--- + +### Task 12: Inflight Tracking Enhancement + +Replace simple string-based inflight maps with structured tracking. + +**Files:** +- Modify: `src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs` +- Test: `tests/NATS.Server.Tests/JetStreamInflightTrackingTests.cs` +- Reference: `golang/nats-server/server/jetstream_cluster.go:1193-1278` + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/JetStreamInflightTrackingTests.cs`: + +```csharp +using NATS.Server.JetStream.Cluster; + +namespace NATS.Server.Tests; + +public class JetStreamInflightTrackingTests +{ + [Fact] + public void TrackInflightStreamProposal_increments_ops() + { + var meta = new JetStreamMetaGroup(3); + var sa = new StreamAssignment + { + StreamName = "inflight-1", + Group = new RaftGroup { Name = "rg-inf", Peers = ["n1", "n2", "n3"] }, + }; + + meta.TrackInflightStreamProposal("ACC", sa); + meta.InflightStreamCount.ShouldBe(1); + meta.IsStreamInflight("ACC", "inflight-1").ShouldBeTrue(); + } + + [Fact] + public void RemoveInflightStreamProposal_clears_when_zero() + { + var meta = new JetStreamMetaGroup(3); + var sa = new StreamAssignment + { + StreamName = "inflight-2", + Group = new RaftGroup { Name = "rg-inf2", Peers = ["n1", "n2", "n3"] }, + }; + + meta.TrackInflightStreamProposal("ACC", sa); + meta.RemoveInflightStreamProposal("ACC", "inflight-2"); + meta.IsStreamInflight("ACC", "inflight-2").ShouldBeFalse(); + } + + [Fact] + public void Duplicate_proposal_increments_ops_count() + { + var meta = new JetStreamMetaGroup(3); + var sa = new StreamAssignment + { + StreamName = "dup-stream", + Group = new RaftGroup { Name = "rg-dup", Peers = ["n1", "n2", "n3"] }, + }; + + meta.TrackInflightStreamProposal("ACC", sa); + meta.TrackInflightStreamProposal("ACC", sa); + meta.InflightStreamCount.ShouldBe(1); // still one unique stream + + // Need two removes to fully clear + meta.RemoveInflightStreamProposal("ACC", "dup-stream"); + meta.IsStreamInflight("ACC", "dup-stream").ShouldBeTrue(); // ops > 0 + meta.RemoveInflightStreamProposal("ACC", "dup-stream"); + meta.IsStreamInflight("ACC", "dup-stream").ShouldBeFalse(); + } +} +``` + +**Step 2: Run test to verify it fails** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamInflightTrackingTests" -v normal +``` +Expected: FAIL + +**Step 3: Implement enhanced inflight tracking** + +In `JetStreamMetaGroup.cs`: +- Add `record InflightInfo(int OpsCount, bool Deleted, StreamAssignment? Assignment)` +- Replace `ConcurrentDictionary _inflightStreams` with `ConcurrentDictionary>` +- `TrackInflightStreamProposal(account, sa)` — increment ops, store assignment +- `RemoveInflightStreamProposal(account, streamName)` — decrement ops, remove when zero +- `IsStreamInflight(account, streamName)` — check if ops > 0 +- Same pattern for consumer inflight tracking + +**Step 4: Run test to verify it passes** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamInflightTrackingTests" -v normal +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add tests/NATS.Server.Tests/JetStreamInflightTrackingTests.cs src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs +git commit -m "feat(cluster): add structured inflight proposal tracking with ops counting" +``` + +--- + +### Task 13: Leadership Transitions + +Handle meta-group, stream, and consumer leadership changes. + +**Files:** +- Modify: `src/NATS.Server/JetStream/Cluster/JetStreamClusterMonitor.cs` +- Modify: `src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs` +- Test: `tests/NATS.Server.Tests/JetStreamLeadershipTests.cs` +- Reference: `golang/nats-server/server/jetstream_cluster.go:7001-7074` (processLeaderChange) + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/JetStreamLeadershipTests.cs`: + +```csharp +using NATS.Server.JetStream.Cluster; + +namespace NATS.Server.Tests; + +public class JetStreamLeadershipTests +{ + [Fact] + public void ProcessLeaderChange_clears_inflight_on_step_down() + { + var meta = new JetStreamMetaGroup(3); + meta.TrackInflightStreamProposal("ACC", new StreamAssignment + { + StreamName = "s1", + Group = new RaftGroup { Name = "rg", Peers = ["n1", "n2", "n3"] }, + }); + + meta.ProcessLeaderChange(isLeader: false); + + meta.InflightStreamCount.ShouldBe(0); + } + + [Fact] + public void ProcessLeaderChange_becoming_leader_resets_state() + { + var meta = new JetStreamMetaGroup(3); + var leaderChanged = false; + meta.OnLeaderChange += (isLeader) => leaderChanged = true; + + meta.ProcessLeaderChange(isLeader: true); + + leaderChanged.ShouldBeTrue(); + } + + [Fact] + public void StepDown_triggers_leader_change() + { + var meta = new JetStreamMetaGroup(3); + meta.BecomeLeader(); + meta.IsLeader().ShouldBeTrue(); + + meta.StepDown(); + + meta.IsLeader().ShouldBeFalse(); + } +} +``` + +**Step 2: Run test to verify it fails** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamLeadershipTests" -v normal +``` +Expected: FAIL + +**Step 3: Implement leadership transition** + +In `JetStreamMetaGroup.cs`: +- Add `event Action? OnLeaderChange` +- `ProcessLeaderChange(isLeader)`: + - If stepping down: clear all inflight maps + - If becoming leader: fire `OnLeaderChange`, initialize update subscriptions +- Modify `StepDown()` to call `ProcessLeaderChange(false)` + +**Step 4: Run test to verify it passes** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamLeadershipTests" -v normal +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add tests/NATS.Server.Tests/JetStreamLeadershipTests.cs src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs src/NATS.Server/JetStream/Cluster/JetStreamClusterMonitor.cs +git commit -m "feat(cluster): implement leadership transition with inflight cleanup" +``` + +--- + +**Phase 2 Exit Gate Verification:** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MetaSnapshotCodec" -v normal +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamClusterMonitor" -v normal +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamAssignment" -v normal +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamInflight" -v normal +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamLeadership" -v normal +``` + +All Phase 2 tests must pass before proceeding to Phase 3. + +--- + +## Phase 3: Consumer + Stream Engines + +**Dependencies:** Phase 1 (storage), Phase 2 (cluster coordination) +**Exit gate:** Consumer delivery loop delivers messages with redelivery, pull requests expire and respect batch/maxBytes, purge with subject filtering works, mirror/source retry recovers from failures. + +--- + +### Task 14: RedeliveryTracker with PriorityQueue + +Replace Dictionary-based redelivery tracking with min-heap for efficient deadline-based scheduling. + +**Files:** +- Modify: `src/NATS.Server/JetStream/Consumers/RedeliveryTracker.cs` +- Test: `tests/NATS.Server.Tests/RedeliveryTrackerTests.cs` +- Reference: `golang/nats-server/server/consumer.go` (rdq redelivery queue) + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/RedeliveryTrackerTests.cs`: + +```csharp +using NATS.Server.JetStream.Consumers; + +namespace NATS.Server.Tests; + +public class RedeliveryTrackerTests +{ + [Fact] + public void Schedule_and_get_due_returns_expired() + { + var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 1000); + var past = DateTimeOffset.UtcNow.AddMilliseconds(-100); + + tracker.Schedule(1, past); + tracker.Schedule(2, DateTimeOffset.UtcNow.AddSeconds(60)); // future + + var due = tracker.GetDue(DateTimeOffset.UtcNow).ToList(); + due.Count.ShouldBe(1); + due[0].ShouldBe(1UL); + } + + [Fact] + public void Acknowledge_removes_from_queue() + { + var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 1000); + tracker.Schedule(1, DateTimeOffset.UtcNow.AddMilliseconds(-100)); + + tracker.Acknowledge(1); + + var due = tracker.GetDue(DateTimeOffset.UtcNow).ToList(); + due.ShouldBeEmpty(); + } + + [Fact] + public void IsMaxDeliveries_returns_true_at_threshold() + { + var tracker = new RedeliveryTracker(maxDeliveries: 3, ackWaitMs: 1000); + + tracker.IncrementDeliveryCount(1); + tracker.IncrementDeliveryCount(1); + tracker.IsMaxDeliveries(1).ShouldBeFalse(); + + tracker.IncrementDeliveryCount(1); + tracker.IsMaxDeliveries(1).ShouldBeTrue(); + } + + [Fact] + public void Backoff_schedule_uses_delivery_count() + { + var backoff = new long[] { 100, 500, 2000 }; + var tracker = new RedeliveryTracker(maxDeliveries: 10, ackWaitMs: 1000, backoffMs: backoff); + + // First redeliver: 100ms + var delay1 = tracker.GetBackoffDelay(deliveryCount: 1); + delay1.ShouldBe(100L); + + // Second: 500ms + var delay2 = tracker.GetBackoffDelay(deliveryCount: 2); + delay2.ShouldBe(500L); + + // Beyond schedule: use last value + var delay4 = tracker.GetBackoffDelay(deliveryCount: 4); + delay4.ShouldBe(2000L); + } + + [Fact] + public void GetDue_returns_in_deadline_order() + { + var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 1000); + var now = DateTimeOffset.UtcNow; + + tracker.Schedule(3, now.AddMilliseconds(-300)); + tracker.Schedule(1, now.AddMilliseconds(-100)); + tracker.Schedule(2, now.AddMilliseconds(-200)); + + var due = tracker.GetDue(now).ToList(); + due.Count.ShouldBe(3); + due[0].ShouldBe(3UL); // earliest deadline first + due[1].ShouldBe(2UL); + due[2].ShouldBe(1UL); + } +} +``` + +**Step 2: Run test to verify it fails** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RedeliveryTrackerTests" -v normal +``` +Expected: FAIL + +**Step 3: Rewrite RedeliveryTracker internals** + +In `RedeliveryTracker.cs`: +- Replace `Dictionary` with `PriorityQueue` +- Add `Dictionary _deliveryCounts` for per-sequence delivery count +- `Schedule(seq, deadline)` — enqueue with deadline priority +- `GetDue(now)` — dequeue all entries past deadline, return in order +- `Acknowledge(seq)` — remove from queue and counts +- `IncrementDeliveryCount(seq)` — increment counter +- `IsMaxDeliveries(seq)` — check against `_maxDeliveries` +- `GetBackoffDelay(deliveryCount)` — index into `_backoffMs` array, clamp to last +- Constructor: `(int maxDeliveries, long ackWaitMs, long[]? backoffMs = null)` + +**Step 4: Run test to verify it passes** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RedeliveryTrackerTests" -v normal +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add tests/NATS.Server.Tests/RedeliveryTrackerTests.cs src/NATS.Server/JetStream/Consumers/RedeliveryTracker.cs +git commit -m "feat(consumer): rewrite RedeliveryTracker with PriorityQueue min-heap" +``` + +--- + +### Task 15: Ack/NAK Processing Enhancement + +Add background ack subscription processing with NAK delay, TERM, and WPI support. + +**Files:** +- Modify: `src/NATS.Server/JetStream/Consumers/AckProcessor.cs` +- Test: `tests/NATS.Server.Tests/AckProcessorTests.cs` +- Reference: `golang/nats-server/server/consumer.go:4854` (processInboundAcks) + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/AckProcessorTests.cs`: + +```csharp +using NATS.Server.JetStream.Consumers; + +namespace NATS.Server.Tests; + +public class AckProcessorTests +{ + [Fact] + public void ProcessAck_removes_from_pending() + { + var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 30000); + var processor = new AckProcessor(tracker); + + processor.Register(1, "deliver.subj"); + processor.PendingCount.ShouldBe(1); + + processor.ProcessAck(1); + processor.PendingCount.ShouldBe(0); + } + + [Fact] + public void ProcessNak_schedules_redelivery() + { + var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 30000); + var processor = new AckProcessor(tracker); + + processor.Register(1, "deliver.subj"); + processor.ProcessNak(1, delayMs: 500); + + processor.PendingCount.ShouldBe(1); // still pending until redelivered + } + + [Fact] + public void ProcessNak_with_backoff_uses_schedule() + { + var backoff = new long[] { 100, 500, 2000 }; + var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 30000, backoffMs: backoff); + var processor = new AckProcessor(tracker); + + processor.Register(1, "deliver.subj"); + processor.ProcessNak(1, delayMs: -1); // -1 means use backoff schedule + + // Should use first backoff value (100ms) + processor.PendingCount.ShouldBe(1); + } + + [Fact] + public void ProcessTerm_removes_permanently() + { + var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 30000); + var processor = new AckProcessor(tracker); + + processor.Register(1, "deliver.subj"); + processor.ProcessTerm(1); + + processor.PendingCount.ShouldBe(0); + processor.TerminatedCount.ShouldBe(1); + } + + [Fact] + public void ProcessProgress_extends_ack_deadline() + { + var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 1000); + var processor = new AckProcessor(tracker); + + processor.Register(1, "deliver.subj"); + var originalDeadline = processor.GetDeadline(1); + + processor.ProcessProgress(1); + var newDeadline = processor.GetDeadline(1); + + newDeadline.ShouldBeGreaterThan(originalDeadline); + } + + [Fact] + public void MaxAckPending_blocks_new_registrations() + { + var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 30000); + var processor = new AckProcessor(tracker, maxAckPending: 2); + + processor.Register(1, "d.1"); + processor.Register(2, "d.2"); + processor.CanRegister().ShouldBeFalse(); + } + + [Fact] + public void ParseAckType_identifies_all_types() + { + AckProcessor.ParseAckType("+ACK"u8).ShouldBe(AckType.Ack); + AckProcessor.ParseAckType("-NAK"u8).ShouldBe(AckType.Nak); + AckProcessor.ParseAckType("+TERM"u8).ShouldBe(AckType.Term); + AckProcessor.ParseAckType("+WPI"u8).ShouldBe(AckType.Progress); + } +} +``` + +**Step 2: Run test to verify it fails** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~AckProcessorTests" -v normal +``` +Expected: FAIL + +**Step 3: Enhance AckProcessor** + +In `AckProcessor.cs`: +- Add `enum AckType { Ack, Nak, Term, Progress }` +- Add `static AckType ParseAckType(ReadOnlySpan data)` — parse `+ACK`, `-NAK`, `+TERM`, `+WPI` +- Add `int PendingCount` property +- Add `int TerminatedCount` property (via `_terminated.Count`) +- Add `bool CanRegister()` — checks `PendingCount < _maxAckPending` +- Add `DateTimeOffset GetDeadline(ulong seq)` — returns ack deadline for pending message +- Enhance `ProcessNak(seq, delayMs)` — if `delayMs == -1`, use backoff schedule from tracker +- Add `ProcessProgress(seq)` — extend deadline by `_ackWaitMs` +- Constructor: add optional `maxAckPending` parameter + +**Step 4: Run test to verify it passes** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~AckProcessorTests" -v normal +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add tests/NATS.Server.Tests/AckProcessorTests.cs src/NATS.Server/JetStream/Consumers/AckProcessor.cs +git commit -m "feat(consumer): enhance AckProcessor with NAK delay, TERM, WPI, and maxAckPending" +``` + +--- + +### Task 16: Pull Request Pipeline + +Implement waiting request queue with expiry, batch/maxBytes enforcement. + +**Files:** +- Create: `src/NATS.Server/JetStream/Consumers/WaitingRequestQueue.cs` +- Modify: `src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs` +- Test: `tests/NATS.Server.Tests/WaitingRequestQueueTests.cs` +- Reference: `golang/nats-server/server/consumer.go:4276-4450` (processNextMsgRequest) + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/WaitingRequestQueueTests.cs`: + +```csharp +using NATS.Server.JetStream.Consumers; + +namespace NATS.Server.Tests; + +public class WaitingRequestQueueTests +{ + [Fact] + public void Enqueue_and_dequeue_fifo() + { + var queue = new WaitingRequestQueue(); + queue.Enqueue(new PullRequest("reply.1", Batch: 10, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: false)); + queue.Enqueue(new PullRequest("reply.2", Batch: 5, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: false)); + + queue.Count.ShouldBe(2); + + var first = queue.TryDequeue(); + first.ShouldNotBeNull(); + first.ReplyTo.ShouldBe("reply.1"); + } + + [Fact] + public void Expired_requests_are_removed() + { + var queue = new WaitingRequestQueue(); + queue.Enqueue(new PullRequest("expired", Batch: 10, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMilliseconds(-100), NoWait: false)); + queue.Enqueue(new PullRequest("valid", Batch: 10, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: false)); + + queue.RemoveExpired(DateTimeOffset.UtcNow); + queue.Count.ShouldBe(1); + + var next = queue.TryDequeue(); + next!.ReplyTo.ShouldBe("valid"); + } + + [Fact] + public void NoWait_request_returns_immediately_when_empty() + { + var queue = new WaitingRequestQueue(); + queue.Enqueue(new PullRequest("nowait", Batch: 10, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: true)); + + var req = queue.TryDequeue(); + req.ShouldNotBeNull(); + req.NoWait.ShouldBeTrue(); + } + + [Fact] + public void MaxBytes_tracks_accumulation() + { + var queue = new WaitingRequestQueue(); + var req = new PullRequest("mb", Batch: 100, MaxBytes: 1024, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: false); + queue.Enqueue(req); + + var dequeued = queue.TryDequeue()!; + dequeued.MaxBytes.ShouldBe(1024L); + dequeued.RemainingBytes.ShouldBe(1024L); + + dequeued.ConsumeBytes(256); + dequeued.RemainingBytes.ShouldBe(768L); + dequeued.IsExhausted.ShouldBeFalse(); + + dequeued.ConsumeBytes(800); + dequeued.IsExhausted.ShouldBeTrue(); + } + + [Fact] + public void Batch_decrements_on_delivery() + { + var queue = new WaitingRequestQueue(); + var req = new PullRequest("batch", Batch: 3, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: false); + queue.Enqueue(req); + + var dequeued = queue.TryDequeue()!; + dequeued.RemainingBatch.ShouldBe(3); + + dequeued.ConsumeBatch(); + dequeued.RemainingBatch.ShouldBe(2); + + dequeued.ConsumeBatch(); + dequeued.ConsumeBatch(); + dequeued.IsExhausted.ShouldBeTrue(); + } +} +``` + +**Step 2: Run test to verify it fails** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~WaitingRequestQueueTests" -v normal +``` +Expected: FAIL + +**Step 3: Implement WaitingRequestQueue** + +Create `src/NATS.Server/JetStream/Consumers/WaitingRequestQueue.cs`: + +```csharp +namespace NATS.Server.JetStream.Consumers; + +public sealed record PullRequest( + string ReplyTo, + int Batch, + long MaxBytes, + DateTimeOffset Expires, + bool NoWait, + string? PinId = null) +{ + public int RemainingBatch { get; private set; } = Batch; + public long RemainingBytes { get; private set; } = MaxBytes; + public bool IsExhausted => RemainingBatch <= 0 || (MaxBytes > 0 && RemainingBytes <= 0); + public void ConsumeBatch() => RemainingBatch--; + public void ConsumeBytes(long bytes) => RemainingBytes -= bytes; +} + +public sealed class WaitingRequestQueue +{ + private readonly LinkedList _queue = new(); + + public int Count => _queue.Count; + public bool IsEmpty => _queue.Count == 0; + + public void Enqueue(PullRequest request) => _queue.AddLast(request); + + public PullRequest? TryDequeue() + { + if (_queue.Count == 0) return null; + var first = _queue.First!.Value; + _queue.RemoveFirst(); + return first; + } + + public void RemoveExpired(DateTimeOffset now) + { + var node = _queue.First; + while (node != null) + { + var next = node.Next; + if (node.Value.Expires <= now) + _queue.Remove(node); + node = next; + } + } +} +``` + +**Step 4: Run test to verify it passes** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~WaitingRequestQueueTests" -v normal +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add src/NATS.Server/JetStream/Consumers/WaitingRequestQueue.cs tests/NATS.Server.Tests/WaitingRequestQueueTests.cs +git commit -m "feat(consumer): add WaitingRequestQueue with expiry and batch/maxBytes tracking" +``` + +--- + +### Task 17: Consumer Pause/Resume + +Add pause/resume state management with advisory events. + +**Files:** +- Modify: `src/NATS.Server/JetStream/ConsumerManager.cs` +- Test: `tests/NATS.Server.Tests/ConsumerPauseResumeTests.cs` +- Reference: `golang/nats-server/server/consumer.go` (pause/resume) + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/ConsumerPauseResumeTests.cs`: + +```csharp +using NATS.Server.JetStream; + +namespace NATS.Server.Tests; + +public class ConsumerPauseResumeTests +{ + [Fact] + public async Task Pause_stops_delivery() + { + var mgr = ConsumerManagerTestHelper.Create(); + var handle = mgr.CreateOrUpdate("test-stream", "test-consumer", + new Models.ConsumerConfig { DeliverSubject = "deliver.test" }); + + var until = DateTime.UtcNow.AddSeconds(5); + mgr.Pause("test-consumer", until); + + mgr.IsPaused("test-consumer").ShouldBeTrue(); + mgr.GetPauseUntil("test-consumer").ShouldBe(until); + } + + [Fact] + public async Task Resume_restarts_delivery() + { + var mgr = ConsumerManagerTestHelper.Create(); + mgr.CreateOrUpdate("test-stream", "test-consumer", + new Models.ConsumerConfig { DeliverSubject = "deliver.test" }); + + mgr.Pause("test-consumer", DateTime.UtcNow.AddSeconds(5)); + mgr.Resume("test-consumer"); + + mgr.IsPaused("test-consumer").ShouldBeFalse(); + } + + [Fact] + public async Task Pause_auto_resumes_after_deadline() + { + var mgr = ConsumerManagerTestHelper.Create(); + mgr.CreateOrUpdate("test-stream", "test-consumer", + new Models.ConsumerConfig { DeliverSubject = "deliver.test" }); + + mgr.Pause("test-consumer", DateTime.UtcNow.AddMilliseconds(100)); + + await Task.Delay(200); + + mgr.IsPaused("test-consumer").ShouldBeFalse(); + } +} +``` + +**Step 2: Run test to verify it fails** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ConsumerPauseResumeTests" -v normal +``` +Expected: FAIL + +**Step 3: Implement pause/resume in ConsumerManager** + +In `ConsumerManager.cs`: +- Add `_pauseUntil` field to `ConsumerHandle` record +- Add `_pauseTimers` dictionary for auto-resume timers +- `Pause(consumerName, until)` — set deadline, stop delivery loop, start auto-resume timer +- `Resume(consumerName)` — clear deadline, restart delivery loop, cancel timer +- `IsPaused(consumerName)` — check if deadline is set and in the future +- `GetPauseUntil(consumerName)` — return deadline +- Auto-resume: `Timer` callback that calls `Resume()` when deadline passes + +Create `ConsumerManagerTestHelper` static class for test setup. + +**Step 4: Run test to verify it passes** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ConsumerPauseResumeTests" -v normal +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add tests/NATS.Server.Tests/ConsumerPauseResumeTests.cs src/NATS.Server/JetStream/ConsumerManager.cs +git commit -m "feat(consumer): add pause/resume with auto-resume timer" +``` + +--- + +### Task 18: Priority Group Pinning + +Add Pin ID generation, TTL timers, and Nats-Pin-Id header support. + +**Files:** +- Modify: `src/NATS.Server/JetStream/Consumers/PriorityGroupManager.cs` +- Test: `tests/NATS.Server.Tests/PriorityGroupPinningTests.cs` +- Reference: `golang/nats-server/server/consumer.go` (setPinnedTimer, assignNewPinId) + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/PriorityGroupPinningTests.cs`: + +```csharp +using NATS.Server.JetStream.Consumers; + +namespace NATS.Server.Tests; + +public class PriorityGroupPinningTests +{ + [Fact] + public void AssignPinId_generates_unique_ids() + { + var mgr = new PriorityGroupManager(); + mgr.Register("group-1", "consumer-a", priority: 0); + + var pin1 = mgr.AssignPinId("group-1", "consumer-a"); + var pin2 = mgr.AssignPinId("group-1", "consumer-a"); + + pin1.ShouldNotBeNullOrEmpty(); + pin2.ShouldNotBeNullOrEmpty(); + pin1.ShouldNotBe(pin2); // each assignment is unique + } + + [Fact] + public void ValidatePinId_accepts_current() + { + var mgr = new PriorityGroupManager(); + mgr.Register("group-1", "consumer-a", priority: 0); + + var pin = mgr.AssignPinId("group-1", "consumer-a"); + mgr.ValidatePinId("group-1", pin).ShouldBeTrue(); + } + + [Fact] + public void ValidatePinId_rejects_expired() + { + var mgr = new PriorityGroupManager(); + mgr.Register("group-1", "consumer-a", priority: 0); + + var pin1 = mgr.AssignPinId("group-1", "consumer-a"); + var pin2 = mgr.AssignPinId("group-1", "consumer-a"); // replaces pin1 + + mgr.ValidatePinId("group-1", pin1).ShouldBeFalse(); + mgr.ValidatePinId("group-1", pin2).ShouldBeTrue(); + } + + [Fact] + public void UnassignPinId_clears() + { + var mgr = new PriorityGroupManager(); + mgr.Register("group-1", "consumer-a", priority: 0); + + var pin = mgr.AssignPinId("group-1", "consumer-a"); + mgr.UnassignPinId("group-1"); + + mgr.ValidatePinId("group-1", pin).ShouldBeFalse(); + } +} +``` + +**Step 2: Run test to verify it fails** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~PriorityGroupPinningTests" -v normal +``` +Expected: FAIL + +**Step 3: Implement pin management** + +In `PriorityGroupManager.cs`: +- Add `_pinIds` dictionary: `group → current pin ID` +- `AssignPinId(group, consumer)` — generate NUID string, store, return +- `ValidatePinId(group, pinId)` — compare against current +- `UnassignPinId(group)` — clear pin ID +- Pin TTL: configurable timeout, auto-unassign after expiry +- NUID generation: use `Guid.NewGuid().ToString("N")[..22]` for simplicity + +**Step 4: Run test to verify it passes** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~PriorityGroupPinningTests" -v normal +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add tests/NATS.Server.Tests/PriorityGroupPinningTests.cs src/NATS.Server/JetStream/Consumers/PriorityGroupManager.cs +git commit -m "feat(consumer): add priority group pin ID management" +``` + +--- + +### Task 19: Stream Purge with Filtering + +Implement `PurgeEx` with subject filter, sequence range, and keep-N semantics. + +**Files:** +- Modify: `src/NATS.Server/JetStream/StreamManager.cs` +- Test: `tests/NATS.Server.Tests/StreamPurgeFilterTests.cs` +- Reference: `golang/nats-server/server/stream.go` (purgeEx) + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/StreamPurgeFilterTests.cs`: + +```csharp +using NATS.Server.JetStream; + +namespace NATS.Server.Tests; + +public class StreamPurgeFilterTests +{ + [Fact] + public async Task PurgeEx_subject_filter_only_removes_matching() + { + var mgr = await StreamManagerTestHelper.CreateWithMessagesAsync( + ("a.1", "m1"), ("b.1", "m2"), ("a.2", "m3"), ("b.2", "m4")); + + var purged = mgr.PurgeEx("a.*", seq: 0, keep: 0); + purged.ShouldBe(2UL); + + var state = await mgr.GetStateAsync(default); + state.Messages.ShouldBe(2UL); + } + + [Fact] + public async Task PurgeEx_seq_range_removes_below() + { + var mgr = await StreamManagerTestHelper.CreateWithMessagesAsync( + ("x.1", "m1"), ("x.2", "m2"), ("x.3", "m3"), ("x.4", "m4")); + + var purged = mgr.PurgeEx("", seq: 3, keep: 0); + purged.ShouldBe(2UL); // seq 1, 2 + + var state = await mgr.GetStateAsync(default); + state.Messages.ShouldBe(2UL); + state.FirstSeq.ShouldBe(3UL); + } + + [Fact] + public async Task PurgeEx_keep_retains_newest() + { + var mgr = await StreamManagerTestHelper.CreateWithMessagesAsync( + ("k.1", "m1"), ("k.2", "m2"), ("k.3", "m3"), ("k.4", "m4"), ("k.5", "m5")); + + var purged = mgr.PurgeEx("", seq: 0, keep: 2); + purged.ShouldBe(3UL); // keep last 2 + + var state = await mgr.GetStateAsync(default); + state.Messages.ShouldBe(2UL); + state.FirstSeq.ShouldBe(4UL); + } + + [Fact] + public async Task PurgeEx_subject_and_keep_combined() + { + var mgr = await StreamManagerTestHelper.CreateWithMessagesAsync( + ("j.1", "m1"), ("j.2", "m2"), ("other", "m3"), ("j.3", "m4"), ("j.4", "m5")); + + // Purge j.* but keep 1 + var purged = mgr.PurgeEx("j.*", seq: 0, keep: 1); + purged.ShouldBe(3UL); // j.1, j.2, j.3 purged; j.4 kept + + var state = await mgr.GetStateAsync(default); + state.Messages.ShouldBe(2UL); // "other" + j.4 + } +} +``` + +**Step 2: Run test to verify it fails** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~StreamPurgeFilterTests" -v normal +``` +Expected: FAIL + +**Step 3: Implement PurgeEx in StreamManager** + +In `StreamManager.cs`, enhance `PurgeEx`: +- If `subject` is set: iterate messages, match using `SubjectMatch.IsMatch()`, remove matching +- If `seq > 0`: remove all messages with sequence < `seq` +- If `keep > 0`: retain last N messages (on subject if specified, globally otherwise) +- Combined: apply subject filter first, then keep-N among the filtered set +- Return count of purged messages + +Create `StreamManagerTestHelper` for test setup. + +**Step 4: Run test to verify it passes** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~StreamPurgeFilterTests" -v normal +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add tests/NATS.Server.Tests/StreamPurgeFilterTests.cs src/NATS.Server/JetStream/StreamManager.cs +git commit -m "feat(stream): implement PurgeEx with subject filter, seq range, and keep-N" +``` + +--- + +### Task 20: Interest Retention Policy + +Track per-consumer interest and remove messages when all consumers have acknowledged. + +**Files:** +- Create: `src/NATS.Server/JetStream/InterestRetentionPolicy.cs` +- Test: `tests/NATS.Server.Tests/InterestRetentionTests.cs` +- Reference: `golang/nats-server/server/stream.go` (checkInterestState, noInterest) + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/InterestRetentionTests.cs`: + +```csharp +using NATS.Server.JetStream; + +namespace NATS.Server.Tests; + +public class InterestRetentionTests +{ + [Fact] + public void ShouldRetain_true_when_consumers_have_not_acked() + { + var policy = new InterestRetentionPolicy(); + policy.RegisterInterest("consumer-A", "orders.>"); + policy.RegisterInterest("consumer-B", "orders.>"); + + policy.ShouldRetain(1, "orders.new").ShouldBeTrue(); + } + + [Fact] + public void ShouldRetain_false_when_all_consumers_acked() + { + var policy = new InterestRetentionPolicy(); + policy.RegisterInterest("consumer-A", "orders.>"); + policy.RegisterInterest("consumer-B", "orders.>"); + + policy.AcknowledgeDelivery("consumer-A", 1); + policy.ShouldRetain(1, "orders.new").ShouldBeTrue(); // B hasn't acked + + policy.AcknowledgeDelivery("consumer-B", 1); + policy.ShouldRetain(1, "orders.new").ShouldBeFalse(); // both acked + } + + [Fact] + public void ShouldRetain_ignores_consumers_without_interest() + { + var policy = new InterestRetentionPolicy(); + policy.RegisterInterest("consumer-A", "orders.>"); + policy.RegisterInterest("consumer-B", "billing.>"); // no interest in orders + + policy.AcknowledgeDelivery("consumer-A", 1); + policy.ShouldRetain(1, "orders.new").ShouldBeFalse(); // B has no interest + } + + [Fact] + public void UnregisterInterest_removes_consumer() + { + var policy = new InterestRetentionPolicy(); + policy.RegisterInterest("consumer-A", "x.>"); + policy.RegisterInterest("consumer-B", "x.>"); + + policy.UnregisterInterest("consumer-B"); + + // Only A needs to ack + policy.AcknowledgeDelivery("consumer-A", 1); + policy.ShouldRetain(1, "x.y").ShouldBeFalse(); + } +} +``` + +**Step 2: Run test to verify it fails** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~InterestRetentionTests" -v normal +``` +Expected: FAIL + +**Step 3: Implement InterestRetentionPolicy** + +Create `src/NATS.Server/JetStream/InterestRetentionPolicy.cs`: + +```csharp +namespace NATS.Server.JetStream; + +/// +/// Tracks per-consumer interest and determines when messages can be removed +/// under Interest retention policy. +/// Go reference: stream.go checkInterestState/noInterest. +/// +public sealed class InterestRetentionPolicy +{ + private readonly Dictionary _interests = new(); // consumer → filter subject + private readonly Dictionary> _acks = new(); // seq → consumers that acked + + public void RegisterInterest(string consumer, string filterSubject) { ... } + public void UnregisterInterest(string consumer) { ... } + public void AcknowledgeDelivery(string consumer, ulong seq) { ... } + public bool ShouldRetain(ulong seq, string msgSubject) { ... } +} +``` + +**Step 4: Run test to verify it passes** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~InterestRetentionTests" -v normal +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add src/NATS.Server/JetStream/InterestRetentionPolicy.cs tests/NATS.Server.Tests/InterestRetentionTests.cs +git commit -m "feat(stream): add InterestRetentionPolicy for per-consumer ack tracking" +``` + +--- + +### Task 21: Mirror/Source Retry Enhancement + +Add exponential backoff retry, gap detection, and error state tracking to MirrorCoordinator and SourceCoordinator. + +**Files:** +- Modify: `src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs` +- Modify: `src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs` +- Test: `tests/NATS.Server.Tests/MirrorSourceRetryTests.cs` +- Reference: `golang/nats-server/server/stream.go` (setupMirrorConsumer, retrySourceConsumerAtSeq) + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/MirrorSourceRetryTests.cs`: + +```csharp +using NATS.Server.JetStream.MirrorSource; + +namespace NATS.Server.Tests; + +public class MirrorSourceRetryTests +{ + [Fact] + public void Mirror_retry_uses_exponential_backoff() + { + var mirror = MirrorCoordinatorTestHelper.Create(); + + mirror.RecordFailure(); + var delay1 = mirror.GetRetryDelay(); + delay1.ShouldBeGreaterThanOrEqualTo(TimeSpan.FromMilliseconds(250)); // initial + + mirror.RecordFailure(); + var delay2 = mirror.GetRetryDelay(); + delay2.ShouldBeGreaterThan(delay1); // exponential growth + + // Cap at max + for (int i = 0; i < 20; i++) mirror.RecordFailure(); + var delayMax = mirror.GetRetryDelay(); + delayMax.ShouldBeLessThanOrEqualTo(TimeSpan.FromSeconds(30)); + } + + [Fact] + public void Mirror_success_resets_backoff() + { + var mirror = MirrorCoordinatorTestHelper.Create(); + + for (int i = 0; i < 5; i++) mirror.RecordFailure(); + mirror.RecordSuccess(); + + var delay = mirror.GetRetryDelay(); + delay.ShouldBe(TimeSpan.FromMilliseconds(250)); // reset to initial + } + + [Fact] + public void Mirror_tracks_sequence_gap() + { + var mirror = MirrorCoordinatorTestHelper.Create(); + + mirror.RecordSourceSeq(1); + mirror.RecordSourceSeq(2); + mirror.RecordSourceSeq(5); // gap: 3, 4 missing + + mirror.HasGap.ShouldBeTrue(); + mirror.GapStart.ShouldBe(3UL); + mirror.GapEnd.ShouldBe(4UL); + } + + [Fact] + public void Mirror_tracks_error_state() + { + var mirror = MirrorCoordinatorTestHelper.Create(); + + mirror.SetError("connection refused"); + mirror.HasError.ShouldBeTrue(); + mirror.ErrorMessage.ShouldBe("connection refused"); + + mirror.ClearError(); + mirror.HasError.ShouldBeFalse(); + } + + [Fact] + public void Source_dedup_window_prunes_expired() + { + var source = SourceCoordinatorTestHelper.Create(); + + source.RecordMsgId("msg-1"); + source.RecordMsgId("msg-2"); + + source.IsDuplicate("msg-1").ShouldBeTrue(); + source.IsDuplicate("msg-3").ShouldBeFalse(); + + // Simulate time passing beyond dedup window + source.PruneDedupWindow(DateTimeOffset.UtcNow.AddMinutes(5)); + source.IsDuplicate("msg-1").ShouldBeFalse(); + } +} +``` + +**Step 2: Run test to verify it fails** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MirrorSourceRetryTests" -v normal +``` +Expected: FAIL + +**Step 3: Implement retry and gap detection** + +In `MirrorCoordinator.cs`: +- Add `RecordFailure()` — increment `_consecutiveFailures` +- Add `RecordSuccess()` — reset `_consecutiveFailures` to 0 +- Add `GetRetryDelay()` — `min(InitialRetryDelay * 2^failures, MaxRetryDelay)` with jitter +- Add `RecordSourceSeq(seq)` — track expected vs actual, detect gaps +- Add `HasGap`, `GapStart`, `GapEnd` properties +- Add `SetError(msg)`, `ClearError()`, `HasError`, `ErrorMessage` + +In `SourceCoordinator.cs`: +- Add `PruneDedupWindow(DateTimeOffset cutoff)` — remove entries older than cutoff +- Ensure `IsDuplicate`, `RecordMsgId` work with time-based window + +Create test helpers: `MirrorCoordinatorTestHelper`, `SourceCoordinatorTestHelper`. + +**Step 4: Run test to verify it passes** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MirrorSourceRetryTests" -v normal +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add tests/NATS.Server.Tests/MirrorSourceRetryTests.cs src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs +git commit -m "feat(mirror): add exponential backoff retry, gap detection, and error tracking" +``` + +--- + +**Phase 3 Exit Gate Verification:** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RedeliveryTracker" -v normal +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~AckProcessor" -v normal +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~WaitingRequestQueue" -v normal +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ConsumerPauseResume" -v normal +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~PriorityGroupPinning" -v normal +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~StreamPurgeFilter" -v normal +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~InterestRetention" -v normal +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MirrorSourceRetry" -v normal +``` + +All Phase 3 tests must pass before proceeding to Phase 4. + +--- + +## Phase 4: Client Performance, MQTT, Config, Networking + +**Dependencies:** Phase 1 (storage for MQTT), Phase 3 (consumer for MQTT) +**Exit gate:** Client flush coalescing measurably reduces syscalls under load, MQTT sessions survive server restart, SIGHUP triggers config reload, implicit routes auto-discover. + +--- + +### Task 22: Client Flush Coalescing + +Reduce syscalls by coalescing multiple flush signals into a single write. + +**Files:** +- Modify: `src/NATS.Server/NatsClient.cs` +- Test: `tests/NATS.Server.Tests/FlushCoalescingTests.cs` +- Reference: `golang/nats-server/server/client.go` (fsp, maxFlushPending, pcd) + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/FlushCoalescingTests.cs`: + +```csharp +namespace NATS.Server.Tests; + +public class FlushCoalescingTests +{ + [Fact] + public async Task Flush_coalescing_reduces_flush_count() + { + // Start server with flush coalescing enabled + var port = TestHelpers.GetFreePort(); + await using var server = new NatsServer(new NatsOptions { Port = port }); + await server.StartAsync(); + + using var socket = new System.Net.Sockets.Socket( + System.Net.Sockets.AddressFamily.InterNetwork, + System.Net.Sockets.SocketType.Stream, + System.Net.Sockets.ProtocolType.Tcp); + await socket.ConnectAsync(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, port)); + var stream = new System.Net.Sockets.NetworkStream(socket); + + // Read INFO + send CONNECT + var buf = new byte[4096]; + await stream.ReadAsync(buf); + await stream.WriteAsync("CONNECT {}\r\n"u8.ToArray()); + await stream.WriteAsync("SUB test 1\r\n"u8.ToArray()); + + // Rapidly publish multiple messages — coalescing should batch flushes + for (int i = 0; i < 100; i++) + await stream.WriteAsync($"PUB test 5\r\nhello\r\n"u8.ToArray()); + + await Task.Delay(100); + + // Verify messages received (coalescing should not drop any) + var received = 0; + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); + try + { + while (!cts.IsCancellationRequested) + { + var n = await stream.ReadAsync(buf, cts.Token); + if (n == 0) break; + var text = System.Text.Encoding.ASCII.GetString(buf, 0, n); + received += text.Split("MSG ").Length - 1; + if (received >= 100) break; + } + } + catch (OperationCanceledException) { } + + received.ShouldBe(100); + } + + [Fact] + public void MaxFlushPending_defaults_to_10() + { + // Verify the constant exists + NatsClient.MaxFlushPending.ShouldBe(10); + } +} +``` + +**Step 2: Run test to verify it fails** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FlushCoalescingTests" -v normal +``` +Expected: FAIL — `MaxFlushPending` constant doesn't exist + +**Step 3: Implement flush coalescing** + +In `NatsClient.cs`: +- Add `public const int MaxFlushPending = 10;` +- Add field `int _flushSignalsPending` +- Add field `SemaphoreSlim _flushSignal = new(0)` for write loop coordination +- Modify `QueueOutbound()`: after channel write, `Interlocked.Increment(ref _flushSignalsPending)`, release semaphore +- Modify `RunWriteLoopAsync()`: after draining channel, if `_flushSignalsPending < MaxFlushPending`, wait briefly on semaphore to coalesce more signals before flushing + +**Step 4: Run test to verify it passes** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FlushCoalescingTests" -v normal +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add tests/NATS.Server.Tests/FlushCoalescingTests.cs src/NATS.Server/NatsClient.cs +git commit -m "feat(client): add flush coalescing to reduce write syscalls" +``` + +--- + +### Task 23: Client Stall Gate + +Block producers when client outbound buffer is near capacity. + +**Files:** +- Modify: `src/NATS.Server/NatsClient.cs` +- Test: `tests/NATS.Server.Tests/StallGateTests.cs` +- Reference: `golang/nats-server/server/client.go` (stc channel, stall gate) + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/StallGateTests.cs`: + +```csharp +namespace NATS.Server.Tests; + +public class StallGateTests +{ + [Fact] + public void Stall_gate_activates_at_threshold() + { + // Unit test against the stall gate logic directly + var gate = new NatsClient.StallGate(maxPending: 1000); + + gate.IsStalled.ShouldBeFalse(); + + gate.UpdatePending(750); // 75% = threshold + gate.IsStalled.ShouldBeTrue(); + + gate.UpdatePending(500); // below threshold + gate.IsStalled.ShouldBeFalse(); + } + + [Fact] + public async Task Stall_gate_blocks_producer() + { + var gate = new NatsClient.StallGate(maxPending: 100); + gate.UpdatePending(80); // stalled + + var blocked = true; + var task = Task.Run(async () => + { + await gate.WaitAsync(TimeSpan.FromSeconds(1)); + blocked = false; + }); + + await Task.Delay(50); + blocked.ShouldBeTrue(); // still blocked + + gate.UpdatePending(50); // release + gate.Release(); + + await task; + blocked.ShouldBeFalse(); + } + + [Fact] + public async Task Stall_gate_timeout_closes_client() + { + var gate = new NatsClient.StallGate(maxPending: 100); + gate.UpdatePending(80); + + var timedOut = await gate.WaitAsync(TimeSpan.FromMilliseconds(50)); + timedOut.ShouldBeFalse(); // timed out, not released + } +} +``` + +**Step 2: Run test to verify it fails** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~StallGateTests" -v normal +``` +Expected: FAIL + +**Step 3: Implement StallGate** + +In `NatsClient.cs`, add nested class: + +```csharp +public sealed class StallGate +{ + private readonly long _threshold; + private SemaphoreSlim? _semaphore; + + public StallGate(long maxPending) => _threshold = maxPending * 3 / 4; + public bool IsStalled => _semaphore != null; + + public void UpdatePending(long pending) + { + if (pending >= _threshold && _semaphore == null) + _semaphore = new SemaphoreSlim(0, 1); + else if (pending < _threshold && _semaphore != null) + Release(); + } + + public async Task WaitAsync(TimeSpan timeout) + { + if (_semaphore == null) return true; + return await _semaphore.WaitAsync(timeout); + } + + public void Release() + { + _semaphore?.Release(); + _semaphore = null; + } +} +``` + +Wire into `QueueOutbound()` and `RunWriteLoopAsync()`. + +**Step 4: Run test to verify it passes** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~StallGateTests" -v normal +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add tests/NATS.Server.Tests/StallGateTests.cs src/NATS.Server/NatsClient.cs +git commit -m "feat(client): add stall gate backpressure for slow consumers" +``` + +--- + +### Task 24: Write Timeout Recovery + +Handle partial flush recovery with per-client-kind policies. + +**Files:** +- Modify: `src/NATS.Server/NatsClient.cs` +- Test: `tests/NATS.Server.Tests/WriteTimeoutTests.cs` +- Reference: `golang/nats-server/server/client.go` (write timeout handling) + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/WriteTimeoutTests.cs`: + +```csharp +namespace NATS.Server.Tests; + +public class WriteTimeoutTests +{ + [Fact] + public void WriteTimeoutPolicy_defaults_by_kind() + { + NatsClient.GetWriteTimeoutPolicy(ClientKind.Client).ShouldBe(WriteTimeoutPolicy.Close); + NatsClient.GetWriteTimeoutPolicy(ClientKind.Router).ShouldBe(WriteTimeoutPolicy.TcpFlush); + NatsClient.GetWriteTimeoutPolicy(ClientKind.Gateway).ShouldBe(WriteTimeoutPolicy.TcpFlush); + NatsClient.GetWriteTimeoutPolicy(ClientKind.Leaf).ShouldBe(WriteTimeoutPolicy.TcpFlush); + } + + [Fact] + public void PartialFlushResult_tracks_bytes() + { + var result = new NatsClient.FlushResult(bytesAttempted: 1024, bytesWritten: 512); + result.IsPartial.ShouldBeTrue(); + result.BytesRemaining.ShouldBe(512L); + } + + [Fact] + public void PartialFlushResult_complete_is_not_partial() + { + var result = new NatsClient.FlushResult(bytesAttempted: 1024, bytesWritten: 1024); + result.IsPartial.ShouldBeFalse(); + result.BytesRemaining.ShouldBe(0L); + } +} +``` + +**Step 2: Run test to verify it fails** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~WriteTimeoutTests" -v normal +``` +Expected: FAIL + +**Step 3: Implement write timeout recovery** + +In `NatsClient.cs`: +- Add `enum WriteTimeoutPolicy { Close, TcpFlush }` +- Add `static WriteTimeoutPolicy GetWriteTimeoutPolicy(ClientKind kind)` — CLIENT→Close, others→TcpFlush +- Add `record FlushResult(long BytesAttempted, long BytesWritten)` with `IsPartial` and `BytesRemaining` +- Modify `RunWriteLoopAsync()` write timeout handling: + - On timeout with `bytesWritten > 0` (partial): if policy is `TcpFlush`, mark slow consumer, continue + - On timeout with `bytesWritten == 0`: close connection + - CLIENT kind always closes on timeout + +**Step 4: Run test to verify it passes** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~WriteTimeoutTests" -v normal +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add tests/NATS.Server.Tests/WriteTimeoutTests.cs src/NATS.Server/NatsClient.cs +git commit -m "feat(client): add write timeout recovery with per-kind policies" +``` + +--- + +### Task 25: MQTT JetStream Persistence + +Back MQTT session and retained stores with JetStream streams. + +**Files:** +- Modify: `src/NATS.Server/Mqtt/MqttSessionStore.cs` +- Modify: `src/NATS.Server/Mqtt/MqttRetainedStore.cs` +- Test: `tests/NATS.Server.Tests/MqttPersistenceTests.cs` +- Reference: `golang/nats-server/server/mqtt.go` ($MQTT_msgs, $MQTT_sess, $MQTT_rmsgs) + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/MqttPersistenceTests.cs`: + +```csharp +using NATS.Server.Mqtt; + +namespace NATS.Server.Tests; + +public class MqttPersistenceTests +{ + [Fact] + public async Task Session_persists_across_restart() + { + var store = MqttSessionStoreTestHelper.CreateWithJetStream(); + + await store.ConnectAsync("client-1", cleanSession: false); + await store.AddSubscriptionAsync("client-1", "topic/test", qos: 1); + await store.SaveSessionAsync("client-1"); + + // Simulate restart + var recovered = MqttSessionStoreTestHelper.CreateWithJetStream(store.BackingStore); + await recovered.ConnectAsync("client-1", cleanSession: false); + + var subs = recovered.GetSubscriptions("client-1"); + subs.ShouldContainKey("topic/test"); + } + + [Fact] + public async Task Clean_session_deletes_existing() + { + var store = MqttSessionStoreTestHelper.CreateWithJetStream(); + + await store.ConnectAsync("client-2", cleanSession: false); + await store.AddSubscriptionAsync("client-2", "persist/me", qos: 1); + await store.SaveSessionAsync("client-2"); + + // Reconnect with clean session + await store.ConnectAsync("client-2", cleanSession: true); + + var subs = store.GetSubscriptions("client-2"); + subs.ShouldBeEmpty(); + } + + [Fact] + public async Task Retained_message_survives_restart() + { + var retained = MqttRetainedStoreTestHelper.CreateWithJetStream(); + + await retained.SetRetainedAsync("sensors/temp", "72.5"u8.ToArray()); + + // Simulate restart + var recovered = MqttRetainedStoreTestHelper.CreateWithJetStream(retained.BackingStore); + var msg = await recovered.GetRetainedAsync("sensors/temp"); + + msg.ShouldNotBeNull(); + System.Text.Encoding.UTF8.GetString(msg).ShouldBe("72.5"); + } + + [Fact] + public async Task Retained_message_cleared_with_empty_payload() + { + var retained = MqttRetainedStoreTestHelper.CreateWithJetStream(); + + await retained.SetRetainedAsync("sensors/temp", "72.5"u8.ToArray()); + await retained.SetRetainedAsync("sensors/temp", ReadOnlyMemory.Empty); // clear + + var msg = await retained.GetRetainedAsync("sensors/temp"); + msg.ShouldBeNull(); + } +} +``` + +**Step 2: Run test to verify it fails** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MqttPersistenceTests" -v normal +``` +Expected: FAIL + +**Step 3: Implement JetStream backing for MQTT stores** + +In `MqttSessionStore.cs`: +- Add `IStreamStore? _backingStore` field for JetStream persistence +- `ConnectAsync(clientId, cleanSession)`: + - If `cleanSession=false`: query backing store for existing session state + - If `cleanSession=true`: delete existing session data from backing store +- `SaveSessionAsync(clientId)` — serialize session state, store in backing store under `$MQTT.sess.{clientId}` +- `BackingStore` property for test access + +In `MqttRetainedStore.cs`: +- Add `IStreamStore? _backingStore` field +- `SetRetainedAsync(topic, payload)` — store in backing store under `$MQTT.rmsgs.{topic}` +- `GetRetainedAsync(topic)` — load from backing store +- If payload is empty, remove the retained message + +Create test helpers with in-memory backing stores. + +**Step 4: Run test to verify it passes** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MqttPersistenceTests" -v normal +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add tests/NATS.Server.Tests/MqttPersistenceTests.cs src/NATS.Server/Mqtt/MqttSessionStore.cs src/NATS.Server/Mqtt/MqttRetainedStore.cs +git commit -m "feat(mqtt): add JetStream-backed session and retained message persistence" +``` + +--- + +### Task 26: SIGHUP Config Reload + +Add Unix signal handler for config hot-reload. + +**Files:** +- Create: `src/NATS.Server/Configuration/SignalHandler.cs` +- Modify: `src/NATS.Server/Configuration/ConfigReloader.cs` +- Modify: `src/NATS.Server.Host/Program.cs` +- Test: `tests/NATS.Server.Tests/SignalHandlerTests.cs` +- Reference: `golang/nats-server/server/opts.go`, `golang/nats-server/server/reload.go` + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/SignalHandlerTests.cs`: + +```csharp +using NATS.Server.Configuration; + +namespace NATS.Server.Tests; + +public class SignalHandlerTests +{ + [Fact] + public void SignalHandler_registers_without_throwing() + { + var reloader = new ConfigReloader(); + // Registration should not throw even without a real server + Should.NotThrow(() => SignalHandler.Register(reloader, null!)); + } + + [Fact] + public async Task ConfigReloader_ReloadAsync_applies_reloadable_changes() + { + var reloader = new ConfigReloader(); + var original = new NatsOptions { Port = 4222 }; + var updated = new NatsOptions { Port = 4222 }; // port can't change + + var result = await reloader.ReloadFromOptionsAsync(original, updated); + result.Success.ShouldBeTrue(); + result.RejectedChanges.ShouldBeEmpty(); + } + + [Fact] + public async Task ConfigReloader_rejects_non_reloadable_changes() + { + var reloader = new ConfigReloader(); + var original = new NatsOptions { Port = 4222 }; + var updated = new NatsOptions { Port = 5555 }; // port change is NOT reloadable + + var result = await reloader.ReloadFromOptionsAsync(original, updated); + result.RejectedChanges.ShouldContain(c => c.Contains("Port")); + } +} +``` + +**Step 2: Run test to verify it fails** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~SignalHandlerTests" -v normal +``` +Expected: FAIL — `SignalHandler` doesn't exist + +**Step 3: Implement SignalHandler and ReloadFromOptionsAsync** + +Create `src/NATS.Server/Configuration/SignalHandler.cs`: + +```csharp +using System.Runtime.InteropServices; + +namespace NATS.Server.Configuration; + +/// +/// Registers POSIX signal handlers for config reload. +/// Go reference: server/signal_unix.go, opts.go reload logic. +/// +public static class SignalHandler +{ + public static void Register(ConfigReloader reloader, NatsServer server) + { + PosixSignalRegistration.Create(PosixSignal.SIGHUP, ctx => + { + _ = reloader.ReloadAsync(server); + }); + } +} +``` + +In `ConfigReloader.cs`, add: +- `ReloadFromOptionsAsync(original, updated)` — compare options, apply reloadable, reject non-reloadable +- Reloadable: auth, TLS, logging, limits +- Non-reloadable: port, host, cluster port, store dir +- Return `ReloadResult { bool Success, List RejectedChanges }` + +**Step 4: Run test to verify it passes** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~SignalHandlerTests" -v normal +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add src/NATS.Server/Configuration/SignalHandler.cs tests/NATS.Server.Tests/SignalHandlerTests.cs src/NATS.Server/Configuration/ConfigReloader.cs +git commit -m "feat(config): add SIGHUP signal handler and config reload validation" +``` + +--- + +### Task 27: Implicit Route/Gateway Discovery + +Auto-discover cluster peers from INFO gossip. + +**Files:** +- Modify: `src/NATS.Server/Routes/RouteManager.cs` +- Modify: `src/NATS.Server/Gateways/GatewayManager.cs` +- Test: `tests/NATS.Server.Tests/ImplicitDiscoveryTests.cs` +- Reference: `golang/nats-server/server/route.go` (processImplicitRoute), `gateway.go` (processImplicitGateway) + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/ImplicitDiscoveryTests.cs`: + +```csharp +namespace NATS.Server.Tests; + +public class ImplicitDiscoveryTests +{ + [Fact] + public void ProcessImplicitRoute_discovers_new_peer() + { + var mgr = RouteManagerTestHelper.Create(); + var serverInfo = new ServerInfo + { + ServerId = "server-2", + ConnectUrls = ["nats://10.0.0.2:6222", "nats://10.0.0.3:6222"], + }; + + mgr.ProcessImplicitRoute(serverInfo); + + mgr.DiscoveredRoutes.ShouldContain("nats://10.0.0.2:6222"); + mgr.DiscoveredRoutes.ShouldContain("nats://10.0.0.3:6222"); + } + + [Fact] + public void ProcessImplicitRoute_skips_known_peers() + { + var mgr = RouteManagerTestHelper.Create(knownRoutes: ["nats://10.0.0.2:6222"]); + var serverInfo = new ServerInfo + { + ServerId = "server-2", + ConnectUrls = ["nats://10.0.0.2:6222", "nats://10.0.0.3:6222"], + }; + + mgr.ProcessImplicitRoute(serverInfo); + + mgr.DiscoveredRoutes.Count.ShouldBe(1); // only 10.0.0.3 is new + mgr.DiscoveredRoutes.ShouldContain("nats://10.0.0.3:6222"); + } + + [Fact] + public void ProcessImplicitGateway_discovers_new_gateway() + { + var mgr = GatewayManagerTestHelper.Create(); + var gwInfo = new GatewayInfo + { + Name = "cluster-B", + Urls = ["nats://10.0.1.1:7222"], + }; + + mgr.ProcessImplicitGateway(gwInfo); + + mgr.DiscoveredGateways.ShouldContain("cluster-B"); + } + + [Fact] + public void ForwardNewRouteInfo_updates_known_servers() + { + var mgr = RouteManagerTestHelper.Create(); + var forwarded = new List(); + mgr.OnForwardInfo += (urls) => forwarded.AddRange(urls); + + mgr.ForwardNewRouteInfoToKnownServers("nats://10.0.0.5:6222"); + + forwarded.ShouldContain("nats://10.0.0.5:6222"); + } +} +``` + +**Step 2: Run test to verify it fails** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ImplicitDiscoveryTests" -v normal +``` +Expected: FAIL + +**Step 3: Implement implicit discovery** + +In `RouteManager.cs`: +- Add `HashSet DiscoveredRoutes` — tracks auto-discovered route URLs +- `ProcessImplicitRoute(serverInfo)`: + - Extract `connect_urls` from INFO + - For each unknown URL: add to `DiscoveredRoutes`, initiate solicited connection +- `ForwardNewRouteInfoToKnownServers(newPeerUrl)`: + - Send updated INFO containing new peer to all existing route connections +- Add `event Action>? OnForwardInfo` for testing + +In `GatewayManager.cs`: +- Add `HashSet DiscoveredGateways` — tracks auto-discovered gateway names +- `ProcessImplicitGateway(gwInfo)`: + - Add gateway name + URLs to discovered set + - Create outbound gateway connection + +Create test helpers: `RouteManagerTestHelper`, `GatewayManagerTestHelper`. + +**Step 4: Run test to verify it passes** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ImplicitDiscoveryTests" -v normal +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add tests/NATS.Server.Tests/ImplicitDiscoveryTests.cs src/NATS.Server/Routes/RouteManager.cs src/NATS.Server/Gateways/GatewayManager.cs +git commit -m "feat(cluster): add implicit route and gateway discovery from INFO gossip" +``` + +--- + +**Phase 4 Exit Gate Verification:** + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FlushCoalescing" -v normal +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~StallGate" -v normal +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~WriteTimeout" -v normal +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MqttPersistence" -v normal +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~SignalHandler" -v normal +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ImplicitDiscovery" -v normal +``` + +All Phase 4 tests must pass. + +--- + +## Final Verification + +After all 4 phases are complete, run the full test suite: + +```bash +dotnet test -v normal +``` + +All existing tests must continue to pass. No regressions. + +Update parity DB with all newly mapped tests: + +```bash +sqlite3 docs/test_parity.db "SELECT status, COUNT(*) FROM go_tests GROUP BY status;" +``` + +--- + +## Task Summary + +| Task | Phase | Component | Gap | +|------|-------|-----------|-----| +| 1 | 1 | MsgBlock encryption | Gap 1.2 | +| 2 | 1 | MsgBlock compression | Gap 1.3 | +| 3 | 1 | Block rotation & lifecycle | Gap 1.1 | +| 4 | 1 | Crash recovery enhancement | Gap 1.4 | +| 5 | 1 | IStreamStore core methods | Gap 1.9 | +| 6 | 1 | IStreamStore query methods | Gap 1.9 | +| 7 | 1 | RAFT binary WAL | Gap 8.1 | +| 8 | 1 | RAFT joint consensus | Gap 8.2 | +| 9 | 2 | Meta snapshot codec | Gap 2.6 | +| 10 | 2 | Cluster monitoring loop | Gap 2.1 | +| 11 | 2 | Assignment processing | Gap 2.2 | +| 12 | 2 | Inflight tracking | Gap 2.3 | +| 13 | 2 | Leadership transitions | Gap 2.5 | +| 14 | 3 | RedeliveryTracker rewrite | Gap 3.4 | +| 15 | 3 | Ack/NAK processing | Gap 3.3 | +| 16 | 3 | Pull request pipeline | Gap 3.2 | +| 17 | 3 | Consumer pause/resume | Gap 3.7 | +| 18 | 3 | Priority group pinning | Gap 3.6 | +| 19 | 3 | Stream purge filtering | Gap 4.5 | +| 20 | 3 | Interest retention | Gap 4.6 | +| 21 | 3 | Mirror/source retry | Gap 4.1-4.3 | +| 22 | 4 | Flush coalescing | Gap 5.1 | +| 23 | 4 | Stall gate | Gap 5.2 | +| 24 | 4 | Write timeout recovery | Gap 5.3 | +| 25 | 4 | MQTT persistence | Gap 6.1 | +| 26 | 4 | SIGHUP config reload | Gap 14.1 | +| 27 | 4 | Implicit discovery | Gap 11.1, 13.1 | diff --git a/docs/plans/2026-02-25-production-gaps-plan.md.tasks.json b/docs/plans/2026-02-25-production-gaps-plan.md.tasks.json new file mode 100644 index 0000000..e11a6da --- /dev/null +++ b/docs/plans/2026-02-25-production-gaps-plan.md.tasks.json @@ -0,0 +1,33 @@ +{ + "planPath": "docs/plans/2026-02-25-production-gaps-plan.md", + "tasks": [ + {"id": 7, "subject": "Task 1: MsgBlock Encryption Integration", "status": "pending"}, + {"id": 8, "subject": "Task 2: MsgBlock Compression Integration", "status": "pending", "blockedBy": [7]}, + {"id": 9, "subject": "Task 3: Block Rotation & Lifecycle", "status": "pending", "blockedBy": [7, 8]}, + {"id": 10, "subject": "Task 4: Crash Recovery Enhancement", "status": "pending", "blockedBy": [7, 8, 9]}, + {"id": 11, "subject": "Task 5: IStreamStore Methods — Batch 1", "status": "pending"}, + {"id": 12, "subject": "Task 6: IStreamStore Methods — Batch 2", "status": "pending", "blockedBy": [11]}, + {"id": 13, "subject": "Task 7: RAFT Binary WAL", "status": "pending"}, + {"id": 14, "subject": "Task 8: RAFT Joint Consensus", "status": "pending", "blockedBy": [13]}, + {"id": 15, "subject": "Task 9: Meta Snapshot Codec", "status": "pending", "blockedBy": [7, 8, 9, 10, 11, 12, 13, 14]}, + {"id": 16, "subject": "Task 10: Cluster Monitoring Loop", "status": "pending", "blockedBy": [15]}, + {"id": 17, "subject": "Task 11: Stream/Consumer Assignment Processing", "status": "pending", "blockedBy": [7, 8, 9, 10, 11, 12, 13, 14]}, + {"id": 18, "subject": "Task 12: Inflight Tracking Enhancement", "status": "pending", "blockedBy": [17]}, + {"id": 19, "subject": "Task 13: Leadership Transitions", "status": "pending", "blockedBy": [16, 18]}, + {"id": 20, "subject": "Task 14: RedeliveryTracker with PriorityQueue", "status": "pending", "blockedBy": [15, 16, 17, 18, 19]}, + {"id": 21, "subject": "Task 15: Ack/NAK Processing Enhancement", "status": "pending", "blockedBy": [20]}, + {"id": 22, "subject": "Task 16: Pull Request Pipeline", "status": "pending", "blockedBy": [15, 16, 17, 18, 19]}, + {"id": 23, "subject": "Task 17: Consumer Pause/Resume", "status": "pending", "blockedBy": [15, 16, 17, 18, 19]}, + {"id": 24, "subject": "Task 18: Priority Group Pinning", "status": "pending", "blockedBy": [15, 16, 17, 18, 19]}, + {"id": 25, "subject": "Task 19: Stream Purge with Filtering", "status": "pending", "blockedBy": [15, 16, 17, 18, 19]}, + {"id": 26, "subject": "Task 20: Interest Retention Policy", "status": "pending", "blockedBy": [15, 16, 17, 18, 19]}, + {"id": 27, "subject": "Task 21: Mirror/Source Retry Enhancement", "status": "pending", "blockedBy": [15, 16, 17, 18, 19]}, + {"id": 28, "subject": "Task 22: Client Flush Coalescing", "status": "pending"}, + {"id": 29, "subject": "Task 23: Client Stall Gate", "status": "pending", "blockedBy": [28]}, + {"id": 30, "subject": "Task 24: Write Timeout Recovery", "status": "pending", "blockedBy": [29]}, + {"id": 31, "subject": "Task 25: MQTT JetStream Persistence", "status": "pending", "blockedBy": [7, 8, 11, 20, 21]}, + {"id": 32, "subject": "Task 26: SIGHUP Config Reload", "status": "pending"}, + {"id": 33, "subject": "Task 27: Implicit Route/Gateway Discovery", "status": "pending"} + ], + "lastUpdated": "2026-02-25T00:00:00Z" +}