Files
natsdotnet/docs/plans/2026-02-25-production-gaps-plan.md
Joseph Doherty 6f354baae9 docs: add implementation plan files for gap closure phases
Includes production gaps plan (15 gaps, 4 phases) and remaining gaps
plan task persistence file (93 gaps, 8 phases) — both fully executed.
2026-02-25 13:27:45 -05:00

118 KiB
Raw Permalink Blame History

Production Gap Closure Implementation Plan

For Claude: REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans to implement this plan task-by-task.

Goal: Port 15 CRITICAL/HIGH gaps from Go NATS server to .NET, covering FileStore, RAFT, JetStream Cluster, Consumer/Stream engines, Client protocol, MQTT persistence, config reload, and networking discovery.

Architecture: Bottom-up dependency approach — Phase 1 builds storage and consensus foundations, Phase 2 adds cluster coordination on top, Phase 3 adds consumer/stream engines, Phase 4 adds client performance, MQTT, config, and networking. Each phase has an exit gate with measurable criteria.

Tech Stack: .NET 10 / C# 14, xUnit 3, Shouldly, NSubstitute, System.IO.Pipelines, IronSnappy (S2), ChaCha20-Poly1305/AES-GCM, SQLite (test parity DB)

Codex review: Verified 2026-02-25 — dependency gaps fixed, path errors corrected, feedback incorporated.

Test strategy: Only run targeted unit tests during implementation (dotnet test --filter). Run full suite only after all 4 phases complete. Update docs/test_parity.db when porting Go tests.

Codex feedback incorporated:

  1. Added missing dependencies: Task 3←{1,2}, Task 4←{1,2,3}, Task 8←7, Task 13←{10,12}
  2. Fixed paths: RouteManager.cs is in Routes/, GatewayManager.cs is in Gateways/ (not Clustering/)
  3. Phase 4A-C (client perf), 4E (SIGHUP), 4F (discovery) are independent lanes — can start parallel with Phase 3
  4. Phase 4D (MQTT) correctly gated on storage + consumer dependencies
  5. WAL format should include per-record checksum and record type — implement during Task 7
  6. Encryption: add explicit nonce/AAD/key-version rules during Task 1 implementation

Parity DB update pattern:

sqlite3 docs/test_parity.db "UPDATE go_tests SET status='mapped', dotnet_test='DotNetTestName', dotnet_file='TestFile.cs' WHERE go_test='GoTestName';"

Phase 1: FileStore + RAFT Foundation

Dependencies: None (pure infrastructure) Exit gate: All IStreamStore methods implemented, FileStore round-trips encrypted+compressed blocks, RAFT persists and recovers state across restarts, joint consensus passes membership change tests.


Task 1: MsgBlock Encryption Integration

Wire AeadEncryptor into MsgBlock so blocks are encrypted at rest.

Files:

  • Modify: src/NATS.Server/JetStream/Storage/MsgBlock.cs
  • Modify: src/NATS.Server/JetStream/Storage/FileStore.cs
  • Modify: src/NATS.Server/JetStream/Storage/FileStoreOptions.cs
  • Test: tests/NATS.Server.Tests/FileStoreEncryptionTests.cs
  • Reference: golang/nats-server/server/filestore.go:816-907 (genEncryptionKeys, recoverAEK, setupAEK)

Step 1: Write the failing test

Create tests/NATS.Server.Tests/FileStoreEncryptionTests.cs:

using NATS.Server.JetStream.Storage;

namespace NATS.Server.Tests;

public class FileStoreEncryptionTests
{
    [Fact]
    public async Task Encrypted_block_round_trips_message()
    {
        // Go: TestFileStoreEncryption server/filestore_test.go
        var dir = Directory.CreateTempSubdirectory();
        var key = new byte[32];
        RandomNumberGenerator.Fill(key);

        await using (var store = new FileStore(new FileStoreOptions
        {
            Directory = dir.FullName,
            Cipher = StoreCipher.ChaCha,
            EncryptionKey = key,
        }))
        {
            await store.AppendAsync("test.subj", "hello encrypted"u8.ToArray(), default);
        }

        // Raw block file should NOT contain plaintext
        var blkFiles = Directory.GetFiles(dir.FullName, "*.blk");
        blkFiles.ShouldNotBeEmpty();
        var raw = File.ReadAllBytes(blkFiles[0]);
        System.Text.Encoding.UTF8.GetString(raw).ShouldNotContain("hello encrypted");

        // Recover with same key should return plaintext
        await using var recovered = new FileStore(new FileStoreOptions
        {
            Directory = dir.FullName,
            Cipher = StoreCipher.ChaCha,
            EncryptionKey = key,
        });
        var msg = await recovered.LoadAsync(1, default);
        msg.ShouldNotBeNull();
        System.Text.Encoding.UTF8.GetString(msg.Payload).ShouldBe("hello encrypted");
    }

    [Fact]
    public async Task Encrypted_block_with_aes_round_trips()
    {
        var dir = Directory.CreateTempSubdirectory();
        var key = new byte[32];
        RandomNumberGenerator.Fill(key);

        await using (var store = new FileStore(new FileStoreOptions
        {
            Directory = dir.FullName,
            Cipher = StoreCipher.AES,
            EncryptionKey = key,
        }))
        {
            await store.AppendAsync("aes.subj", "aes payload"u8.ToArray(), default);
        }

        await using var recovered = new FileStore(new FileStoreOptions
        {
            Directory = dir.FullName,
            Cipher = StoreCipher.AES,
            EncryptionKey = key,
        });
        var msg = await recovered.LoadAsync(1, default);
        msg.ShouldNotBeNull();
        System.Text.Encoding.UTF8.GetString(msg.Payload).ShouldBe("aes payload");
    }

    [Fact]
    public async Task Wrong_key_fails_to_decrypt()
    {
        var dir = Directory.CreateTempSubdirectory();
        var key1 = new byte[32];
        var key2 = new byte[32];
        RandomNumberGenerator.Fill(key1);
        RandomNumberGenerator.Fill(key2);

        await using (var store = new FileStore(new FileStoreOptions
        {
            Directory = dir.FullName,
            Cipher = StoreCipher.ChaCha,
            EncryptionKey = key1,
        }))
        {
            await store.AppendAsync("secret", "data"u8.ToArray(), default);
        }

        // Recovery with wrong key should throw or return no messages
        var act = () => new FileStore(new FileStoreOptions
        {
            Directory = dir.FullName,
            Cipher = StoreCipher.ChaCha,
            EncryptionKey = key2,
        });
        Should.Throw<Exception>(act);
    }
}

Step 2: Run test to verify it fails

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreEncryptionTests" -v normal

Expected: FAIL — encryption not wired into block I/O

Step 3: Implement encryption in MsgBlock

In MsgBlock.cs, add:

  • New fields: byte[]? _aek (per-block AEAD key), StoreCipher _cipher
  • New constructor parameter for encryption key and cipher
  • Modify Write/WriteAt to encrypt payload via AeadEncryptor.Encrypt() before disk write
  • Modify Read/ReadRecord to decrypt payload via AeadEncryptor.Decrypt() after disk read
  • Add static MsgBlock CreateEncrypted(string path, int blockId, long maxBytes, ulong firstSeq, byte[] aek, StoreCipher cipher)
  • Add static MsgBlock RecoverEncrypted(string path, int blockId, long maxBytes, byte[] aek, StoreCipher cipher)

In FileStore.cs, modify:

  • EnsureActiveBlock() — pass encryption key to MsgBlock.CreateEncrypted() when _useAead
  • RecoverBlocks() — pass encryption key to MsgBlock.RecoverEncrypted() when _useAead
  • Add GenPerBlockKey(int blockId) — derives per-block key from stream key using HKDF
  • Add SaveBlockKey(int blockId, byte[] aek) — persists per-block key to .key metadata file
  • Add LoadBlockKey(int blockId) — loads per-block key from .key metadata file

Step 4: Run test to verify it passes

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreEncryptionTests" -v normal

Expected: PASS

Step 5: Update parity DB

sqlite3 docs/test_parity.db "UPDATE go_tests SET status='mapped', dotnet_test='Encrypted_block_round_trips_message', dotnet_file='FileStoreEncryptionTests.cs' WHERE go_test='TestFileStoreEncryption';"

Step 6: Commit

git add tests/NATS.Server.Tests/FileStoreEncryptionTests.cs src/NATS.Server/JetStream/Storage/MsgBlock.cs src/NATS.Server/JetStream/Storage/FileStore.cs
git commit -m "feat(filestore): wire AeadEncryptor into MsgBlock for at-rest encryption"

Task 2: MsgBlock Compression Integration

Wire S2Codec into MsgBlock for per-message compression.

Files:

  • Modify: src/NATS.Server/JetStream/Storage/MsgBlock.cs
  • Modify: src/NATS.Server/JetStream/Storage/FileStore.cs
  • Test: tests/NATS.Server.Tests/FileStoreCompressionTests.cs
  • Reference: golang/nats-server/server/filestore.go:4443 (setupWriteCache with compression)

Step 1: Write the failing test

Create tests/NATS.Server.Tests/FileStoreCompressionTests.cs:

using NATS.Server.JetStream.Storage;

namespace NATS.Server.Tests;

public class FileStoreCompressionTests
{
    [Fact]
    public async Task Compressed_block_round_trips_message()
    {
        // Go: TestFileStoreCompression server/filestore_test.go
        var dir = Directory.CreateTempSubdirectory();

        await using (var store = new FileStore(new FileStoreOptions
        {
            Directory = dir.FullName,
            Compression = StoreCompression.S2Compression,
        }))
        {
            var payload = new byte[1024];
            Array.Fill(payload, (byte)'A'); // highly compressible
            await store.AppendAsync("comp.subj", payload, default);
        }

        // Block file should be smaller than uncompressed payload
        var blkFiles = Directory.GetFiles(dir.FullName, "*.blk");
        blkFiles.ShouldNotBeEmpty();
        new FileInfo(blkFiles[0]).Length.ShouldBeLessThan(1024);

        // Recover should return original payload
        await using var recovered = new FileStore(new FileStoreOptions
        {
            Directory = dir.FullName,
            Compression = StoreCompression.S2Compression,
        });
        var msg = await recovered.LoadAsync(1, default);
        msg.ShouldNotBeNull();
        msg.Payload.Length.ShouldBe(1024);
        msg.Payload.All(b => b == (byte)'A').ShouldBeTrue();
    }

    [Fact]
    public async Task Compressed_and_encrypted_round_trips()
    {
        var dir = Directory.CreateTempSubdirectory();
        var key = new byte[32];
        RandomNumberGenerator.Fill(key);

        await using (var store = new FileStore(new FileStoreOptions
        {
            Directory = dir.FullName,
            Compression = StoreCompression.S2Compression,
            Cipher = StoreCipher.ChaCha,
            EncryptionKey = key,
        }))
        {
            await store.AppendAsync("both.subj", "compress+encrypt"u8.ToArray(), default);
        }

        await using var recovered = new FileStore(new FileStoreOptions
        {
            Directory = dir.FullName,
            Compression = StoreCompression.S2Compression,
            Cipher = StoreCipher.ChaCha,
            EncryptionKey = key,
        });
        var msg = await recovered.LoadAsync(1, default);
        msg.ShouldNotBeNull();
        System.Text.Encoding.UTF8.GetString(msg.Payload).ShouldBe("compress+encrypt");
    }
}

Step 2: Run test to verify it fails

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreCompressionTests" -v normal

Expected: FAIL — compression not wired into block path

Step 3: Implement compression in MsgBlock

In MsgBlock.cs:

  • Add field: bool _compressed
  • Modify Write/WriteAt: if _compressed, call S2Codec.Compress(payload) before writing record
  • Modify Read/ReadRecord: if _compressed, call S2Codec.Decompress(data) after reading record
  • Order: compress first, then encrypt (on write); decrypt first, then decompress (on read)

In FileStore.cs:

  • Pass _useS2 flag to MsgBlock factory methods
  • Persist compression flag in block metadata so recovery knows to decompress

Step 4: Run test to verify it passes

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreCompressionTests" -v normal

Expected: PASS

Step 5: Commit

git add tests/NATS.Server.Tests/FileStoreCompressionTests.cs src/NATS.Server/JetStream/Storage/MsgBlock.cs src/NATS.Server/JetStream/Storage/FileStore.cs
git commit -m "feat(filestore): wire S2Codec into MsgBlock for per-message compression"

Task 3: Block Rotation & Lifecycle

Add automatic block rotation when active block exceeds size threshold, block sealing, and background flusher.

Files:

  • Modify: src/NATS.Server/JetStream/Storage/MsgBlock.cs
  • Modify: src/NATS.Server/JetStream/Storage/FileStore.cs
  • Test: tests/NATS.Server.Tests/FileStoreBlockRotationTests.cs
  • Reference: golang/nats-server/server/filestore.go:4485 (newMsgBlockForWrite), 5783-5842 (flushLoop)

Step 1: Write the failing test

Create tests/NATS.Server.Tests/FileStoreBlockRotationTests.cs:

using NATS.Server.JetStream.Storage;

namespace NATS.Server.Tests;

public class FileStoreBlockRotationTests
{
    [Fact]
    public async Task Block_rotates_when_size_exceeded()
    {
        // Go: TestFileStoreBlockRotation server/filestore_test.go
        var dir = Directory.CreateTempSubdirectory();
        var opts = new FileStoreOptions
        {
            Directory = dir.FullName,
            BlockSizeBytes = 512, // small block for testing
        };

        await using var store = new FileStore(opts);
        var payload = new byte[200];

        // Write enough to trigger rotation (3 x 200 > 512)
        await store.AppendAsync("rot.1", payload, default);
        await store.AppendAsync("rot.2", payload, default);
        await store.AppendAsync("rot.3", payload, default);

        store.BlockCount.ShouldBeGreaterThan(1);
    }

    [Fact]
    public async Task Sealed_block_is_read_only()
    {
        var dir = Directory.CreateTempSubdirectory();
        var opts = new FileStoreOptions
        {
            Directory = dir.FullName,
            BlockSizeBytes = 256,
        };

        await using var store = new FileStore(opts);
        var payload = new byte[200];

        await store.AppendAsync("s.1", payload, default);
        await store.AppendAsync("s.2", payload, default); // triggers rotation

        // First block should be sealed
        var blkFiles = Directory.GetFiles(dir.FullName, "*.blk").OrderBy(f => f).ToArray();
        blkFiles.Length.ShouldBeGreaterThan(1);
    }

    [Fact]
    public async Task Messages_across_blocks_recover_correctly()
    {
        var dir = Directory.CreateTempSubdirectory();
        var opts = new FileStoreOptions
        {
            Directory = dir.FullName,
            BlockSizeBytes = 256,
        };

        await using (var store = new FileStore(opts))
        {
            for (int i = 1; i <= 10; i++)
                await store.AppendAsync($"msg.{i}", $"payload-{i}"u8.ToArray(), default);
        }

        await using var recovered = new FileStore(new FileStoreOptions
        {
            Directory = dir.FullName,
            BlockSizeBytes = 256,
        });
        var state = await recovered.GetStateAsync(default);
        state.Messages.ShouldBe(10UL);

        for (ulong seq = 1; seq <= 10; seq++)
        {
            var msg = await recovered.LoadAsync(seq, default);
            msg.ShouldNotBeNull();
            msg.Subject.ShouldBe($"msg.{seq}");
        }
    }
}

Step 2: Run test to verify it fails

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreBlockRotationTests" -v normal

Expected: FAIL — no automatic rotation

Step 3: Implement block rotation

In MsgBlock.cs:

  • Add bool IsSealed property (set by Seal())
  • Add void Seal() — marks block read-only, flushes pending writes
  • Add long BytesWritten property — current write offset

In FileStore.cs:

  • Modify EnsureActiveBlock() — check _activeBlock.BytesWritten >= _options.BlockSizeBytes, call RotateBlock() if exceeded
  • Add RotateBlock() — seals active block, creates new block with _nextBlockId++, adds to _blocks
  • Modify RecoverBlocks() — handle multiple .blk files, recover each in order

Step 4: Run test to verify it passes

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreBlockRotationTests" -v normal

Expected: PASS

Step 5: Commit

git add tests/NATS.Server.Tests/FileStoreBlockRotationTests.cs src/NATS.Server/JetStream/Storage/MsgBlock.cs src/NATS.Server/JetStream/Storage/FileStore.cs
git commit -m "feat(filestore): add block rotation on size threshold and sealing"

Task 4: Crash Recovery Enhancement

Enhance recovery to rebuild TTL wheel, validate checksums, and handle corrupt/truncated blocks.

Files:

  • Modify: src/NATS.Server/JetStream/Storage/FileStore.cs
  • Modify: src/NATS.Server/JetStream/Storage/MsgBlock.cs
  • Modify: src/NATS.Server/JetStream/Storage/MessageRecord.cs
  • Test: tests/NATS.Server.Tests/FileStoreCrashRecoveryTests.cs
  • Reference: golang/nats-server/server/filestore.go:1754-2401 (recoverFullState, recoverTTLState)

Step 1: Write the failing test

Create tests/NATS.Server.Tests/FileStoreCrashRecoveryTests.cs:

using NATS.Server.JetStream.Storage;

namespace NATS.Server.Tests;

public class FileStoreCrashRecoveryTests
{
    [Fact]
    public async Task Recovery_rebuilds_ttl_wheel_and_expires_old()
    {
        var dir = Directory.CreateTempSubdirectory();

        await using (var store = new FileStore(new FileStoreOptions
        {
            Directory = dir.FullName,
            MaxAgeMs = 100, // 100ms TTL
        }))
        {
            await store.AppendAsync("ttl.1", "old"u8.ToArray(), default);
        }

        // Wait for messages to expire
        await Task.Delay(200);

        // Recovery should expire old messages
        await using var recovered = new FileStore(new FileStoreOptions
        {
            Directory = dir.FullName,
            MaxAgeMs = 100,
        });
        var state = await recovered.GetStateAsync(default);
        state.Messages.ShouldBe(0UL);
    }

    [Fact]
    public async Task Recovery_handles_truncated_block()
    {
        var dir = Directory.CreateTempSubdirectory();

        await using (var store = new FileStore(new FileStoreOptions { Directory = dir.FullName }))
        {
            await store.AppendAsync("t.1", "data1"u8.ToArray(), default);
            await store.AppendAsync("t.2", "data2"u8.ToArray(), default);
        }

        // Truncate the block file to simulate crash mid-write
        var blkFile = Directory.GetFiles(dir.FullName, "*.blk")[0];
        var info = new FileInfo(blkFile);
        using (var fs = File.OpenWrite(blkFile))
            fs.SetLength(info.Length - 5); // chop last 5 bytes

        // Recovery should salvage valid messages
        await using var recovered = new FileStore(new FileStoreOptions { Directory = dir.FullName });
        var state = await recovered.GetStateAsync(default);
        state.Messages.ShouldBeGreaterThanOrEqualTo(1UL);
    }

    [Fact]
    public async Task Atomic_state_write_survives_crash()
    {
        var dir = Directory.CreateTempSubdirectory();

        await using var store = new FileStore(new FileStoreOptions { Directory = dir.FullName });
        await store.AppendAsync("a.1", "payload"u8.ToArray(), default);

        // Force state write
        store.FlushAllPending();

        // State file should exist
        var stateFile = Path.Combine(dir.FullName, "stream.state");
        File.Exists(stateFile).ShouldBeTrue();
    }
}

Step 2: Run test to verify it fails

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreCrashRecoveryTests" -v normal

Expected: FAIL

Step 3: Implement crash recovery enhancements

In FileStore.cs:

  • Add RecoverTtlState() — scan all blocks, re-register unexpired messages in _ttlWheel
  • Add WriteStateAtomically(path, data) — write to temp file, then File.Move(temp, target, overwrite: true)
  • Enhance RecoverBlocks() — handle truncated records gracefully (catch decode errors, skip corrupt tail)
  • Implement FlushAllPending() — flush active block, write stream state atomically

In MsgBlock.cs:

  • Add RecoverWithValidation() — validate each record during recovery, skip corrupt entries
  • Add TryReadRecord(offset) — returns null on corrupt data instead of throwing

Step 4: Run test to verify it passes

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreCrashRecoveryTests" -v normal

Expected: PASS

Step 5: Commit

git add tests/NATS.Server.Tests/FileStoreCrashRecoveryTests.cs src/NATS.Server/JetStream/Storage/FileStore.cs src/NATS.Server/JetStream/Storage/MsgBlock.cs
git commit -m "feat(filestore): enhance crash recovery with TTL rebuild and truncation handling"

Task 5: IStreamStore Methods — Batch 1 (Core Operations)

Implement the core sync IStreamStore methods: StoreMsg, StoreRawMsg, LoadMsg, LoadNextMsg, LoadLastMsg, LoadPrevMsg, RemoveMsg, EraseMsg, State, FastState, Type, Stop, Delete.

Files:

  • Modify: src/NATS.Server/JetStream/Storage/FileStore.cs
  • Test: tests/NATS.Server.Tests/FileStoreStreamStoreTests.cs
  • Reference: golang/nats-server/server/filestore.go (various)

Step 1: Write the failing test

Create tests/NATS.Server.Tests/FileStoreStreamStoreTests.cs:

using NATS.Server.JetStream.Storage;

namespace NATS.Server.Tests;

public class FileStoreStreamStoreTests
{
    private FileStore CreateStore()
    {
        var dir = Directory.CreateTempSubdirectory();
        return new FileStore(new FileStoreOptions { Directory = dir.FullName });
    }

    [Fact]
    public void StoreMsg_appends_and_returns_seq_ts()
    {
        using var store = CreateStore();
        var (seq, ts) = store.StoreMsg("foo.bar", null, "hello"u8.ToArray(), 0);
        seq.ShouldBe(1UL);
        ts.ShouldBeGreaterThan(0L);
    }

    [Fact]
    public void LoadMsg_returns_stored_message()
    {
        using var store = CreateStore();
        store.StoreMsg("load.test", null, "payload"u8.ToArray(), 0);
        var sm = store.LoadMsg(1, null);
        sm.Seq.ShouldBe(1UL);
        sm.Subj.ShouldBe("load.test");
        System.Text.Encoding.UTF8.GetString(sm.Msg).ShouldBe("payload");
    }

    [Fact]
    public void LoadNextMsg_finds_next_matching()
    {
        using var store = CreateStore();
        store.StoreMsg("a.1", null, "m1"u8.ToArray(), 0);
        store.StoreMsg("b.1", null, "m2"u8.ToArray(), 0);
        store.StoreMsg("a.2", null, "m3"u8.ToArray(), 0);

        var (msg, skip) = store.LoadNextMsg("a.*", true, 1, null);
        msg.Seq.ShouldBe(1UL);
        msg.Subj.ShouldBe("a.1");
    }

    [Fact]
    public void LoadLastMsg_returns_most_recent_on_subject()
    {
        using var store = CreateStore();
        store.StoreMsg("last.subj", null, "first"u8.ToArray(), 0);
        store.StoreMsg("last.subj", null, "second"u8.ToArray(), 0);
        store.StoreMsg("other", null, "other"u8.ToArray(), 0);

        var sm = store.LoadLastMsg("last.subj", null);
        sm.Seq.ShouldBe(2UL);
        System.Text.Encoding.UTF8.GetString(sm.Msg).ShouldBe("second");
    }

    [Fact]
    public void LoadPrevMsg_returns_message_before_seq()
    {
        using var store = CreateStore();
        store.StoreMsg("p.1", null, "m1"u8.ToArray(), 0);
        store.StoreMsg("p.2", null, "m2"u8.ToArray(), 0);
        store.StoreMsg("p.3", null, "m3"u8.ToArray(), 0);

        var sm = store.LoadPrevMsg(3, null);
        sm.Seq.ShouldBe(2UL);
    }

    [Fact]
    public void RemoveMsg_soft_deletes()
    {
        using var store = CreateStore();
        store.StoreMsg("rm.1", null, "data"u8.ToArray(), 0);
        store.RemoveMsg(1).ShouldBeTrue();

        var state = store.State();
        state.Msgs.ShouldBe(0UL);
    }

    [Fact]
    public void State_returns_full_stream_state()
    {
        using var store = CreateStore();
        store.StoreMsg("s.1", null, "a"u8.ToArray(), 0);
        store.StoreMsg("s.2", null, "b"u8.ToArray(), 0);

        var state = store.State();
        state.Msgs.ShouldBe(2UL);
        state.FirstSeq.ShouldBe(1UL);
        state.LastSeq.ShouldBe(2UL);
    }

    [Fact]
    public void Type_returns_file()
    {
        using var store = CreateStore();
        store.Type().ShouldBe(StorageType.File);
    }

    [Fact]
    public void Stop_flushes_and_closes()
    {
        using var store = CreateStore();
        store.StoreMsg("stop.1", null, "data"u8.ToArray(), 0);
        store.Stop();
        // After stop, store should not accept writes
        Should.Throw<Exception>(() => store.StoreMsg("stop.2", null, "more"u8.ToArray(), 0));
    }
}

Step 2: Run test to verify it fails

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreStreamStoreTests" -v normal

Expected: FAIL — all methods throw NotSupportedException

Step 3: Implement IStreamStore core methods in FileStore.cs

Implement each method:

  • StoreMsg — synchronous wrapper around AppendAsync logic, return (seq, timestamp)
  • StoreRawMsg — append with caller-specified seq/ts, bypass auto-increment
  • LoadMsg — lookup by sequence in _messages dict
  • LoadNextMsg — scan from start seq forward, match filter with SubjectMatch.IsMatch()
  • LoadLastMsg — reverse scan _messages for subject match
  • LoadPrevMsg — scan backward from start-1
  • RemoveMsg — mark deleted in _messages and MsgBlock._deleted
  • EraseMsg — remove + overwrite bytes on disk with random data
  • State() — return StreamState with first/last/msgs/bytes/deleted
  • FastState() — populate ref param with minimal fields
  • Type() — return StorageType.File
  • Stop() — flush active block, close all FDs, mark store as stopped
  • Delete(inline) — Stop + delete all .blk files and directory

Add StoreMsg record type if not already defined:

public record struct StoreMsg(ulong Seq, string Subj, byte[]? Hdr, byte[] Msg, long Ts);

Step 4: Run test to verify it passes

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreStreamStoreTests" -v normal

Expected: PASS

Step 5: Commit

git add tests/NATS.Server.Tests/FileStoreStreamStoreTests.cs src/NATS.Server/JetStream/Storage/FileStore.cs
git commit -m "feat(filestore): implement core IStreamStore sync methods"

Task 6: IStreamStore Methods — Batch 2 (Query & State)

Implement remaining methods: Purge, PurgeEx, Compact, Truncate, GetSeqFromTime, FilteredState, SubjectsState, SubjectsTotals, NumPending, EncodedStreamState, UpdateConfig, ResetState, FlushAllPending, SkipMsg, SkipMsgs.

Files:

  • Modify: src/NATS.Server/JetStream/Storage/FileStore.cs
  • Test: tests/NATS.Server.Tests/FileStoreQueryTests.cs
  • Reference: golang/nats-server/server/filestore.go (various)

Step 1: Write the failing test

Create tests/NATS.Server.Tests/FileStoreQueryTests.cs:

using NATS.Server.JetStream.Storage;

namespace NATS.Server.Tests;

public class FileStoreQueryTests
{
    private FileStore CreateStore()
    {
        var dir = Directory.CreateTempSubdirectory();
        return new FileStore(new FileStoreOptions { Directory = dir.FullName });
    }

    [Fact]
    public void PurgeEx_with_subject_filter()
    {
        using var store = CreateStore();
        store.StoreMsg("a.1", null, "m1"u8.ToArray(), 0);
        store.StoreMsg("b.1", null, "m2"u8.ToArray(), 0);
        store.StoreMsg("a.2", null, "m3"u8.ToArray(), 0);

        var purged = store.PurgeEx("a.*", 0, 0);
        purged.ShouldBe(2UL); // only a.1 and a.2

        var state = store.State();
        state.Msgs.ShouldBe(1UL); // b.1 remains
    }

    [Fact]
    public void Compact_removes_below_seq()
    {
        using var store = CreateStore();
        for (int i = 0; i < 5; i++)
            store.StoreMsg("c.x", null, $"m{i}"u8.ToArray(), 0);

        store.Compact(3); // remove seq 1, 2
        var state = store.State();
        state.FirstSeq.ShouldBe(3UL);
    }

    [Fact]
    public void FilteredState_returns_matching_counts()
    {
        using var store = CreateStore();
        store.StoreMsg("f.a", null, "m1"u8.ToArray(), 0);
        store.StoreMsg("f.b", null, "m2"u8.ToArray(), 0);
        store.StoreMsg("f.a", null, "m3"u8.ToArray(), 0);

        var ss = store.FilteredState(1, "f.a");
        ss.Msgs.ShouldBe(2UL);
    }

    [Fact]
    public void SubjectsTotals_returns_per_subject_counts()
    {
        using var store = CreateStore();
        store.StoreMsg("t.a", null, "m1"u8.ToArray(), 0);
        store.StoreMsg("t.b", null, "m2"u8.ToArray(), 0);
        store.StoreMsg("t.a", null, "m3"u8.ToArray(), 0);

        var totals = store.SubjectsTotals("t.*");
        totals["t.a"].ShouldBe(2UL);
        totals["t.b"].ShouldBe(1UL);
    }

    [Fact]
    public void NumPending_counts_from_start_seq()
    {
        using var store = CreateStore();
        store.StoreMsg("np.x", null, "m1"u8.ToArray(), 0);
        store.StoreMsg("np.x", null, "m2"u8.ToArray(), 0);
        store.StoreMsg("np.y", null, "m3"u8.ToArray(), 0);

        var (total, _) = store.NumPending(2, "np.x", false);
        total.ShouldBe(1UL); // only seq 2 matches
    }

    [Fact]
    public void GetSeqFromTime_returns_first_at_or_after()
    {
        using var store = CreateStore();
        store.StoreMsg("time.1", null, "m1"u8.ToArray(), 0);
        var beforeSecond = DateTime.UtcNow;
        store.StoreMsg("time.2", null, "m2"u8.ToArray(), 0);

        var seq = store.GetSeqFromTime(beforeSecond);
        seq.ShouldBe(2UL);
    }

    [Fact]
    public void SkipMsg_reserves_sequence()
    {
        using var store = CreateStore();
        store.StoreMsg("sk.1", null, "m1"u8.ToArray(), 0);
        store.SkipMsg(2);
        store.StoreMsg("sk.3", null, "m3"u8.ToArray(), 0);

        var state = store.State();
        state.LastSeq.ShouldBe(3UL);
        state.Msgs.ShouldBe(2UL); // seq 2 is skipped, not a message
    }
}

Step 2: Run test to verify it fails

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreQueryTests" -v normal

Expected: FAIL

Step 3: Implement query and state methods in FileStore.cs

Implement each remaining method using the in-memory _messages dictionary and block structures. Key approaches:

  • PurgeEx — iterate _messages, match subject with wildcard support, remove matching
  • Compact — remove all entries with seq < given
  • Truncate — remove all entries with seq > given
  • GetSeqFromTime — binary search or linear scan by timestamp
  • FilteredState — count messages matching subject filter from given seq
  • SubjectsState — group by subject, compute per-subject SimpleState
  • SubjectsTotals — group by subject, count
  • NumPending — count from start seq matching filter
  • EncodedStreamState — binary serialize stream state (versioned format)
  • UpdateConfig — apply new block size, retention, limits
  • ResetState — clear caches
  • SkipMsg / SkipMsgs — reserve sequence(s) without storing data

Step 4: Run test to verify it passes

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStoreQueryTests" -v normal

Expected: PASS

Step 5: Commit

git add tests/NATS.Server.Tests/FileStoreQueryTests.cs src/NATS.Server/JetStream/Storage/FileStore.cs
git commit -m "feat(filestore): implement query and state IStreamStore methods"

Task 7: RAFT Binary WAL

Replace in-memory JSON persistence with binary WAL for production durability.

Files:

  • Modify: src/NATS.Server/Raft/RaftLog.cs
  • Modify: src/NATS.Server/Raft/RaftNode.cs
  • Create: src/NATS.Server/Raft/RaftWal.cs
  • Test: tests/NATS.Server.Tests/RaftWalTests.cs
  • Reference: golang/nats-server/server/raft.go:1000-1100

Step 1: Write the failing test

Create tests/NATS.Server.Tests/RaftWalTests.cs:

using NATS.Server.Raft;

namespace NATS.Server.Tests;

public class RaftWalTests
{
    [Fact]
    public async Task Wal_persists_and_recovers_entries()
    {
        var dir = Directory.CreateTempSubdirectory();
        var walPath = Path.Combine(dir.FullName, "raft.wal");

        // Write entries
        {
            var wal = new RaftWal(walPath);
            await wal.AppendAsync(new RaftLogEntry(1, 1, "cmd-1"));
            await wal.AppendAsync(new RaftLogEntry(2, 1, "cmd-2"));
            await wal.AppendAsync(new RaftLogEntry(3, 2, "cmd-3"));
            await wal.SyncAsync();
            wal.Dispose();
        }

        // Recover
        var recovered = RaftWal.Load(walPath);
        var entries = recovered.Entries.ToList();
        entries.Count.ShouldBe(3);
        entries[0].Index.ShouldBe(1);
        entries[0].Term.ShouldBe(1);
        entries[0].Command.ShouldBe("cmd-1");
        entries[2].Index.ShouldBe(3);
        entries[2].Term.ShouldBe(2);
        recovered.Dispose();
    }

    [Fact]
    public async Task Wal_compact_removes_old_entries()
    {
        var dir = Directory.CreateTempSubdirectory();
        var walPath = Path.Combine(dir.FullName, "raft.wal");

        var wal = new RaftWal(walPath);
        for (int i = 1; i <= 10; i++)
            await wal.AppendAsync(new RaftLogEntry(i, 1, $"cmd-{i}"));
        await wal.SyncAsync();

        await wal.CompactAsync(5); // remove entries 1-5

        var recovered = RaftWal.Load(walPath);
        recovered.Entries.Count().ShouldBe(5);
        recovered.Entries.First().Index.ShouldBe(6);
        wal.Dispose();
        recovered.Dispose();
    }

    [Fact]
    public async Task Wal_handles_truncated_file()
    {
        var dir = Directory.CreateTempSubdirectory();
        var walPath = Path.Combine(dir.FullName, "raft.wal");

        {
            var wal = new RaftWal(walPath);
            await wal.AppendAsync(new RaftLogEntry(1, 1, "good-entry"));
            await wal.AppendAsync(new RaftLogEntry(2, 1, "will-be-truncated"));
            await wal.SyncAsync();
            wal.Dispose();
        }

        // Truncate last few bytes
        using (var fs = File.OpenWrite(walPath))
            fs.SetLength(fs.Length - 3);

        var recovered = RaftWal.Load(walPath);
        recovered.Entries.Count().ShouldBeGreaterThanOrEqualTo(1);
        recovered.Entries.First().Command.ShouldBe("good-entry");
        recovered.Dispose();
    }

    [Fact]
    public async Task RaftNode_persists_term_and_vote()
    {
        var dir = Directory.CreateTempSubdirectory();
        var node = new RaftNode("n1", persistDirectory: dir.FullName);
        node.TermState.CurrentTerm = 5;
        node.TermState.VotedFor = "n2";

        await node.PersistAsync();

        var recovered = new RaftNode("n1", persistDirectory: dir.FullName);
        await recovered.LoadPersistedStateAsync();
        recovered.Term.ShouldBe(5);
        recovered.TermState.VotedFor.ShouldBe("n2");
    }
}

Step 2: Run test to verify it fails

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RaftWalTests" -v normal

Expected: FAIL — RaftWal class doesn't exist

Step 3: Implement RaftWal

Create src/NATS.Server/Raft/RaftWal.cs:

namespace NATS.Server.Raft;

/// <summary>
/// Binary write-ahead log for RAFT entries.
/// Format: [4-byte version header]([4-byte length][8-byte index][4-byte term][N-byte command])*
/// Go reference: raft.go WAL patterns.
/// </summary>
public sealed class RaftWal : IDisposable
{
    private const int Version = 1;
    private FileStream _file;
    private readonly List<RaftLogEntry> _entries = [];

    // Constructor, AppendAsync, SyncAsync, CompactAsync, Load, Dispose...
}

Key implementation:

  • Binary record format: [4:length_le][8:index_le][4:term_le][N:utf8_command]
  • Version header: [4:magic][4:version] at file start
  • AppendAsync — write length-prefixed record, add to in-memory list
  • SyncAsyncFileStream.FlushAsync() for fsync
  • CompactAsync(upToIndex) — rewrite WAL from upToIndex+1 onward using temp file + rename
  • Load(path) — scan WAL, validate records, skip corrupt tail

Modify RaftLog.cs:

  • Add optional RaftWal backing — when _wal is set, delegate Append/Compact to it
  • Keep in-memory list for fast access, WAL for durability

Modify RaftNode.cs:

  • Wire _persistDirectory — create WAL at {dir}/raft.wal
  • PersistAsync() — write meta.json with term/votedFor
  • LoadPersistedStateAsync() — load meta.json + WAL entries

Step 4: Run test to verify it passes

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RaftWalTests" -v normal

Expected: PASS

Step 5: Commit

git add src/NATS.Server/Raft/RaftWal.cs tests/NATS.Server.Tests/RaftWalTests.cs src/NATS.Server/Raft/RaftLog.cs src/NATS.Server/Raft/RaftNode.cs
git commit -m "feat(raft): add binary WAL for persistent log storage"

Task 8: RAFT Joint Consensus

Implement safe two-phase membership changes per Raft paper Section 4.

Files:

  • Modify: src/NATS.Server/Raft/RaftNode.cs
  • Test: tests/NATS.Server.Tests/RaftJointConsensusTests.cs
  • Reference: golang/nats-server/server/raft.go Section 4

Step 1: Write the failing test

Create tests/NATS.Server.Tests/RaftJointConsensusTests.cs:

using NATS.Server.Raft;

namespace NATS.Server.Tests;

public class RaftJointConsensusTests
{
    [Fact]
    public async Task AddPeer_uses_joint_consensus()
    {
        var cluster = RaftTestCluster.Create(3);
        var leader = await cluster.ElectLeaderAsync();

        await leader.ProposeAddPeerAsync("n4");

        // During joint phase, quorum requires majority of both old and new
        leader.Members.ShouldContain("n4");
        leader.Members.Count.ShouldBe(4);
    }

    [Fact]
    public async Task RemovePeer_uses_joint_consensus()
    {
        var cluster = RaftTestCluster.Create(3);
        var leader = await cluster.ElectLeaderAsync();

        await leader.ProposeRemovePeerAsync("n3");

        leader.Members.ShouldNotContain("n3");
        leader.Members.Count.ShouldBe(2);
    }

    [Fact]
    public async Task Joint_quorum_requires_both_old_and_new_majority()
    {
        var cluster = RaftTestCluster.Create(3);
        var leader = await cluster.ElectLeaderAsync();

        // Start add — enters joint config
        leader.BeginJointConsensus(["n1", "n2", "n3"], ["n1", "n2", "n3", "n4"]);

        // Joint quorum: majority of {n1,n2,n3} AND majority of {n1,n2,n3,n4}
        leader.CalculateJointQuorum(["n1", "n2"], ["n1", "n2", "n3"]).ShouldBeTrue(); // 2/3 and 3/4
        leader.CalculateJointQuorum(["n1"], ["n1", "n2"]).ShouldBeFalse(); // 1/3 fails old quorum
    }

    [Fact]
    public async Task Only_one_membership_change_at_a_time()
    {
        var cluster = RaftTestCluster.Create(3);
        var leader = await cluster.ElectLeaderAsync();

        await leader.ProposeAddPeerAsync("n4");

        // Second concurrent change should be rejected
        var ex = await Should.ThrowAsync<InvalidOperationException>(
            () => leader.ProposeAddPeerAsync("n5"));
    }
}

Step 2: Run test to verify it fails

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RaftJointConsensusTests" -v normal

Expected: FAIL

Step 3: Implement joint consensus in RaftNode.cs

Add to RaftNode.cs:

  • New field: HashSet<string>? _jointNewMembers — Cnew during transition
  • BeginJointConsensus(cold, cnew) — set _jointNewMembers = cnew, propose joint entry
  • CalculateJointQuorum(coldVotes, cnewVotes)majority(cold) AND majority(cnew)
  • Modify ProposeAddPeerAsync:
    1. Check MembershipChangeInProgress — reject if true
    2. Propose joint entry (Cold Cnew) to log
    3. Wait for joint entry commit
    4. Propose final Cnew entry to log
    5. Wait for final entry commit
    6. Clear _jointNewMembers
  • Modify ProposeRemovePeerAsync — same two-phase pattern
  • Modify ApplyMembershipChange(entry):
    • If joint entry: set _jointNewMembers
    • If final entry: replace _members with Cnew, clear _jointNewMembers

Step 4: Run test to verify it passes

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RaftJointConsensusTests" -v normal

Expected: PASS

Step 5: Commit

git add tests/NATS.Server.Tests/RaftJointConsensusTests.cs src/NATS.Server/Raft/RaftNode.cs
git commit -m "feat(raft): implement joint consensus for safe membership changes"

Phase 1 Exit Gate Verification:

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FileStore" -v normal
dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RaftWal" -v normal
dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RaftJointConsensus" -v normal

All Phase 1 tests must pass before proceeding to Phase 2.


Phase 2: JetStream Cluster Coordination

Dependencies: Phase 1 (RAFT persistence, FileStore) Exit gate: Meta-group processes stream/consumer assignments via RAFT, snapshots encode/decode round-trip, leadership transitions handled correctly, orphan detection works.


Task 9: Meta Snapshot Encoding/Decoding

Implement binary snapshot codec for meta-group state with S2 compression.

Files:

  • Create: src/NATS.Server/JetStream/Cluster/MetaSnapshotCodec.cs
  • Test: tests/NATS.Server.Tests/MetaSnapshotCodecTests.cs
  • Reference: golang/nats-server/server/jetstream_cluster.go:2031-2145 (encodeMetaSnapshot, decodeMetaSnapshot)

Step 1: Write the failing test

Create tests/NATS.Server.Tests/MetaSnapshotCodecTests.cs:

using NATS.Server.JetStream.Cluster;

namespace NATS.Server.Tests;

public class MetaSnapshotCodecTests
{
    [Fact]
    public void Encode_decode_round_trips()
    {
        var assignments = new Dictionary<string, StreamAssignment>
        {
            ["stream-A"] = new StreamAssignment
            {
                StreamName = "stream-A",
                Group = new RaftGroup { Name = "rg-a", Peers = ["n1", "n2", "n3"] },
                ConfigJson = """{"subjects":["foo.>"]}""",
            },
            ["stream-B"] = new StreamAssignment
            {
                StreamName = "stream-B",
                Group = new RaftGroup { Name = "rg-b", Peers = ["n1", "n2"] },
                ConfigJson = """{"subjects":["bar.>"]}""",
                Consumers =
                {
                    ["con-1"] = new ConsumerAssignment
                    {
                        ConsumerName = "con-1",
                        StreamName = "stream-B",
                        Group = new RaftGroup { Name = "rg-c1", Peers = ["n1", "n2"] },
                    },
                },
            },
        };

        var encoded = MetaSnapshotCodec.Encode(assignments);
        encoded.ShouldNotBeEmpty();

        var decoded = MetaSnapshotCodec.Decode(encoded);
        decoded.Count.ShouldBe(2);
        decoded["stream-A"].StreamName.ShouldBe("stream-A");
        decoded["stream-A"].Group.Peers.Count.ShouldBe(3);
        decoded["stream-B"].Consumers.Count.ShouldBe(1);
        decoded["stream-B"].Consumers["con-1"].ConsumerName.ShouldBe("con-1");
    }

    [Fact]
    public void Encoded_snapshot_is_compressed()
    {
        var assignments = new Dictionary<string, StreamAssignment>();
        for (int i = 0; i < 100; i++)
        {
            assignments[$"stream-{i}"] = new StreamAssignment
            {
                StreamName = $"stream-{i}",
                Group = new RaftGroup { Name = $"rg-{i}", Peers = ["n1", "n2", "n3"] },
                ConfigJson = """{"subjects":["test.>"]}""",
            };
        }

        var encoded = MetaSnapshotCodec.Encode(assignments);
        var json = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(assignments);

        // S2 compressed should be smaller than raw JSON
        encoded.Length.ShouldBeLessThan(json.Length);
    }

    [Fact]
    public void Empty_snapshot_round_trips()
    {
        var empty = new Dictionary<string, StreamAssignment>();
        var encoded = MetaSnapshotCodec.Encode(empty);
        var decoded = MetaSnapshotCodec.Decode(encoded);
        decoded.ShouldBeEmpty();
    }

    [Fact]
    public void Versioned_format_rejects_unknown_version()
    {
        var bad = new byte[] { 0xFF, 0xFF, 0, 0 }; // invalid version
        Should.Throw<InvalidOperationException>(() => MetaSnapshotCodec.Decode(bad));
    }
}

Step 2: Run test to verify it fails

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MetaSnapshotCodecTests" -v normal

Expected: FAIL — MetaSnapshotCodec doesn't exist

Step 3: Implement MetaSnapshotCodec

Create src/NATS.Server/JetStream/Cluster/MetaSnapshotCodec.cs:

using System.Buffers.Binary;
using System.Text.Json;
using NATS.Server.JetStream.Storage;

namespace NATS.Server.JetStream.Cluster;

/// <summary>
/// Binary codec for meta-group snapshots. Format:
/// [2:version][N:S2-compressed JSON of assignment map]
/// Go reference: jetstream_cluster.go:2075-2145 (encodeMetaSnapshot/decodeMetaSnapshot)
/// </summary>
public static class MetaSnapshotCodec
{
    private const ushort CurrentVersion = 1;

    public static byte[] Encode(Dictionary<string, StreamAssignment> assignments) { ... }
    public static Dictionary<string, StreamAssignment> Decode(byte[] data) { ... }
}

Implementation:

  • Encode: JSON serialize → S2 compress → prepend 2-byte version header
  • Decode: read version → strip header → S2 decompress → JSON deserialize
  • Reject unknown versions with InvalidOperationException

Step 4: Run test to verify it passes

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MetaSnapshotCodecTests" -v normal

Expected: PASS

Step 5: Commit

git add src/NATS.Server/JetStream/Cluster/MetaSnapshotCodec.cs tests/NATS.Server.Tests/MetaSnapshotCodecTests.cs
git commit -m "feat(cluster): add MetaSnapshotCodec with S2 compression and versioned format"

Task 10: Cluster Monitoring Loop

Background loop that processes meta RAFT entries and drives cluster state transitions.

Files:

  • Create: src/NATS.Server/JetStream/Cluster/JetStreamClusterMonitor.cs
  • Test: tests/NATS.Server.Tests/JetStreamClusterMonitorTests.cs
  • Reference: golang/nats-server/server/jetstream_cluster.go:1455-1825 (monitorCluster)

Step 1: Write the failing test

Create tests/NATS.Server.Tests/JetStreamClusterMonitorTests.cs:

using System.Threading.Channels;
using NATS.Server.JetStream.Cluster;
using NATS.Server.Raft;

namespace NATS.Server.Tests;

public class JetStreamClusterMonitorTests
{
    [Fact]
    public async Task Monitor_processes_stream_assignment_entry()
    {
        var meta = new JetStreamMetaGroup(3);
        var channel = Channel.CreateUnbounded<RaftLogEntry>();
        var monitor = new JetStreamClusterMonitor(meta, channel.Reader);

        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
        var monitorTask = monitor.StartAsync(cts.Token);

        // Write a stream assignment entry
        var assignJson = System.Text.Json.JsonSerializer.Serialize(new
        {
            Op = "assignStream",
            StreamName = "test-stream",
            Peers = new[] { "n1", "n2", "n3" },
            Config = """{"subjects":["test.>"]}""",
        });
        await channel.Writer.WriteAsync(new RaftLogEntry(1, 1, assignJson));

        // Give the monitor time to process
        await Task.Delay(100);

        meta.StreamCount.ShouldBe(1);
        meta.GetStreamAssignment("test-stream").ShouldNotBeNull();

        cts.Cancel();
        await monitorTask;
    }

    [Fact]
    public async Task Monitor_processes_consumer_assignment_entry()
    {
        var meta = new JetStreamMetaGroup(3);
        var channel = Channel.CreateUnbounded<RaftLogEntry>();
        var monitor = new JetStreamClusterMonitor(meta, channel.Reader);

        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
        var monitorTask = monitor.StartAsync(cts.Token);

        // First assign stream
        var streamJson = System.Text.Json.JsonSerializer.Serialize(new
        {
            Op = "assignStream",
            StreamName = "s1",
            Peers = new[] { "n1", "n2", "n3" },
            Config = """{"subjects":["x.>"]}""",
        });
        await channel.Writer.WriteAsync(new RaftLogEntry(1, 1, streamJson));

        // Then assign consumer
        var consumerJson = System.Text.Json.JsonSerializer.Serialize(new
        {
            Op = "assignConsumer",
            StreamName = "s1",
            ConsumerName = "c1",
            Peers = new[] { "n1", "n2", "n3" },
        });
        await channel.Writer.WriteAsync(new RaftLogEntry(2, 1, consumerJson));

        await Task.Delay(100);

        meta.ConsumerCount.ShouldBe(1);

        cts.Cancel();
        await monitorTask;
    }

    [Fact]
    public async Task Monitor_processes_stream_removal()
    {
        var meta = new JetStreamMetaGroup(3);
        var channel = Channel.CreateUnbounded<RaftLogEntry>();
        var monitor = new JetStreamClusterMonitor(meta, channel.Reader);

        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
        var monitorTask = monitor.StartAsync(cts.Token);

        // Assign then remove
        var assignJson = System.Text.Json.JsonSerializer.Serialize(new
        {
            Op = "assignStream",
            StreamName = "to-remove",
            Peers = new[] { "n1", "n2", "n3" },
            Config = """{"subjects":["rm.>"]}""",
        });
        await channel.Writer.WriteAsync(new RaftLogEntry(1, 1, assignJson));
        await Task.Delay(50);

        var removeJson = System.Text.Json.JsonSerializer.Serialize(new
        {
            Op = "removeStream",
            StreamName = "to-remove",
        });
        await channel.Writer.WriteAsync(new RaftLogEntry(2, 1, removeJson));
        await Task.Delay(100);

        meta.StreamCount.ShouldBe(0);

        cts.Cancel();
        await monitorTask;
    }

    [Fact]
    public async Task Monitor_applies_meta_snapshot()
    {
        var meta = new JetStreamMetaGroup(3);
        var channel = Channel.CreateUnbounded<RaftLogEntry>();
        var monitor = new JetStreamClusterMonitor(meta, channel.Reader);

        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
        var monitorTask = monitor.StartAsync(cts.Token);

        // Create a snapshot with known state
        var assignments = new Dictionary<string, StreamAssignment>
        {
            ["snap-stream"] = new StreamAssignment
            {
                StreamName = "snap-stream",
                Group = new RaftGroup { Name = "rg-snap", Peers = ["n1", "n2", "n3"] },
            },
        };
        var snapshot = MetaSnapshotCodec.Encode(assignments);
        var snapshotB64 = Convert.ToBase64String(snapshot);

        var snapshotJson = System.Text.Json.JsonSerializer.Serialize(new
        {
            Op = "snapshot",
            Data = snapshotB64,
        });
        await channel.Writer.WriteAsync(new RaftLogEntry(1, 1, snapshotJson));
        await Task.Delay(100);

        meta.StreamCount.ShouldBe(1);
        meta.GetStreamAssignment("snap-stream").ShouldNotBeNull();

        cts.Cancel();
        await monitorTask;
    }
}

Step 2: Run test to verify it fails

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamClusterMonitorTests" -v normal

Expected: FAIL — JetStreamClusterMonitor doesn't exist

Step 3: Implement JetStreamClusterMonitor

Create src/NATS.Server/JetStream/Cluster/JetStreamClusterMonitor.cs:

using System.Text.Json;
using System.Threading.Channels;
using NATS.Server.Raft;

namespace NATS.Server.JetStream.Cluster;

/// <summary>
/// Background loop consuming meta RAFT entries and dispatching cluster state changes.
/// Go reference: jetstream_cluster.go:1455-1825 (monitorCluster).
/// </summary>
public sealed class JetStreamClusterMonitor
{
    private readonly JetStreamMetaGroup _meta;
    private readonly ChannelReader<RaftLogEntry> _entries;

    public JetStreamClusterMonitor(JetStreamMetaGroup meta, ChannelReader<RaftLogEntry> entries) { ... }

    public async Task StartAsync(CancellationToken ct) { ... }
    private void ApplyMetaEntry(RaftLogEntry entry) { ... }
    private void ProcessStreamAssignment(JsonElement data) { ... }
    private void ProcessConsumerAssignment(JsonElement data) { ... }
    private void ProcessStreamRemoval(JsonElement data) { ... }
    private void ProcessConsumerRemoval(JsonElement data) { ... }
    private void ApplyMetaSnapshot(JsonElement data) { ... }
}

The main loop:

  1. Read entries from channel
  2. Parse JSON, dispatch by Op field
  3. Call corresponding handler on JetStreamMetaGroup

Add to JetStreamMetaGroup.cs:

  • AddStreamAssignment(StreamAssignment sa) — adds to _assignments
  • RemoveStreamAssignment(string streamName) — removes from _assignments
  • AddConsumerAssignment(string streamName, ConsumerAssignment ca) — adds to stream's consumers
  • RemoveConsumerAssignment(string streamName, string consumerName) — removes consumer
  • ReplaceAllAssignments(Dictionary<string, StreamAssignment> newState) — for snapshot apply

Step 4: Run test to verify it passes

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamClusterMonitorTests" -v normal

Expected: PASS

Step 5: Commit

git add src/NATS.Server/JetStream/Cluster/JetStreamClusterMonitor.cs tests/NATS.Server.Tests/JetStreamClusterMonitorTests.cs src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs
git commit -m "feat(cluster): add JetStreamClusterMonitor for meta RAFT entry processing"

Task 11: Stream/Consumer Assignment Processing

Enhance JetStreamMetaGroup to validate and process assignments with proper error handling.

Files:

  • Modify: src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs
  • Test: tests/NATS.Server.Tests/JetStreamAssignmentProcessingTests.cs
  • Reference: golang/nats-server/server/jetstream_cluster.go:4541-5925

Step 1: Write the failing test

Create tests/NATS.Server.Tests/JetStreamAssignmentProcessingTests.cs:

using NATS.Server.JetStream.Cluster;

namespace NATS.Server.Tests;

public class JetStreamAssignmentProcessingTests
{
    [Fact]
    public void ProcessStreamAssignment_validates_config()
    {
        var meta = new JetStreamMetaGroup(3);
        var sa = new StreamAssignment
        {
            StreamName = "valid-stream",
            Group = new RaftGroup { Name = "rg-1", Peers = ["n1", "n2", "n3"] },
            ConfigJson = """{"subjects":["test.>"]}""",
        };

        meta.ProcessStreamAssignment(sa).ShouldBeTrue();
        meta.StreamCount.ShouldBe(1);
    }

    [Fact]
    public void ProcessStreamAssignment_rejects_empty_name()
    {
        var meta = new JetStreamMetaGroup(3);
        var sa = new StreamAssignment
        {
            StreamName = "",
            Group = new RaftGroup { Name = "rg-1", Peers = ["n1", "n2", "n3"] },
        };

        meta.ProcessStreamAssignment(sa).ShouldBeFalse();
        meta.StreamCount.ShouldBe(0);
    }

    [Fact]
    public void ProcessUpdateStreamAssignment_applies_config_change()
    {
        var meta = new JetStreamMetaGroup(3);
        var sa = new StreamAssignment
        {
            StreamName = "updatable",
            Group = new RaftGroup { Name = "rg-u", Peers = ["n1", "n2", "n3"] },
            ConfigJson = """{"subjects":["old.>"]}""",
        };
        meta.ProcessStreamAssignment(sa);

        var updated = new StreamAssignment
        {
            StreamName = "updatable",
            Group = new RaftGroup { Name = "rg-u", Peers = ["n1", "n2", "n3"] },
            ConfigJson = """{"subjects":["new.>"]}""",
        };
        meta.ProcessUpdateStreamAssignment(updated).ShouldBeTrue();

        var assignment = meta.GetStreamAssignment("updatable");
        assignment!.ConfigJson.ShouldContain("new.>");
    }

    [Fact]
    public void ProcessConsumerAssignment_requires_existing_stream()
    {
        var meta = new JetStreamMetaGroup(3);
        var ca = new ConsumerAssignment
        {
            ConsumerName = "orphan-consumer",
            StreamName = "nonexistent-stream",
            Group = new RaftGroup { Name = "rg-c", Peers = ["n1", "n2", "n3"] },
        };

        meta.ProcessConsumerAssignment(ca).ShouldBeFalse();
    }

    [Fact]
    public void ProcessStreamRemoval_cascades_to_consumers()
    {
        var meta = new JetStreamMetaGroup(3);
        meta.ProcessStreamAssignment(new StreamAssignment
        {
            StreamName = "cascade",
            Group = new RaftGroup { Name = "rg-cas", Peers = ["n1", "n2", "n3"] },
        });
        meta.ProcessConsumerAssignment(new ConsumerAssignment
        {
            ConsumerName = "c1",
            StreamName = "cascade",
            Group = new RaftGroup { Name = "rg-c1", Peers = ["n1", "n2", "n3"] },
        });

        meta.ProcessStreamRemoval("cascade").ShouldBeTrue();
        meta.StreamCount.ShouldBe(0);
        meta.ConsumerCount.ShouldBe(0);
    }
}

Step 2: Run test to verify it fails

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamAssignmentProcessingTests" -v normal

Expected: FAIL

Step 3: Implement assignment processing in JetStreamMetaGroup.cs

Add methods:

  • bool ProcessStreamAssignment(StreamAssignment sa) — validate name, check duplicates, add
  • bool ProcessUpdateStreamAssignment(StreamAssignment sa) — find existing, update config
  • bool ProcessStreamRemoval(string streamName) — remove stream + cascade consumers
  • bool ProcessConsumerAssignment(ConsumerAssignment ca) — validate stream exists, add consumer
  • bool ProcessConsumerRemoval(string streamName, string consumerName) — remove consumer

Step 4: Run test to verify it passes

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamAssignmentProcessingTests" -v normal

Expected: PASS

Step 5: Commit

git add tests/NATS.Server.Tests/JetStreamAssignmentProcessingTests.cs src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs
git commit -m "feat(cluster): add stream/consumer assignment processing with validation"

Task 12: Inflight Tracking Enhancement

Replace simple string-based inflight maps with structured tracking.

Files:

  • Modify: src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs
  • Test: tests/NATS.Server.Tests/JetStreamInflightTrackingTests.cs
  • Reference: golang/nats-server/server/jetstream_cluster.go:1193-1278

Step 1: Write the failing test

Create tests/NATS.Server.Tests/JetStreamInflightTrackingTests.cs:

using NATS.Server.JetStream.Cluster;

namespace NATS.Server.Tests;

public class JetStreamInflightTrackingTests
{
    [Fact]
    public void TrackInflightStreamProposal_increments_ops()
    {
        var meta = new JetStreamMetaGroup(3);
        var sa = new StreamAssignment
        {
            StreamName = "inflight-1",
            Group = new RaftGroup { Name = "rg-inf", Peers = ["n1", "n2", "n3"] },
        };

        meta.TrackInflightStreamProposal("ACC", sa);
        meta.InflightStreamCount.ShouldBe(1);
        meta.IsStreamInflight("ACC", "inflight-1").ShouldBeTrue();
    }

    [Fact]
    public void RemoveInflightStreamProposal_clears_when_zero()
    {
        var meta = new JetStreamMetaGroup(3);
        var sa = new StreamAssignment
        {
            StreamName = "inflight-2",
            Group = new RaftGroup { Name = "rg-inf2", Peers = ["n1", "n2", "n3"] },
        };

        meta.TrackInflightStreamProposal("ACC", sa);
        meta.RemoveInflightStreamProposal("ACC", "inflight-2");
        meta.IsStreamInflight("ACC", "inflight-2").ShouldBeFalse();
    }

    [Fact]
    public void Duplicate_proposal_increments_ops_count()
    {
        var meta = new JetStreamMetaGroup(3);
        var sa = new StreamAssignment
        {
            StreamName = "dup-stream",
            Group = new RaftGroup { Name = "rg-dup", Peers = ["n1", "n2", "n3"] },
        };

        meta.TrackInflightStreamProposal("ACC", sa);
        meta.TrackInflightStreamProposal("ACC", sa);
        meta.InflightStreamCount.ShouldBe(1); // still one unique stream

        // Need two removes to fully clear
        meta.RemoveInflightStreamProposal("ACC", "dup-stream");
        meta.IsStreamInflight("ACC", "dup-stream").ShouldBeTrue(); // ops > 0
        meta.RemoveInflightStreamProposal("ACC", "dup-stream");
        meta.IsStreamInflight("ACC", "dup-stream").ShouldBeFalse();
    }
}

Step 2: Run test to verify it fails

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamInflightTrackingTests" -v normal

Expected: FAIL

Step 3: Implement enhanced inflight tracking

In JetStreamMetaGroup.cs:

  • Add record InflightInfo(int OpsCount, bool Deleted, StreamAssignment? Assignment)
  • Replace ConcurrentDictionary<string, string> _inflightStreams with ConcurrentDictionary<string, Dictionary<string, InflightInfo>>
  • TrackInflightStreamProposal(account, sa) — increment ops, store assignment
  • RemoveInflightStreamProposal(account, streamName) — decrement ops, remove when zero
  • IsStreamInflight(account, streamName) — check if ops > 0
  • Same pattern for consumer inflight tracking

Step 4: Run test to verify it passes

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamInflightTrackingTests" -v normal

Expected: PASS

Step 5: Commit

git add tests/NATS.Server.Tests/JetStreamInflightTrackingTests.cs src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs
git commit -m "feat(cluster): add structured inflight proposal tracking with ops counting"

Task 13: Leadership Transitions

Handle meta-group, stream, and consumer leadership changes.

Files:

  • Modify: src/NATS.Server/JetStream/Cluster/JetStreamClusterMonitor.cs
  • Modify: src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs
  • Test: tests/NATS.Server.Tests/JetStreamLeadershipTests.cs
  • Reference: golang/nats-server/server/jetstream_cluster.go:7001-7074 (processLeaderChange)

Step 1: Write the failing test

Create tests/NATS.Server.Tests/JetStreamLeadershipTests.cs:

using NATS.Server.JetStream.Cluster;

namespace NATS.Server.Tests;

public class JetStreamLeadershipTests
{
    [Fact]
    public void ProcessLeaderChange_clears_inflight_on_step_down()
    {
        var meta = new JetStreamMetaGroup(3);
        meta.TrackInflightStreamProposal("ACC", new StreamAssignment
        {
            StreamName = "s1",
            Group = new RaftGroup { Name = "rg", Peers = ["n1", "n2", "n3"] },
        });

        meta.ProcessLeaderChange(isLeader: false);

        meta.InflightStreamCount.ShouldBe(0);
    }

    [Fact]
    public void ProcessLeaderChange_becoming_leader_resets_state()
    {
        var meta = new JetStreamMetaGroup(3);
        var leaderChanged = false;
        meta.OnLeaderChange += (isLeader) => leaderChanged = true;

        meta.ProcessLeaderChange(isLeader: true);

        leaderChanged.ShouldBeTrue();
    }

    [Fact]
    public void StepDown_triggers_leader_change()
    {
        var meta = new JetStreamMetaGroup(3);
        meta.BecomeLeader();
        meta.IsLeader().ShouldBeTrue();

        meta.StepDown();

        meta.IsLeader().ShouldBeFalse();
    }
}

Step 2: Run test to verify it fails

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamLeadershipTests" -v normal

Expected: FAIL

Step 3: Implement leadership transition

In JetStreamMetaGroup.cs:

  • Add event Action<bool>? OnLeaderChange
  • ProcessLeaderChange(isLeader):
    • If stepping down: clear all inflight maps
    • If becoming leader: fire OnLeaderChange, initialize update subscriptions
  • Modify StepDown() to call ProcessLeaderChange(false)

Step 4: Run test to verify it passes

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamLeadershipTests" -v normal

Expected: PASS

Step 5: Commit

git add tests/NATS.Server.Tests/JetStreamLeadershipTests.cs src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs src/NATS.Server/JetStream/Cluster/JetStreamClusterMonitor.cs
git commit -m "feat(cluster): implement leadership transition with inflight cleanup"

Phase 2 Exit Gate Verification:

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MetaSnapshotCodec" -v normal
dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamClusterMonitor" -v normal
dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamAssignment" -v normal
dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamInflight" -v normal
dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamLeadership" -v normal

All Phase 2 tests must pass before proceeding to Phase 3.


Phase 3: Consumer + Stream Engines

Dependencies: Phase 1 (storage), Phase 2 (cluster coordination) Exit gate: Consumer delivery loop delivers messages with redelivery, pull requests expire and respect batch/maxBytes, purge with subject filtering works, mirror/source retry recovers from failures.


Task 14: RedeliveryTracker with PriorityQueue

Replace Dictionary-based redelivery tracking with min-heap for efficient deadline-based scheduling.

Files:

  • Modify: src/NATS.Server/JetStream/Consumers/RedeliveryTracker.cs
  • Test: tests/NATS.Server.Tests/RedeliveryTrackerTests.cs
  • Reference: golang/nats-server/server/consumer.go (rdq redelivery queue)

Step 1: Write the failing test

Create tests/NATS.Server.Tests/RedeliveryTrackerTests.cs:

using NATS.Server.JetStream.Consumers;

namespace NATS.Server.Tests;

public class RedeliveryTrackerTests
{
    [Fact]
    public void Schedule_and_get_due_returns_expired()
    {
        var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 1000);
        var past = DateTimeOffset.UtcNow.AddMilliseconds(-100);

        tracker.Schedule(1, past);
        tracker.Schedule(2, DateTimeOffset.UtcNow.AddSeconds(60)); // future

        var due = tracker.GetDue(DateTimeOffset.UtcNow).ToList();
        due.Count.ShouldBe(1);
        due[0].ShouldBe(1UL);
    }

    [Fact]
    public void Acknowledge_removes_from_queue()
    {
        var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 1000);
        tracker.Schedule(1, DateTimeOffset.UtcNow.AddMilliseconds(-100));

        tracker.Acknowledge(1);

        var due = tracker.GetDue(DateTimeOffset.UtcNow).ToList();
        due.ShouldBeEmpty();
    }

    [Fact]
    public void IsMaxDeliveries_returns_true_at_threshold()
    {
        var tracker = new RedeliveryTracker(maxDeliveries: 3, ackWaitMs: 1000);

        tracker.IncrementDeliveryCount(1);
        tracker.IncrementDeliveryCount(1);
        tracker.IsMaxDeliveries(1).ShouldBeFalse();

        tracker.IncrementDeliveryCount(1);
        tracker.IsMaxDeliveries(1).ShouldBeTrue();
    }

    [Fact]
    public void Backoff_schedule_uses_delivery_count()
    {
        var backoff = new long[] { 100, 500, 2000 };
        var tracker = new RedeliveryTracker(maxDeliveries: 10, ackWaitMs: 1000, backoffMs: backoff);

        // First redeliver: 100ms
        var delay1 = tracker.GetBackoffDelay(deliveryCount: 1);
        delay1.ShouldBe(100L);

        // Second: 500ms
        var delay2 = tracker.GetBackoffDelay(deliveryCount: 2);
        delay2.ShouldBe(500L);

        // Beyond schedule: use last value
        var delay4 = tracker.GetBackoffDelay(deliveryCount: 4);
        delay4.ShouldBe(2000L);
    }

    [Fact]
    public void GetDue_returns_in_deadline_order()
    {
        var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 1000);
        var now = DateTimeOffset.UtcNow;

        tracker.Schedule(3, now.AddMilliseconds(-300));
        tracker.Schedule(1, now.AddMilliseconds(-100));
        tracker.Schedule(2, now.AddMilliseconds(-200));

        var due = tracker.GetDue(now).ToList();
        due.Count.ShouldBe(3);
        due[0].ShouldBe(3UL); // earliest deadline first
        due[1].ShouldBe(2UL);
        due[2].ShouldBe(1UL);
    }
}

Step 2: Run test to verify it fails

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RedeliveryTrackerTests" -v normal

Expected: FAIL

Step 3: Rewrite RedeliveryTracker internals

In RedeliveryTracker.cs:

  • Replace Dictionary<ulong, RedeliveryEntry> with PriorityQueue<ulong, DateTimeOffset>
  • Add Dictionary<ulong, int> _deliveryCounts for per-sequence delivery count
  • Schedule(seq, deadline) — enqueue with deadline priority
  • GetDue(now) — dequeue all entries past deadline, return in order
  • Acknowledge(seq) — remove from queue and counts
  • IncrementDeliveryCount(seq) — increment counter
  • IsMaxDeliveries(seq) — check against _maxDeliveries
  • GetBackoffDelay(deliveryCount) — index into _backoffMs array, clamp to last
  • Constructor: (int maxDeliveries, long ackWaitMs, long[]? backoffMs = null)

Step 4: Run test to verify it passes

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RedeliveryTrackerTests" -v normal

Expected: PASS

Step 5: Commit

git add tests/NATS.Server.Tests/RedeliveryTrackerTests.cs src/NATS.Server/JetStream/Consumers/RedeliveryTracker.cs
git commit -m "feat(consumer): rewrite RedeliveryTracker with PriorityQueue min-heap"

Task 15: Ack/NAK Processing Enhancement

Add background ack subscription processing with NAK delay, TERM, and WPI support.

Files:

  • Modify: src/NATS.Server/JetStream/Consumers/AckProcessor.cs
  • Test: tests/NATS.Server.Tests/AckProcessorTests.cs
  • Reference: golang/nats-server/server/consumer.go:4854 (processInboundAcks)

Step 1: Write the failing test

Create tests/NATS.Server.Tests/AckProcessorTests.cs:

using NATS.Server.JetStream.Consumers;

namespace NATS.Server.Tests;

public class AckProcessorTests
{
    [Fact]
    public void ProcessAck_removes_from_pending()
    {
        var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 30000);
        var processor = new AckProcessor(tracker);

        processor.Register(1, "deliver.subj");
        processor.PendingCount.ShouldBe(1);

        processor.ProcessAck(1);
        processor.PendingCount.ShouldBe(0);
    }

    [Fact]
    public void ProcessNak_schedules_redelivery()
    {
        var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 30000);
        var processor = new AckProcessor(tracker);

        processor.Register(1, "deliver.subj");
        processor.ProcessNak(1, delayMs: 500);

        processor.PendingCount.ShouldBe(1); // still pending until redelivered
    }

    [Fact]
    public void ProcessNak_with_backoff_uses_schedule()
    {
        var backoff = new long[] { 100, 500, 2000 };
        var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 30000, backoffMs: backoff);
        var processor = new AckProcessor(tracker);

        processor.Register(1, "deliver.subj");
        processor.ProcessNak(1, delayMs: -1); // -1 means use backoff schedule

        // Should use first backoff value (100ms)
        processor.PendingCount.ShouldBe(1);
    }

    [Fact]
    public void ProcessTerm_removes_permanently()
    {
        var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 30000);
        var processor = new AckProcessor(tracker);

        processor.Register(1, "deliver.subj");
        processor.ProcessTerm(1);

        processor.PendingCount.ShouldBe(0);
        processor.TerminatedCount.ShouldBe(1);
    }

    [Fact]
    public void ProcessProgress_extends_ack_deadline()
    {
        var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 1000);
        var processor = new AckProcessor(tracker);

        processor.Register(1, "deliver.subj");
        var originalDeadline = processor.GetDeadline(1);

        processor.ProcessProgress(1);
        var newDeadline = processor.GetDeadline(1);

        newDeadline.ShouldBeGreaterThan(originalDeadline);
    }

    [Fact]
    public void MaxAckPending_blocks_new_registrations()
    {
        var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 30000);
        var processor = new AckProcessor(tracker, maxAckPending: 2);

        processor.Register(1, "d.1");
        processor.Register(2, "d.2");
        processor.CanRegister().ShouldBeFalse();
    }

    [Fact]
    public void ParseAckType_identifies_all_types()
    {
        AckProcessor.ParseAckType("+ACK"u8).ShouldBe(AckType.Ack);
        AckProcessor.ParseAckType("-NAK"u8).ShouldBe(AckType.Nak);
        AckProcessor.ParseAckType("+TERM"u8).ShouldBe(AckType.Term);
        AckProcessor.ParseAckType("+WPI"u8).ShouldBe(AckType.Progress);
    }
}

Step 2: Run test to verify it fails

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~AckProcessorTests" -v normal

Expected: FAIL

Step 3: Enhance AckProcessor

In AckProcessor.cs:

  • Add enum AckType { Ack, Nak, Term, Progress }
  • Add static AckType ParseAckType(ReadOnlySpan<byte> data) — parse +ACK, -NAK, +TERM, +WPI
  • Add int PendingCount property
  • Add int TerminatedCount property (via _terminated.Count)
  • Add bool CanRegister() — checks PendingCount < _maxAckPending
  • Add DateTimeOffset GetDeadline(ulong seq) — returns ack deadline for pending message
  • Enhance ProcessNak(seq, delayMs) — if delayMs == -1, use backoff schedule from tracker
  • Add ProcessProgress(seq) — extend deadline by _ackWaitMs
  • Constructor: add optional maxAckPending parameter

Step 4: Run test to verify it passes

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~AckProcessorTests" -v normal

Expected: PASS

Step 5: Commit

git add tests/NATS.Server.Tests/AckProcessorTests.cs src/NATS.Server/JetStream/Consumers/AckProcessor.cs
git commit -m "feat(consumer): enhance AckProcessor with NAK delay, TERM, WPI, and maxAckPending"

Task 16: Pull Request Pipeline

Implement waiting request queue with expiry, batch/maxBytes enforcement.

Files:

  • Create: src/NATS.Server/JetStream/Consumers/WaitingRequestQueue.cs
  • Modify: src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs
  • Test: tests/NATS.Server.Tests/WaitingRequestQueueTests.cs
  • Reference: golang/nats-server/server/consumer.go:4276-4450 (processNextMsgRequest)

Step 1: Write the failing test

Create tests/NATS.Server.Tests/WaitingRequestQueueTests.cs:

using NATS.Server.JetStream.Consumers;

namespace NATS.Server.Tests;

public class WaitingRequestQueueTests
{
    [Fact]
    public void Enqueue_and_dequeue_fifo()
    {
        var queue = new WaitingRequestQueue();
        queue.Enqueue(new PullRequest("reply.1", Batch: 10, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: false));
        queue.Enqueue(new PullRequest("reply.2", Batch: 5, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: false));

        queue.Count.ShouldBe(2);

        var first = queue.TryDequeue();
        first.ShouldNotBeNull();
        first.ReplyTo.ShouldBe("reply.1");
    }

    [Fact]
    public void Expired_requests_are_removed()
    {
        var queue = new WaitingRequestQueue();
        queue.Enqueue(new PullRequest("expired", Batch: 10, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMilliseconds(-100), NoWait: false));
        queue.Enqueue(new PullRequest("valid", Batch: 10, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: false));

        queue.RemoveExpired(DateTimeOffset.UtcNow);
        queue.Count.ShouldBe(1);

        var next = queue.TryDequeue();
        next!.ReplyTo.ShouldBe("valid");
    }

    [Fact]
    public void NoWait_request_returns_immediately_when_empty()
    {
        var queue = new WaitingRequestQueue();
        queue.Enqueue(new PullRequest("nowait", Batch: 10, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: true));

        var req = queue.TryDequeue();
        req.ShouldNotBeNull();
        req.NoWait.ShouldBeTrue();
    }

    [Fact]
    public void MaxBytes_tracks_accumulation()
    {
        var queue = new WaitingRequestQueue();
        var req = new PullRequest("mb", Batch: 100, MaxBytes: 1024, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: false);
        queue.Enqueue(req);

        var dequeued = queue.TryDequeue()!;
        dequeued.MaxBytes.ShouldBe(1024L);
        dequeued.RemainingBytes.ShouldBe(1024L);

        dequeued.ConsumeBytes(256);
        dequeued.RemainingBytes.ShouldBe(768L);
        dequeued.IsExhausted.ShouldBeFalse();

        dequeued.ConsumeBytes(800);
        dequeued.IsExhausted.ShouldBeTrue();
    }

    [Fact]
    public void Batch_decrements_on_delivery()
    {
        var queue = new WaitingRequestQueue();
        var req = new PullRequest("batch", Batch: 3, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: false);
        queue.Enqueue(req);

        var dequeued = queue.TryDequeue()!;
        dequeued.RemainingBatch.ShouldBe(3);

        dequeued.ConsumeBatch();
        dequeued.RemainingBatch.ShouldBe(2);

        dequeued.ConsumeBatch();
        dequeued.ConsumeBatch();
        dequeued.IsExhausted.ShouldBeTrue();
    }
}

Step 2: Run test to verify it fails

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~WaitingRequestQueueTests" -v normal

Expected: FAIL

Step 3: Implement WaitingRequestQueue

Create src/NATS.Server/JetStream/Consumers/WaitingRequestQueue.cs:

namespace NATS.Server.JetStream.Consumers;

public sealed record PullRequest(
    string ReplyTo,
    int Batch,
    long MaxBytes,
    DateTimeOffset Expires,
    bool NoWait,
    string? PinId = null)
{
    public int RemainingBatch { get; private set; } = Batch;
    public long RemainingBytes { get; private set; } = MaxBytes;
    public bool IsExhausted => RemainingBatch <= 0 || (MaxBytes > 0 && RemainingBytes <= 0);
    public void ConsumeBatch() => RemainingBatch--;
    public void ConsumeBytes(long bytes) => RemainingBytes -= bytes;
}

public sealed class WaitingRequestQueue
{
    private readonly LinkedList<PullRequest> _queue = new();

    public int Count => _queue.Count;
    public bool IsEmpty => _queue.Count == 0;

    public void Enqueue(PullRequest request) => _queue.AddLast(request);

    public PullRequest? TryDequeue()
    {
        if (_queue.Count == 0) return null;
        var first = _queue.First!.Value;
        _queue.RemoveFirst();
        return first;
    }

    public void RemoveExpired(DateTimeOffset now)
    {
        var node = _queue.First;
        while (node != null)
        {
            var next = node.Next;
            if (node.Value.Expires <= now)
                _queue.Remove(node);
            node = next;
        }
    }
}

Step 4: Run test to verify it passes

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~WaitingRequestQueueTests" -v normal

Expected: PASS

Step 5: Commit

git add src/NATS.Server/JetStream/Consumers/WaitingRequestQueue.cs tests/NATS.Server.Tests/WaitingRequestQueueTests.cs
git commit -m "feat(consumer): add WaitingRequestQueue with expiry and batch/maxBytes tracking"

Task 17: Consumer Pause/Resume

Add pause/resume state management with advisory events.

Files:

  • Modify: src/NATS.Server/JetStream/ConsumerManager.cs
  • Test: tests/NATS.Server.Tests/ConsumerPauseResumeTests.cs
  • Reference: golang/nats-server/server/consumer.go (pause/resume)

Step 1: Write the failing test

Create tests/NATS.Server.Tests/ConsumerPauseResumeTests.cs:

using NATS.Server.JetStream;

namespace NATS.Server.Tests;

public class ConsumerPauseResumeTests
{
    [Fact]
    public async Task Pause_stops_delivery()
    {
        var mgr = ConsumerManagerTestHelper.Create();
        var handle = mgr.CreateOrUpdate("test-stream", "test-consumer",
            new Models.ConsumerConfig { DeliverSubject = "deliver.test" });

        var until = DateTime.UtcNow.AddSeconds(5);
        mgr.Pause("test-consumer", until);

        mgr.IsPaused("test-consumer").ShouldBeTrue();
        mgr.GetPauseUntil("test-consumer").ShouldBe(until);
    }

    [Fact]
    public async Task Resume_restarts_delivery()
    {
        var mgr = ConsumerManagerTestHelper.Create();
        mgr.CreateOrUpdate("test-stream", "test-consumer",
            new Models.ConsumerConfig { DeliverSubject = "deliver.test" });

        mgr.Pause("test-consumer", DateTime.UtcNow.AddSeconds(5));
        mgr.Resume("test-consumer");

        mgr.IsPaused("test-consumer").ShouldBeFalse();
    }

    [Fact]
    public async Task Pause_auto_resumes_after_deadline()
    {
        var mgr = ConsumerManagerTestHelper.Create();
        mgr.CreateOrUpdate("test-stream", "test-consumer",
            new Models.ConsumerConfig { DeliverSubject = "deliver.test" });

        mgr.Pause("test-consumer", DateTime.UtcNow.AddMilliseconds(100));

        await Task.Delay(200);

        mgr.IsPaused("test-consumer").ShouldBeFalse();
    }
}

Step 2: Run test to verify it fails

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ConsumerPauseResumeTests" -v normal

Expected: FAIL

Step 3: Implement pause/resume in ConsumerManager

In ConsumerManager.cs:

  • Add _pauseUntil field to ConsumerHandle record
  • Add _pauseTimers dictionary for auto-resume timers
  • Pause(consumerName, until) — set deadline, stop delivery loop, start auto-resume timer
  • Resume(consumerName) — clear deadline, restart delivery loop, cancel timer
  • IsPaused(consumerName) — check if deadline is set and in the future
  • GetPauseUntil(consumerName) — return deadline
  • Auto-resume: Timer callback that calls Resume() when deadline passes

Create ConsumerManagerTestHelper static class for test setup.

Step 4: Run test to verify it passes

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ConsumerPauseResumeTests" -v normal

Expected: PASS

Step 5: Commit

git add tests/NATS.Server.Tests/ConsumerPauseResumeTests.cs src/NATS.Server/JetStream/ConsumerManager.cs
git commit -m "feat(consumer): add pause/resume with auto-resume timer"

Task 18: Priority Group Pinning

Add Pin ID generation, TTL timers, and Nats-Pin-Id header support.

Files:

  • Modify: src/NATS.Server/JetStream/Consumers/PriorityGroupManager.cs
  • Test: tests/NATS.Server.Tests/PriorityGroupPinningTests.cs
  • Reference: golang/nats-server/server/consumer.go (setPinnedTimer, assignNewPinId)

Step 1: Write the failing test

Create tests/NATS.Server.Tests/PriorityGroupPinningTests.cs:

using NATS.Server.JetStream.Consumers;

namespace NATS.Server.Tests;

public class PriorityGroupPinningTests
{
    [Fact]
    public void AssignPinId_generates_unique_ids()
    {
        var mgr = new PriorityGroupManager();
        mgr.Register("group-1", "consumer-a", priority: 0);

        var pin1 = mgr.AssignPinId("group-1", "consumer-a");
        var pin2 = mgr.AssignPinId("group-1", "consumer-a");

        pin1.ShouldNotBeNullOrEmpty();
        pin2.ShouldNotBeNullOrEmpty();
        pin1.ShouldNotBe(pin2); // each assignment is unique
    }

    [Fact]
    public void ValidatePinId_accepts_current()
    {
        var mgr = new PriorityGroupManager();
        mgr.Register("group-1", "consumer-a", priority: 0);

        var pin = mgr.AssignPinId("group-1", "consumer-a");
        mgr.ValidatePinId("group-1", pin).ShouldBeTrue();
    }

    [Fact]
    public void ValidatePinId_rejects_expired()
    {
        var mgr = new PriorityGroupManager();
        mgr.Register("group-1", "consumer-a", priority: 0);

        var pin1 = mgr.AssignPinId("group-1", "consumer-a");
        var pin2 = mgr.AssignPinId("group-1", "consumer-a"); // replaces pin1

        mgr.ValidatePinId("group-1", pin1).ShouldBeFalse();
        mgr.ValidatePinId("group-1", pin2).ShouldBeTrue();
    }

    [Fact]
    public void UnassignPinId_clears()
    {
        var mgr = new PriorityGroupManager();
        mgr.Register("group-1", "consumer-a", priority: 0);

        var pin = mgr.AssignPinId("group-1", "consumer-a");
        mgr.UnassignPinId("group-1");

        mgr.ValidatePinId("group-1", pin).ShouldBeFalse();
    }
}

Step 2: Run test to verify it fails

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~PriorityGroupPinningTests" -v normal

Expected: FAIL

Step 3: Implement pin management

In PriorityGroupManager.cs:

  • Add _pinIds dictionary: group → current pin ID
  • AssignPinId(group, consumer) — generate NUID string, store, return
  • ValidatePinId(group, pinId) — compare against current
  • UnassignPinId(group) — clear pin ID
  • Pin TTL: configurable timeout, auto-unassign after expiry
  • NUID generation: use Guid.NewGuid().ToString("N")[..22] for simplicity

Step 4: Run test to verify it passes

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~PriorityGroupPinningTests" -v normal

Expected: PASS

Step 5: Commit

git add tests/NATS.Server.Tests/PriorityGroupPinningTests.cs src/NATS.Server/JetStream/Consumers/PriorityGroupManager.cs
git commit -m "feat(consumer): add priority group pin ID management"

Task 19: Stream Purge with Filtering

Implement PurgeEx with subject filter, sequence range, and keep-N semantics.

Files:

  • Modify: src/NATS.Server/JetStream/StreamManager.cs
  • Test: tests/NATS.Server.Tests/StreamPurgeFilterTests.cs
  • Reference: golang/nats-server/server/stream.go (purgeEx)

Step 1: Write the failing test

Create tests/NATS.Server.Tests/StreamPurgeFilterTests.cs:

using NATS.Server.JetStream;

namespace NATS.Server.Tests;

public class StreamPurgeFilterTests
{
    [Fact]
    public async Task PurgeEx_subject_filter_only_removes_matching()
    {
        var mgr = await StreamManagerTestHelper.CreateWithMessagesAsync(
            ("a.1", "m1"), ("b.1", "m2"), ("a.2", "m3"), ("b.2", "m4"));

        var purged = mgr.PurgeEx("a.*", seq: 0, keep: 0);
        purged.ShouldBe(2UL);

        var state = await mgr.GetStateAsync(default);
        state.Messages.ShouldBe(2UL);
    }

    [Fact]
    public async Task PurgeEx_seq_range_removes_below()
    {
        var mgr = await StreamManagerTestHelper.CreateWithMessagesAsync(
            ("x.1", "m1"), ("x.2", "m2"), ("x.3", "m3"), ("x.4", "m4"));

        var purged = mgr.PurgeEx("", seq: 3, keep: 0);
        purged.ShouldBe(2UL); // seq 1, 2

        var state = await mgr.GetStateAsync(default);
        state.Messages.ShouldBe(2UL);
        state.FirstSeq.ShouldBe(3UL);
    }

    [Fact]
    public async Task PurgeEx_keep_retains_newest()
    {
        var mgr = await StreamManagerTestHelper.CreateWithMessagesAsync(
            ("k.1", "m1"), ("k.2", "m2"), ("k.3", "m3"), ("k.4", "m4"), ("k.5", "m5"));

        var purged = mgr.PurgeEx("", seq: 0, keep: 2);
        purged.ShouldBe(3UL); // keep last 2

        var state = await mgr.GetStateAsync(default);
        state.Messages.ShouldBe(2UL);
        state.FirstSeq.ShouldBe(4UL);
    }

    [Fact]
    public async Task PurgeEx_subject_and_keep_combined()
    {
        var mgr = await StreamManagerTestHelper.CreateWithMessagesAsync(
            ("j.1", "m1"), ("j.2", "m2"), ("other", "m3"), ("j.3", "m4"), ("j.4", "m5"));

        // Purge j.* but keep 1
        var purged = mgr.PurgeEx("j.*", seq: 0, keep: 1);
        purged.ShouldBe(3UL); // j.1, j.2, j.3 purged; j.4 kept

        var state = await mgr.GetStateAsync(default);
        state.Messages.ShouldBe(2UL); // "other" + j.4
    }
}

Step 2: Run test to verify it fails

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~StreamPurgeFilterTests" -v normal

Expected: FAIL

Step 3: Implement PurgeEx in StreamManager

In StreamManager.cs, enhance PurgeEx:

  • If subject is set: iterate messages, match using SubjectMatch.IsMatch(), remove matching
  • If seq > 0: remove all messages with sequence < seq
  • If keep > 0: retain last N messages (on subject if specified, globally otherwise)
  • Combined: apply subject filter first, then keep-N among the filtered set
  • Return count of purged messages

Create StreamManagerTestHelper for test setup.

Step 4: Run test to verify it passes

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~StreamPurgeFilterTests" -v normal

Expected: PASS

Step 5: Commit

git add tests/NATS.Server.Tests/StreamPurgeFilterTests.cs src/NATS.Server/JetStream/StreamManager.cs
git commit -m "feat(stream): implement PurgeEx with subject filter, seq range, and keep-N"

Task 20: Interest Retention Policy

Track per-consumer interest and remove messages when all consumers have acknowledged.

Files:

  • Create: src/NATS.Server/JetStream/InterestRetentionPolicy.cs
  • Test: tests/NATS.Server.Tests/InterestRetentionTests.cs
  • Reference: golang/nats-server/server/stream.go (checkInterestState, noInterest)

Step 1: Write the failing test

Create tests/NATS.Server.Tests/InterestRetentionTests.cs:

using NATS.Server.JetStream;

namespace NATS.Server.Tests;

public class InterestRetentionTests
{
    [Fact]
    public void ShouldRetain_true_when_consumers_have_not_acked()
    {
        var policy = new InterestRetentionPolicy();
        policy.RegisterInterest("consumer-A", "orders.>");
        policy.RegisterInterest("consumer-B", "orders.>");

        policy.ShouldRetain(1, "orders.new").ShouldBeTrue();
    }

    [Fact]
    public void ShouldRetain_false_when_all_consumers_acked()
    {
        var policy = new InterestRetentionPolicy();
        policy.RegisterInterest("consumer-A", "orders.>");
        policy.RegisterInterest("consumer-B", "orders.>");

        policy.AcknowledgeDelivery("consumer-A", 1);
        policy.ShouldRetain(1, "orders.new").ShouldBeTrue(); // B hasn't acked

        policy.AcknowledgeDelivery("consumer-B", 1);
        policy.ShouldRetain(1, "orders.new").ShouldBeFalse(); // both acked
    }

    [Fact]
    public void ShouldRetain_ignores_consumers_without_interest()
    {
        var policy = new InterestRetentionPolicy();
        policy.RegisterInterest("consumer-A", "orders.>");
        policy.RegisterInterest("consumer-B", "billing.>"); // no interest in orders

        policy.AcknowledgeDelivery("consumer-A", 1);
        policy.ShouldRetain(1, "orders.new").ShouldBeFalse(); // B has no interest
    }

    [Fact]
    public void UnregisterInterest_removes_consumer()
    {
        var policy = new InterestRetentionPolicy();
        policy.RegisterInterest("consumer-A", "x.>");
        policy.RegisterInterest("consumer-B", "x.>");

        policy.UnregisterInterest("consumer-B");

        // Only A needs to ack
        policy.AcknowledgeDelivery("consumer-A", 1);
        policy.ShouldRetain(1, "x.y").ShouldBeFalse();
    }
}

Step 2: Run test to verify it fails

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~InterestRetentionTests" -v normal

Expected: FAIL

Step 3: Implement InterestRetentionPolicy

Create src/NATS.Server/JetStream/InterestRetentionPolicy.cs:

namespace NATS.Server.JetStream;

/// <summary>
/// Tracks per-consumer interest and determines when messages can be removed
/// under Interest retention policy.
/// Go reference: stream.go checkInterestState/noInterest.
/// </summary>
public sealed class InterestRetentionPolicy
{
    private readonly Dictionary<string, string> _interests = new(); // consumer → filter subject
    private readonly Dictionary<ulong, HashSet<string>> _acks = new(); // seq → consumers that acked

    public void RegisterInterest(string consumer, string filterSubject) { ... }
    public void UnregisterInterest(string consumer) { ... }
    public void AcknowledgeDelivery(string consumer, ulong seq) { ... }
    public bool ShouldRetain(ulong seq, string msgSubject) { ... }
}

Step 4: Run test to verify it passes

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~InterestRetentionTests" -v normal

Expected: PASS

Step 5: Commit

git add src/NATS.Server/JetStream/InterestRetentionPolicy.cs tests/NATS.Server.Tests/InterestRetentionTests.cs
git commit -m "feat(stream): add InterestRetentionPolicy for per-consumer ack tracking"

Task 21: Mirror/Source Retry Enhancement

Add exponential backoff retry, gap detection, and error state tracking to MirrorCoordinator and SourceCoordinator.

Files:

  • Modify: src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs
  • Modify: src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs
  • Test: tests/NATS.Server.Tests/MirrorSourceRetryTests.cs
  • Reference: golang/nats-server/server/stream.go (setupMirrorConsumer, retrySourceConsumerAtSeq)

Step 1: Write the failing test

Create tests/NATS.Server.Tests/MirrorSourceRetryTests.cs:

using NATS.Server.JetStream.MirrorSource;

namespace NATS.Server.Tests;

public class MirrorSourceRetryTests
{
    [Fact]
    public void Mirror_retry_uses_exponential_backoff()
    {
        var mirror = MirrorCoordinatorTestHelper.Create();

        mirror.RecordFailure();
        var delay1 = mirror.GetRetryDelay();
        delay1.ShouldBeGreaterThanOrEqualTo(TimeSpan.FromMilliseconds(250)); // initial

        mirror.RecordFailure();
        var delay2 = mirror.GetRetryDelay();
        delay2.ShouldBeGreaterThan(delay1); // exponential growth

        // Cap at max
        for (int i = 0; i < 20; i++) mirror.RecordFailure();
        var delayMax = mirror.GetRetryDelay();
        delayMax.ShouldBeLessThanOrEqualTo(TimeSpan.FromSeconds(30));
    }

    [Fact]
    public void Mirror_success_resets_backoff()
    {
        var mirror = MirrorCoordinatorTestHelper.Create();

        for (int i = 0; i < 5; i++) mirror.RecordFailure();
        mirror.RecordSuccess();

        var delay = mirror.GetRetryDelay();
        delay.ShouldBe(TimeSpan.FromMilliseconds(250)); // reset to initial
    }

    [Fact]
    public void Mirror_tracks_sequence_gap()
    {
        var mirror = MirrorCoordinatorTestHelper.Create();

        mirror.RecordSourceSeq(1);
        mirror.RecordSourceSeq(2);
        mirror.RecordSourceSeq(5); // gap: 3, 4 missing

        mirror.HasGap.ShouldBeTrue();
        mirror.GapStart.ShouldBe(3UL);
        mirror.GapEnd.ShouldBe(4UL);
    }

    [Fact]
    public void Mirror_tracks_error_state()
    {
        var mirror = MirrorCoordinatorTestHelper.Create();

        mirror.SetError("connection refused");
        mirror.HasError.ShouldBeTrue();
        mirror.ErrorMessage.ShouldBe("connection refused");

        mirror.ClearError();
        mirror.HasError.ShouldBeFalse();
    }

    [Fact]
    public void Source_dedup_window_prunes_expired()
    {
        var source = SourceCoordinatorTestHelper.Create();

        source.RecordMsgId("msg-1");
        source.RecordMsgId("msg-2");

        source.IsDuplicate("msg-1").ShouldBeTrue();
        source.IsDuplicate("msg-3").ShouldBeFalse();

        // Simulate time passing beyond dedup window
        source.PruneDedupWindow(DateTimeOffset.UtcNow.AddMinutes(5));
        source.IsDuplicate("msg-1").ShouldBeFalse();
    }
}

Step 2: Run test to verify it fails

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MirrorSourceRetryTests" -v normal

Expected: FAIL

Step 3: Implement retry and gap detection

In MirrorCoordinator.cs:

  • Add RecordFailure() — increment _consecutiveFailures
  • Add RecordSuccess() — reset _consecutiveFailures to 0
  • Add GetRetryDelay()min(InitialRetryDelay * 2^failures, MaxRetryDelay) with jitter
  • Add RecordSourceSeq(seq) — track expected vs actual, detect gaps
  • Add HasGap, GapStart, GapEnd properties
  • Add SetError(msg), ClearError(), HasError, ErrorMessage

In SourceCoordinator.cs:

  • Add PruneDedupWindow(DateTimeOffset cutoff) — remove entries older than cutoff
  • Ensure IsDuplicate, RecordMsgId work with time-based window

Create test helpers: MirrorCoordinatorTestHelper, SourceCoordinatorTestHelper.

Step 4: Run test to verify it passes

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MirrorSourceRetryTests" -v normal

Expected: PASS

Step 5: Commit

git add tests/NATS.Server.Tests/MirrorSourceRetryTests.cs src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs
git commit -m "feat(mirror): add exponential backoff retry, gap detection, and error tracking"

Phase 3 Exit Gate Verification:

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RedeliveryTracker" -v normal
dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~AckProcessor" -v normal
dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~WaitingRequestQueue" -v normal
dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ConsumerPauseResume" -v normal
dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~PriorityGroupPinning" -v normal
dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~StreamPurgeFilter" -v normal
dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~InterestRetention" -v normal
dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MirrorSourceRetry" -v normal

All Phase 3 tests must pass before proceeding to Phase 4.


Phase 4: Client Performance, MQTT, Config, Networking

Dependencies: Phase 1 (storage for MQTT), Phase 3 (consumer for MQTT) Exit gate: Client flush coalescing measurably reduces syscalls under load, MQTT sessions survive server restart, SIGHUP triggers config reload, implicit routes auto-discover.


Task 22: Client Flush Coalescing

Reduce syscalls by coalescing multiple flush signals into a single write.

Files:

  • Modify: src/NATS.Server/NatsClient.cs
  • Test: tests/NATS.Server.Tests/FlushCoalescingTests.cs
  • Reference: golang/nats-server/server/client.go (fsp, maxFlushPending, pcd)

Step 1: Write the failing test

Create tests/NATS.Server.Tests/FlushCoalescingTests.cs:

namespace NATS.Server.Tests;

public class FlushCoalescingTests
{
    [Fact]
    public async Task Flush_coalescing_reduces_flush_count()
    {
        // Start server with flush coalescing enabled
        var port = TestHelpers.GetFreePort();
        await using var server = new NatsServer(new NatsOptions { Port = port });
        await server.StartAsync();

        using var socket = new System.Net.Sockets.Socket(
            System.Net.Sockets.AddressFamily.InterNetwork,
            System.Net.Sockets.SocketType.Stream,
            System.Net.Sockets.ProtocolType.Tcp);
        await socket.ConnectAsync(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, port));
        var stream = new System.Net.Sockets.NetworkStream(socket);

        // Read INFO + send CONNECT
        var buf = new byte[4096];
        await stream.ReadAsync(buf);
        await stream.WriteAsync("CONNECT {}\r\n"u8.ToArray());
        await stream.WriteAsync("SUB test 1\r\n"u8.ToArray());

        // Rapidly publish multiple messages — coalescing should batch flushes
        for (int i = 0; i < 100; i++)
            await stream.WriteAsync($"PUB test 5\r\nhello\r\n"u8.ToArray());

        await Task.Delay(100);

        // Verify messages received (coalescing should not drop any)
        var received = 0;
        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
        try
        {
            while (!cts.IsCancellationRequested)
            {
                var n = await stream.ReadAsync(buf, cts.Token);
                if (n == 0) break;
                var text = System.Text.Encoding.ASCII.GetString(buf, 0, n);
                received += text.Split("MSG ").Length - 1;
                if (received >= 100) break;
            }
        }
        catch (OperationCanceledException) { }

        received.ShouldBe(100);
    }

    [Fact]
    public void MaxFlushPending_defaults_to_10()
    {
        // Verify the constant exists
        NatsClient.MaxFlushPending.ShouldBe(10);
    }
}

Step 2: Run test to verify it fails

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FlushCoalescingTests" -v normal

Expected: FAIL — MaxFlushPending constant doesn't exist

Step 3: Implement flush coalescing

In NatsClient.cs:

  • Add public const int MaxFlushPending = 10;
  • Add field int _flushSignalsPending
  • Add field SemaphoreSlim _flushSignal = new(0) for write loop coordination
  • Modify QueueOutbound(): after channel write, Interlocked.Increment(ref _flushSignalsPending), release semaphore
  • Modify RunWriteLoopAsync(): after draining channel, if _flushSignalsPending < MaxFlushPending, wait briefly on semaphore to coalesce more signals before flushing

Step 4: Run test to verify it passes

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FlushCoalescingTests" -v normal

Expected: PASS

Step 5: Commit

git add tests/NATS.Server.Tests/FlushCoalescingTests.cs src/NATS.Server/NatsClient.cs
git commit -m "feat(client): add flush coalescing to reduce write syscalls"

Task 23: Client Stall Gate

Block producers when client outbound buffer is near capacity.

Files:

  • Modify: src/NATS.Server/NatsClient.cs
  • Test: tests/NATS.Server.Tests/StallGateTests.cs
  • Reference: golang/nats-server/server/client.go (stc channel, stall gate)

Step 1: Write the failing test

Create tests/NATS.Server.Tests/StallGateTests.cs:

namespace NATS.Server.Tests;

public class StallGateTests
{
    [Fact]
    public void Stall_gate_activates_at_threshold()
    {
        // Unit test against the stall gate logic directly
        var gate = new NatsClient.StallGate(maxPending: 1000);

        gate.IsStalled.ShouldBeFalse();

        gate.UpdatePending(750); // 75% = threshold
        gate.IsStalled.ShouldBeTrue();

        gate.UpdatePending(500); // below threshold
        gate.IsStalled.ShouldBeFalse();
    }

    [Fact]
    public async Task Stall_gate_blocks_producer()
    {
        var gate = new NatsClient.StallGate(maxPending: 100);
        gate.UpdatePending(80); // stalled

        var blocked = true;
        var task = Task.Run(async () =>
        {
            await gate.WaitAsync(TimeSpan.FromSeconds(1));
            blocked = false;
        });

        await Task.Delay(50);
        blocked.ShouldBeTrue(); // still blocked

        gate.UpdatePending(50); // release
        gate.Release();

        await task;
        blocked.ShouldBeFalse();
    }

    [Fact]
    public async Task Stall_gate_timeout_closes_client()
    {
        var gate = new NatsClient.StallGate(maxPending: 100);
        gate.UpdatePending(80);

        var timedOut = await gate.WaitAsync(TimeSpan.FromMilliseconds(50));
        timedOut.ShouldBeFalse(); // timed out, not released
    }
}

Step 2: Run test to verify it fails

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~StallGateTests" -v normal

Expected: FAIL

Step 3: Implement StallGate

In NatsClient.cs, add nested class:

public sealed class StallGate
{
    private readonly long _threshold;
    private SemaphoreSlim? _semaphore;

    public StallGate(long maxPending) => _threshold = maxPending * 3 / 4;
    public bool IsStalled => _semaphore != null;

    public void UpdatePending(long pending)
    {
        if (pending >= _threshold && _semaphore == null)
            _semaphore = new SemaphoreSlim(0, 1);
        else if (pending < _threshold && _semaphore != null)
            Release();
    }

    public async Task<bool> WaitAsync(TimeSpan timeout)
    {
        if (_semaphore == null) return true;
        return await _semaphore.WaitAsync(timeout);
    }

    public void Release()
    {
        _semaphore?.Release();
        _semaphore = null;
    }
}

Wire into QueueOutbound() and RunWriteLoopAsync().

Step 4: Run test to verify it passes

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~StallGateTests" -v normal

Expected: PASS

Step 5: Commit

git add tests/NATS.Server.Tests/StallGateTests.cs src/NATS.Server/NatsClient.cs
git commit -m "feat(client): add stall gate backpressure for slow consumers"

Task 24: Write Timeout Recovery

Handle partial flush recovery with per-client-kind policies.

Files:

  • Modify: src/NATS.Server/NatsClient.cs
  • Test: tests/NATS.Server.Tests/WriteTimeoutTests.cs
  • Reference: golang/nats-server/server/client.go (write timeout handling)

Step 1: Write the failing test

Create tests/NATS.Server.Tests/WriteTimeoutTests.cs:

namespace NATS.Server.Tests;

public class WriteTimeoutTests
{
    [Fact]
    public void WriteTimeoutPolicy_defaults_by_kind()
    {
        NatsClient.GetWriteTimeoutPolicy(ClientKind.Client).ShouldBe(WriteTimeoutPolicy.Close);
        NatsClient.GetWriteTimeoutPolicy(ClientKind.Router).ShouldBe(WriteTimeoutPolicy.TcpFlush);
        NatsClient.GetWriteTimeoutPolicy(ClientKind.Gateway).ShouldBe(WriteTimeoutPolicy.TcpFlush);
        NatsClient.GetWriteTimeoutPolicy(ClientKind.Leaf).ShouldBe(WriteTimeoutPolicy.TcpFlush);
    }

    [Fact]
    public void PartialFlushResult_tracks_bytes()
    {
        var result = new NatsClient.FlushResult(bytesAttempted: 1024, bytesWritten: 512);
        result.IsPartial.ShouldBeTrue();
        result.BytesRemaining.ShouldBe(512L);
    }

    [Fact]
    public void PartialFlushResult_complete_is_not_partial()
    {
        var result = new NatsClient.FlushResult(bytesAttempted: 1024, bytesWritten: 1024);
        result.IsPartial.ShouldBeFalse();
        result.BytesRemaining.ShouldBe(0L);
    }
}

Step 2: Run test to verify it fails

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~WriteTimeoutTests" -v normal

Expected: FAIL

Step 3: Implement write timeout recovery

In NatsClient.cs:

  • Add enum WriteTimeoutPolicy { Close, TcpFlush }
  • Add static WriteTimeoutPolicy GetWriteTimeoutPolicy(ClientKind kind) — CLIENT→Close, others→TcpFlush
  • Add record FlushResult(long BytesAttempted, long BytesWritten) with IsPartial and BytesRemaining
  • Modify RunWriteLoopAsync() write timeout handling:
    • On timeout with bytesWritten > 0 (partial): if policy is TcpFlush, mark slow consumer, continue
    • On timeout with bytesWritten == 0: close connection
    • CLIENT kind always closes on timeout

Step 4: Run test to verify it passes

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~WriteTimeoutTests" -v normal

Expected: PASS

Step 5: Commit

git add tests/NATS.Server.Tests/WriteTimeoutTests.cs src/NATS.Server/NatsClient.cs
git commit -m "feat(client): add write timeout recovery with per-kind policies"

Task 25: MQTT JetStream Persistence

Back MQTT session and retained stores with JetStream streams.

Files:

  • Modify: src/NATS.Server/Mqtt/MqttSessionStore.cs
  • Modify: src/NATS.Server/Mqtt/MqttRetainedStore.cs
  • Test: tests/NATS.Server.Tests/MqttPersistenceTests.cs
  • Reference: golang/nats-server/server/mqtt.go ($MQTT_msgs, $MQTT_sess, $MQTT_rmsgs)

Step 1: Write the failing test

Create tests/NATS.Server.Tests/MqttPersistenceTests.cs:

using NATS.Server.Mqtt;

namespace NATS.Server.Tests;

public class MqttPersistenceTests
{
    [Fact]
    public async Task Session_persists_across_restart()
    {
        var store = MqttSessionStoreTestHelper.CreateWithJetStream();

        await store.ConnectAsync("client-1", cleanSession: false);
        await store.AddSubscriptionAsync("client-1", "topic/test", qos: 1);
        await store.SaveSessionAsync("client-1");

        // Simulate restart
        var recovered = MqttSessionStoreTestHelper.CreateWithJetStream(store.BackingStore);
        await recovered.ConnectAsync("client-1", cleanSession: false);

        var subs = recovered.GetSubscriptions("client-1");
        subs.ShouldContainKey("topic/test");
    }

    [Fact]
    public async Task Clean_session_deletes_existing()
    {
        var store = MqttSessionStoreTestHelper.CreateWithJetStream();

        await store.ConnectAsync("client-2", cleanSession: false);
        await store.AddSubscriptionAsync("client-2", "persist/me", qos: 1);
        await store.SaveSessionAsync("client-2");

        // Reconnect with clean session
        await store.ConnectAsync("client-2", cleanSession: true);

        var subs = store.GetSubscriptions("client-2");
        subs.ShouldBeEmpty();
    }

    [Fact]
    public async Task Retained_message_survives_restart()
    {
        var retained = MqttRetainedStoreTestHelper.CreateWithJetStream();

        await retained.SetRetainedAsync("sensors/temp", "72.5"u8.ToArray());

        // Simulate restart
        var recovered = MqttRetainedStoreTestHelper.CreateWithJetStream(retained.BackingStore);
        var msg = await recovered.GetRetainedAsync("sensors/temp");

        msg.ShouldNotBeNull();
        System.Text.Encoding.UTF8.GetString(msg).ShouldBe("72.5");
    }

    [Fact]
    public async Task Retained_message_cleared_with_empty_payload()
    {
        var retained = MqttRetainedStoreTestHelper.CreateWithJetStream();

        await retained.SetRetainedAsync("sensors/temp", "72.5"u8.ToArray());
        await retained.SetRetainedAsync("sensors/temp", ReadOnlyMemory<byte>.Empty); // clear

        var msg = await retained.GetRetainedAsync("sensors/temp");
        msg.ShouldBeNull();
    }
}

Step 2: Run test to verify it fails

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MqttPersistenceTests" -v normal

Expected: FAIL

Step 3: Implement JetStream backing for MQTT stores

In MqttSessionStore.cs:

  • Add IStreamStore? _backingStore field for JetStream persistence
  • ConnectAsync(clientId, cleanSession):
    • If cleanSession=false: query backing store for existing session state
    • If cleanSession=true: delete existing session data from backing store
  • SaveSessionAsync(clientId) — serialize session state, store in backing store under $MQTT.sess.{clientId}
  • BackingStore property for test access

In MqttRetainedStore.cs:

  • Add IStreamStore? _backingStore field
  • SetRetainedAsync(topic, payload) — store in backing store under $MQTT.rmsgs.{topic}
  • GetRetainedAsync(topic) — load from backing store
  • If payload is empty, remove the retained message

Create test helpers with in-memory backing stores.

Step 4: Run test to verify it passes

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MqttPersistenceTests" -v normal

Expected: PASS

Step 5: Commit

git add tests/NATS.Server.Tests/MqttPersistenceTests.cs src/NATS.Server/Mqtt/MqttSessionStore.cs src/NATS.Server/Mqtt/MqttRetainedStore.cs
git commit -m "feat(mqtt): add JetStream-backed session and retained message persistence"

Task 26: SIGHUP Config Reload

Add Unix signal handler for config hot-reload.

Files:

  • Create: src/NATS.Server/Configuration/SignalHandler.cs
  • Modify: src/NATS.Server/Configuration/ConfigReloader.cs
  • Modify: src/NATS.Server.Host/Program.cs
  • Test: tests/NATS.Server.Tests/SignalHandlerTests.cs
  • Reference: golang/nats-server/server/opts.go, golang/nats-server/server/reload.go

Step 1: Write the failing test

Create tests/NATS.Server.Tests/SignalHandlerTests.cs:

using NATS.Server.Configuration;

namespace NATS.Server.Tests;

public class SignalHandlerTests
{
    [Fact]
    public void SignalHandler_registers_without_throwing()
    {
        var reloader = new ConfigReloader();
        // Registration should not throw even without a real server
        Should.NotThrow(() => SignalHandler.Register(reloader, null!));
    }

    [Fact]
    public async Task ConfigReloader_ReloadAsync_applies_reloadable_changes()
    {
        var reloader = new ConfigReloader();
        var original = new NatsOptions { Port = 4222 };
        var updated = new NatsOptions { Port = 4222 }; // port can't change

        var result = await reloader.ReloadFromOptionsAsync(original, updated);
        result.Success.ShouldBeTrue();
        result.RejectedChanges.ShouldBeEmpty();
    }

    [Fact]
    public async Task ConfigReloader_rejects_non_reloadable_changes()
    {
        var reloader = new ConfigReloader();
        var original = new NatsOptions { Port = 4222 };
        var updated = new NatsOptions { Port = 5555 }; // port change is NOT reloadable

        var result = await reloader.ReloadFromOptionsAsync(original, updated);
        result.RejectedChanges.ShouldContain(c => c.Contains("Port"));
    }
}

Step 2: Run test to verify it fails

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~SignalHandlerTests" -v normal

Expected: FAIL — SignalHandler doesn't exist

Step 3: Implement SignalHandler and ReloadFromOptionsAsync

Create src/NATS.Server/Configuration/SignalHandler.cs:

using System.Runtime.InteropServices;

namespace NATS.Server.Configuration;

/// <summary>
/// Registers POSIX signal handlers for config reload.
/// Go reference: server/signal_unix.go, opts.go reload logic.
/// </summary>
public static class SignalHandler
{
    public static void Register(ConfigReloader reloader, NatsServer server)
    {
        PosixSignalRegistration.Create(PosixSignal.SIGHUP, ctx =>
        {
            _ = reloader.ReloadAsync(server);
        });
    }
}

In ConfigReloader.cs, add:

  • ReloadFromOptionsAsync(original, updated) — compare options, apply reloadable, reject non-reloadable
  • Reloadable: auth, TLS, logging, limits
  • Non-reloadable: port, host, cluster port, store dir
  • Return ReloadResult { bool Success, List<string> RejectedChanges }

Step 4: Run test to verify it passes

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~SignalHandlerTests" -v normal

Expected: PASS

Step 5: Commit

git add src/NATS.Server/Configuration/SignalHandler.cs tests/NATS.Server.Tests/SignalHandlerTests.cs src/NATS.Server/Configuration/ConfigReloader.cs
git commit -m "feat(config): add SIGHUP signal handler and config reload validation"

Task 27: Implicit Route/Gateway Discovery

Auto-discover cluster peers from INFO gossip.

Files:

  • Modify: src/NATS.Server/Routes/RouteManager.cs
  • Modify: src/NATS.Server/Gateways/GatewayManager.cs
  • Test: tests/NATS.Server.Tests/ImplicitDiscoveryTests.cs
  • Reference: golang/nats-server/server/route.go (processImplicitRoute), gateway.go (processImplicitGateway)

Step 1: Write the failing test

Create tests/NATS.Server.Tests/ImplicitDiscoveryTests.cs:

namespace NATS.Server.Tests;

public class ImplicitDiscoveryTests
{
    [Fact]
    public void ProcessImplicitRoute_discovers_new_peer()
    {
        var mgr = RouteManagerTestHelper.Create();
        var serverInfo = new ServerInfo
        {
            ServerId = "server-2",
            ConnectUrls = ["nats://10.0.0.2:6222", "nats://10.0.0.3:6222"],
        };

        mgr.ProcessImplicitRoute(serverInfo);

        mgr.DiscoveredRoutes.ShouldContain("nats://10.0.0.2:6222");
        mgr.DiscoveredRoutes.ShouldContain("nats://10.0.0.3:6222");
    }

    [Fact]
    public void ProcessImplicitRoute_skips_known_peers()
    {
        var mgr = RouteManagerTestHelper.Create(knownRoutes: ["nats://10.0.0.2:6222"]);
        var serverInfo = new ServerInfo
        {
            ServerId = "server-2",
            ConnectUrls = ["nats://10.0.0.2:6222", "nats://10.0.0.3:6222"],
        };

        mgr.ProcessImplicitRoute(serverInfo);

        mgr.DiscoveredRoutes.Count.ShouldBe(1); // only 10.0.0.3 is new
        mgr.DiscoveredRoutes.ShouldContain("nats://10.0.0.3:6222");
    }

    [Fact]
    public void ProcessImplicitGateway_discovers_new_gateway()
    {
        var mgr = GatewayManagerTestHelper.Create();
        var gwInfo = new GatewayInfo
        {
            Name = "cluster-B",
            Urls = ["nats://10.0.1.1:7222"],
        };

        mgr.ProcessImplicitGateway(gwInfo);

        mgr.DiscoveredGateways.ShouldContain("cluster-B");
    }

    [Fact]
    public void ForwardNewRouteInfo_updates_known_servers()
    {
        var mgr = RouteManagerTestHelper.Create();
        var forwarded = new List<string>();
        mgr.OnForwardInfo += (urls) => forwarded.AddRange(urls);

        mgr.ForwardNewRouteInfoToKnownServers("nats://10.0.0.5:6222");

        forwarded.ShouldContain("nats://10.0.0.5:6222");
    }
}

Step 2: Run test to verify it fails

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ImplicitDiscoveryTests" -v normal

Expected: FAIL

Step 3: Implement implicit discovery

In RouteManager.cs:

  • Add HashSet<string> DiscoveredRoutes — tracks auto-discovered route URLs
  • ProcessImplicitRoute(serverInfo):
    • Extract connect_urls from INFO
    • For each unknown URL: add to DiscoveredRoutes, initiate solicited connection
  • ForwardNewRouteInfoToKnownServers(newPeerUrl):
    • Send updated INFO containing new peer to all existing route connections
  • Add event Action<List<string>>? OnForwardInfo for testing

In GatewayManager.cs:

  • Add HashSet<string> DiscoveredGateways — tracks auto-discovered gateway names
  • ProcessImplicitGateway(gwInfo):
    • Add gateway name + URLs to discovered set
    • Create outbound gateway connection

Create test helpers: RouteManagerTestHelper, GatewayManagerTestHelper.

Step 4: Run test to verify it passes

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ImplicitDiscoveryTests" -v normal

Expected: PASS

Step 5: Commit

git add tests/NATS.Server.Tests/ImplicitDiscoveryTests.cs src/NATS.Server/Routes/RouteManager.cs src/NATS.Server/Gateways/GatewayManager.cs
git commit -m "feat(cluster): add implicit route and gateway discovery from INFO gossip"

Phase 4 Exit Gate Verification:

dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~FlushCoalescing" -v normal
dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~StallGate" -v normal
dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~WriteTimeout" -v normal
dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MqttPersistence" -v normal
dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~SignalHandler" -v normal
dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ImplicitDiscovery" -v normal

All Phase 4 tests must pass.


Final Verification

After all 4 phases are complete, run the full test suite:

dotnet test -v normal

All existing tests must continue to pass. No regressions.

Update parity DB with all newly mapped tests:

sqlite3 docs/test_parity.db "SELECT status, COUNT(*) FROM go_tests GROUP BY status;"

Task Summary

Task Phase Component Gap
1 1 MsgBlock encryption Gap 1.2
2 1 MsgBlock compression Gap 1.3
3 1 Block rotation & lifecycle Gap 1.1
4 1 Crash recovery enhancement Gap 1.4
5 1 IStreamStore core methods Gap 1.9
6 1 IStreamStore query methods Gap 1.9
7 1 RAFT binary WAL Gap 8.1
8 1 RAFT joint consensus Gap 8.2
9 2 Meta snapshot codec Gap 2.6
10 2 Cluster monitoring loop Gap 2.1
11 2 Assignment processing Gap 2.2
12 2 Inflight tracking Gap 2.3
13 2 Leadership transitions Gap 2.5
14 3 RedeliveryTracker rewrite Gap 3.4
15 3 Ack/NAK processing Gap 3.3
16 3 Pull request pipeline Gap 3.2
17 3 Consumer pause/resume Gap 3.7
18 3 Priority group pinning Gap 3.6
19 3 Stream purge filtering Gap 4.5
20 3 Interest retention Gap 4.6
21 3 Mirror/source retry Gap 4.1-4.3
22 4 Flush coalescing Gap 5.1
23 4 Stall gate Gap 5.2
24 4 Write timeout recovery Gap 5.3
25 4 MQTT persistence Gap 6.1
26 4 SIGHUP config reload Gap 14.1
27 4 Implicit discovery Gap 11.1, 13.1