diff --git a/docs/plans/2026-02-24-structuregaps-full-parity-plan.md b/docs/plans/2026-02-24-structuregaps-full-parity-plan.md new file mode 100644 index 0000000..84992ab --- /dev/null +++ b/docs/plans/2026-02-24-structuregaps-full-parity-plan.md @@ -0,0 +1,1268 @@ +# Full Go Parity: All 15 Structure Gaps — Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans to implement this plan task-by-task. + +**Goal:** Port all missing functionality from `docs/structuregaps.md` (15 gaps) from Go NATS server to .NET, update `test_parity.db` for each ported test, target ~1,194 additional mapped Go tests (29% → ~70%). + +**Architecture:** 5 parallel tracks (A/D/E independent, C depends on A, B depends on A+C). Feature-first TDD: write failing test, implement minimum code, verify, commit. Batch DB updates at end of each sub-phase. + +**Tech Stack:** .NET 10 / C# 14, xUnit 3, Shouldly, NSubstitute, IronSnappy (S2), System.Security.Cryptography (ChaCha20/AES-GCM), System.IO.Pipelines, SQLite (test_parity.db) + +--- + +## Track A: Storage (FileStore Block Management) + +### Task A1: Message Block Binary Record Encoding + +**Files:** +- Create: `src/NATS.Server/JetStream/Storage/MessageRecord.cs` +- Test: `tests/NATS.Server.Tests/JetStream/Storage/MessageRecordTests.cs` + +**Context:** Go stores messages as binary records in block files: `[1:flags][varint:subject_len][N:subject][varint:hdr_len][M:headers][varint:msg_len][P:payload][8:checksum]`. The .NET store currently uses JSONL. This task creates the binary encoding/decoding layer. + +**Go reference:** `filestore.go:8770-8783` (message record format), `filestore.go:5720-5790` (writeMsgRecord) + +**Step 1: Write the failing test** + +```csharp +// MessageRecordTests.cs +using Shouldly; + +namespace NATS.Server.Tests.JetStream.Storage; + +public class MessageRecordTests +{ + [Fact] + public void RoundTrip_SimpleMessage() + { + var record = new MessageRecord + { + Sequence = 1, + Subject = "test.subject", + Headers = ReadOnlyMemory.Empty, + Payload = "hello"u8.ToArray(), + Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() + }; + + var encoded = MessageRecord.Encode(record); + var decoded = MessageRecord.Decode(encoded); + + decoded.Sequence.ShouldBe(1UL); + decoded.Subject.ShouldBe("test.subject"); + decoded.Payload.Span.SequenceEqual("hello"u8).ShouldBeTrue(); + } + + [Fact] + public void RoundTrip_WithHeaders() + { + var headers = "NATS/1.0\r\nNats-Msg-Id: abc\r\n\r\n"u8.ToArray(); + var record = new MessageRecord + { + Sequence = 42, + Subject = "orders.new", + Headers = headers, + Payload = "{\"id\":1}"u8.ToArray(), + Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() + }; + + var encoded = MessageRecord.Encode(record); + var decoded = MessageRecord.Decode(encoded); + + decoded.Sequence.ShouldBe(42UL); + decoded.Headers.Length.ShouldBe(headers.Length); + } + + [Fact] + public void Encode_SetsChecksumInTrailer() + { + var record = new MessageRecord { Sequence = 1, Subject = "x", Payload = "y"u8.ToArray() }; + var encoded = MessageRecord.Encode(record); + // Last 8 bytes are checksum + encoded.Length.ShouldBeGreaterThan(8); + var checksum = BitConverter.ToUInt64(encoded.AsSpan()[^8..]); + checksum.ShouldNotBe(0UL); + } + + [Fact] + public void Decode_DetectsCorruptChecksum() + { + var record = new MessageRecord { Sequence = 1, Subject = "x", Payload = "y"u8.ToArray() }; + var encoded = MessageRecord.Encode(record); + // Corrupt payload byte + encoded[encoded.Length / 2] ^= 0xFF; + Should.Throw(() => MessageRecord.Decode(encoded)); + } + + [Theory] + [InlineData(0)] + [InlineData(1)] + [InlineData(127)] + [InlineData(128)] + [InlineData(16383)] + [InlineData(16384)] + public void Varint_RoundTrip(int value) + { + Span buf = stackalloc byte[8]; + var written = MessageRecord.WriteVarint(buf, (ulong)value); + var (decoded, read) = MessageRecord.ReadVarint(buf); + decoded.ShouldBe((ulong)value); + read.ShouldBe(written); + } +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MessageRecordTests" -v normal` +Expected: FAIL — `MessageRecord` type does not exist. + +**Step 3: Write minimal implementation** + +```csharp +// MessageRecord.cs +using System.Buffers.Binary; +using System.IO.Hashing; +using System.Text; + +namespace NATS.Server.JetStream.Storage; + +/// +/// Binary message record encoding matching Go filestore.go wire format. +/// Format: [1:flags][varint:subj_len][N:subject][varint:hdr_len][M:headers][varint:payload_len][P:payload][8:checksum] +/// Go reference: filestore.go:8770-8783 +/// +public sealed class MessageRecord +{ + public ulong Sequence { get; init; } + public required string Subject { get; init; } + public ReadOnlyMemory Headers { get; init; } + public ReadOnlyMemory Payload { get; init; } + public long Timestamp { get; init; } + public bool Deleted { get; init; } + + private const byte DeletedFlag = 0x80; // ebit in Go + + public static byte[] Encode(MessageRecord record) + { + var subjectBytes = Encoding.ASCII.GetBytes(record.Subject); + // Calculate total size + var size = 1 // flags + + VarintSize((ulong)subjectBytes.Length) + subjectBytes.Length + + VarintSize((ulong)record.Headers.Length) + record.Headers.Length + + VarintSize((ulong)record.Payload.Length) + record.Payload.Length + + 8; // sequence + size += 8; // checksum + + var buf = new byte[size]; + var offset = 0; + + // Flags + buf[offset++] = record.Deleted ? DeletedFlag : (byte)0; + + // Subject + offset += WriteVarint(buf.AsSpan(offset), (ulong)subjectBytes.Length); + subjectBytes.CopyTo(buf.AsSpan(offset)); + offset += subjectBytes.Length; + + // Headers + offset += WriteVarint(buf.AsSpan(offset), (ulong)record.Headers.Length); + record.Headers.Span.CopyTo(buf.AsSpan(offset)); + offset += record.Headers.Length; + + // Payload + offset += WriteVarint(buf.AsSpan(offset), (ulong)record.Payload.Length); + record.Payload.Span.CopyTo(buf.AsSpan(offset)); + offset += record.Payload.Length; + + // Sequence (8 bytes LE) + BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(offset), record.Sequence); + offset += 8; + + // Checksum (XxHash64 of everything before checksum) + var hash = XxHash64.HashToUInt64(buf.AsSpan(0, offset)); + BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(offset), hash); + + return buf; + } + + public static MessageRecord Decode(ReadOnlySpan data) + { + if (data.Length < 10) + throw new InvalidDataException("Record too short"); + + // Verify checksum first + var payloadSpan = data[..^8]; + var storedChecksum = BinaryPrimitives.ReadUInt64LittleEndian(data[^8..]); + var computedChecksum = XxHash64.HashToUInt64(payloadSpan); + if (storedChecksum != computedChecksum) + throw new InvalidDataException("Checksum mismatch"); + + var offset = 0; + + // Flags + var flags = data[offset++]; + var deleted = (flags & DeletedFlag) != 0; + + // Subject + var (subjLen, read) = ReadVarint(data[offset..]); + offset += read; + var subject = Encoding.ASCII.GetString(data.Slice(offset, (int)subjLen)); + offset += (int)subjLen; + + // Headers + var (hdrLen, hdrRead) = ReadVarint(data[offset..]); + offset += hdrRead; + var headers = data.Slice(offset, (int)hdrLen).ToArray(); + offset += (int)hdrLen; + + // Payload + var (payLen, payRead) = ReadVarint(data[offset..]); + offset += payRead; + var payload = data.Slice(offset, (int)payLen).ToArray(); + offset += (int)payLen; + + // Sequence + var sequence = BinaryPrimitives.ReadUInt64LittleEndian(data[offset..]); + + return new MessageRecord + { + Sequence = sequence, + Subject = subject, + Headers = headers, + Payload = payload, + Deleted = deleted, + }; + } + + public static int WriteVarint(Span buf, ulong value) + { + var i = 0; + while (value >= 0x80) + { + buf[i++] = (byte)(value | 0x80); + value >>= 7; + } + buf[i++] = (byte)value; + return i; + } + + public static (ulong value, int bytesRead) ReadVarint(ReadOnlySpan buf) + { + ulong value = 0; + var shift = 0; + var i = 0; + while (i < buf.Length) + { + var b = buf[i++]; + value |= (ulong)(b & 0x7F) << shift; + if ((b & 0x80) == 0) + return (value, i); + shift += 7; + } + throw new InvalidDataException("Unterminated varint"); + } + + private static int VarintSize(ulong value) + { + var size = 1; + while (value >= 0x80) { size++; value >>= 7; } + return size; + } +} +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MessageRecordTests" -v normal` +Expected: PASS (all 7 tests) + +**Step 5: Commit** + +```bash +git add src/NATS.Server/JetStream/Storage/MessageRecord.cs tests/NATS.Server.Tests/JetStream/Storage/MessageRecordTests.cs +git commit -m "feat(storage): add binary message record encoding (Go parity)" +``` + +--- + +### Task A2: Message Block Abstraction + +**Files:** +- Create: `src/NATS.Server/JetStream/Storage/MsgBlock.cs` +- Test: `tests/NATS.Server.Tests/JetStream/Storage/MsgBlockTests.cs` + +**Context:** Go's `msgBlock` is the unit of storage — a file containing sequential message records with an in-memory index. Blocks are sealed (read-only) when they reach a size limit, and a new block is created. This maps to Go's `type msgBlock struct` in `filestore.go:108-180`. + +**Step 1: Write the failing test** + +```csharp +// MsgBlockTests.cs +namespace NATS.Server.Tests.JetStream.Storage; + +public class MsgBlockTests +{ + [Fact] + public void Write_SingleMessage_ReturnsSequence() + { + using var block = MsgBlock.Create(0, GetTempPath(), maxBytes: 64 * 1024); + var seq = block.Write("test.subj", ReadOnlyMemory.Empty, "payload"u8.ToArray()); + seq.ShouldBe(1UL); + } + + [Fact] + public void Write_MultipleMessages_IncrementsSequence() + { + using var block = MsgBlock.Create(0, GetTempPath(), maxBytes: 64 * 1024); + block.Write("a", ReadOnlyMemory.Empty, "1"u8.ToArray()).ShouldBe(1UL); + block.Write("b", ReadOnlyMemory.Empty, "2"u8.ToArray()).ShouldBe(2UL); + block.Write("c", ReadOnlyMemory.Empty, "3"u8.ToArray()).ShouldBe(3UL); + block.MessageCount.ShouldBe(3UL); + } + + [Fact] + public void Read_BySequence_ReturnsMessage() + { + using var block = MsgBlock.Create(0, GetTempPath(), maxBytes: 64 * 1024); + block.Write("test.subj", ReadOnlyMemory.Empty, "hello"u8.ToArray()); + var msg = block.Read(1); + msg.ShouldNotBeNull(); + msg.Subject.ShouldBe("test.subj"); + msg.Payload.Span.SequenceEqual("hello"u8).ShouldBeTrue(); + } + + [Fact] + public void Read_NonexistentSequence_ReturnsNull() + { + using var block = MsgBlock.Create(0, GetTempPath(), maxBytes: 64 * 1024); + block.Read(999).ShouldBeNull(); + } + + [Fact] + public void IsSealed_ReturnsTrueWhenFull() + { + using var block = MsgBlock.Create(0, GetTempPath(), maxBytes: 200); + // Write until block is sealed + while (!block.IsSealed) + block.Write("s", ReadOnlyMemory.Empty, new byte[50]); + block.IsSealed.ShouldBeTrue(); + } + + [Fact] + public void Delete_MarksSequenceAsDeleted() + { + using var block = MsgBlock.Create(0, GetTempPath(), maxBytes: 64 * 1024); + block.Write("s", ReadOnlyMemory.Empty, "data"u8.ToArray()); + block.Delete(1).ShouldBeTrue(); + block.Read(1).ShouldBeNull(); + block.DeletedCount.ShouldBe(1UL); + } + + [Fact] + public void Recover_RebuildsIndexFromFile() + { + var path = GetTempPath(); + // Write some messages + using (var block = MsgBlock.Create(0, path, maxBytes: 64 * 1024)) + { + block.Write("a", ReadOnlyMemory.Empty, "1"u8.ToArray()); + block.Write("b", ReadOnlyMemory.Empty, "2"u8.ToArray()); + block.Flush(); + } + // Recover from file + using var recovered = MsgBlock.Recover(0, path); + recovered.MessageCount.ShouldBe(2UL); + recovered.Read(1)!.Subject.ShouldBe("a"); + recovered.Read(2)!.Subject.ShouldBe("b"); + } + + private static string GetTempPath() => + Path.Combine(Path.GetTempPath(), $"nats_block_{Guid.NewGuid():N}"); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MsgBlockTests" -v normal` +Expected: FAIL — `MsgBlock` type does not exist. + +**Step 3: Write minimal implementation** + +Create `src/NATS.Server/JetStream/Storage/MsgBlock.cs` implementing: +- `Create(id, path, maxBytes)` — factory for new block +- `Recover(id, path)` — factory that rebuilds index from existing file +- `Write(subject, headers, payload)` → returns sequence, appends binary record to file buffer +- `Read(sequence)` → looks up in index, reads from file or cache, returns MessageRecord +- `Delete(sequence)` → marks in deletion set (SequenceSet), returns bool +- `Flush()` → writes buffer to disk +- `IsSealed` → true when `_bytesWritten >= _maxBytes` +- Properties: `MessageCount`, `DeletedCount`, `FirstSequence`, `LastSequence`, `BytesUsed` +- Internal: `Dictionary` index, `FileStream` for I/O +- `IDisposable` for file handle cleanup + +**Go reference:** `filestore.go:108-180` (msgBlock struct), `filestore.go:5720-5800` (writeMsgRecord) + +**Step 4:** Run tests, verify pass. **Step 5:** Commit with `feat(storage): add MsgBlock block-based message storage`. + +--- + +### Task A3: FileStore Block Manager Rewrite + +**Files:** +- Modify: `src/NATS.Server/JetStream/Storage/FileStore.cs` +- Test: `tests/NATS.Server.Tests/JetStream/Storage/FileStoreBlockTests.cs` + +**Context:** Replace the JSONL append-only FileStore with a block manager that creates, seals, and rotates MsgBlocks. The existing async API (`AppendAsync`, `LoadAsync`, etc.) delegates to the block engine. The sync API (`StoreMsg`, `LoadMsg`, etc.) is implemented directly. + +**Key changes to FileStore.cs:** +1. Replace `Dictionary` with `List` and `_lastBlock` pointer +2. `AppendAsync` → delegates to `StoreMsg` which writes to `_lastBlock`, rotates if sealed +3. `LoadAsync` → delegates to `LoadMsg` which finds block by sequence, reads from block +4. `RemoveAsync` → calls `_blocks[blockIdx].Delete(seq)`, updates state +5. Block rotation: when `_lastBlock.IsSealed`, create new block, add to list +6. Recovery: on construction, scan directory for `.blk` files, call `MsgBlock.Recover` for each +7. Wire S2Codec on write path (compress record before writing to block) +8. Wire AeadEncryptor on write path (encrypt after compression) +9. Atomic flush: write to temp file, rename on seal + +**Tests verify:** +- Multi-block rotation (write >64KB to trigger new block) +- Cross-block reads (read from sealed + active blocks) +- Restart recovery (create store, write, dispose, re-create, verify data intact) +- Compression round-trip (enable S2, store+load, verify data matches) +- Encryption round-trip (enable AEAD, store+load, verify data matches) +- Purge by subject (only messages matching subject removed) +- Purge by sequence range +- Concurrent read/write safety + +**Step flow:** Write 8 failing tests → implement block manager → verify all pass → commit. + +--- + +### Task A4: Tombstone Tracking and Purge Operations + +**Files:** +- Modify: `src/NATS.Server/JetStream/Storage/FileStore.cs` +- Modify: `src/NATS.Server/JetStream/Storage/MsgBlock.cs` +- Test: `tests/NATS.Server.Tests/JetStream/Storage/FileStorePurgeBlockTests.cs` + +**Context:** Go tracks deleted messages via `dmap` (AVL-based SequenceSet per block). Purge operations: `PurgeEx(subject, seq, keep)`, `Compact(seq)`, `Truncate(seq)`. The .NET `SequenceSet.cs` (777 lines) already exists and can be used. + +**Go reference:** `filestore.go:2200-2400` (purge), `filestore.go:2900-3100` (compact) + +**Implementation:** +- Each `MsgBlock` gets a `SequenceSet _deleted` field +- `Delete(seq)` adds to `_deleted`, updates byte counts +- `Compact()` rewrites block excluding deleted sequences +- `FilteredState(seq, subject)` walks blocks, skips deleted +- `SubjectsState(filter)` aggregates per-subject counts excluding deleted +- `PurgeEx` on FileStore iterates blocks, calls `Delete` for matching sequences + +**Tests:** Purge by subject, purge by sequence range, compact removes gaps, state queries reflect deletions. + +--- + +### Task A5: Write Cache and TTL Scheduling + +**Files:** +- Modify: `src/NATS.Server/JetStream/Storage/MsgBlock.cs` (add cache) +- Modify: `src/NATS.Server/JetStream/Storage/FileStore.cs` (add TTL) +- Test: `tests/NATS.Server.Tests/JetStream/Storage/FileStoreTtlTests.cs` + +**Context:** Active block has an in-memory write cache for recent messages. On startup, walk blocks to reconstruct TTL expirations and register with `HashWheel`. + +**Go reference:** `filestore.go:180-220` (cache struct), `filestore.go:8900-9000` (age check) + +**Implementation:** +- `MsgBlock._cache`: `Dictionary` for hot messages, evicted on timer +- `FileStore._ttlWheel`: Reference to `HashWheel` for scheduling expirations +- On `StoreMsg` with TTL > 0: register expiration callback +- On startup recovery: walk all blocks, check timestamps, re-register unexpired TTLs + +--- + +### Task A6: Port FileStore Go Tests + DB Update + +**Files:** +- Modify: existing test files under `tests/NATS.Server.Tests/JetStream/Storage/` +- Create: `tests/NATS.Server.Tests/JetStream/Storage/FileStoreGoParityTests.cs` + +**Context:** Port ~159 unmapped tests from `filestore_test.go`. Group by functionality: basic store/load (~30), compression (~15), encryption (~15), recovery (~20), purge/compact (~25), TTL (~10), concurrent (~15), state queries (~20), edge cases (~9). + +**For each test ported, update DB:** +```sql +UPDATE go_tests SET status='mapped', dotnet_test='', dotnet_file='FileStoreGoParityTests.cs', + notes='Ported from in filestore_test.go' WHERE go_file='filestore_test.go' AND go_test=''; +``` + +**Batch the DB update in a single SQL script at the end of this task.** + +--- + +## Track B: Consensus (RAFT + JetStream Cluster) + +**Blocked by:** Tracks A and C + +### Task B1: RAFT Apply Queue and Commit Tracking + +**Files:** +- Modify: `src/NATS.Server/Raft/RaftNode.cs` +- Create: `src/NATS.Server/Raft/CommitQueue.cs` +- Test: `tests/NATS.Server.Tests/Raft/RaftApplyQueueTests.cs` + +**Context:** Go RAFT tracks `applied`, `processed`, `commit` indexes separately. The apply queue enables the state machine to consume committed entries asynchronously. Currently .NET only has `AppliedIndex`. + +**Go reference:** `raft.go:150-160` (applied/processed fields), `raft.go:2100-2150` (ApplyQ) + +**Implementation:** +- `CommitQueue`: Channel-based queue for committed entries awaiting state machine application +- `RaftNode.CommitIndex`, `RaftNode.ProcessedIndex` alongside existing `AppliedIndex` +- `Applied(index)` method returns entries applied since last call +- `Processed(index, applied)` tracks pipeline efficiency + +--- + +### Task B2: Campaign Timeout and Election Management + +**Files:** +- Modify: `src/NATS.Server/Raft/RaftNode.cs` +- Test: `tests/NATS.Server.Tests/Raft/RaftElectionTimerTests.cs` + +**Context:** Go uses randomized election delays (150-300ms) to prevent split votes. Currently .NET elections are on-demand only. + +**Go reference:** `raft.go:1400-1450` (resetElectionTimeout), `raft.go:1500-1550` (campaign logic) + +**Implementation:** +- `_electionTimer`: `PeriodicTimer` with randomized interval (150-300ms) +- `ResetElectionTimeout()` restarts timer on any heartbeat/append from leader +- Timer expiry triggers `Campaign()` if in Follower state +- `CampaignImmediately()` bypasses timer for testing + +--- + +### Task B3: Health Classification and Peer Tracking + +**Files:** +- Modify: `src/NATS.Server/Raft/RaftNode.cs` +- Create: `src/NATS.Server/Raft/RaftPeerState.cs` +- Test: `tests/NATS.Server.Tests/Raft/RaftHealthTests.cs` + +**Context:** Go classifies peers as current/catching-up/leaderless based on `lastContact`, `nextIndex`, `matchIndex`. + +**Implementation:** +- `RaftPeerState`: record with `LastContact`, `NextIndex`, `MatchIndex`, `Active` +- `Current()` — is this node's log within election timeout of leader? +- `Healthy()` — node health based on last contact time +- `Peers()` — returns list of peer states +- Leader tracks peer progress during replication + +--- + +### Task B4: Membership Changes (Add/Remove Peer) + +**Files:** +- Create: `src/NATS.Server/Raft/RaftMembership.cs` +- Modify: `src/NATS.Server/Raft/RaftNode.cs` +- Test: `tests/NATS.Server.Tests/Raft/RaftMembershipTests.cs` + +**Context:** Go uses single-server membership changes (not joint consensus). `ProposeAddPeer`/`ProposeRemovePeer` return error if change already in flight. + +**Go reference:** `raft.go:2500-2600` (ProposeAddPeer/RemovePeer) + +**Implementation:** +- Special RAFT log entry type for membership changes +- `_membershipChangeIndex` tracks uncommitted change +- On commit: update peer list, recompute quorum +- On leader change: re-propose inflight membership changes + +--- + +### Task B5: Snapshot Checkpoints and Log Compaction + +**Files:** +- Create: `src/NATS.Server/Raft/RaftSnapshotCheckpoint.cs` +- Modify: `src/NATS.Server/Raft/RaftSnapshotStore.cs` +- Modify: `src/NATS.Server/Raft/RaftLog.cs` +- Test: `tests/NATS.Server.Tests/Raft/RaftSnapshotCheckpointTests.cs` + +**Context:** After snapshot, truncate log entries before snapshot index. Streaming snapshot transfer for catching-up followers. + +**Go reference:** `raft.go:3200-3400` (CreateSnapshotCheckpoint), `raft.go:3500-3700` (installSnapshot) + +**Implementation:** +- `RaftSnapshotCheckpoint`: saves state at a point, allows install or abort +- `InstallSnapshot(chunks)`: chunked streaming transfer to follower +- `CompactLog(upToIndex)`: truncate entries before index, update base index +- `DrainAndReplaySnapshot()`: recovery path + +--- + +### Task B6: Pre-Vote Protocol + +**Files:** +- Modify: `src/NATS.Server/Raft/RaftNode.cs` +- Modify: `src/NATS.Server/Raft/RaftWireFormat.cs` +- Test: `tests/NATS.Server.Tests/Raft/RaftPreVoteTests.cs` + +**Context:** Before incrementing term, candidate sends pre-vote requests. Prevents partitioned nodes from disrupting stable clusters. + +**Go reference:** `raft.go:1600-1700` (pre-vote logic) + +**Implementation:** +- New wire message: `RaftPreVoteRequest`/`RaftPreVoteResponse` +- Pre-vote granted only if candidate's log is at least as up-to-date +- Successful pre-vote → proceed to real election with term increment + +--- + +### Task B7: Stream/Consumer Assignment Types + +**Files:** +- Create: `src/NATS.Server/JetStream/Cluster/StreamAssignment.cs` +- Create: `src/NATS.Server/JetStream/Cluster/ConsumerAssignment.cs` +- Create: `src/NATS.Server/JetStream/Cluster/RaftGroup.cs` +- Test: `tests/NATS.Server.Tests/JetStream/Cluster/AssignmentSerializationTests.cs` + +**Context:** Core data types for cluster coordination. Must serialize/deserialize for RAFT log entries. + +**Go reference:** `jetstream_cluster.go:60-120` (streamAssignment, consumerAssignment structs) + +--- + +### Task B8: JetStream Meta-Group RAFT Proposal Workflow + +**Files:** +- Rewrite: `src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs` (51 → ~1,500+ lines) +- Test: `tests/NATS.Server.Tests/JetStream/Cluster/MetaGroupProposalTests.cs` + +**Context:** The meta-controller receives API requests, validates, proposes stream/consumer assignments to the meta-group RAFT, and on commit all nodes apply the assignment. + +**Go reference:** `jetstream_cluster.go:500-1000` (processStreamAssignment), `jetstream_cluster.go:1500-2000` (processConsumerAssignment) + +**Implementation:** +- `ProposeStreamCreate(config)` → validate → create StreamAssignment → propose to RAFT +- `ProposeStreamDelete(name)` → propose deletion entry +- `ProposeConsumerCreate(stream, config)` → validate → create ConsumerAssignment → propose +- `ApplyEntry(entry)` → dispatch based on entry type (create/delete/update stream/consumer) +- Inflight tracking: `_inflightStreams`, `_inflightConsumers` dictionaries +- On leader change: re-propose inflight entries + +--- + +### Task B9: Placement Engine + +**Files:** +- Rewrite: `src/NATS.Server/JetStream/Cluster/AssetPlacementPlanner.cs` (17 → ~300+ lines) +- Rename to: `src/NATS.Server/JetStream/Cluster/PlacementEngine.cs` +- Test: `tests/NATS.Server.Tests/JetStream/Cluster/PlacementEngineTests.cs` + +**Context:** Topology-aware placement: unique nodes, tag matching, cluster affinity. + +**Go reference:** `jetstream_cluster.go:2500-2800` (selectPeerGroup) + +**Implementation:** +- `SelectPeers(replicas, tags, cluster, exclude)` → returns peer list +- Ensures no two replicas on same node +- Tag filtering: only select nodes matching required tags +- Cluster affinity: prefer peers in same cluster for locality + +--- + +### Task B10: Per-Stream RAFT Groups + +**Files:** +- Modify: `src/NATS.Server/JetStream/Cluster/StreamReplicaGroup.cs` +- Test: `tests/NATS.Server.Tests/JetStream/Cluster/StreamRaftGroupTests.cs` + +**Context:** Each stream gets its own RAFT group for message replication. The meta-group assigns the replica set, then the stream RAFT group handles message proposals. + +**Implementation:** +- `StreamReplicaGroup` wraps a `RaftNode` with stream-specific apply logic +- `ProposeMessage(subject, headers, payload)` → proposes to stream RAFT +- On commit: apply message to local FileStore +- Leadership transfer triggers mirror/source reconnection + +--- + +### Task B11: Port RAFT + Cluster Go Tests + DB Update + +**Tests to port:** ~85 from `raft_test.go`, ~358 from `jetstream_cluster_*_test.go`, ~47 from `jetstream_super_cluster_test.go` + +**Batch DB update SQL script at end of task.** + +--- + +## Track C: Protocol (Client, Consumer, JetStream API, Mirrors/Sources) + +### Task C1: Client Adaptive Buffers and Slow Consumer Detection + +**Files:** +- Modify: `src/NATS.Server/NatsClient.cs` +- Test: `tests/NATS.Server.Tests/ClientAdaptiveBufferTests.cs` + +**Context:** NatsClient.cs already has adaptive read buffers (lines 252-274) and slow consumer detection (lines 808-850). The gaps are: write buffer pooling with flush coalescing, max control line enforcement (4096 bytes), and write timeout with partial flush recovery. + +**Implementation:** +1. Write buffer pool: use `ArrayPool.Shared` for outbound buffers, coalesce multiple small writes +2. Max control line: in parser, reject control lines >4096 bytes with protocol error +3. Write timeout: `CancellationTokenSource` with configurable timeout on `FlushAsync` + +**Tests:** Write coalescing under load, control line overflow rejection, write timeout recovery. + +--- + +### Task C2: AckProcessor NAK/TERM/PROGRESS Support + +**Files:** +- Modify: `src/NATS.Server/JetStream/Consumers/AckProcessor.cs` +- Test: `tests/NATS.Server.Tests/JetStream/Consumers/AckProcessorNakTests.cs` + +**Context:** AckProcessor currently only handles basic ACK. Go dispatches on ack type: ACK, NAK, TERM, PROGRESS. NAK schedules redelivery with backoff array. + +**Go reference:** `consumer.go:2550-2650` (processAck), `consumer.go:2700-2750` (processNak) + +**Implementation:** +- Parse ack payload to determine type (Go uses `-NAK`, `+NXT`, `+WPI`, `+TERM`) +- `ProcessNak(seq, delay?)` → schedule redelivery with optional delay +- `ProcessTerm(seq)` → remove from pending, do not redeliver +- `ProcessProgress(seq)` → reset ack deadline (work in progress) +- Thread-safe: replace Dictionary with ConcurrentDictionary or use locking +- Priority queue for expiry: `SortedSet<(DateTimeOffset deadline, ulong seq)>` + +--- + +### Task C3: PushConsumer Delivery Dispatch + +**Files:** +- Modify: `src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs` +- Test: `tests/NATS.Server.Tests/JetStream/Consumers/PushConsumerDeliveryTests.cs` + +**Context:** PushConsumerEngine.Enqueue queues frames but NO delivery mechanism sends them to the consumer's deliver subject. This is the critical missing piece. + +**Go reference:** `consumer.go:2700-3500` (deliverMsg, sendMsg) + +**Implementation:** +- Add `DeliverSubject` property from ConsumerConfig +- Background delivery loop: poll `PushFrames` channel, format as HMSG, send via `NatsClient.SendMessage` +- Headers: `Nats-Sequence`, `Nats-Time-Stamp`, `Nats-Subject` (original subject) +- Delivery counter tracking per message +- Flow control: pause delivery when client's pending bytes exceed threshold + +--- + +### Task C4: Redelivery Tracker with Backoff Schedules + +**Files:** +- Create: `src/NATS.Server/JetStream/Consumers/RedeliveryTracker.cs` +- Test: `tests/NATS.Server.Tests/JetStream/Consumers/RedeliveryTrackerTests.cs` + +**Context:** Track per-message redelivery count and apply configurable backoff arrays. + +**Go reference:** `consumer.go:400-500` (pending map), `consumer.go:2800-2900` (backoff logic) + +**Implementation:** +- `RedeliveryTracker(backoffMs[])`: configurable backoff schedule +- `Schedule(seq, deliveryCount)` → returns next delivery time based on backoff[min(count, len-1)] +- `GetDue()` → returns sequences ready for redelivery +- `MaxDeliveries(seq)` → true when delivery count exceeds max + +--- + +### Task C5: Priority Group Pinning and Idle Heartbeats + +**Files:** +- Create: `src/NATS.Server/JetStream/Consumers/PriorityGroupManager.cs` +- Modify: `src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs` (heartbeat timer) +- Test: `tests/NATS.Server.Tests/JetStream/Consumers/PriorityGroupTests.cs` + +**Context:** Sticky consumer assignment within priority groups. Idle heartbeat timer sends empty messages when no data available. + +**Go reference:** `consumer.go:500-600` (priority groups), `consumer.go:3100-3200` (heartbeat) + +--- + +### Task C6: PullConsumer Timeout and Filter Compilation + +**Files:** +- Modify: `src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs` +- Test: `tests/NATS.Server.Tests/JetStream/Consumers/PullConsumerTimeoutTests.cs` + +**Context:** Enforce `ExpiresMs` on long-running fetches. Compile filter subjects for efficient matching. + +**Implementation:** +- `CancellationTokenSource` with ExpiresMs timeout on fetch +- Pre-compile filter subjects into `SubjectMatch` patterns for O(1) matching +- Skip non-matching subjects without counting against pending limits + +--- + +### Task C7: JetStream API Leader Forwarding + +**Files:** +- Modify: `src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs` +- Modify: `src/NATS.Server/JetStream/JetStreamService.cs` +- Test: `tests/NATS.Server.Tests/JetStream/Api/LeaderForwardingTests.cs` + +**Context:** API requests at non-leader nodes must be forwarded to the current leader. + +**Go reference:** `jetstream_api.go:200-300` (isLeader check, forward logic) + +**Implementation:** +- Before handling any API request, check `_metaGroup.IsLeader()` +- If not leader: serialize request, publish to leader's reply subject, return leader's response +- Timeout on forwarding with error response + +--- + +### Task C8: Stream Purge with Options + +**Files:** +- Modify: `src/NATS.Server/JetStream/Api/StreamApiHandlers.cs` +- Modify: `src/NATS.Server/JetStream/StreamManager.cs` +- Test: `tests/NATS.Server.Tests/JetStream/Api/StreamPurgeOptionsTests.cs` + +**Context:** Go supports: purge by subject filter, keep N messages, sequence-based purge. + +**Go reference:** `jetstream_api.go:1200-1350` (handleStreamPurge) + +**Implementation:** +- Parse purge request: `{ "filter": "orders.*", "seq": 100, "keep": 5 }` +- `StreamManager.PurgeEx(filter, seq, keep)` delegates to FileStore +- FileStore iterates blocks, deletes matching sequences + +--- + +### Task C9: Mirror Synchronization Loop + +**Files:** +- Rewrite: `src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs` (23 → ~500+ lines) +- Test: `tests/NATS.Server.Tests/JetStream/MirrorSource/MirrorSyncTests.cs` + +**Context:** Continuous pull from origin stream, apply messages locally. Maintain origin→current sequence alignment. + +**Go reference:** `stream.go:500-800` (processMirrorMsgs, setupMirrorConsumer) + +**Implementation:** +- Background task: create ephemeral pull consumer on origin stream +- Fetch batches of messages, apply to local store +- Track `LastOriginSequence` for catchup after restarts +- Retry with exponential backoff on failures +- Health reporting: lag (origin last seq - local last seq) + +--- + +### Task C10: Source Coordination with Filtering + +**Files:** +- Rewrite: `src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs` (37 → ~500+ lines) +- Test: `tests/NATS.Server.Tests/JetStream/MirrorSource/SourceFilterTests.cs` + +**Context:** Source consumption with subject filtering, account isolation, and deduplication. + +**Implementation:** +- `FilterSubject` support: only forward messages matching filter +- `SubjectTransformPrefix` applied before storing +- Deduplication via `Nats-Msg-Id` header with configurable window +- Multiple sources per stream supported +- Lag tracking per source + +--- + +### Task C11: Port Protocol Go Tests + DB Update + +**Tests to port:** ~43 client_test.go, ~134 jetstream_consumer_test.go, ~184 jetstream_test.go, ~30 mirror/source +**Batch DB update at end.** + +--- + +## Track D: Networking (Gateway, Leaf Node, Routes) + +### Task D1: Gateway Interest-Only Mode State Machine + +**Files:** +- Modify: `src/NATS.Server/Gateways/GatewayConnection.cs` +- Create: `src/NATS.Server/Gateways/GatewayInterestTracker.cs` +- Test: `tests/NATS.Server.Tests/Gateways/GatewayInterestModeTests.cs` + +**Context:** After initial flooding, gateways switch to sending only messages for subjects with known remote subscribers. Currently .NET forwards everything. + +**Go reference:** `gateway.go:100-150` (InterestMode enum), `gateway.go:1500-1600` (switchToInterestOnlyMode) + +**Implementation:** +- `GatewayInterestMode` enum: `Optimistic`, `Transitioning`, `InterestOnly` +- Per-account interest state tracking +- `outsie` equivalent: mode + no-interest set (optimistic) or SubList (interest-only) +- Switch threshold: after N unsubscribes, switch from Optimistic → InterestOnly +- `ShouldForward(account, subject)` → checks interest state before sending + +**Tests:** +- Start in Optimistic mode, all messages forwarded +- After threshold unsubscribes, switch to InterestOnly +- In InterestOnly, only messages with RS+ subscribers forwarded +- Mode persists across reconnection + +--- + +### Task D2: Route Pool Accounting per Account + +**Files:** +- Modify: `src/NATS.Server/Routes/RouteManager.cs` +- Modify: `src/NATS.Server/Routes/RouteConnection.cs` +- Test: `tests/NATS.Server.Tests/Routes/RoutePoolAccountTests.cs` + +**Context:** Go hashes account name to determine which pool connection carries its traffic. + +**Go reference:** `route.go:533-545` (computeRoutePoolIdx) + +**Implementation:** +- `ComputeRoutePoolIdx(poolSize, accountName)` → FNV-1a hash → mod poolSize +- Each route connection tagged with pool index +- Message dispatch routes through appropriate pool connection +- Account-specific dedicated routes bypass pool (optional) + +--- + +### Task D3: Route S2 Compression + +**Files:** +- Modify: `src/NATS.Server/Routes/RouteCompressionCodec.cs` +- Test: `tests/NATS.Server.Tests/Routes/RouteS2CompressionTests.cs` + +**Context:** Replace Deflate with S2 (IronSnappy). Add compression level auto-tuning. + +**Implementation:** +- Replace `DeflateStream` with `IronSnappy` calls +- `CompressionLevel` enum: `Off`, `Fast` (S2 default), `Better`, `Best` +- `Auto` mode: adjust based on RTT measurements +- Negotiation during route handshake + +--- + +### Task D4: Gateway Reply Mapper Expansion + +**Files:** +- Modify: `src/NATS.Server/Gateways/ReplyMapper.cs` +- Test: `tests/NATS.Server.Tests/Gateways/ReplyMapperFullTests.cs` + +**Context:** Full `_GR_.{clusterId}.{hash}.{originalReply}` handling. + +**Go reference:** `gateway.go:2000-2100` (replyMapper logic) + +--- + +### Task D5: Leaf Node Solicited Connections and JetStream Domains + +**Files:** +- Modify: `src/NATS.Server/LeafNodes/LeafNodeManager.cs` +- Modify: `src/NATS.Server/LeafNodes/LeafConnection.cs` +- Test: `tests/NATS.Server.Tests/LeafNodes/LeafSolicitedConnectionTests.cs` + +**Context:** Outbound leaf connections with retry/reconnect. Forward JetStream domain headers. + +**Implementation:** +- `ConnectSolicitedAsync(url, account)` → establish outbound leaf connection +- Retry with exponential backoff (1s, 2s, 4s, ..., max 60s) +- `LeafNodeOptions.JetStreamDomain` propagated in connection handshake +- Domain header forwarded in LMSG frames + +--- + +### Task D6: Leaf Subject Filtering + +**Files:** +- Modify: `src/NATS.Server/LeafNodes/LeafHubSpokeMapper.cs` +- Test: `tests/NATS.Server.Tests/LeafNodes/LeafSubjectFilterTests.cs` + +**Context:** Export/import specific subjects on leaf connections. Currently only account mapping. + +--- + +### Task D7: Port Networking Go Tests + DB Update + +**Tests to port:** ~61 gateway_test.go, ~59 leafnode_test.go, ~39 routes_test.go +**Batch DB update at end.** + +--- + +## Track E: Services (MQTT, Accounts, Config, WebSocket, Monitoring) + +### Task E1: MQTT Binary Protocol Parser + +**Files:** +- Rewrite: `src/NATS.Server/Mqtt/MqttProtocolParser.cs` +- Rewrite: `src/NATS.Server/Mqtt/MqttPacketReader.cs` +- Rewrite: `src/NATS.Server/Mqtt/MqttPacketWriter.cs` +- Test: `tests/NATS.Server.Tests/Mqtt/MqttBinaryProtocolTests.cs` + +**Context:** Replace text-based MQTT parser with proper MQTT v3.1.1 binary encoding. Go's implementation processes binary frames directly. + +**Go reference:** `mqtt.go:1000-1200` (parse CONNECT, SUBSCRIBE, PUBLISH packets) + +**Implementation:** +- Fixed header: [4:type][4:flags][1-4:remaining_length] +- CONNECT: protocol name, version, flags, keepalive, client ID, will topic/message, credentials +- SUBSCRIBE: packet ID, topic filter + QoS pairs +- PUBLISH: topic name, packet ID (QoS>0), payload +- SUBACK, PUBACK, PUBREC, PUBREL, PUBCOMP responses + +--- + +### Task E2: MQTT Session Persistence with JetStream + +**Files:** +- Modify: `src/NATS.Server/Mqtt/MqttListener.cs` +- Create: `src/NATS.Server/Mqtt/MqttSessionStore.cs` +- Test: `tests/NATS.Server.Tests/Mqtt/MqttSessionPersistenceTests.cs` + +**Context:** Go stores MQTT sessions in a dedicated `$MQTT_sess` JetStream stream. Session state includes: subscriptions with QoS levels, pending publishes, will messages. + +**Go reference:** `mqtt.go:253-300` (mqttSession struct) + +**Implementation:** +- `MqttSessionStore`: wraps JetStream stream `$MQTT_sess_` +- `SaveSession(clientId, state)` → store JSON session snapshot +- `LoadSession(clientId)` → restore on reconnect +- `DeleteSession(clientId)` → clean session on CONNECT with clean=true +- Flapper detection: track connect/disconnect timestamps, apply backoff + +--- + +### Task E3: MQTT QoS and Retained Messages + +**Files:** +- Modify: `src/NATS.Server/Mqtt/MqttConnection.cs` +- Create: `src/NATS.Server/Mqtt/MqttRetainedStore.cs` +- Test: `tests/NATS.Server.Tests/Mqtt/MqttQosTests.cs` + +**Context:** QoS 1 (at-least-once) with PUBACK tracking. QoS 2 (exactly-once) with PUBREC/PUBREL/PUBCOMP. Retained messages stored per-account in JetStream. + +**Implementation:** +- `_pendingPublish`: `ConcurrentDictionary` keyed by packet ID +- QoS 1: track PUBLISH → wait PUBACK → remove from pending +- QoS 2: PUBLISH → PUBREC → PUBREL → PUBCOMP state machine +- Retained messages: `$MQTT_retained_` stream, one message per topic (overwrite) +- MQTT wildcard translation: `+` → `*`, `#` → `>`, `/` → `.` + +--- + +### Task E4: Account Import/Export with Cycle Detection + +**Files:** +- Create: `src/NATS.Server/Auth/AccountImportExport.cs` +- Modify: `src/NATS.Server/Auth/Account.cs` +- Test: `tests/NATS.Server.Tests/Auth/AccountImportExportTests.cs` + +**Context:** Service/stream exports with whitelist enforcement. Imports with weighted destination selection. Cycle detection prevents A→B→A import chains. + +**Go reference:** `accounts.go:1500-2000` (addServiceImport, addStreamImport) + +**Implementation:** +- `AddServiceExport(subject, accounts[])` → register export with optional account whitelist +- `AddServiceImport(from, subject, to)` → bind import, check cycle +- `DetectCycle(from, to, visited)` → DFS through import graph +- Weighted imports: `AddServiceImport(from, subject, to, weight)` → round-robin or weighted random + +--- + +### Task E5: Account JetStream Limits + +**Files:** +- Create: `src/NATS.Server/Auth/AccountLimits.cs` +- Modify: `src/NATS.Server/Auth/Account.cs` +- Test: `tests/NATS.Server.Tests/Auth/AccountLimitsTests.cs` + +**Context:** Per-account limits on JetStream resources: max storage, max streams, max consumers. + +**Implementation:** +- `AccountLimits`: MaxStorage, MaxStreams, MaxConsumers, MaxAckPending +- `Account.TryReserveStream()` → checks against limits +- `Account.TryReserveConsumer()` → checks against limits +- `Account.TrackStorage(delta)` → updates storage usage atomically +- Evict oldest client when MaxConnections exceeded + +--- + +### Task E6: System Account and $SYS Handling + +**Files:** +- Modify: `src/NATS.Server/Auth/Account.cs` +- Modify: `src/NATS.Server/NatsServer.cs` +- Test: `tests/NATS.Server.Tests/Auth/SystemAccountTests.cs` + +**Context:** The system account handles `$SYS.>` subjects for internal server-to-server communication. + +**Implementation:** +- Designate one account as system (from config or JWT) +- Route `$SYS.>` subscriptions to system account's SubList +- Internal event system publishes to system account subjects +- Non-system accounts cannot subscribe to `$SYS.>` + +--- + +### Task E7: Config Signal Handling (SIGHUP) + +**Files:** +- Modify: `src/NATS.Server/Configuration/ConfigReloader.cs` +- Modify: `src/NATS.Server.Host/Program.cs` +- Test: `tests/NATS.Server.Tests/Configuration/SignalReloadTests.cs` + +**Context:** On Unix, SIGHUP triggers config reload. Use .NET 10's `PosixSignalRegistration`. + +**Implementation:** +- `PosixSignalRegistration.Create(PosixSignal.SIGHUP, handler)` +- Handler calls `ConfigReloader.ReloadAsync()` which diffs and applies changes +- Wire `ConfigReloader.ApplyDiff(diff)` to actually reload subsystems (currently validation-only) + +--- + +### Task E8: Auth Change Propagation on Reload + +**Files:** +- Modify: `src/NATS.Server/Configuration/ConfigReloader.cs` +- Modify: `src/NATS.Server/NatsServer.cs` +- Test: `tests/NATS.Server.Tests/Configuration/AuthReloadTests.cs` + +**Context:** On config reload, disconnect clients whose credentials are no longer valid. + +**Implementation:** +- After diff identifies auth changes, iterate connected clients +- Re-evaluate each client's auth against new config +- Disconnect clients that fail re-evaluation with `AUTH_EXPIRED` error + +--- + +### Task E9: TLS Certificate Reload + +**Files:** +- Modify: `src/NATS.Server/Configuration/ConfigReloader.cs` +- Test: `tests/NATS.Server.Tests/Configuration/TlsReloadTests.cs` + +**Context:** Reload TLS certificates without restarting the server. + +**Implementation:** +- On reload, if TLS config changed, load new certificate +- New connections use new certificate +- Existing connections continue with old certificate until reconnect + +--- + +### Task E10: WebSocket Compression Negotiation + +**Files:** +- Modify: `src/NATS.Server/WebSocket/WsUpgrade.cs` +- Modify: `src/NATS.Server/WebSocket/WsCompression.cs` +- Test: `tests/NATS.Server.Tests/WebSocket/WsCompressionNegotiationTests.cs` + +**Context:** Full `permessage-deflate` parameter negotiation per RFC 7692. + +**Implementation:** +- Parse `Sec-WebSocket-Extensions` header for deflate parameters +- `server_no_context_takeover`, `client_no_context_takeover` +- `server_max_window_bits`, `client_max_window_bits` +- Return negotiated parameters in upgrade response + +--- + +### Task E11: WebSocket JWT Authentication + +**Files:** +- Modify: `src/NATS.Server/WebSocket/WsUpgrade.cs` +- Test: `tests/NATS.Server.Tests/WebSocket/WsJwtAuthTests.cs` + +**Context:** Extract JWT token from WebSocket upgrade request (cookie or query parameter) and authenticate. + +**Implementation:** +- Check `Authorization` header, `jwt` cookie, or `?jwt=` query parameter +- Pass to `AuthService.Authenticate()` pipeline +- On failure, reject WebSocket upgrade with 401 + +--- + +### Task E12: Monitoring Connz Filtering and Sort + +**Files:** +- Modify: `src/NATS.Server/Monitoring/ConnzHandler.cs` +- Test: `tests/NATS.Server.Tests/Monitoring/ConnzFilterTests.cs` + +**Context:** `/connz?acc=ACCOUNT&sort=bytes_to&limit=10&offset=0` + +**Implementation:** +- Parse query parameters: `acc`, `sort`, `limit`, `offset`, `state` (open/closed/all) +- Sort by: `cid`, `start`, `subs`, `pending`, `msgs_to`, `msgs_from`, `bytes_to`, `bytes_from` +- Account filtering: only return connections for specified account +- Closed connection ring buffer for disconnect history + +--- + +### Task E13: Full System Event Payloads + +**Files:** +- Modify: `src/NATS.Server/Events/EventTypes.cs` +- Test: `tests/NATS.Server.Tests/Events/EventPayloadTests.cs` + +**Context:** Ensure all event DTOs have complete JSON fields matching Go's output. + +**Go reference:** `events.go:100-300` (event struct fields) + +--- + +### Task E14: Message Trace Propagation + +**Files:** +- Rewrite: `src/NATS.Server/Internal/MessageTraceContext.cs` (22 → ~200+ lines) +- Modify: `src/NATS.Server/NatsClient.cs` (trace on publish/deliver) +- Test: `tests/NATS.Server.Tests/Internal/MessageTraceTests.cs` + +**Context:** Trace messages through the full delivery pipeline: publish → route → gateway → leaf → deliver. + +**Go reference:** `msgtrace.go` (799 lines) + +**Implementation:** +- `MessageTraceContext`: collects trace events as message traverses the system +- Trace header: `Nats-Trace-Dest` specifies where to publish trace report +- On each hop: add trace entry with timestamp, node, action +- On delivery: publish collected trace to `Nats-Trace-Dest` subject + +--- + +### Task E15: Port Services Go Tests + DB Update + +**Tests to port:** ~59 mqtt_test.go, ~34 accounts_test.go, ~105 reload+opts, ~53 websocket, ~118 monitor/events/msgtrace +**Batch DB update at end of each sub-phase.** + +--- + +## DB Update Script Template + +At the end of each track, run a batch SQL update. Example for Track A: + +```bash +sqlite3 docs/test_parity.db <<'SQL' +-- Track A: FileStore tests ported +UPDATE go_tests SET status='mapped', dotnet_test='RoundTrip_SimpleMessage', dotnet_file='FileStoreGoParityTests.cs', + notes='Ported from TestFileStoreBasic in filestore_test.go:42' + WHERE go_file='filestore_test.go' AND go_test='TestFileStoreBasic'; +-- ... repeat for each ported test ... + +-- Verify +SELECT status, COUNT(*) FROM go_tests GROUP BY status; +SQL +``` + +--- + +## Execution Summary + +| Track | Tasks | Est. Tests | Dependencies | Priority | +|-------|-------|-----------|--------------|----------| +| A: Storage | A1-A6 | ~159 | None | Start immediately | +| D: Networking | D1-D7 | ~159 | None | Start immediately | +| E: Services | E1-E15 | ~369 | None | Start immediately | +| C: Protocol | C1-C11 | ~391 | Track A | After A merges | +| B: Consensus | B1-B11 | ~490 | Tracks A + C | After A+C merge | +| **Total** | **50 tasks** | **~1,568** | | | + +**Success criteria:** +- All 50 tasks complete +- All new tests passing (`dotnet test` green) +- `test_parity.db` updated: mapped ratio 29% → ~70%+ +- Each track committed and merged to main diff --git a/docs/plans/2026-02-24-structuregaps-full-parity-plan.md.tasks.json b/docs/plans/2026-02-24-structuregaps-full-parity-plan.md.tasks.json new file mode 100644 index 0000000..f4e27ba --- /dev/null +++ b/docs/plans/2026-02-24-structuregaps-full-parity-plan.md.tasks.json @@ -0,0 +1,56 @@ +{ + "planPath": "docs/plans/2026-02-24-structuregaps-full-parity-plan.md", + "tasks": [ + {"id": 0, "subject": "Task A1: Message Block Binary Record Encoding", "status": "pending"}, + {"id": 1, "subject": "Task A2: Message Block Abstraction", "status": "pending", "blockedBy": [0]}, + {"id": 2, "subject": "Task A3: FileStore Block Manager Rewrite", "status": "pending", "blockedBy": [1]}, + {"id": 3, "subject": "Task A4: Tombstone Tracking and Purge", "status": "pending", "blockedBy": [2]}, + {"id": 4, "subject": "Task A5: Write Cache and TTL Scheduling", "status": "pending", "blockedBy": [2]}, + {"id": 5, "subject": "Task A6: Port FileStore Go Tests + DB Update", "status": "pending", "blockedBy": [3, 4]}, + {"id": 6, "subject": "Task B1: RAFT Apply Queue and Commit Tracking", "status": "pending", "blockedBy": [5, 26]}, + {"id": 7, "subject": "Task B2: Campaign Timeout and Election Management", "status": "pending", "blockedBy": [6]}, + {"id": 8, "subject": "Task B3: Health Classification and Peer Tracking", "status": "pending", "blockedBy": [6]}, + {"id": 9, "subject": "Task B4: Membership Changes (Add/Remove Peer)", "status": "pending", "blockedBy": [7, 8]}, + {"id": 10, "subject": "Task B5: Snapshot Checkpoints and Log Compaction", "status": "pending", "blockedBy": [9]}, + {"id": 11, "subject": "Task B6: Pre-Vote Protocol", "status": "pending", "blockedBy": [7]}, + {"id": 12, "subject": "Task B7: Stream/Consumer Assignment Types", "status": "pending", "blockedBy": [10]}, + {"id": 13, "subject": "Task B8: JetStream Meta-Group Proposal Workflow", "status": "pending", "blockedBy": [12]}, + {"id": 14, "subject": "Task B9: Placement Engine", "status": "pending", "blockedBy": [12]}, + {"id": 15, "subject": "Task B10: Per-Stream RAFT Groups", "status": "pending", "blockedBy": [13]}, + {"id": 16, "subject": "Task B11: Port RAFT + Cluster Go Tests + DB Update", "status": "pending", "blockedBy": [15, 14, 11]}, + {"id": 17, "subject": "Task C1: Client Adaptive Buffers + Slow Consumer", "status": "pending"}, + {"id": 18, "subject": "Task C2: AckProcessor NAK/TERM/PROGRESS", "status": "pending"}, + {"id": 19, "subject": "Task C3: PushConsumer Delivery Dispatch", "status": "pending"}, + {"id": 20, "subject": "Task C4: Redelivery Tracker with Backoff", "status": "pending", "blockedBy": [18]}, + {"id": 21, "subject": "Task C5: Priority Groups and Idle Heartbeats", "status": "pending", "blockedBy": [19]}, + {"id": 22, "subject": "Task C6: PullConsumer Timeout + Filter", "status": "pending"}, + {"id": 23, "subject": "Task C7: JetStream API Leader Forwarding", "status": "pending"}, + {"id": 24, "subject": "Task C8: Stream Purge with Options", "status": "pending", "blockedBy": [5]}, + {"id": 25, "subject": "Task C9: Mirror Synchronization Loop", "status": "pending", "blockedBy": [5]}, + {"id": 26, "subject": "Task C10: Source Coordination with Filtering", "status": "pending", "blockedBy": [5]}, + {"id": 27, "subject": "Task C11: Port Protocol Go Tests + DB Update", "status": "pending", "blockedBy": [17, 20, 21, 22, 23, 24, 25, 26]}, + {"id": 28, "subject": "Task D1: Gateway Interest-Only Mode", "status": "pending"}, + {"id": 29, "subject": "Task D2: Route Pool Accounting per Account", "status": "pending"}, + {"id": 30, "subject": "Task D3: Route S2 Compression", "status": "pending"}, + {"id": 31, "subject": "Task D4: Gateway Reply Mapper Expansion", "status": "pending"}, + {"id": 32, "subject": "Task D5: Leaf Solicited Connections + JetStream Domains", "status": "pending"}, + {"id": 33, "subject": "Task D6: Leaf Subject Filtering", "status": "pending"}, + {"id": 34, "subject": "Task D7: Port Networking Go Tests + DB Update", "status": "pending", "blockedBy": [28, 29, 30, 31, 32, 33]}, + {"id": 35, "subject": "Task E1: MQTT Binary Protocol Parser", "status": "pending"}, + {"id": 36, "subject": "Task E2: MQTT Session Persistence (JetStream)", "status": "pending", "blockedBy": [35]}, + {"id": 37, "subject": "Task E3: MQTT QoS and Retained Messages", "status": "pending", "blockedBy": [36]}, + {"id": 38, "subject": "Task E4: Account Import/Export + Cycle Detection", "status": "pending"}, + {"id": 39, "subject": "Task E5: Account JetStream Limits", "status": "pending"}, + {"id": 40, "subject": "Task E6: System Account + $SYS Handling", "status": "pending", "blockedBy": [38]}, + {"id": 41, "subject": "Task E7: Config Signal Handling (SIGHUP)", "status": "pending"}, + {"id": 42, "subject": "Task E8: Auth Change Propagation on Reload", "status": "pending", "blockedBy": [41]}, + {"id": 43, "subject": "Task E9: TLS Certificate Reload", "status": "pending", "blockedBy": [41]}, + {"id": 44, "subject": "Task E10: WebSocket Compression Negotiation", "status": "pending"}, + {"id": 45, "subject": "Task E11: WebSocket JWT Authentication", "status": "pending"}, + {"id": 46, "subject": "Task E12: Monitoring Connz Filtering + Sort", "status": "pending"}, + {"id": 47, "subject": "Task E13: Full System Event Payloads", "status": "pending"}, + {"id": 48, "subject": "Task E14: Message Trace Propagation", "status": "pending"}, + {"id": 49, "subject": "Task E15: Port Services Go Tests + DB Update", "status": "pending", "blockedBy": [35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48]} + ], + "lastUpdated": "2026-02-24T12:00:00Z" +}