# Production Gap Closure Implementation Plan > **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans to implement this plan task-by-task. **Goal:** Port 15 CRITICAL/HIGH gaps from Go NATS server to .NET, covering FileStore, RAFT, JetStream Cluster, Consumer/Stream engines, Client protocol, MQTT persistence, config reload, and networking discovery. **Architecture:** Bottom-up dependency approach — Phase 1 builds storage and consensus foundations, Phase 2 adds cluster coordination on top, Phase 3 adds consumer/stream engines, Phase 4 adds client performance, MQTT, config, and networking. Each phase has an exit gate with measurable criteria. **Tech Stack:** .NET 10 / C# 14, xUnit 3, Shouldly, NSubstitute, System.IO.Pipelines, IronSnappy (S2), ChaCha20-Poly1305/AES-GCM, SQLite (test parity DB) **Codex review:** Verified 2026-02-25 — dependency gaps fixed, path errors corrected, feedback incorporated. **Test strategy:** Only run targeted unit tests during implementation (`dotnet test --filter`). Run full suite only after all 4 phases complete. Update `docs/test_parity.db` when porting Go tests. **Codex feedback incorporated:** 1. Added missing dependencies: Task 3←{1,2}, Task 4←{1,2,3}, Task 8←7, Task 13←{10,12} 2. Fixed paths: RouteManager.cs is in `Routes/`, GatewayManager.cs is in `Gateways/` (not `Clustering/`) 3. Phase 4A-C (client perf), 4E (SIGHUP), 4F (discovery) are independent lanes — can start parallel with Phase 3 4. Phase 4D (MQTT) correctly gated on storage + consumer dependencies 5. WAL format should include per-record checksum and record type — implement during Task 7 6. Encryption: add explicit nonce/AAD/key-version rules during Task 1 implementation **Parity DB update pattern:** ```bash sqlite3 docs/test_parity.db "UPDATE go_tests SET status='mapped', dotnet_test='DotNetTestName', dotnet_file='TestFile.cs' WHERE go_test='GoTestName';" ``` --- ## Phase 1: FileStore + RAFT Foundation **Dependencies:** None (pure infrastructure) **Exit gate:** All IStreamStore methods implemented, FileStore round-trips encrypted+compressed blocks, RAFT persists and recovers state across restarts, joint consensus passes membership change tests. --- ### Task 1: MsgBlock Encryption Integration Wire `AeadEncryptor` into `MsgBlock` so blocks are encrypted at rest. **Files:** - Modify: `src/NATS.Server/JetStream/Storage/MsgBlock.cs` - Modify: `src/NATS.Server/JetStream/Storage/FileStore.cs` - Modify: `src/NATS.Server/JetStream/Storage/FileStoreOptions.cs` - Test: `tests/NATS.Server.Tests/FileStoreEncryptionTests.cs` - Reference: `golang/nats-server/server/filestore.go:816-907` (genEncryptionKeys, recoverAEK, setupAEK) **Step 1: Write the failing test** Create `tests/NATS.Server.Tests/FileStoreEncryptionTests.cs`: ```csharp using NATS.Server.JetStream.Storage; namespace NATS.Server.Tests; public class FileStoreEncryptionTests { [Fact] public async Task Encrypted_block_round_trips_message() { // Go: TestFileStoreEncryption server/filestore_test.go var dir = Directory.CreateTempSubdirectory(); var key = new byte[32]; RandomNumberGenerator.Fill(key); await using (var store = new FileStore(new FileStoreOptions { Directory = dir.FullName, Cipher = StoreCipher.ChaCha, EncryptionKey = key, })) { await store.AppendAsync("test.subj", "hello encrypted"u8.ToArray(), default); } // Raw block file should NOT contain plaintext var blkFiles = Directory.GetFiles(dir.FullName, "*.blk"); blkFiles.ShouldNotBeEmpty(); var raw = File.ReadAllBytes(blkFiles[0]); System.Text.Encoding.UTF8.GetString(raw).ShouldNotContain("hello encrypted"); // Recover with same key should return plaintext await using var recovered = new FileStore(new FileStoreOptions { Directory = dir.FullName, Cipher = StoreCipher.ChaCha, EncryptionKey = key, }); var msg = await recovered.LoadAsync(1, default); msg.ShouldNotBeNull(); System.Text.Encoding.UTF8.GetString(msg.Payload).ShouldBe("hello encrypted"); } [Fact] public async Task Encrypted_block_with_aes_round_trips() { var dir = Directory.CreateTempSubdirectory(); var key = new byte[32]; RandomNumberGenerator.Fill(key); await using (var store = new FileStore(new FileStoreOptions { Directory = dir.FullName, Cipher = StoreCipher.AES, EncryptionKey = key, })) { await store.AppendAsync("aes.subj", "aes payload"u8.ToArray(), default); } await using var recovered = new FileStore(new FileStoreOptions { Directory = dir.FullName, Cipher = StoreCipher.AES, EncryptionKey = key, }); var msg = await recovered.LoadAsync(1, default); msg.ShouldNotBeNull(); System.Text.Encoding.UTF8.GetString(msg.Payload).ShouldBe("aes payload"); } [Fact] public async Task Wrong_key_fails_to_decrypt() { var dir = Directory.CreateTempSubdirectory(); var key1 = new byte[32]; var key2 = new byte[32]; RandomNumberGenerator.Fill(key1); RandomNumberGenerator.Fill(key2); await using (var store = new FileStore(new FileStoreOptions { Directory = dir.FullName, Cipher = StoreCipher.ChaCha, EncryptionKey = key1, })) { await store.AppendAsync("secret", "data"u8.ToArray(), default); } // Recovery with wrong key should throw or return no messages var act = () => new FileStore(new FileStoreOptions { Directory = dir.FullName, Cipher = StoreCipher.ChaCha, EncryptionKey = key2, }); Should.Throw(act); } } ``` **Step 2: Run test to verify it fails** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreEncryptionTests" -v normal ``` Expected: FAIL — encryption not wired into block I/O **Step 3: Implement encryption in MsgBlock** In `MsgBlock.cs`, add: - New fields: `byte[]? _aek` (per-block AEAD key), `StoreCipher _cipher` - New constructor parameter for encryption key and cipher - Modify `Write`/`WriteAt` to encrypt payload via `AeadEncryptor.Encrypt()` before disk write - Modify `Read`/`ReadRecord` to decrypt payload via `AeadEncryptor.Decrypt()` after disk read - Add `static MsgBlock CreateEncrypted(string path, int blockId, long maxBytes, ulong firstSeq, byte[] aek, StoreCipher cipher)` - Add `static MsgBlock RecoverEncrypted(string path, int blockId, long maxBytes, byte[] aek, StoreCipher cipher)` In `FileStore.cs`, modify: - `EnsureActiveBlock()` — pass encryption key to `MsgBlock.CreateEncrypted()` when `_useAead` - `RecoverBlocks()` — pass encryption key to `MsgBlock.RecoverEncrypted()` when `_useAead` - Add `GenPerBlockKey(int blockId)` — derives per-block key from stream key using HKDF - Add `SaveBlockKey(int blockId, byte[] aek)` — persists per-block key to `.key` metadata file - Add `LoadBlockKey(int blockId)` — loads per-block key from `.key` metadata file **Step 4: Run test to verify it passes** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreEncryptionTests" -v normal ``` Expected: PASS **Step 5: Update parity DB** ```bash sqlite3 docs/test_parity.db "UPDATE go_tests SET status='mapped', dotnet_test='Encrypted_block_round_trips_message', dotnet_file='FileStoreEncryptionTests.cs' WHERE go_test='TestFileStoreEncryption';" ``` **Step 6: Commit** ```bash git add tests/NATS.Server.Tests/FileStoreEncryptionTests.cs src/NATS.Server/JetStream/Storage/MsgBlock.cs src/NATS.Server/JetStream/Storage/FileStore.cs git commit -m "feat(filestore): wire AeadEncryptor into MsgBlock for at-rest encryption" ``` --- ### Task 2: MsgBlock Compression Integration Wire `S2Codec` into `MsgBlock` for per-message compression. **Files:** - Modify: `src/NATS.Server/JetStream/Storage/MsgBlock.cs` - Modify: `src/NATS.Server/JetStream/Storage/FileStore.cs` - Test: `tests/NATS.Server.Tests/FileStoreCompressionTests.cs` - Reference: `golang/nats-server/server/filestore.go:4443` (setupWriteCache with compression) **Step 1: Write the failing test** Create `tests/NATS.Server.Tests/FileStoreCompressionTests.cs`: ```csharp using NATS.Server.JetStream.Storage; namespace NATS.Server.Tests; public class FileStoreCompressionTests { [Fact] public async Task Compressed_block_round_trips_message() { // Go: TestFileStoreCompression server/filestore_test.go var dir = Directory.CreateTempSubdirectory(); await using (var store = new FileStore(new FileStoreOptions { Directory = dir.FullName, Compression = StoreCompression.S2Compression, })) { var payload = new byte[1024]; Array.Fill(payload, (byte)'A'); // highly compressible await store.AppendAsync("comp.subj", payload, default); } // Block file should be smaller than uncompressed payload var blkFiles = Directory.GetFiles(dir.FullName, "*.blk"); blkFiles.ShouldNotBeEmpty(); new FileInfo(blkFiles[0]).Length.ShouldBeLessThan(1024); // Recover should return original payload await using var recovered = new FileStore(new FileStoreOptions { Directory = dir.FullName, Compression = StoreCompression.S2Compression, }); var msg = await recovered.LoadAsync(1, default); msg.ShouldNotBeNull(); msg.Payload.Length.ShouldBe(1024); msg.Payload.All(b => b == (byte)'A').ShouldBeTrue(); } [Fact] public async Task Compressed_and_encrypted_round_trips() { var dir = Directory.CreateTempSubdirectory(); var key = new byte[32]; RandomNumberGenerator.Fill(key); await using (var store = new FileStore(new FileStoreOptions { Directory = dir.FullName, Compression = StoreCompression.S2Compression, Cipher = StoreCipher.ChaCha, EncryptionKey = key, })) { await store.AppendAsync("both.subj", "compress+encrypt"u8.ToArray(), default); } await using var recovered = new FileStore(new FileStoreOptions { Directory = dir.FullName, Compression = StoreCompression.S2Compression, Cipher = StoreCipher.ChaCha, EncryptionKey = key, }); var msg = await recovered.LoadAsync(1, default); msg.ShouldNotBeNull(); System.Text.Encoding.UTF8.GetString(msg.Payload).ShouldBe("compress+encrypt"); } } ``` **Step 2: Run test to verify it fails** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreCompressionTests" -v normal ``` Expected: FAIL — compression not wired into block path **Step 3: Implement compression in MsgBlock** In `MsgBlock.cs`: - Add field: `bool _compressed` - Modify `Write`/`WriteAt`: if `_compressed`, call `S2Codec.Compress(payload)` before writing record - Modify `Read`/`ReadRecord`: if `_compressed`, call `S2Codec.Decompress(data)` after reading record - Order: compress first, then encrypt (on write); decrypt first, then decompress (on read) In `FileStore.cs`: - Pass `_useS2` flag to MsgBlock factory methods - Persist compression flag in block metadata so recovery knows to decompress **Step 4: Run test to verify it passes** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreCompressionTests" -v normal ``` Expected: PASS **Step 5: Commit** ```bash git add tests/NATS.Server.Tests/FileStoreCompressionTests.cs src/NATS.Server/JetStream/Storage/MsgBlock.cs src/NATS.Server/JetStream/Storage/FileStore.cs git commit -m "feat(filestore): wire S2Codec into MsgBlock for per-message compression" ``` --- ### Task 3: Block Rotation & Lifecycle Add automatic block rotation when active block exceeds size threshold, block sealing, and background flusher. **Files:** - Modify: `src/NATS.Server/JetStream/Storage/MsgBlock.cs` - Modify: `src/NATS.Server/JetStream/Storage/FileStore.cs` - Test: `tests/NATS.Server.Tests/FileStoreBlockRotationTests.cs` - Reference: `golang/nats-server/server/filestore.go:4485` (newMsgBlockForWrite), `5783-5842` (flushLoop) **Step 1: Write the failing test** Create `tests/NATS.Server.Tests/FileStoreBlockRotationTests.cs`: ```csharp using NATS.Server.JetStream.Storage; namespace NATS.Server.Tests; public class FileStoreBlockRotationTests { [Fact] public async Task Block_rotates_when_size_exceeded() { // Go: TestFileStoreBlockRotation server/filestore_test.go var dir = Directory.CreateTempSubdirectory(); var opts = new FileStoreOptions { Directory = dir.FullName, BlockSizeBytes = 512, // small block for testing }; await using var store = new FileStore(opts); var payload = new byte[200]; // Write enough to trigger rotation (3 x 200 > 512) await store.AppendAsync("rot.1", payload, default); await store.AppendAsync("rot.2", payload, default); await store.AppendAsync("rot.3", payload, default); store.BlockCount.ShouldBeGreaterThan(1); } [Fact] public async Task Sealed_block_is_read_only() { var dir = Directory.CreateTempSubdirectory(); var opts = new FileStoreOptions { Directory = dir.FullName, BlockSizeBytes = 256, }; await using var store = new FileStore(opts); var payload = new byte[200]; await store.AppendAsync("s.1", payload, default); await store.AppendAsync("s.2", payload, default); // triggers rotation // First block should be sealed var blkFiles = Directory.GetFiles(dir.FullName, "*.blk").OrderBy(f => f).ToArray(); blkFiles.Length.ShouldBeGreaterThan(1); } [Fact] public async Task Messages_across_blocks_recover_correctly() { var dir = Directory.CreateTempSubdirectory(); var opts = new FileStoreOptions { Directory = dir.FullName, BlockSizeBytes = 256, }; await using (var store = new FileStore(opts)) { for (int i = 1; i <= 10; i++) await store.AppendAsync($"msg.{i}", $"payload-{i}"u8.ToArray(), default); } await using var recovered = new FileStore(new FileStoreOptions { Directory = dir.FullName, BlockSizeBytes = 256, }); var state = await recovered.GetStateAsync(default); state.Messages.ShouldBe(10UL); for (ulong seq = 1; seq <= 10; seq++) { var msg = await recovered.LoadAsync(seq, default); msg.ShouldNotBeNull(); msg.Subject.ShouldBe($"msg.{seq}"); } } } ``` **Step 2: Run test to verify it fails** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreBlockRotationTests" -v normal ``` Expected: FAIL — no automatic rotation **Step 3: Implement block rotation** In `MsgBlock.cs`: - Add `bool IsSealed` property (set by `Seal()`) - Add `void Seal()` — marks block read-only, flushes pending writes - Add `long BytesWritten` property — current write offset In `FileStore.cs`: - Modify `EnsureActiveBlock()` — check `_activeBlock.BytesWritten >= _options.BlockSizeBytes`, call `RotateBlock()` if exceeded - Add `RotateBlock()` — seals active block, creates new block with `_nextBlockId++`, adds to `_blocks` - Modify `RecoverBlocks()` — handle multiple `.blk` files, recover each in order **Step 4: Run test to verify it passes** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreBlockRotationTests" -v normal ``` Expected: PASS **Step 5: Commit** ```bash git add tests/NATS.Server.Tests/FileStoreBlockRotationTests.cs src/NATS.Server/JetStream/Storage/MsgBlock.cs src/NATS.Server/JetStream/Storage/FileStore.cs git commit -m "feat(filestore): add block rotation on size threshold and sealing" ``` --- ### Task 4: Crash Recovery Enhancement Enhance recovery to rebuild TTL wheel, validate checksums, and handle corrupt/truncated blocks. **Files:** - Modify: `src/NATS.Server/JetStream/Storage/FileStore.cs` - Modify: `src/NATS.Server/JetStream/Storage/MsgBlock.cs` - Modify: `src/NATS.Server/JetStream/Storage/MessageRecord.cs` - Test: `tests/NATS.Server.Tests/FileStoreCrashRecoveryTests.cs` - Reference: `golang/nats-server/server/filestore.go:1754-2401` (recoverFullState, recoverTTLState) **Step 1: Write the failing test** Create `tests/NATS.Server.Tests/FileStoreCrashRecoveryTests.cs`: ```csharp using NATS.Server.JetStream.Storage; namespace NATS.Server.Tests; public class FileStoreCrashRecoveryTests { [Fact] public async Task Recovery_rebuilds_ttl_wheel_and_expires_old() { var dir = Directory.CreateTempSubdirectory(); await using (var store = new FileStore(new FileStoreOptions { Directory = dir.FullName, MaxAgeMs = 100, // 100ms TTL })) { await store.AppendAsync("ttl.1", "old"u8.ToArray(), default); } // Wait for messages to expire await Task.Delay(200); // Recovery should expire old messages await using var recovered = new FileStore(new FileStoreOptions { Directory = dir.FullName, MaxAgeMs = 100, }); var state = await recovered.GetStateAsync(default); state.Messages.ShouldBe(0UL); } [Fact] public async Task Recovery_handles_truncated_block() { var dir = Directory.CreateTempSubdirectory(); await using (var store = new FileStore(new FileStoreOptions { Directory = dir.FullName })) { await store.AppendAsync("t.1", "data1"u8.ToArray(), default); await store.AppendAsync("t.2", "data2"u8.ToArray(), default); } // Truncate the block file to simulate crash mid-write var blkFile = Directory.GetFiles(dir.FullName, "*.blk")[0]; var info = new FileInfo(blkFile); using (var fs = File.OpenWrite(blkFile)) fs.SetLength(info.Length - 5); // chop last 5 bytes // Recovery should salvage valid messages await using var recovered = new FileStore(new FileStoreOptions { Directory = dir.FullName }); var state = await recovered.GetStateAsync(default); state.Messages.ShouldBeGreaterThanOrEqualTo(1UL); } [Fact] public async Task Atomic_state_write_survives_crash() { var dir = Directory.CreateTempSubdirectory(); await using var store = new FileStore(new FileStoreOptions { Directory = dir.FullName }); await store.AppendAsync("a.1", "payload"u8.ToArray(), default); // Force state write store.FlushAllPending(); // State file should exist var stateFile = Path.Combine(dir.FullName, "stream.state"); File.Exists(stateFile).ShouldBeTrue(); } } ``` **Step 2: Run test to verify it fails** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreCrashRecoveryTests" -v normal ``` Expected: FAIL **Step 3: Implement crash recovery enhancements** In `FileStore.cs`: - Add `RecoverTtlState()` — scan all blocks, re-register unexpired messages in `_ttlWheel` - Add `WriteStateAtomically(path, data)` — write to temp file, then `File.Move(temp, target, overwrite: true)` - Enhance `RecoverBlocks()` — handle truncated records gracefully (catch decode errors, skip corrupt tail) - Implement `FlushAllPending()` — flush active block, write stream state atomically In `MsgBlock.cs`: - Add `RecoverWithValidation()` — validate each record during recovery, skip corrupt entries - Add `TryReadRecord(offset)` — returns null on corrupt data instead of throwing **Step 4: Run test to verify it passes** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreCrashRecoveryTests" -v normal ``` Expected: PASS **Step 5: Commit** ```bash git add tests/NATS.Server.Tests/FileStoreCrashRecoveryTests.cs src/NATS.Server/JetStream/Storage/FileStore.cs src/NATS.Server/JetStream/Storage/MsgBlock.cs git commit -m "feat(filestore): enhance crash recovery with TTL rebuild and truncation handling" ``` --- ### Task 5: IStreamStore Methods — Batch 1 (Core Operations) Implement the core sync IStreamStore methods: `StoreMsg`, `StoreRawMsg`, `LoadMsg`, `LoadNextMsg`, `LoadLastMsg`, `LoadPrevMsg`, `RemoveMsg`, `EraseMsg`, `State`, `FastState`, `Type`, `Stop`, `Delete`. **Files:** - Modify: `src/NATS.Server/JetStream/Storage/FileStore.cs` - Test: `tests/NATS.Server.Tests/FileStoreStreamStoreTests.cs` - Reference: `golang/nats-server/server/filestore.go` (various) **Step 1: Write the failing test** Create `tests/NATS.Server.Tests/FileStoreStreamStoreTests.cs`: ```csharp using NATS.Server.JetStream.Storage; namespace NATS.Server.Tests; public class FileStoreStreamStoreTests { private FileStore CreateStore() { var dir = Directory.CreateTempSubdirectory(); return new FileStore(new FileStoreOptions { Directory = dir.FullName }); } [Fact] public void StoreMsg_appends_and_returns_seq_ts() { using var store = CreateStore(); var (seq, ts) = store.StoreMsg("foo.bar", null, "hello"u8.ToArray(), 0); seq.ShouldBe(1UL); ts.ShouldBeGreaterThan(0L); } [Fact] public void LoadMsg_returns_stored_message() { using var store = CreateStore(); store.StoreMsg("load.test", null, "payload"u8.ToArray(), 0); var sm = store.LoadMsg(1, null); sm.Seq.ShouldBe(1UL); sm.Subj.ShouldBe("load.test"); System.Text.Encoding.UTF8.GetString(sm.Msg).ShouldBe("payload"); } [Fact] public void LoadNextMsg_finds_next_matching() { using var store = CreateStore(); store.StoreMsg("a.1", null, "m1"u8.ToArray(), 0); store.StoreMsg("b.1", null, "m2"u8.ToArray(), 0); store.StoreMsg("a.2", null, "m3"u8.ToArray(), 0); var (msg, skip) = store.LoadNextMsg("a.*", true, 1, null); msg.Seq.ShouldBe(1UL); msg.Subj.ShouldBe("a.1"); } [Fact] public void LoadLastMsg_returns_most_recent_on_subject() { using var store = CreateStore(); store.StoreMsg("last.subj", null, "first"u8.ToArray(), 0); store.StoreMsg("last.subj", null, "second"u8.ToArray(), 0); store.StoreMsg("other", null, "other"u8.ToArray(), 0); var sm = store.LoadLastMsg("last.subj", null); sm.Seq.ShouldBe(2UL); System.Text.Encoding.UTF8.GetString(sm.Msg).ShouldBe("second"); } [Fact] public void LoadPrevMsg_returns_message_before_seq() { using var store = CreateStore(); store.StoreMsg("p.1", null, "m1"u8.ToArray(), 0); store.StoreMsg("p.2", null, "m2"u8.ToArray(), 0); store.StoreMsg("p.3", null, "m3"u8.ToArray(), 0); var sm = store.LoadPrevMsg(3, null); sm.Seq.ShouldBe(2UL); } [Fact] public void RemoveMsg_soft_deletes() { using var store = CreateStore(); store.StoreMsg("rm.1", null, "data"u8.ToArray(), 0); store.RemoveMsg(1).ShouldBeTrue(); var state = store.State(); state.Msgs.ShouldBe(0UL); } [Fact] public void State_returns_full_stream_state() { using var store = CreateStore(); store.StoreMsg("s.1", null, "a"u8.ToArray(), 0); store.StoreMsg("s.2", null, "b"u8.ToArray(), 0); var state = store.State(); state.Msgs.ShouldBe(2UL); state.FirstSeq.ShouldBe(1UL); state.LastSeq.ShouldBe(2UL); } [Fact] public void Type_returns_file() { using var store = CreateStore(); store.Type().ShouldBe(StorageType.File); } [Fact] public void Stop_flushes_and_closes() { using var store = CreateStore(); store.StoreMsg("stop.1", null, "data"u8.ToArray(), 0); store.Stop(); // After stop, store should not accept writes Should.Throw(() => store.StoreMsg("stop.2", null, "more"u8.ToArray(), 0)); } } ``` **Step 2: Run test to verify it fails** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreStreamStoreTests" -v normal ``` Expected: FAIL — all methods throw `NotSupportedException` **Step 3: Implement IStreamStore core methods in FileStore.cs** Implement each method: - `StoreMsg` — synchronous wrapper around `AppendAsync` logic, return `(seq, timestamp)` - `StoreRawMsg` — append with caller-specified seq/ts, bypass auto-increment - `LoadMsg` — lookup by sequence in `_messages` dict - `LoadNextMsg` — scan from `start` seq forward, match `filter` with `SubjectMatch.IsMatch()` - `LoadLastMsg` — reverse scan `_messages` for subject match - `LoadPrevMsg` — scan backward from `start-1` - `RemoveMsg` — mark deleted in `_messages` and `MsgBlock._deleted` - `EraseMsg` — remove + overwrite bytes on disk with random data - `State()` — return `StreamState` with first/last/msgs/bytes/deleted - `FastState()` — populate ref param with minimal fields - `Type()` — return `StorageType.File` - `Stop()` — flush active block, close all FDs, mark store as stopped - `Delete(inline)` — Stop + delete all `.blk` files and directory Add `StoreMsg` record type if not already defined: ```csharp public record struct StoreMsg(ulong Seq, string Subj, byte[]? Hdr, byte[] Msg, long Ts); ``` **Step 4: Run test to verify it passes** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreStreamStoreTests" -v normal ``` Expected: PASS **Step 5: Commit** ```bash git add tests/NATS.Server.Tests/FileStoreStreamStoreTests.cs src/NATS.Server/JetStream/Storage/FileStore.cs git commit -m "feat(filestore): implement core IStreamStore sync methods" ``` --- ### Task 6: IStreamStore Methods — Batch 2 (Query & State) Implement remaining methods: `Purge`, `PurgeEx`, `Compact`, `Truncate`, `GetSeqFromTime`, `FilteredState`, `SubjectsState`, `SubjectsTotals`, `NumPending`, `EncodedStreamState`, `UpdateConfig`, `ResetState`, `FlushAllPending`, `SkipMsg`, `SkipMsgs`. **Files:** - Modify: `src/NATS.Server/JetStream/Storage/FileStore.cs` - Test: `tests/NATS.Server.Tests/FileStoreQueryTests.cs` - Reference: `golang/nats-server/server/filestore.go` (various) **Step 1: Write the failing test** Create `tests/NATS.Server.Tests/FileStoreQueryTests.cs`: ```csharp using NATS.Server.JetStream.Storage; namespace NATS.Server.Tests; public class FileStoreQueryTests { private FileStore CreateStore() { var dir = Directory.CreateTempSubdirectory(); return new FileStore(new FileStoreOptions { Directory = dir.FullName }); } [Fact] public void PurgeEx_with_subject_filter() { using var store = CreateStore(); store.StoreMsg("a.1", null, "m1"u8.ToArray(), 0); store.StoreMsg("b.1", null, "m2"u8.ToArray(), 0); store.StoreMsg("a.2", null, "m3"u8.ToArray(), 0); var purged = store.PurgeEx("a.*", 0, 0); purged.ShouldBe(2UL); // only a.1 and a.2 var state = store.State(); state.Msgs.ShouldBe(1UL); // b.1 remains } [Fact] public void Compact_removes_below_seq() { using var store = CreateStore(); for (int i = 0; i < 5; i++) store.StoreMsg("c.x", null, $"m{i}"u8.ToArray(), 0); store.Compact(3); // remove seq 1, 2 var state = store.State(); state.FirstSeq.ShouldBe(3UL); } [Fact] public void FilteredState_returns_matching_counts() { using var store = CreateStore(); store.StoreMsg("f.a", null, "m1"u8.ToArray(), 0); store.StoreMsg("f.b", null, "m2"u8.ToArray(), 0); store.StoreMsg("f.a", null, "m3"u8.ToArray(), 0); var ss = store.FilteredState(1, "f.a"); ss.Msgs.ShouldBe(2UL); } [Fact] public void SubjectsTotals_returns_per_subject_counts() { using var store = CreateStore(); store.StoreMsg("t.a", null, "m1"u8.ToArray(), 0); store.StoreMsg("t.b", null, "m2"u8.ToArray(), 0); store.StoreMsg("t.a", null, "m3"u8.ToArray(), 0); var totals = store.SubjectsTotals("t.*"); totals["t.a"].ShouldBe(2UL); totals["t.b"].ShouldBe(1UL); } [Fact] public void NumPending_counts_from_start_seq() { using var store = CreateStore(); store.StoreMsg("np.x", null, "m1"u8.ToArray(), 0); store.StoreMsg("np.x", null, "m2"u8.ToArray(), 0); store.StoreMsg("np.y", null, "m3"u8.ToArray(), 0); var (total, _) = store.NumPending(2, "np.x", false); total.ShouldBe(1UL); // only seq 2 matches } [Fact] public void GetSeqFromTime_returns_first_at_or_after() { using var store = CreateStore(); store.StoreMsg("time.1", null, "m1"u8.ToArray(), 0); var beforeSecond = DateTime.UtcNow; store.StoreMsg("time.2", null, "m2"u8.ToArray(), 0); var seq = store.GetSeqFromTime(beforeSecond); seq.ShouldBe(2UL); } [Fact] public void SkipMsg_reserves_sequence() { using var store = CreateStore(); store.StoreMsg("sk.1", null, "m1"u8.ToArray(), 0); store.SkipMsg(2); store.StoreMsg("sk.3", null, "m3"u8.ToArray(), 0); var state = store.State(); state.LastSeq.ShouldBe(3UL); state.Msgs.ShouldBe(2UL); // seq 2 is skipped, not a message } } ``` **Step 2: Run test to verify it fails** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreQueryTests" -v normal ``` Expected: FAIL **Step 3: Implement query and state methods in FileStore.cs** Implement each remaining method using the in-memory `_messages` dictionary and block structures. Key approaches: - `PurgeEx` — iterate `_messages`, match subject with wildcard support, remove matching - `Compact` — remove all entries with seq < given - `Truncate` — remove all entries with seq > given - `GetSeqFromTime` — binary search or linear scan by timestamp - `FilteredState` — count messages matching subject filter from given seq - `SubjectsState` — group by subject, compute per-subject `SimpleState` - `SubjectsTotals` — group by subject, count - `NumPending` — count from start seq matching filter - `EncodedStreamState` — binary serialize stream state (versioned format) - `UpdateConfig` — apply new block size, retention, limits - `ResetState` — clear caches - `SkipMsg` / `SkipMsgs` — reserve sequence(s) without storing data **Step 4: Run test to verify it passes** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreQueryTests" -v normal ``` Expected: PASS **Step 5: Commit** ```bash git add tests/NATS.Server.Tests/FileStoreQueryTests.cs src/NATS.Server/JetStream/Storage/FileStore.cs git commit -m "feat(filestore): implement query and state IStreamStore methods" ``` --- ### Task 7: RAFT Binary WAL Replace in-memory JSON persistence with binary WAL for production durability. **Files:** - Modify: `src/NATS.Server/Raft/RaftLog.cs` - Modify: `src/NATS.Server/Raft/RaftNode.cs` - Create: `src/NATS.Server/Raft/RaftWal.cs` - Test: `tests/NATS.Server.Tests/RaftWalTests.cs` - Reference: `golang/nats-server/server/raft.go:1000-1100` **Step 1: Write the failing test** Create `tests/NATS.Server.Tests/RaftWalTests.cs`: ```csharp using NATS.Server.Raft; namespace NATS.Server.Tests; public class RaftWalTests { [Fact] public async Task Wal_persists_and_recovers_entries() { var dir = Directory.CreateTempSubdirectory(); var walPath = Path.Combine(dir.FullName, "raft.wal"); // Write entries { var wal = new RaftWal(walPath); await wal.AppendAsync(new RaftLogEntry(1, 1, "cmd-1")); await wal.AppendAsync(new RaftLogEntry(2, 1, "cmd-2")); await wal.AppendAsync(new RaftLogEntry(3, 2, "cmd-3")); await wal.SyncAsync(); wal.Dispose(); } // Recover var recovered = RaftWal.Load(walPath); var entries = recovered.Entries.ToList(); entries.Count.ShouldBe(3); entries[0].Index.ShouldBe(1); entries[0].Term.ShouldBe(1); entries[0].Command.ShouldBe("cmd-1"); entries[2].Index.ShouldBe(3); entries[2].Term.ShouldBe(2); recovered.Dispose(); } [Fact] public async Task Wal_compact_removes_old_entries() { var dir = Directory.CreateTempSubdirectory(); var walPath = Path.Combine(dir.FullName, "raft.wal"); var wal = new RaftWal(walPath); for (int i = 1; i <= 10; i++) await wal.AppendAsync(new RaftLogEntry(i, 1, $"cmd-{i}")); await wal.SyncAsync(); await wal.CompactAsync(5); // remove entries 1-5 var recovered = RaftWal.Load(walPath); recovered.Entries.Count().ShouldBe(5); recovered.Entries.First().Index.ShouldBe(6); wal.Dispose(); recovered.Dispose(); } [Fact] public async Task Wal_handles_truncated_file() { var dir = Directory.CreateTempSubdirectory(); var walPath = Path.Combine(dir.FullName, "raft.wal"); { var wal = new RaftWal(walPath); await wal.AppendAsync(new RaftLogEntry(1, 1, "good-entry")); await wal.AppendAsync(new RaftLogEntry(2, 1, "will-be-truncated")); await wal.SyncAsync(); wal.Dispose(); } // Truncate last few bytes using (var fs = File.OpenWrite(walPath)) fs.SetLength(fs.Length - 3); var recovered = RaftWal.Load(walPath); recovered.Entries.Count().ShouldBeGreaterThanOrEqualTo(1); recovered.Entries.First().Command.ShouldBe("good-entry"); recovered.Dispose(); } [Fact] public async Task RaftNode_persists_term_and_vote() { var dir = Directory.CreateTempSubdirectory(); var node = new RaftNode("n1", persistDirectory: dir.FullName); node.TermState.CurrentTerm = 5; node.TermState.VotedFor = "n2"; await node.PersistAsync(); var recovered = new RaftNode("n1", persistDirectory: dir.FullName); await recovered.LoadPersistedStateAsync(); recovered.Term.ShouldBe(5); recovered.TermState.VotedFor.ShouldBe("n2"); } } ``` **Step 2: Run test to verify it fails** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RaftWalTests" -v normal ``` Expected: FAIL — `RaftWal` class doesn't exist **Step 3: Implement RaftWal** Create `src/NATS.Server/Raft/RaftWal.cs`: ```csharp namespace NATS.Server.Raft; /// /// Binary write-ahead log for RAFT entries. /// Format: [4-byte version header]([4-byte length][8-byte index][4-byte term][N-byte command])* /// Go reference: raft.go WAL patterns. /// public sealed class RaftWal : IDisposable { private const int Version = 1; private FileStream _file; private readonly List _entries = []; // Constructor, AppendAsync, SyncAsync, CompactAsync, Load, Dispose... } ``` Key implementation: - Binary record format: `[4:length_le][8:index_le][4:term_le][N:utf8_command]` - Version header: `[4:magic][4:version]` at file start - `AppendAsync` — write length-prefixed record, add to in-memory list - `SyncAsync` — `FileStream.FlushAsync()` for fsync - `CompactAsync(upToIndex)` — rewrite WAL from `upToIndex+1` onward using temp file + rename - `Load(path)` — scan WAL, validate records, skip corrupt tail Modify `RaftLog.cs`: - Add optional `RaftWal` backing — when `_wal` is set, delegate `Append`/`Compact` to it - Keep in-memory list for fast access, WAL for durability Modify `RaftNode.cs`: - Wire `_persistDirectory` — create WAL at `{dir}/raft.wal` - `PersistAsync()` — write `meta.json` with term/votedFor - `LoadPersistedStateAsync()` — load `meta.json` + WAL entries **Step 4: Run test to verify it passes** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RaftWalTests" -v normal ``` Expected: PASS **Step 5: Commit** ```bash git add src/NATS.Server/Raft/RaftWal.cs tests/NATS.Server.Tests/RaftWalTests.cs src/NATS.Server/Raft/RaftLog.cs src/NATS.Server/Raft/RaftNode.cs git commit -m "feat(raft): add binary WAL for persistent log storage" ``` --- ### Task 8: RAFT Joint Consensus Implement safe two-phase membership changes per Raft paper Section 4. **Files:** - Modify: `src/NATS.Server/Raft/RaftNode.cs` - Test: `tests/NATS.Server.Tests/RaftJointConsensusTests.cs` - Reference: `golang/nats-server/server/raft.go` Section 4 **Step 1: Write the failing test** Create `tests/NATS.Server.Tests/RaftJointConsensusTests.cs`: ```csharp using NATS.Server.Raft; namespace NATS.Server.Tests; public class RaftJointConsensusTests { [Fact] public async Task AddPeer_uses_joint_consensus() { var cluster = RaftTestCluster.Create(3); var leader = await cluster.ElectLeaderAsync(); await leader.ProposeAddPeerAsync("n4"); // During joint phase, quorum requires majority of both old and new leader.Members.ShouldContain("n4"); leader.Members.Count.ShouldBe(4); } [Fact] public async Task RemovePeer_uses_joint_consensus() { var cluster = RaftTestCluster.Create(3); var leader = await cluster.ElectLeaderAsync(); await leader.ProposeRemovePeerAsync("n3"); leader.Members.ShouldNotContain("n3"); leader.Members.Count.ShouldBe(2); } [Fact] public async Task Joint_quorum_requires_both_old_and_new_majority() { var cluster = RaftTestCluster.Create(3); var leader = await cluster.ElectLeaderAsync(); // Start add — enters joint config leader.BeginJointConsensus(["n1", "n2", "n3"], ["n1", "n2", "n3", "n4"]); // Joint quorum: majority of {n1,n2,n3} AND majority of {n1,n2,n3,n4} leader.CalculateJointQuorum(["n1", "n2"], ["n1", "n2", "n3"]).ShouldBeTrue(); // 2/3 and 3/4 leader.CalculateJointQuorum(["n1"], ["n1", "n2"]).ShouldBeFalse(); // 1/3 fails old quorum } [Fact] public async Task Only_one_membership_change_at_a_time() { var cluster = RaftTestCluster.Create(3); var leader = await cluster.ElectLeaderAsync(); await leader.ProposeAddPeerAsync("n4"); // Second concurrent change should be rejected var ex = await Should.ThrowAsync( () => leader.ProposeAddPeerAsync("n5")); } } ``` **Step 2: Run test to verify it fails** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RaftJointConsensusTests" -v normal ``` Expected: FAIL **Step 3: Implement joint consensus in RaftNode.cs** Add to `RaftNode.cs`: - New field: `HashSet? _jointNewMembers` — Cnew during transition - `BeginJointConsensus(cold, cnew)` — set `_jointNewMembers = cnew`, propose joint entry - `CalculateJointQuorum(coldVotes, cnewVotes)` — `majority(cold) AND majority(cnew)` - Modify `ProposeAddPeerAsync`: 1. Check `MembershipChangeInProgress` — reject if true 2. Propose joint entry (`Cold ∪ Cnew`) to log 3. Wait for joint entry commit 4. Propose final Cnew entry to log 5. Wait for final entry commit 6. Clear `_jointNewMembers` - Modify `ProposeRemovePeerAsync` — same two-phase pattern - Modify `ApplyMembershipChange(entry)`: - If joint entry: set `_jointNewMembers` - If final entry: replace `_members` with Cnew, clear `_jointNewMembers` **Step 4: Run test to verify it passes** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RaftJointConsensusTests" -v normal ``` Expected: PASS **Step 5: Commit** ```bash git add tests/NATS.Server.Tests/RaftJointConsensusTests.cs src/NATS.Server/Raft/RaftNode.cs git commit -m "feat(raft): implement joint consensus for safe membership changes" ``` --- **Phase 1 Exit Gate Verification:** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStore" -v normal dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RaftWal" -v normal dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RaftJointConsensus" -v normal ``` All Phase 1 tests must pass before proceeding to Phase 2. --- ## Phase 2: JetStream Cluster Coordination **Dependencies:** Phase 1 (RAFT persistence, FileStore) **Exit gate:** Meta-group processes stream/consumer assignments via RAFT, snapshots encode/decode round-trip, leadership transitions handled correctly, orphan detection works. --- ### Task 9: Meta Snapshot Encoding/Decoding Implement binary snapshot codec for meta-group state with S2 compression. **Files:** - Create: `src/NATS.Server/JetStream/Cluster/MetaSnapshotCodec.cs` - Test: `tests/NATS.Server.Tests/MetaSnapshotCodecTests.cs` - Reference: `golang/nats-server/server/jetstream_cluster.go:2031-2145` (encodeMetaSnapshot, decodeMetaSnapshot) **Step 1: Write the failing test** Create `tests/NATS.Server.Tests/MetaSnapshotCodecTests.cs`: ```csharp using NATS.Server.JetStream.Cluster; namespace NATS.Server.Tests; public class MetaSnapshotCodecTests { [Fact] public void Encode_decode_round_trips() { var assignments = new Dictionary { ["stream-A"] = new StreamAssignment { StreamName = "stream-A", Group = new RaftGroup { Name = "rg-a", Peers = ["n1", "n2", "n3"] }, ConfigJson = """{"subjects":["foo.>"]}""", }, ["stream-B"] = new StreamAssignment { StreamName = "stream-B", Group = new RaftGroup { Name = "rg-b", Peers = ["n1", "n2"] }, ConfigJson = """{"subjects":["bar.>"]}""", Consumers = { ["con-1"] = new ConsumerAssignment { ConsumerName = "con-1", StreamName = "stream-B", Group = new RaftGroup { Name = "rg-c1", Peers = ["n1", "n2"] }, }, }, }, }; var encoded = MetaSnapshotCodec.Encode(assignments); encoded.ShouldNotBeEmpty(); var decoded = MetaSnapshotCodec.Decode(encoded); decoded.Count.ShouldBe(2); decoded["stream-A"].StreamName.ShouldBe("stream-A"); decoded["stream-A"].Group.Peers.Count.ShouldBe(3); decoded["stream-B"].Consumers.Count.ShouldBe(1); decoded["stream-B"].Consumers["con-1"].ConsumerName.ShouldBe("con-1"); } [Fact] public void Encoded_snapshot_is_compressed() { var assignments = new Dictionary(); for (int i = 0; i < 100; i++) { assignments[$"stream-{i}"] = new StreamAssignment { StreamName = $"stream-{i}", Group = new RaftGroup { Name = $"rg-{i}", Peers = ["n1", "n2", "n3"] }, ConfigJson = """{"subjects":["test.>"]}""", }; } var encoded = MetaSnapshotCodec.Encode(assignments); var json = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(assignments); // S2 compressed should be smaller than raw JSON encoded.Length.ShouldBeLessThan(json.Length); } [Fact] public void Empty_snapshot_round_trips() { var empty = new Dictionary(); var encoded = MetaSnapshotCodec.Encode(empty); var decoded = MetaSnapshotCodec.Decode(encoded); decoded.ShouldBeEmpty(); } [Fact] public void Versioned_format_rejects_unknown_version() { var bad = new byte[] { 0xFF, 0xFF, 0, 0 }; // invalid version Should.Throw(() => MetaSnapshotCodec.Decode(bad)); } } ``` **Step 2: Run test to verify it fails** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MetaSnapshotCodecTests" -v normal ``` Expected: FAIL — `MetaSnapshotCodec` doesn't exist **Step 3: Implement MetaSnapshotCodec** Create `src/NATS.Server/JetStream/Cluster/MetaSnapshotCodec.cs`: ```csharp using System.Buffers.Binary; using System.Text.Json; using NATS.Server.JetStream.Storage; namespace NATS.Server.JetStream.Cluster; /// /// Binary codec for meta-group snapshots. Format: /// [2:version][N:S2-compressed JSON of assignment map] /// Go reference: jetstream_cluster.go:2075-2145 (encodeMetaSnapshot/decodeMetaSnapshot) /// public static class MetaSnapshotCodec { private const ushort CurrentVersion = 1; public static byte[] Encode(Dictionary assignments) { ... } public static Dictionary Decode(byte[] data) { ... } } ``` Implementation: - `Encode`: JSON serialize → S2 compress → prepend 2-byte version header - `Decode`: read version → strip header → S2 decompress → JSON deserialize - Reject unknown versions with `InvalidOperationException` **Step 4: Run test to verify it passes** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MetaSnapshotCodecTests" -v normal ``` Expected: PASS **Step 5: Commit** ```bash git add src/NATS.Server/JetStream/Cluster/MetaSnapshotCodec.cs tests/NATS.Server.Tests/MetaSnapshotCodecTests.cs git commit -m "feat(cluster): add MetaSnapshotCodec with S2 compression and versioned format" ``` --- ### Task 10: Cluster Monitoring Loop Background loop that processes meta RAFT entries and drives cluster state transitions. **Files:** - Create: `src/NATS.Server/JetStream/Cluster/JetStreamClusterMonitor.cs` - Test: `tests/NATS.Server.Tests/JetStreamClusterMonitorTests.cs` - Reference: `golang/nats-server/server/jetstream_cluster.go:1455-1825` (monitorCluster) **Step 1: Write the failing test** Create `tests/NATS.Server.Tests/JetStreamClusterMonitorTests.cs`: ```csharp using System.Threading.Channels; using NATS.Server.JetStream.Cluster; using NATS.Server.Raft; namespace NATS.Server.Tests; public class JetStreamClusterMonitorTests { [Fact] public async Task Monitor_processes_stream_assignment_entry() { var meta = new JetStreamMetaGroup(3); var channel = Channel.CreateUnbounded(); var monitor = new JetStreamClusterMonitor(meta, channel.Reader); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); var monitorTask = monitor.StartAsync(cts.Token); // Write a stream assignment entry var assignJson = System.Text.Json.JsonSerializer.Serialize(new { Op = "assignStream", StreamName = "test-stream", Peers = new[] { "n1", "n2", "n3" }, Config = """{"subjects":["test.>"]}""", }); await channel.Writer.WriteAsync(new RaftLogEntry(1, 1, assignJson)); // Give the monitor time to process await Task.Delay(100); meta.StreamCount.ShouldBe(1); meta.GetStreamAssignment("test-stream").ShouldNotBeNull(); cts.Cancel(); await monitorTask; } [Fact] public async Task Monitor_processes_consumer_assignment_entry() { var meta = new JetStreamMetaGroup(3); var channel = Channel.CreateUnbounded(); var monitor = new JetStreamClusterMonitor(meta, channel.Reader); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); var monitorTask = monitor.StartAsync(cts.Token); // First assign stream var streamJson = System.Text.Json.JsonSerializer.Serialize(new { Op = "assignStream", StreamName = "s1", Peers = new[] { "n1", "n2", "n3" }, Config = """{"subjects":["x.>"]}""", }); await channel.Writer.WriteAsync(new RaftLogEntry(1, 1, streamJson)); // Then assign consumer var consumerJson = System.Text.Json.JsonSerializer.Serialize(new { Op = "assignConsumer", StreamName = "s1", ConsumerName = "c1", Peers = new[] { "n1", "n2", "n3" }, }); await channel.Writer.WriteAsync(new RaftLogEntry(2, 1, consumerJson)); await Task.Delay(100); meta.ConsumerCount.ShouldBe(1); cts.Cancel(); await monitorTask; } [Fact] public async Task Monitor_processes_stream_removal() { var meta = new JetStreamMetaGroup(3); var channel = Channel.CreateUnbounded(); var monitor = new JetStreamClusterMonitor(meta, channel.Reader); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); var monitorTask = monitor.StartAsync(cts.Token); // Assign then remove var assignJson = System.Text.Json.JsonSerializer.Serialize(new { Op = "assignStream", StreamName = "to-remove", Peers = new[] { "n1", "n2", "n3" }, Config = """{"subjects":["rm.>"]}""", }); await channel.Writer.WriteAsync(new RaftLogEntry(1, 1, assignJson)); await Task.Delay(50); var removeJson = System.Text.Json.JsonSerializer.Serialize(new { Op = "removeStream", StreamName = "to-remove", }); await channel.Writer.WriteAsync(new RaftLogEntry(2, 1, removeJson)); await Task.Delay(100); meta.StreamCount.ShouldBe(0); cts.Cancel(); await monitorTask; } [Fact] public async Task Monitor_applies_meta_snapshot() { var meta = new JetStreamMetaGroup(3); var channel = Channel.CreateUnbounded(); var monitor = new JetStreamClusterMonitor(meta, channel.Reader); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); var monitorTask = monitor.StartAsync(cts.Token); // Create a snapshot with known state var assignments = new Dictionary { ["snap-stream"] = new StreamAssignment { StreamName = "snap-stream", Group = new RaftGroup { Name = "rg-snap", Peers = ["n1", "n2", "n3"] }, }, }; var snapshot = MetaSnapshotCodec.Encode(assignments); var snapshotB64 = Convert.ToBase64String(snapshot); var snapshotJson = System.Text.Json.JsonSerializer.Serialize(new { Op = "snapshot", Data = snapshotB64, }); await channel.Writer.WriteAsync(new RaftLogEntry(1, 1, snapshotJson)); await Task.Delay(100); meta.StreamCount.ShouldBe(1); meta.GetStreamAssignment("snap-stream").ShouldNotBeNull(); cts.Cancel(); await monitorTask; } } ``` **Step 2: Run test to verify it fails** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamClusterMonitorTests" -v normal ``` Expected: FAIL — `JetStreamClusterMonitor` doesn't exist **Step 3: Implement JetStreamClusterMonitor** Create `src/NATS.Server/JetStream/Cluster/JetStreamClusterMonitor.cs`: ```csharp using System.Text.Json; using System.Threading.Channels; using NATS.Server.Raft; namespace NATS.Server.JetStream.Cluster; /// /// Background loop consuming meta RAFT entries and dispatching cluster state changes. /// Go reference: jetstream_cluster.go:1455-1825 (monitorCluster). /// public sealed class JetStreamClusterMonitor { private readonly JetStreamMetaGroup _meta; private readonly ChannelReader _entries; public JetStreamClusterMonitor(JetStreamMetaGroup meta, ChannelReader entries) { ... } public async Task StartAsync(CancellationToken ct) { ... } private void ApplyMetaEntry(RaftLogEntry entry) { ... } private void ProcessStreamAssignment(JsonElement data) { ... } private void ProcessConsumerAssignment(JsonElement data) { ... } private void ProcessStreamRemoval(JsonElement data) { ... } private void ProcessConsumerRemoval(JsonElement data) { ... } private void ApplyMetaSnapshot(JsonElement data) { ... } } ``` The main loop: 1. Read entries from channel 2. Parse JSON, dispatch by `Op` field 3. Call corresponding handler on `JetStreamMetaGroup` Add to `JetStreamMetaGroup.cs`: - `AddStreamAssignment(StreamAssignment sa)` — adds to `_assignments` - `RemoveStreamAssignment(string streamName)` — removes from `_assignments` - `AddConsumerAssignment(string streamName, ConsumerAssignment ca)` — adds to stream's consumers - `RemoveConsumerAssignment(string streamName, string consumerName)` — removes consumer - `ReplaceAllAssignments(Dictionary newState)` — for snapshot apply **Step 4: Run test to verify it passes** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamClusterMonitorTests" -v normal ``` Expected: PASS **Step 5: Commit** ```bash git add src/NATS.Server/JetStream/Cluster/JetStreamClusterMonitor.cs tests/NATS.Server.Tests/JetStreamClusterMonitorTests.cs src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs git commit -m "feat(cluster): add JetStreamClusterMonitor for meta RAFT entry processing" ``` --- ### Task 11: Stream/Consumer Assignment Processing Enhance `JetStreamMetaGroup` to validate and process assignments with proper error handling. **Files:** - Modify: `src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs` - Test: `tests/NATS.Server.Tests/JetStreamAssignmentProcessingTests.cs` - Reference: `golang/nats-server/server/jetstream_cluster.go:4541-5925` **Step 1: Write the failing test** Create `tests/NATS.Server.Tests/JetStreamAssignmentProcessingTests.cs`: ```csharp using NATS.Server.JetStream.Cluster; namespace NATS.Server.Tests; public class JetStreamAssignmentProcessingTests { [Fact] public void ProcessStreamAssignment_validates_config() { var meta = new JetStreamMetaGroup(3); var sa = new StreamAssignment { StreamName = "valid-stream", Group = new RaftGroup { Name = "rg-1", Peers = ["n1", "n2", "n3"] }, ConfigJson = """{"subjects":["test.>"]}""", }; meta.ProcessStreamAssignment(sa).ShouldBeTrue(); meta.StreamCount.ShouldBe(1); } [Fact] public void ProcessStreamAssignment_rejects_empty_name() { var meta = new JetStreamMetaGroup(3); var sa = new StreamAssignment { StreamName = "", Group = new RaftGroup { Name = "rg-1", Peers = ["n1", "n2", "n3"] }, }; meta.ProcessStreamAssignment(sa).ShouldBeFalse(); meta.StreamCount.ShouldBe(0); } [Fact] public void ProcessUpdateStreamAssignment_applies_config_change() { var meta = new JetStreamMetaGroup(3); var sa = new StreamAssignment { StreamName = "updatable", Group = new RaftGroup { Name = "rg-u", Peers = ["n1", "n2", "n3"] }, ConfigJson = """{"subjects":["old.>"]}""", }; meta.ProcessStreamAssignment(sa); var updated = new StreamAssignment { StreamName = "updatable", Group = new RaftGroup { Name = "rg-u", Peers = ["n1", "n2", "n3"] }, ConfigJson = """{"subjects":["new.>"]}""", }; meta.ProcessUpdateStreamAssignment(updated).ShouldBeTrue(); var assignment = meta.GetStreamAssignment("updatable"); assignment!.ConfigJson.ShouldContain("new.>"); } [Fact] public void ProcessConsumerAssignment_requires_existing_stream() { var meta = new JetStreamMetaGroup(3); var ca = new ConsumerAssignment { ConsumerName = "orphan-consumer", StreamName = "nonexistent-stream", Group = new RaftGroup { Name = "rg-c", Peers = ["n1", "n2", "n3"] }, }; meta.ProcessConsumerAssignment(ca).ShouldBeFalse(); } [Fact] public void ProcessStreamRemoval_cascades_to_consumers() { var meta = new JetStreamMetaGroup(3); meta.ProcessStreamAssignment(new StreamAssignment { StreamName = "cascade", Group = new RaftGroup { Name = "rg-cas", Peers = ["n1", "n2", "n3"] }, }); meta.ProcessConsumerAssignment(new ConsumerAssignment { ConsumerName = "c1", StreamName = "cascade", Group = new RaftGroup { Name = "rg-c1", Peers = ["n1", "n2", "n3"] }, }); meta.ProcessStreamRemoval("cascade").ShouldBeTrue(); meta.StreamCount.ShouldBe(0); meta.ConsumerCount.ShouldBe(0); } } ``` **Step 2: Run test to verify it fails** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamAssignmentProcessingTests" -v normal ``` Expected: FAIL **Step 3: Implement assignment processing in JetStreamMetaGroup.cs** Add methods: - `bool ProcessStreamAssignment(StreamAssignment sa)` — validate name, check duplicates, add - `bool ProcessUpdateStreamAssignment(StreamAssignment sa)` — find existing, update config - `bool ProcessStreamRemoval(string streamName)` — remove stream + cascade consumers - `bool ProcessConsumerAssignment(ConsumerAssignment ca)` — validate stream exists, add consumer - `bool ProcessConsumerRemoval(string streamName, string consumerName)` — remove consumer **Step 4: Run test to verify it passes** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamAssignmentProcessingTests" -v normal ``` Expected: PASS **Step 5: Commit** ```bash git add tests/NATS.Server.Tests/JetStreamAssignmentProcessingTests.cs src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs git commit -m "feat(cluster): add stream/consumer assignment processing with validation" ``` --- ### Task 12: Inflight Tracking Enhancement Replace simple string-based inflight maps with structured tracking. **Files:** - Modify: `src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs` - Test: `tests/NATS.Server.Tests/JetStreamInflightTrackingTests.cs` - Reference: `golang/nats-server/server/jetstream_cluster.go:1193-1278` **Step 1: Write the failing test** Create `tests/NATS.Server.Tests/JetStreamInflightTrackingTests.cs`: ```csharp using NATS.Server.JetStream.Cluster; namespace NATS.Server.Tests; public class JetStreamInflightTrackingTests { [Fact] public void TrackInflightStreamProposal_increments_ops() { var meta = new JetStreamMetaGroup(3); var sa = new StreamAssignment { StreamName = "inflight-1", Group = new RaftGroup { Name = "rg-inf", Peers = ["n1", "n2", "n3"] }, }; meta.TrackInflightStreamProposal("ACC", sa); meta.InflightStreamCount.ShouldBe(1); meta.IsStreamInflight("ACC", "inflight-1").ShouldBeTrue(); } [Fact] public void RemoveInflightStreamProposal_clears_when_zero() { var meta = new JetStreamMetaGroup(3); var sa = new StreamAssignment { StreamName = "inflight-2", Group = new RaftGroup { Name = "rg-inf2", Peers = ["n1", "n2", "n3"] }, }; meta.TrackInflightStreamProposal("ACC", sa); meta.RemoveInflightStreamProposal("ACC", "inflight-2"); meta.IsStreamInflight("ACC", "inflight-2").ShouldBeFalse(); } [Fact] public void Duplicate_proposal_increments_ops_count() { var meta = new JetStreamMetaGroup(3); var sa = new StreamAssignment { StreamName = "dup-stream", Group = new RaftGroup { Name = "rg-dup", Peers = ["n1", "n2", "n3"] }, }; meta.TrackInflightStreamProposal("ACC", sa); meta.TrackInflightStreamProposal("ACC", sa); meta.InflightStreamCount.ShouldBe(1); // still one unique stream // Need two removes to fully clear meta.RemoveInflightStreamProposal("ACC", "dup-stream"); meta.IsStreamInflight("ACC", "dup-stream").ShouldBeTrue(); // ops > 0 meta.RemoveInflightStreamProposal("ACC", "dup-stream"); meta.IsStreamInflight("ACC", "dup-stream").ShouldBeFalse(); } } ``` **Step 2: Run test to verify it fails** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamInflightTrackingTests" -v normal ``` Expected: FAIL **Step 3: Implement enhanced inflight tracking** In `JetStreamMetaGroup.cs`: - Add `record InflightInfo(int OpsCount, bool Deleted, StreamAssignment? Assignment)` - Replace `ConcurrentDictionary _inflightStreams` with `ConcurrentDictionary>` - `TrackInflightStreamProposal(account, sa)` — increment ops, store assignment - `RemoveInflightStreamProposal(account, streamName)` — decrement ops, remove when zero - `IsStreamInflight(account, streamName)` — check if ops > 0 - Same pattern for consumer inflight tracking **Step 4: Run test to verify it passes** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamInflightTrackingTests" -v normal ``` Expected: PASS **Step 5: Commit** ```bash git add tests/NATS.Server.Tests/JetStreamInflightTrackingTests.cs src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs git commit -m "feat(cluster): add structured inflight proposal tracking with ops counting" ``` --- ### Task 13: Leadership Transitions Handle meta-group, stream, and consumer leadership changes. **Files:** - Modify: `src/NATS.Server/JetStream/Cluster/JetStreamClusterMonitor.cs` - Modify: `src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs` - Test: `tests/NATS.Server.Tests/JetStreamLeadershipTests.cs` - Reference: `golang/nats-server/server/jetstream_cluster.go:7001-7074` (processLeaderChange) **Step 1: Write the failing test** Create `tests/NATS.Server.Tests/JetStreamLeadershipTests.cs`: ```csharp using NATS.Server.JetStream.Cluster; namespace NATS.Server.Tests; public class JetStreamLeadershipTests { [Fact] public void ProcessLeaderChange_clears_inflight_on_step_down() { var meta = new JetStreamMetaGroup(3); meta.TrackInflightStreamProposal("ACC", new StreamAssignment { StreamName = "s1", Group = new RaftGroup { Name = "rg", Peers = ["n1", "n2", "n3"] }, }); meta.ProcessLeaderChange(isLeader: false); meta.InflightStreamCount.ShouldBe(0); } [Fact] public void ProcessLeaderChange_becoming_leader_resets_state() { var meta = new JetStreamMetaGroup(3); var leaderChanged = false; meta.OnLeaderChange += (isLeader) => leaderChanged = true; meta.ProcessLeaderChange(isLeader: true); leaderChanged.ShouldBeTrue(); } [Fact] public void StepDown_triggers_leader_change() { var meta = new JetStreamMetaGroup(3); meta.BecomeLeader(); meta.IsLeader().ShouldBeTrue(); meta.StepDown(); meta.IsLeader().ShouldBeFalse(); } } ``` **Step 2: Run test to verify it fails** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamLeadershipTests" -v normal ``` Expected: FAIL **Step 3: Implement leadership transition** In `JetStreamMetaGroup.cs`: - Add `event Action? OnLeaderChange` - `ProcessLeaderChange(isLeader)`: - If stepping down: clear all inflight maps - If becoming leader: fire `OnLeaderChange`, initialize update subscriptions - Modify `StepDown()` to call `ProcessLeaderChange(false)` **Step 4: Run test to verify it passes** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamLeadershipTests" -v normal ``` Expected: PASS **Step 5: Commit** ```bash git add tests/NATS.Server.Tests/JetStreamLeadershipTests.cs src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs src/NATS.Server/JetStream/Cluster/JetStreamClusterMonitor.cs git commit -m "feat(cluster): implement leadership transition with inflight cleanup" ``` --- **Phase 2 Exit Gate Verification:** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MetaSnapshotCodec" -v normal dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamClusterMonitor" -v normal dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamAssignment" -v normal dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamInflight" -v normal dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamLeadership" -v normal ``` All Phase 2 tests must pass before proceeding to Phase 3. --- ## Phase 3: Consumer + Stream Engines **Dependencies:** Phase 1 (storage), Phase 2 (cluster coordination) **Exit gate:** Consumer delivery loop delivers messages with redelivery, pull requests expire and respect batch/maxBytes, purge with subject filtering works, mirror/source retry recovers from failures. --- ### Task 14: RedeliveryTracker with PriorityQueue Replace Dictionary-based redelivery tracking with min-heap for efficient deadline-based scheduling. **Files:** - Modify: `src/NATS.Server/JetStream/Consumers/RedeliveryTracker.cs` - Test: `tests/NATS.Server.Tests/RedeliveryTrackerTests.cs` - Reference: `golang/nats-server/server/consumer.go` (rdq redelivery queue) **Step 1: Write the failing test** Create `tests/NATS.Server.Tests/RedeliveryTrackerTests.cs`: ```csharp using NATS.Server.JetStream.Consumers; namespace NATS.Server.Tests; public class RedeliveryTrackerTests { [Fact] public void Schedule_and_get_due_returns_expired() { var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 1000); var past = DateTimeOffset.UtcNow.AddMilliseconds(-100); tracker.Schedule(1, past); tracker.Schedule(2, DateTimeOffset.UtcNow.AddSeconds(60)); // future var due = tracker.GetDue(DateTimeOffset.UtcNow).ToList(); due.Count.ShouldBe(1); due[0].ShouldBe(1UL); } [Fact] public void Acknowledge_removes_from_queue() { var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 1000); tracker.Schedule(1, DateTimeOffset.UtcNow.AddMilliseconds(-100)); tracker.Acknowledge(1); var due = tracker.GetDue(DateTimeOffset.UtcNow).ToList(); due.ShouldBeEmpty(); } [Fact] public void IsMaxDeliveries_returns_true_at_threshold() { var tracker = new RedeliveryTracker(maxDeliveries: 3, ackWaitMs: 1000); tracker.IncrementDeliveryCount(1); tracker.IncrementDeliveryCount(1); tracker.IsMaxDeliveries(1).ShouldBeFalse(); tracker.IncrementDeliveryCount(1); tracker.IsMaxDeliveries(1).ShouldBeTrue(); } [Fact] public void Backoff_schedule_uses_delivery_count() { var backoff = new long[] { 100, 500, 2000 }; var tracker = new RedeliveryTracker(maxDeliveries: 10, ackWaitMs: 1000, backoffMs: backoff); // First redeliver: 100ms var delay1 = tracker.GetBackoffDelay(deliveryCount: 1); delay1.ShouldBe(100L); // Second: 500ms var delay2 = tracker.GetBackoffDelay(deliveryCount: 2); delay2.ShouldBe(500L); // Beyond schedule: use last value var delay4 = tracker.GetBackoffDelay(deliveryCount: 4); delay4.ShouldBe(2000L); } [Fact] public void GetDue_returns_in_deadline_order() { var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 1000); var now = DateTimeOffset.UtcNow; tracker.Schedule(3, now.AddMilliseconds(-300)); tracker.Schedule(1, now.AddMilliseconds(-100)); tracker.Schedule(2, now.AddMilliseconds(-200)); var due = tracker.GetDue(now).ToList(); due.Count.ShouldBe(3); due[0].ShouldBe(3UL); // earliest deadline first due[1].ShouldBe(2UL); due[2].ShouldBe(1UL); } } ``` **Step 2: Run test to verify it fails** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RedeliveryTrackerTests" -v normal ``` Expected: FAIL **Step 3: Rewrite RedeliveryTracker internals** In `RedeliveryTracker.cs`: - Replace `Dictionary` with `PriorityQueue` - Add `Dictionary _deliveryCounts` for per-sequence delivery count - `Schedule(seq, deadline)` — enqueue with deadline priority - `GetDue(now)` — dequeue all entries past deadline, return in order - `Acknowledge(seq)` — remove from queue and counts - `IncrementDeliveryCount(seq)` — increment counter - `IsMaxDeliveries(seq)` — check against `_maxDeliveries` - `GetBackoffDelay(deliveryCount)` — index into `_backoffMs` array, clamp to last - Constructor: `(int maxDeliveries, long ackWaitMs, long[]? backoffMs = null)` **Step 4: Run test to verify it passes** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RedeliveryTrackerTests" -v normal ``` Expected: PASS **Step 5: Commit** ```bash git add tests/NATS.Server.Tests/RedeliveryTrackerTests.cs src/NATS.Server/JetStream/Consumers/RedeliveryTracker.cs git commit -m "feat(consumer): rewrite RedeliveryTracker with PriorityQueue min-heap" ``` --- ### Task 15: Ack/NAK Processing Enhancement Add background ack subscription processing with NAK delay, TERM, and WPI support. **Files:** - Modify: `src/NATS.Server/JetStream/Consumers/AckProcessor.cs` - Test: `tests/NATS.Server.Tests/AckProcessorTests.cs` - Reference: `golang/nats-server/server/consumer.go:4854` (processInboundAcks) **Step 1: Write the failing test** Create `tests/NATS.Server.Tests/AckProcessorTests.cs`: ```csharp using NATS.Server.JetStream.Consumers; namespace NATS.Server.Tests; public class AckProcessorTests { [Fact] public void ProcessAck_removes_from_pending() { var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 30000); var processor = new AckProcessor(tracker); processor.Register(1, "deliver.subj"); processor.PendingCount.ShouldBe(1); processor.ProcessAck(1); processor.PendingCount.ShouldBe(0); } [Fact] public void ProcessNak_schedules_redelivery() { var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 30000); var processor = new AckProcessor(tracker); processor.Register(1, "deliver.subj"); processor.ProcessNak(1, delayMs: 500); processor.PendingCount.ShouldBe(1); // still pending until redelivered } [Fact] public void ProcessNak_with_backoff_uses_schedule() { var backoff = new long[] { 100, 500, 2000 }; var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 30000, backoffMs: backoff); var processor = new AckProcessor(tracker); processor.Register(1, "deliver.subj"); processor.ProcessNak(1, delayMs: -1); // -1 means use backoff schedule // Should use first backoff value (100ms) processor.PendingCount.ShouldBe(1); } [Fact] public void ProcessTerm_removes_permanently() { var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 30000); var processor = new AckProcessor(tracker); processor.Register(1, "deliver.subj"); processor.ProcessTerm(1); processor.PendingCount.ShouldBe(0); processor.TerminatedCount.ShouldBe(1); } [Fact] public void ProcessProgress_extends_ack_deadline() { var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 1000); var processor = new AckProcessor(tracker); processor.Register(1, "deliver.subj"); var originalDeadline = processor.GetDeadline(1); processor.ProcessProgress(1); var newDeadline = processor.GetDeadline(1); newDeadline.ShouldBeGreaterThan(originalDeadline); } [Fact] public void MaxAckPending_blocks_new_registrations() { var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 30000); var processor = new AckProcessor(tracker, maxAckPending: 2); processor.Register(1, "d.1"); processor.Register(2, "d.2"); processor.CanRegister().ShouldBeFalse(); } [Fact] public void ParseAckType_identifies_all_types() { AckProcessor.ParseAckType("+ACK"u8).ShouldBe(AckType.Ack); AckProcessor.ParseAckType("-NAK"u8).ShouldBe(AckType.Nak); AckProcessor.ParseAckType("+TERM"u8).ShouldBe(AckType.Term); AckProcessor.ParseAckType("+WPI"u8).ShouldBe(AckType.Progress); } } ``` **Step 2: Run test to verify it fails** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~AckProcessorTests" -v normal ``` Expected: FAIL **Step 3: Enhance AckProcessor** In `AckProcessor.cs`: - Add `enum AckType { Ack, Nak, Term, Progress }` - Add `static AckType ParseAckType(ReadOnlySpan data)` — parse `+ACK`, `-NAK`, `+TERM`, `+WPI` - Add `int PendingCount` property - Add `int TerminatedCount` property (via `_terminated.Count`) - Add `bool CanRegister()` — checks `PendingCount < _maxAckPending` - Add `DateTimeOffset GetDeadline(ulong seq)` — returns ack deadline for pending message - Enhance `ProcessNak(seq, delayMs)` — if `delayMs == -1`, use backoff schedule from tracker - Add `ProcessProgress(seq)` — extend deadline by `_ackWaitMs` - Constructor: add optional `maxAckPending` parameter **Step 4: Run test to verify it passes** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~AckProcessorTests" -v normal ``` Expected: PASS **Step 5: Commit** ```bash git add tests/NATS.Server.Tests/AckProcessorTests.cs src/NATS.Server/JetStream/Consumers/AckProcessor.cs git commit -m "feat(consumer): enhance AckProcessor with NAK delay, TERM, WPI, and maxAckPending" ``` --- ### Task 16: Pull Request Pipeline Implement waiting request queue with expiry, batch/maxBytes enforcement. **Files:** - Create: `src/NATS.Server/JetStream/Consumers/WaitingRequestQueue.cs` - Modify: `src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs` - Test: `tests/NATS.Server.Tests/WaitingRequestQueueTests.cs` - Reference: `golang/nats-server/server/consumer.go:4276-4450` (processNextMsgRequest) **Step 1: Write the failing test** Create `tests/NATS.Server.Tests/WaitingRequestQueueTests.cs`: ```csharp using NATS.Server.JetStream.Consumers; namespace NATS.Server.Tests; public class WaitingRequestQueueTests { [Fact] public void Enqueue_and_dequeue_fifo() { var queue = new WaitingRequestQueue(); queue.Enqueue(new PullRequest("reply.1", Batch: 10, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: false)); queue.Enqueue(new PullRequest("reply.2", Batch: 5, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: false)); queue.Count.ShouldBe(2); var first = queue.TryDequeue(); first.ShouldNotBeNull(); first.ReplyTo.ShouldBe("reply.1"); } [Fact] public void Expired_requests_are_removed() { var queue = new WaitingRequestQueue(); queue.Enqueue(new PullRequest("expired", Batch: 10, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMilliseconds(-100), NoWait: false)); queue.Enqueue(new PullRequest("valid", Batch: 10, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: false)); queue.RemoveExpired(DateTimeOffset.UtcNow); queue.Count.ShouldBe(1); var next = queue.TryDequeue(); next!.ReplyTo.ShouldBe("valid"); } [Fact] public void NoWait_request_returns_immediately_when_empty() { var queue = new WaitingRequestQueue(); queue.Enqueue(new PullRequest("nowait", Batch: 10, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: true)); var req = queue.TryDequeue(); req.ShouldNotBeNull(); req.NoWait.ShouldBeTrue(); } [Fact] public void MaxBytes_tracks_accumulation() { var queue = new WaitingRequestQueue(); var req = new PullRequest("mb", Batch: 100, MaxBytes: 1024, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: false); queue.Enqueue(req); var dequeued = queue.TryDequeue()!; dequeued.MaxBytes.ShouldBe(1024L); dequeued.RemainingBytes.ShouldBe(1024L); dequeued.ConsumeBytes(256); dequeued.RemainingBytes.ShouldBe(768L); dequeued.IsExhausted.ShouldBeFalse(); dequeued.ConsumeBytes(800); dequeued.IsExhausted.ShouldBeTrue(); } [Fact] public void Batch_decrements_on_delivery() { var queue = new WaitingRequestQueue(); var req = new PullRequest("batch", Batch: 3, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: false); queue.Enqueue(req); var dequeued = queue.TryDequeue()!; dequeued.RemainingBatch.ShouldBe(3); dequeued.ConsumeBatch(); dequeued.RemainingBatch.ShouldBe(2); dequeued.ConsumeBatch(); dequeued.ConsumeBatch(); dequeued.IsExhausted.ShouldBeTrue(); } } ``` **Step 2: Run test to verify it fails** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~WaitingRequestQueueTests" -v normal ``` Expected: FAIL **Step 3: Implement WaitingRequestQueue** Create `src/NATS.Server/JetStream/Consumers/WaitingRequestQueue.cs`: ```csharp namespace NATS.Server.JetStream.Consumers; public sealed record PullRequest( string ReplyTo, int Batch, long MaxBytes, DateTimeOffset Expires, bool NoWait, string? PinId = null) { public int RemainingBatch { get; private set; } = Batch; public long RemainingBytes { get; private set; } = MaxBytes; public bool IsExhausted => RemainingBatch <= 0 || (MaxBytes > 0 && RemainingBytes <= 0); public void ConsumeBatch() => RemainingBatch--; public void ConsumeBytes(long bytes) => RemainingBytes -= bytes; } public sealed class WaitingRequestQueue { private readonly LinkedList _queue = new(); public int Count => _queue.Count; public bool IsEmpty => _queue.Count == 0; public void Enqueue(PullRequest request) => _queue.AddLast(request); public PullRequest? TryDequeue() { if (_queue.Count == 0) return null; var first = _queue.First!.Value; _queue.RemoveFirst(); return first; } public void RemoveExpired(DateTimeOffset now) { var node = _queue.First; while (node != null) { var next = node.Next; if (node.Value.Expires <= now) _queue.Remove(node); node = next; } } } ``` **Step 4: Run test to verify it passes** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~WaitingRequestQueueTests" -v normal ``` Expected: PASS **Step 5: Commit** ```bash git add src/NATS.Server/JetStream/Consumers/WaitingRequestQueue.cs tests/NATS.Server.Tests/WaitingRequestQueueTests.cs git commit -m "feat(consumer): add WaitingRequestQueue with expiry and batch/maxBytes tracking" ``` --- ### Task 17: Consumer Pause/Resume Add pause/resume state management with advisory events. **Files:** - Modify: `src/NATS.Server/JetStream/ConsumerManager.cs` - Test: `tests/NATS.Server.Tests/ConsumerPauseResumeTests.cs` - Reference: `golang/nats-server/server/consumer.go` (pause/resume) **Step 1: Write the failing test** Create `tests/NATS.Server.Tests/ConsumerPauseResumeTests.cs`: ```csharp using NATS.Server.JetStream; namespace NATS.Server.Tests; public class ConsumerPauseResumeTests { [Fact] public async Task Pause_stops_delivery() { var mgr = ConsumerManagerTestHelper.Create(); var handle = mgr.CreateOrUpdate("test-stream", "test-consumer", new Models.ConsumerConfig { DeliverSubject = "deliver.test" }); var until = DateTime.UtcNow.AddSeconds(5); mgr.Pause("test-consumer", until); mgr.IsPaused("test-consumer").ShouldBeTrue(); mgr.GetPauseUntil("test-consumer").ShouldBe(until); } [Fact] public async Task Resume_restarts_delivery() { var mgr = ConsumerManagerTestHelper.Create(); mgr.CreateOrUpdate("test-stream", "test-consumer", new Models.ConsumerConfig { DeliverSubject = "deliver.test" }); mgr.Pause("test-consumer", DateTime.UtcNow.AddSeconds(5)); mgr.Resume("test-consumer"); mgr.IsPaused("test-consumer").ShouldBeFalse(); } [Fact] public async Task Pause_auto_resumes_after_deadline() { var mgr = ConsumerManagerTestHelper.Create(); mgr.CreateOrUpdate("test-stream", "test-consumer", new Models.ConsumerConfig { DeliverSubject = "deliver.test" }); mgr.Pause("test-consumer", DateTime.UtcNow.AddMilliseconds(100)); await Task.Delay(200); mgr.IsPaused("test-consumer").ShouldBeFalse(); } } ``` **Step 2: Run test to verify it fails** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ConsumerPauseResumeTests" -v normal ``` Expected: FAIL **Step 3: Implement pause/resume in ConsumerManager** In `ConsumerManager.cs`: - Add `_pauseUntil` field to `ConsumerHandle` record - Add `_pauseTimers` dictionary for auto-resume timers - `Pause(consumerName, until)` — set deadline, stop delivery loop, start auto-resume timer - `Resume(consumerName)` — clear deadline, restart delivery loop, cancel timer - `IsPaused(consumerName)` — check if deadline is set and in the future - `GetPauseUntil(consumerName)` — return deadline - Auto-resume: `Timer` callback that calls `Resume()` when deadline passes Create `ConsumerManagerTestHelper` static class for test setup. **Step 4: Run test to verify it passes** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ConsumerPauseResumeTests" -v normal ``` Expected: PASS **Step 5: Commit** ```bash git add tests/NATS.Server.Tests/ConsumerPauseResumeTests.cs src/NATS.Server/JetStream/ConsumerManager.cs git commit -m "feat(consumer): add pause/resume with auto-resume timer" ``` --- ### Task 18: Priority Group Pinning Add Pin ID generation, TTL timers, and Nats-Pin-Id header support. **Files:** - Modify: `src/NATS.Server/JetStream/Consumers/PriorityGroupManager.cs` - Test: `tests/NATS.Server.Tests/PriorityGroupPinningTests.cs` - Reference: `golang/nats-server/server/consumer.go` (setPinnedTimer, assignNewPinId) **Step 1: Write the failing test** Create `tests/NATS.Server.Tests/PriorityGroupPinningTests.cs`: ```csharp using NATS.Server.JetStream.Consumers; namespace NATS.Server.Tests; public class PriorityGroupPinningTests { [Fact] public void AssignPinId_generates_unique_ids() { var mgr = new PriorityGroupManager(); mgr.Register("group-1", "consumer-a", priority: 0); var pin1 = mgr.AssignPinId("group-1", "consumer-a"); var pin2 = mgr.AssignPinId("group-1", "consumer-a"); pin1.ShouldNotBeNullOrEmpty(); pin2.ShouldNotBeNullOrEmpty(); pin1.ShouldNotBe(pin2); // each assignment is unique } [Fact] public void ValidatePinId_accepts_current() { var mgr = new PriorityGroupManager(); mgr.Register("group-1", "consumer-a", priority: 0); var pin = mgr.AssignPinId("group-1", "consumer-a"); mgr.ValidatePinId("group-1", pin).ShouldBeTrue(); } [Fact] public void ValidatePinId_rejects_expired() { var mgr = new PriorityGroupManager(); mgr.Register("group-1", "consumer-a", priority: 0); var pin1 = mgr.AssignPinId("group-1", "consumer-a"); var pin2 = mgr.AssignPinId("group-1", "consumer-a"); // replaces pin1 mgr.ValidatePinId("group-1", pin1).ShouldBeFalse(); mgr.ValidatePinId("group-1", pin2).ShouldBeTrue(); } [Fact] public void UnassignPinId_clears() { var mgr = new PriorityGroupManager(); mgr.Register("group-1", "consumer-a", priority: 0); var pin = mgr.AssignPinId("group-1", "consumer-a"); mgr.UnassignPinId("group-1"); mgr.ValidatePinId("group-1", pin).ShouldBeFalse(); } } ``` **Step 2: Run test to verify it fails** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~PriorityGroupPinningTests" -v normal ``` Expected: FAIL **Step 3: Implement pin management** In `PriorityGroupManager.cs`: - Add `_pinIds` dictionary: `group → current pin ID` - `AssignPinId(group, consumer)` — generate NUID string, store, return - `ValidatePinId(group, pinId)` — compare against current - `UnassignPinId(group)` — clear pin ID - Pin TTL: configurable timeout, auto-unassign after expiry - NUID generation: use `Guid.NewGuid().ToString("N")[..22]` for simplicity **Step 4: Run test to verify it passes** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~PriorityGroupPinningTests" -v normal ``` Expected: PASS **Step 5: Commit** ```bash git add tests/NATS.Server.Tests/PriorityGroupPinningTests.cs src/NATS.Server/JetStream/Consumers/PriorityGroupManager.cs git commit -m "feat(consumer): add priority group pin ID management" ``` --- ### Task 19: Stream Purge with Filtering Implement `PurgeEx` with subject filter, sequence range, and keep-N semantics. **Files:** - Modify: `src/NATS.Server/JetStream/StreamManager.cs` - Test: `tests/NATS.Server.Tests/StreamPurgeFilterTests.cs` - Reference: `golang/nats-server/server/stream.go` (purgeEx) **Step 1: Write the failing test** Create `tests/NATS.Server.Tests/StreamPurgeFilterTests.cs`: ```csharp using NATS.Server.JetStream; namespace NATS.Server.Tests; public class StreamPurgeFilterTests { [Fact] public async Task PurgeEx_subject_filter_only_removes_matching() { var mgr = await StreamManagerTestHelper.CreateWithMessagesAsync( ("a.1", "m1"), ("b.1", "m2"), ("a.2", "m3"), ("b.2", "m4")); var purged = mgr.PurgeEx("a.*", seq: 0, keep: 0); purged.ShouldBe(2UL); var state = await mgr.GetStateAsync(default); state.Messages.ShouldBe(2UL); } [Fact] public async Task PurgeEx_seq_range_removes_below() { var mgr = await StreamManagerTestHelper.CreateWithMessagesAsync( ("x.1", "m1"), ("x.2", "m2"), ("x.3", "m3"), ("x.4", "m4")); var purged = mgr.PurgeEx("", seq: 3, keep: 0); purged.ShouldBe(2UL); // seq 1, 2 var state = await mgr.GetStateAsync(default); state.Messages.ShouldBe(2UL); state.FirstSeq.ShouldBe(3UL); } [Fact] public async Task PurgeEx_keep_retains_newest() { var mgr = await StreamManagerTestHelper.CreateWithMessagesAsync( ("k.1", "m1"), ("k.2", "m2"), ("k.3", "m3"), ("k.4", "m4"), ("k.5", "m5")); var purged = mgr.PurgeEx("", seq: 0, keep: 2); purged.ShouldBe(3UL); // keep last 2 var state = await mgr.GetStateAsync(default); state.Messages.ShouldBe(2UL); state.FirstSeq.ShouldBe(4UL); } [Fact] public async Task PurgeEx_subject_and_keep_combined() { var mgr = await StreamManagerTestHelper.CreateWithMessagesAsync( ("j.1", "m1"), ("j.2", "m2"), ("other", "m3"), ("j.3", "m4"), ("j.4", "m5")); // Purge j.* but keep 1 var purged = mgr.PurgeEx("j.*", seq: 0, keep: 1); purged.ShouldBe(3UL); // j.1, j.2, j.3 purged; j.4 kept var state = await mgr.GetStateAsync(default); state.Messages.ShouldBe(2UL); // "other" + j.4 } } ``` **Step 2: Run test to verify it fails** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~StreamPurgeFilterTests" -v normal ``` Expected: FAIL **Step 3: Implement PurgeEx in StreamManager** In `StreamManager.cs`, enhance `PurgeEx`: - If `subject` is set: iterate messages, match using `SubjectMatch.IsMatch()`, remove matching - If `seq > 0`: remove all messages with sequence < `seq` - If `keep > 0`: retain last N messages (on subject if specified, globally otherwise) - Combined: apply subject filter first, then keep-N among the filtered set - Return count of purged messages Create `StreamManagerTestHelper` for test setup. **Step 4: Run test to verify it passes** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~StreamPurgeFilterTests" -v normal ``` Expected: PASS **Step 5: Commit** ```bash git add tests/NATS.Server.Tests/StreamPurgeFilterTests.cs src/NATS.Server/JetStream/StreamManager.cs git commit -m "feat(stream): implement PurgeEx with subject filter, seq range, and keep-N" ``` --- ### Task 20: Interest Retention Policy Track per-consumer interest and remove messages when all consumers have acknowledged. **Files:** - Create: `src/NATS.Server/JetStream/InterestRetentionPolicy.cs` - Test: `tests/NATS.Server.Tests/InterestRetentionTests.cs` - Reference: `golang/nats-server/server/stream.go` (checkInterestState, noInterest) **Step 1: Write the failing test** Create `tests/NATS.Server.Tests/InterestRetentionTests.cs`: ```csharp using NATS.Server.JetStream; namespace NATS.Server.Tests; public class InterestRetentionTests { [Fact] public void ShouldRetain_true_when_consumers_have_not_acked() { var policy = new InterestRetentionPolicy(); policy.RegisterInterest("consumer-A", "orders.>"); policy.RegisterInterest("consumer-B", "orders.>"); policy.ShouldRetain(1, "orders.new").ShouldBeTrue(); } [Fact] public void ShouldRetain_false_when_all_consumers_acked() { var policy = new InterestRetentionPolicy(); policy.RegisterInterest("consumer-A", "orders.>"); policy.RegisterInterest("consumer-B", "orders.>"); policy.AcknowledgeDelivery("consumer-A", 1); policy.ShouldRetain(1, "orders.new").ShouldBeTrue(); // B hasn't acked policy.AcknowledgeDelivery("consumer-B", 1); policy.ShouldRetain(1, "orders.new").ShouldBeFalse(); // both acked } [Fact] public void ShouldRetain_ignores_consumers_without_interest() { var policy = new InterestRetentionPolicy(); policy.RegisterInterest("consumer-A", "orders.>"); policy.RegisterInterest("consumer-B", "billing.>"); // no interest in orders policy.AcknowledgeDelivery("consumer-A", 1); policy.ShouldRetain(1, "orders.new").ShouldBeFalse(); // B has no interest } [Fact] public void UnregisterInterest_removes_consumer() { var policy = new InterestRetentionPolicy(); policy.RegisterInterest("consumer-A", "x.>"); policy.RegisterInterest("consumer-B", "x.>"); policy.UnregisterInterest("consumer-B"); // Only A needs to ack policy.AcknowledgeDelivery("consumer-A", 1); policy.ShouldRetain(1, "x.y").ShouldBeFalse(); } } ``` **Step 2: Run test to verify it fails** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~InterestRetentionTests" -v normal ``` Expected: FAIL **Step 3: Implement InterestRetentionPolicy** Create `src/NATS.Server/JetStream/InterestRetentionPolicy.cs`: ```csharp namespace NATS.Server.JetStream; /// /// Tracks per-consumer interest and determines when messages can be removed /// under Interest retention policy. /// Go reference: stream.go checkInterestState/noInterest. /// public sealed class InterestRetentionPolicy { private readonly Dictionary _interests = new(); // consumer → filter subject private readonly Dictionary> _acks = new(); // seq → consumers that acked public void RegisterInterest(string consumer, string filterSubject) { ... } public void UnregisterInterest(string consumer) { ... } public void AcknowledgeDelivery(string consumer, ulong seq) { ... } public bool ShouldRetain(ulong seq, string msgSubject) { ... } } ``` **Step 4: Run test to verify it passes** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~InterestRetentionTests" -v normal ``` Expected: PASS **Step 5: Commit** ```bash git add src/NATS.Server/JetStream/InterestRetentionPolicy.cs tests/NATS.Server.Tests/InterestRetentionTests.cs git commit -m "feat(stream): add InterestRetentionPolicy for per-consumer ack tracking" ``` --- ### Task 21: Mirror/Source Retry Enhancement Add exponential backoff retry, gap detection, and error state tracking to MirrorCoordinator and SourceCoordinator. **Files:** - Modify: `src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs` - Modify: `src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs` - Test: `tests/NATS.Server.Tests/MirrorSourceRetryTests.cs` - Reference: `golang/nats-server/server/stream.go` (setupMirrorConsumer, retrySourceConsumerAtSeq) **Step 1: Write the failing test** Create `tests/NATS.Server.Tests/MirrorSourceRetryTests.cs`: ```csharp using NATS.Server.JetStream.MirrorSource; namespace NATS.Server.Tests; public class MirrorSourceRetryTests { [Fact] public void Mirror_retry_uses_exponential_backoff() { var mirror = MirrorCoordinatorTestHelper.Create(); mirror.RecordFailure(); var delay1 = mirror.GetRetryDelay(); delay1.ShouldBeGreaterThanOrEqualTo(TimeSpan.FromMilliseconds(250)); // initial mirror.RecordFailure(); var delay2 = mirror.GetRetryDelay(); delay2.ShouldBeGreaterThan(delay1); // exponential growth // Cap at max for (int i = 0; i < 20; i++) mirror.RecordFailure(); var delayMax = mirror.GetRetryDelay(); delayMax.ShouldBeLessThanOrEqualTo(TimeSpan.FromSeconds(30)); } [Fact] public void Mirror_success_resets_backoff() { var mirror = MirrorCoordinatorTestHelper.Create(); for (int i = 0; i < 5; i++) mirror.RecordFailure(); mirror.RecordSuccess(); var delay = mirror.GetRetryDelay(); delay.ShouldBe(TimeSpan.FromMilliseconds(250)); // reset to initial } [Fact] public void Mirror_tracks_sequence_gap() { var mirror = MirrorCoordinatorTestHelper.Create(); mirror.RecordSourceSeq(1); mirror.RecordSourceSeq(2); mirror.RecordSourceSeq(5); // gap: 3, 4 missing mirror.HasGap.ShouldBeTrue(); mirror.GapStart.ShouldBe(3UL); mirror.GapEnd.ShouldBe(4UL); } [Fact] public void Mirror_tracks_error_state() { var mirror = MirrorCoordinatorTestHelper.Create(); mirror.SetError("connection refused"); mirror.HasError.ShouldBeTrue(); mirror.ErrorMessage.ShouldBe("connection refused"); mirror.ClearError(); mirror.HasError.ShouldBeFalse(); } [Fact] public void Source_dedup_window_prunes_expired() { var source = SourceCoordinatorTestHelper.Create(); source.RecordMsgId("msg-1"); source.RecordMsgId("msg-2"); source.IsDuplicate("msg-1").ShouldBeTrue(); source.IsDuplicate("msg-3").ShouldBeFalse(); // Simulate time passing beyond dedup window source.PruneDedupWindow(DateTimeOffset.UtcNow.AddMinutes(5)); source.IsDuplicate("msg-1").ShouldBeFalse(); } } ``` **Step 2: Run test to verify it fails** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MirrorSourceRetryTests" -v normal ``` Expected: FAIL **Step 3: Implement retry and gap detection** In `MirrorCoordinator.cs`: - Add `RecordFailure()` — increment `_consecutiveFailures` - Add `RecordSuccess()` — reset `_consecutiveFailures` to 0 - Add `GetRetryDelay()` — `min(InitialRetryDelay * 2^failures, MaxRetryDelay)` with jitter - Add `RecordSourceSeq(seq)` — track expected vs actual, detect gaps - Add `HasGap`, `GapStart`, `GapEnd` properties - Add `SetError(msg)`, `ClearError()`, `HasError`, `ErrorMessage` In `SourceCoordinator.cs`: - Add `PruneDedupWindow(DateTimeOffset cutoff)` — remove entries older than cutoff - Ensure `IsDuplicate`, `RecordMsgId` work with time-based window Create test helpers: `MirrorCoordinatorTestHelper`, `SourceCoordinatorTestHelper`. **Step 4: Run test to verify it passes** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MirrorSourceRetryTests" -v normal ``` Expected: PASS **Step 5: Commit** ```bash git add tests/NATS.Server.Tests/MirrorSourceRetryTests.cs src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs git commit -m "feat(mirror): add exponential backoff retry, gap detection, and error tracking" ``` --- **Phase 3 Exit Gate Verification:** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RedeliveryTracker" -v normal dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~AckProcessor" -v normal dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~WaitingRequestQueue" -v normal dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ConsumerPauseResume" -v normal dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~PriorityGroupPinning" -v normal dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~StreamPurgeFilter" -v normal dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~InterestRetention" -v normal dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MirrorSourceRetry" -v normal ``` All Phase 3 tests must pass before proceeding to Phase 4. --- ## Phase 4: Client Performance, MQTT, Config, Networking **Dependencies:** Phase 1 (storage for MQTT), Phase 3 (consumer for MQTT) **Exit gate:** Client flush coalescing measurably reduces syscalls under load, MQTT sessions survive server restart, SIGHUP triggers config reload, implicit routes auto-discover. --- ### Task 22: Client Flush Coalescing Reduce syscalls by coalescing multiple flush signals into a single write. **Files:** - Modify: `src/NATS.Server/NatsClient.cs` - Test: `tests/NATS.Server.Tests/FlushCoalescingTests.cs` - Reference: `golang/nats-server/server/client.go` (fsp, maxFlushPending, pcd) **Step 1: Write the failing test** Create `tests/NATS.Server.Tests/FlushCoalescingTests.cs`: ```csharp namespace NATS.Server.Tests; public class FlushCoalescingTests { [Fact] public async Task Flush_coalescing_reduces_flush_count() { // Start server with flush coalescing enabled var port = TestHelpers.GetFreePort(); await using var server = new NatsServer(new NatsOptions { Port = port }); await server.StartAsync(); using var socket = new System.Net.Sockets.Socket( System.Net.Sockets.AddressFamily.InterNetwork, System.Net.Sockets.SocketType.Stream, System.Net.Sockets.ProtocolType.Tcp); await socket.ConnectAsync(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, port)); var stream = new System.Net.Sockets.NetworkStream(socket); // Read INFO + send CONNECT var buf = new byte[4096]; await stream.ReadAsync(buf); await stream.WriteAsync("CONNECT {}\r\n"u8.ToArray()); await stream.WriteAsync("SUB test 1\r\n"u8.ToArray()); // Rapidly publish multiple messages — coalescing should batch flushes for (int i = 0; i < 100; i++) await stream.WriteAsync($"PUB test 5\r\nhello\r\n"u8.ToArray()); await Task.Delay(100); // Verify messages received (coalescing should not drop any) var received = 0; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); try { while (!cts.IsCancellationRequested) { var n = await stream.ReadAsync(buf, cts.Token); if (n == 0) break; var text = System.Text.Encoding.ASCII.GetString(buf, 0, n); received += text.Split("MSG ").Length - 1; if (received >= 100) break; } } catch (OperationCanceledException) { } received.ShouldBe(100); } [Fact] public void MaxFlushPending_defaults_to_10() { // Verify the constant exists NatsClient.MaxFlushPending.ShouldBe(10); } } ``` **Step 2: Run test to verify it fails** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FlushCoalescingTests" -v normal ``` Expected: FAIL — `MaxFlushPending` constant doesn't exist **Step 3: Implement flush coalescing** In `NatsClient.cs`: - Add `public const int MaxFlushPending = 10;` - Add field `int _flushSignalsPending` - Add field `SemaphoreSlim _flushSignal = new(0)` for write loop coordination - Modify `QueueOutbound()`: after channel write, `Interlocked.Increment(ref _flushSignalsPending)`, release semaphore - Modify `RunWriteLoopAsync()`: after draining channel, if `_flushSignalsPending < MaxFlushPending`, wait briefly on semaphore to coalesce more signals before flushing **Step 4: Run test to verify it passes** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FlushCoalescingTests" -v normal ``` Expected: PASS **Step 5: Commit** ```bash git add tests/NATS.Server.Tests/FlushCoalescingTests.cs src/NATS.Server/NatsClient.cs git commit -m "feat(client): add flush coalescing to reduce write syscalls" ``` --- ### Task 23: Client Stall Gate Block producers when client outbound buffer is near capacity. **Files:** - Modify: `src/NATS.Server/NatsClient.cs` - Test: `tests/NATS.Server.Tests/StallGateTests.cs` - Reference: `golang/nats-server/server/client.go` (stc channel, stall gate) **Step 1: Write the failing test** Create `tests/NATS.Server.Tests/StallGateTests.cs`: ```csharp namespace NATS.Server.Tests; public class StallGateTests { [Fact] public void Stall_gate_activates_at_threshold() { // Unit test against the stall gate logic directly var gate = new NatsClient.StallGate(maxPending: 1000); gate.IsStalled.ShouldBeFalse(); gate.UpdatePending(750); // 75% = threshold gate.IsStalled.ShouldBeTrue(); gate.UpdatePending(500); // below threshold gate.IsStalled.ShouldBeFalse(); } [Fact] public async Task Stall_gate_blocks_producer() { var gate = new NatsClient.StallGate(maxPending: 100); gate.UpdatePending(80); // stalled var blocked = true; var task = Task.Run(async () => { await gate.WaitAsync(TimeSpan.FromSeconds(1)); blocked = false; }); await Task.Delay(50); blocked.ShouldBeTrue(); // still blocked gate.UpdatePending(50); // release gate.Release(); await task; blocked.ShouldBeFalse(); } [Fact] public async Task Stall_gate_timeout_closes_client() { var gate = new NatsClient.StallGate(maxPending: 100); gate.UpdatePending(80); var timedOut = await gate.WaitAsync(TimeSpan.FromMilliseconds(50)); timedOut.ShouldBeFalse(); // timed out, not released } } ``` **Step 2: Run test to verify it fails** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~StallGateTests" -v normal ``` Expected: FAIL **Step 3: Implement StallGate** In `NatsClient.cs`, add nested class: ```csharp public sealed class StallGate { private readonly long _threshold; private SemaphoreSlim? _semaphore; public StallGate(long maxPending) => _threshold = maxPending * 3 / 4; public bool IsStalled => _semaphore != null; public void UpdatePending(long pending) { if (pending >= _threshold && _semaphore == null) _semaphore = new SemaphoreSlim(0, 1); else if (pending < _threshold && _semaphore != null) Release(); } public async Task WaitAsync(TimeSpan timeout) { if (_semaphore == null) return true; return await _semaphore.WaitAsync(timeout); } public void Release() { _semaphore?.Release(); _semaphore = null; } } ``` Wire into `QueueOutbound()` and `RunWriteLoopAsync()`. **Step 4: Run test to verify it passes** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~StallGateTests" -v normal ``` Expected: PASS **Step 5: Commit** ```bash git add tests/NATS.Server.Tests/StallGateTests.cs src/NATS.Server/NatsClient.cs git commit -m "feat(client): add stall gate backpressure for slow consumers" ``` --- ### Task 24: Write Timeout Recovery Handle partial flush recovery with per-client-kind policies. **Files:** - Modify: `src/NATS.Server/NatsClient.cs` - Test: `tests/NATS.Server.Tests/WriteTimeoutTests.cs` - Reference: `golang/nats-server/server/client.go` (write timeout handling) **Step 1: Write the failing test** Create `tests/NATS.Server.Tests/WriteTimeoutTests.cs`: ```csharp namespace NATS.Server.Tests; public class WriteTimeoutTests { [Fact] public void WriteTimeoutPolicy_defaults_by_kind() { NatsClient.GetWriteTimeoutPolicy(ClientKind.Client).ShouldBe(WriteTimeoutPolicy.Close); NatsClient.GetWriteTimeoutPolicy(ClientKind.Router).ShouldBe(WriteTimeoutPolicy.TcpFlush); NatsClient.GetWriteTimeoutPolicy(ClientKind.Gateway).ShouldBe(WriteTimeoutPolicy.TcpFlush); NatsClient.GetWriteTimeoutPolicy(ClientKind.Leaf).ShouldBe(WriteTimeoutPolicy.TcpFlush); } [Fact] public void PartialFlushResult_tracks_bytes() { var result = new NatsClient.FlushResult(bytesAttempted: 1024, bytesWritten: 512); result.IsPartial.ShouldBeTrue(); result.BytesRemaining.ShouldBe(512L); } [Fact] public void PartialFlushResult_complete_is_not_partial() { var result = new NatsClient.FlushResult(bytesAttempted: 1024, bytesWritten: 1024); result.IsPartial.ShouldBeFalse(); result.BytesRemaining.ShouldBe(0L); } } ``` **Step 2: Run test to verify it fails** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~WriteTimeoutTests" -v normal ``` Expected: FAIL **Step 3: Implement write timeout recovery** In `NatsClient.cs`: - Add `enum WriteTimeoutPolicy { Close, TcpFlush }` - Add `static WriteTimeoutPolicy GetWriteTimeoutPolicy(ClientKind kind)` — CLIENT→Close, others→TcpFlush - Add `record FlushResult(long BytesAttempted, long BytesWritten)` with `IsPartial` and `BytesRemaining` - Modify `RunWriteLoopAsync()` write timeout handling: - On timeout with `bytesWritten > 0` (partial): if policy is `TcpFlush`, mark slow consumer, continue - On timeout with `bytesWritten == 0`: close connection - CLIENT kind always closes on timeout **Step 4: Run test to verify it passes** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~WriteTimeoutTests" -v normal ``` Expected: PASS **Step 5: Commit** ```bash git add tests/NATS.Server.Tests/WriteTimeoutTests.cs src/NATS.Server/NatsClient.cs git commit -m "feat(client): add write timeout recovery with per-kind policies" ``` --- ### Task 25: MQTT JetStream Persistence Back MQTT session and retained stores with JetStream streams. **Files:** - Modify: `src/NATS.Server/Mqtt/MqttSessionStore.cs` - Modify: `src/NATS.Server/Mqtt/MqttRetainedStore.cs` - Test: `tests/NATS.Server.Tests/MqttPersistenceTests.cs` - Reference: `golang/nats-server/server/mqtt.go` ($MQTT_msgs, $MQTT_sess, $MQTT_rmsgs) **Step 1: Write the failing test** Create `tests/NATS.Server.Tests/MqttPersistenceTests.cs`: ```csharp using NATS.Server.Mqtt; namespace NATS.Server.Tests; public class MqttPersistenceTests { [Fact] public async Task Session_persists_across_restart() { var store = MqttSessionStoreTestHelper.CreateWithJetStream(); await store.ConnectAsync("client-1", cleanSession: false); await store.AddSubscriptionAsync("client-1", "topic/test", qos: 1); await store.SaveSessionAsync("client-1"); // Simulate restart var recovered = MqttSessionStoreTestHelper.CreateWithJetStream(store.BackingStore); await recovered.ConnectAsync("client-1", cleanSession: false); var subs = recovered.GetSubscriptions("client-1"); subs.ShouldContainKey("topic/test"); } [Fact] public async Task Clean_session_deletes_existing() { var store = MqttSessionStoreTestHelper.CreateWithJetStream(); await store.ConnectAsync("client-2", cleanSession: false); await store.AddSubscriptionAsync("client-2", "persist/me", qos: 1); await store.SaveSessionAsync("client-2"); // Reconnect with clean session await store.ConnectAsync("client-2", cleanSession: true); var subs = store.GetSubscriptions("client-2"); subs.ShouldBeEmpty(); } [Fact] public async Task Retained_message_survives_restart() { var retained = MqttRetainedStoreTestHelper.CreateWithJetStream(); await retained.SetRetainedAsync("sensors/temp", "72.5"u8.ToArray()); // Simulate restart var recovered = MqttRetainedStoreTestHelper.CreateWithJetStream(retained.BackingStore); var msg = await recovered.GetRetainedAsync("sensors/temp"); msg.ShouldNotBeNull(); System.Text.Encoding.UTF8.GetString(msg).ShouldBe("72.5"); } [Fact] public async Task Retained_message_cleared_with_empty_payload() { var retained = MqttRetainedStoreTestHelper.CreateWithJetStream(); await retained.SetRetainedAsync("sensors/temp", "72.5"u8.ToArray()); await retained.SetRetainedAsync("sensors/temp", ReadOnlyMemory.Empty); // clear var msg = await retained.GetRetainedAsync("sensors/temp"); msg.ShouldBeNull(); } } ``` **Step 2: Run test to verify it fails** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MqttPersistenceTests" -v normal ``` Expected: FAIL **Step 3: Implement JetStream backing for MQTT stores** In `MqttSessionStore.cs`: - Add `IStreamStore? _backingStore` field for JetStream persistence - `ConnectAsync(clientId, cleanSession)`: - If `cleanSession=false`: query backing store for existing session state - If `cleanSession=true`: delete existing session data from backing store - `SaveSessionAsync(clientId)` — serialize session state, store in backing store under `$MQTT.sess.{clientId}` - `BackingStore` property for test access In `MqttRetainedStore.cs`: - Add `IStreamStore? _backingStore` field - `SetRetainedAsync(topic, payload)` — store in backing store under `$MQTT.rmsgs.{topic}` - `GetRetainedAsync(topic)` — load from backing store - If payload is empty, remove the retained message Create test helpers with in-memory backing stores. **Step 4: Run test to verify it passes** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MqttPersistenceTests" -v normal ``` Expected: PASS **Step 5: Commit** ```bash git add tests/NATS.Server.Tests/MqttPersistenceTests.cs src/NATS.Server/Mqtt/MqttSessionStore.cs src/NATS.Server/Mqtt/MqttRetainedStore.cs git commit -m "feat(mqtt): add JetStream-backed session and retained message persistence" ``` --- ### Task 26: SIGHUP Config Reload Add Unix signal handler for config hot-reload. **Files:** - Create: `src/NATS.Server/Configuration/SignalHandler.cs` - Modify: `src/NATS.Server/Configuration/ConfigReloader.cs` - Modify: `src/NATS.Server.Host/Program.cs` - Test: `tests/NATS.Server.Tests/SignalHandlerTests.cs` - Reference: `golang/nats-server/server/opts.go`, `golang/nats-server/server/reload.go` **Step 1: Write the failing test** Create `tests/NATS.Server.Tests/SignalHandlerTests.cs`: ```csharp using NATS.Server.Configuration; namespace NATS.Server.Tests; public class SignalHandlerTests { [Fact] public void SignalHandler_registers_without_throwing() { var reloader = new ConfigReloader(); // Registration should not throw even without a real server Should.NotThrow(() => SignalHandler.Register(reloader, null!)); } [Fact] public async Task ConfigReloader_ReloadAsync_applies_reloadable_changes() { var reloader = new ConfigReloader(); var original = new NatsOptions { Port = 4222 }; var updated = new NatsOptions { Port = 4222 }; // port can't change var result = await reloader.ReloadFromOptionsAsync(original, updated); result.Success.ShouldBeTrue(); result.RejectedChanges.ShouldBeEmpty(); } [Fact] public async Task ConfigReloader_rejects_non_reloadable_changes() { var reloader = new ConfigReloader(); var original = new NatsOptions { Port = 4222 }; var updated = new NatsOptions { Port = 5555 }; // port change is NOT reloadable var result = await reloader.ReloadFromOptionsAsync(original, updated); result.RejectedChanges.ShouldContain(c => c.Contains("Port")); } } ``` **Step 2: Run test to verify it fails** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~SignalHandlerTests" -v normal ``` Expected: FAIL — `SignalHandler` doesn't exist **Step 3: Implement SignalHandler and ReloadFromOptionsAsync** Create `src/NATS.Server/Configuration/SignalHandler.cs`: ```csharp using System.Runtime.InteropServices; namespace NATS.Server.Configuration; /// /// Registers POSIX signal handlers for config reload. /// Go reference: server/signal_unix.go, opts.go reload logic. /// public static class SignalHandler { public static void Register(ConfigReloader reloader, NatsServer server) { PosixSignalRegistration.Create(PosixSignal.SIGHUP, ctx => { _ = reloader.ReloadAsync(server); }); } } ``` In `ConfigReloader.cs`, add: - `ReloadFromOptionsAsync(original, updated)` — compare options, apply reloadable, reject non-reloadable - Reloadable: auth, TLS, logging, limits - Non-reloadable: port, host, cluster port, store dir - Return `ReloadResult { bool Success, List RejectedChanges }` **Step 4: Run test to verify it passes** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~SignalHandlerTests" -v normal ``` Expected: PASS **Step 5: Commit** ```bash git add src/NATS.Server/Configuration/SignalHandler.cs tests/NATS.Server.Tests/SignalHandlerTests.cs src/NATS.Server/Configuration/ConfigReloader.cs git commit -m "feat(config): add SIGHUP signal handler and config reload validation" ``` --- ### Task 27: Implicit Route/Gateway Discovery Auto-discover cluster peers from INFO gossip. **Files:** - Modify: `src/NATS.Server/Routes/RouteManager.cs` - Modify: `src/NATS.Server/Gateways/GatewayManager.cs` - Test: `tests/NATS.Server.Tests/ImplicitDiscoveryTests.cs` - Reference: `golang/nats-server/server/route.go` (processImplicitRoute), `gateway.go` (processImplicitGateway) **Step 1: Write the failing test** Create `tests/NATS.Server.Tests/ImplicitDiscoveryTests.cs`: ```csharp namespace NATS.Server.Tests; public class ImplicitDiscoveryTests { [Fact] public void ProcessImplicitRoute_discovers_new_peer() { var mgr = RouteManagerTestHelper.Create(); var serverInfo = new ServerInfo { ServerId = "server-2", ConnectUrls = ["nats://10.0.0.2:6222", "nats://10.0.0.3:6222"], }; mgr.ProcessImplicitRoute(serverInfo); mgr.DiscoveredRoutes.ShouldContain("nats://10.0.0.2:6222"); mgr.DiscoveredRoutes.ShouldContain("nats://10.0.0.3:6222"); } [Fact] public void ProcessImplicitRoute_skips_known_peers() { var mgr = RouteManagerTestHelper.Create(knownRoutes: ["nats://10.0.0.2:6222"]); var serverInfo = new ServerInfo { ServerId = "server-2", ConnectUrls = ["nats://10.0.0.2:6222", "nats://10.0.0.3:6222"], }; mgr.ProcessImplicitRoute(serverInfo); mgr.DiscoveredRoutes.Count.ShouldBe(1); // only 10.0.0.3 is new mgr.DiscoveredRoutes.ShouldContain("nats://10.0.0.3:6222"); } [Fact] public void ProcessImplicitGateway_discovers_new_gateway() { var mgr = GatewayManagerTestHelper.Create(); var gwInfo = new GatewayInfo { Name = "cluster-B", Urls = ["nats://10.0.1.1:7222"], }; mgr.ProcessImplicitGateway(gwInfo); mgr.DiscoveredGateways.ShouldContain("cluster-B"); } [Fact] public void ForwardNewRouteInfo_updates_known_servers() { var mgr = RouteManagerTestHelper.Create(); var forwarded = new List(); mgr.OnForwardInfo += (urls) => forwarded.AddRange(urls); mgr.ForwardNewRouteInfoToKnownServers("nats://10.0.0.5:6222"); forwarded.ShouldContain("nats://10.0.0.5:6222"); } } ``` **Step 2: Run test to verify it fails** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ImplicitDiscoveryTests" -v normal ``` Expected: FAIL **Step 3: Implement implicit discovery** In `RouteManager.cs`: - Add `HashSet DiscoveredRoutes` — tracks auto-discovered route URLs - `ProcessImplicitRoute(serverInfo)`: - Extract `connect_urls` from INFO - For each unknown URL: add to `DiscoveredRoutes`, initiate solicited connection - `ForwardNewRouteInfoToKnownServers(newPeerUrl)`: - Send updated INFO containing new peer to all existing route connections - Add `event Action>? OnForwardInfo` for testing In `GatewayManager.cs`: - Add `HashSet DiscoveredGateways` — tracks auto-discovered gateway names - `ProcessImplicitGateway(gwInfo)`: - Add gateway name + URLs to discovered set - Create outbound gateway connection Create test helpers: `RouteManagerTestHelper`, `GatewayManagerTestHelper`. **Step 4: Run test to verify it passes** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ImplicitDiscoveryTests" -v normal ``` Expected: PASS **Step 5: Commit** ```bash git add tests/NATS.Server.Tests/ImplicitDiscoveryTests.cs src/NATS.Server/Routes/RouteManager.cs src/NATS.Server/Gateways/GatewayManager.cs git commit -m "feat(cluster): add implicit route and gateway discovery from INFO gossip" ``` --- **Phase 4 Exit Gate Verification:** ```bash dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FlushCoalescing" -v normal dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~StallGate" -v normal dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~WriteTimeout" -v normal dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MqttPersistence" -v normal dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~SignalHandler" -v normal dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ImplicitDiscovery" -v normal ``` All Phase 4 tests must pass. --- ## Final Verification After all 4 phases are complete, run the full test suite: ```bash dotnet test -v normal ``` All existing tests must continue to pass. No regressions. Update parity DB with all newly mapped tests: ```bash sqlite3 docs/test_parity.db "SELECT status, COUNT(*) FROM go_tests GROUP BY status;" ``` --- ## Task Summary | Task | Phase | Component | Gap | |------|-------|-----------|-----| | 1 | 1 | MsgBlock encryption | Gap 1.2 | | 2 | 1 | MsgBlock compression | Gap 1.3 | | 3 | 1 | Block rotation & lifecycle | Gap 1.1 | | 4 | 1 | Crash recovery enhancement | Gap 1.4 | | 5 | 1 | IStreamStore core methods | Gap 1.9 | | 6 | 1 | IStreamStore query methods | Gap 1.9 | | 7 | 1 | RAFT binary WAL | Gap 8.1 | | 8 | 1 | RAFT joint consensus | Gap 8.2 | | 9 | 2 | Meta snapshot codec | Gap 2.6 | | 10 | 2 | Cluster monitoring loop | Gap 2.1 | | 11 | 2 | Assignment processing | Gap 2.2 | | 12 | 2 | Inflight tracking | Gap 2.3 | | 13 | 2 | Leadership transitions | Gap 2.5 | | 14 | 3 | RedeliveryTracker rewrite | Gap 3.4 | | 15 | 3 | Ack/NAK processing | Gap 3.3 | | 16 | 3 | Pull request pipeline | Gap 3.2 | | 17 | 3 | Consumer pause/resume | Gap 3.7 | | 18 | 3 | Priority group pinning | Gap 3.6 | | 19 | 3 | Stream purge filtering | Gap 4.5 | | 20 | 3 | Interest retention | Gap 4.6 | | 21 | 3 | Mirror/source retry | Gap 4.1-4.3 | | 22 | 4 | Flush coalescing | Gap 5.1 | | 23 | 4 | Stall gate | Gap 5.2 | | 24 | 4 | Write timeout recovery | Gap 5.3 | | 25 | 4 | MQTT persistence | Gap 6.1 | | 26 | 4 | SIGHUP config reload | Gap 14.1 | | 27 | 4 | Implicit discovery | Gap 11.1, 13.1 |