Files
natsdotnet/docs/plans/2026-02-24-structuregaps-full-parity-plan.md
Joseph Doherty 2a240c6355 docs: add implementation plan for all 15 structure gaps
50 tasks across 5 parallel tracks (A-E) with full TDD steps,
Go reference citations, file paths, and test_parity.db update
protocol. Task persistence file for session resumption.
2026-02-24 12:05:22 -05:00

1269 lines
46 KiB
Markdown

# Full Go Parity: All 15 Structure Gaps — Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans to implement this plan task-by-task.
**Goal:** Port all missing functionality from `docs/structuregaps.md` (15 gaps) from Go NATS server to .NET, update `test_parity.db` for each ported test, target ~1,194 additional mapped Go tests (29% → ~70%).
**Architecture:** 5 parallel tracks (A/D/E independent, C depends on A, B depends on A+C). Feature-first TDD: write failing test, implement minimum code, verify, commit. Batch DB updates at end of each sub-phase.
**Tech Stack:** .NET 10 / C# 14, xUnit 3, Shouldly, NSubstitute, IronSnappy (S2), System.Security.Cryptography (ChaCha20/AES-GCM), System.IO.Pipelines, SQLite (test_parity.db)
---
## Track A: Storage (FileStore Block Management)
### Task A1: Message Block Binary Record Encoding
**Files:**
- Create: `src/NATS.Server/JetStream/Storage/MessageRecord.cs`
- Test: `tests/NATS.Server.Tests/JetStream/Storage/MessageRecordTests.cs`
**Context:** Go stores messages as binary records in block files: `[1:flags][varint:subject_len][N:subject][varint:hdr_len][M:headers][varint:msg_len][P:payload][8:checksum]`. The .NET store currently uses JSONL. This task creates the binary encoding/decoding layer.
**Go reference:** `filestore.go:8770-8783` (message record format), `filestore.go:5720-5790` (writeMsgRecord)
**Step 1: Write the failing test**
```csharp
// MessageRecordTests.cs
using Shouldly;
namespace NATS.Server.Tests.JetStream.Storage;
public class MessageRecordTests
{
[Fact]
public void RoundTrip_SimpleMessage()
{
var record = new MessageRecord
{
Sequence = 1,
Subject = "test.subject",
Headers = ReadOnlyMemory<byte>.Empty,
Payload = "hello"u8.ToArray(),
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
};
var encoded = MessageRecord.Encode(record);
var decoded = MessageRecord.Decode(encoded);
decoded.Sequence.ShouldBe(1UL);
decoded.Subject.ShouldBe("test.subject");
decoded.Payload.Span.SequenceEqual("hello"u8).ShouldBeTrue();
}
[Fact]
public void RoundTrip_WithHeaders()
{
var headers = "NATS/1.0\r\nNats-Msg-Id: abc\r\n\r\n"u8.ToArray();
var record = new MessageRecord
{
Sequence = 42,
Subject = "orders.new",
Headers = headers,
Payload = "{\"id\":1}"u8.ToArray(),
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
};
var encoded = MessageRecord.Encode(record);
var decoded = MessageRecord.Decode(encoded);
decoded.Sequence.ShouldBe(42UL);
decoded.Headers.Length.ShouldBe(headers.Length);
}
[Fact]
public void Encode_SetsChecksumInTrailer()
{
var record = new MessageRecord { Sequence = 1, Subject = "x", Payload = "y"u8.ToArray() };
var encoded = MessageRecord.Encode(record);
// Last 8 bytes are checksum
encoded.Length.ShouldBeGreaterThan(8);
var checksum = BitConverter.ToUInt64(encoded.AsSpan()[^8..]);
checksum.ShouldNotBe(0UL);
}
[Fact]
public void Decode_DetectsCorruptChecksum()
{
var record = new MessageRecord { Sequence = 1, Subject = "x", Payload = "y"u8.ToArray() };
var encoded = MessageRecord.Encode(record);
// Corrupt payload byte
encoded[encoded.Length / 2] ^= 0xFF;
Should.Throw<InvalidDataException>(() => MessageRecord.Decode(encoded));
}
[Theory]
[InlineData(0)]
[InlineData(1)]
[InlineData(127)]
[InlineData(128)]
[InlineData(16383)]
[InlineData(16384)]
public void Varint_RoundTrip(int value)
{
Span<byte> buf = stackalloc byte[8];
var written = MessageRecord.WriteVarint(buf, (ulong)value);
var (decoded, read) = MessageRecord.ReadVarint(buf);
decoded.ShouldBe((ulong)value);
read.ShouldBe(written);
}
}
```
**Step 2: Run test to verify it fails**
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MessageRecordTests" -v normal`
Expected: FAIL — `MessageRecord` type does not exist.
**Step 3: Write minimal implementation**
```csharp
// MessageRecord.cs
using System.Buffers.Binary;
using System.IO.Hashing;
using System.Text;
namespace NATS.Server.JetStream.Storage;
/// <summary>
/// Binary message record encoding matching Go filestore.go wire format.
/// Format: [1:flags][varint:subj_len][N:subject][varint:hdr_len][M:headers][varint:payload_len][P:payload][8:checksum]
/// Go reference: filestore.go:8770-8783
/// </summary>
public sealed class MessageRecord
{
public ulong Sequence { get; init; }
public required string Subject { get; init; }
public ReadOnlyMemory<byte> Headers { get; init; }
public ReadOnlyMemory<byte> Payload { get; init; }
public long Timestamp { get; init; }
public bool Deleted { get; init; }
private const byte DeletedFlag = 0x80; // ebit in Go
public static byte[] Encode(MessageRecord record)
{
var subjectBytes = Encoding.ASCII.GetBytes(record.Subject);
// Calculate total size
var size = 1 // flags
+ VarintSize((ulong)subjectBytes.Length) + subjectBytes.Length
+ VarintSize((ulong)record.Headers.Length) + record.Headers.Length
+ VarintSize((ulong)record.Payload.Length) + record.Payload.Length
+ 8; // sequence
size += 8; // checksum
var buf = new byte[size];
var offset = 0;
// Flags
buf[offset++] = record.Deleted ? DeletedFlag : (byte)0;
// Subject
offset += WriteVarint(buf.AsSpan(offset), (ulong)subjectBytes.Length);
subjectBytes.CopyTo(buf.AsSpan(offset));
offset += subjectBytes.Length;
// Headers
offset += WriteVarint(buf.AsSpan(offset), (ulong)record.Headers.Length);
record.Headers.Span.CopyTo(buf.AsSpan(offset));
offset += record.Headers.Length;
// Payload
offset += WriteVarint(buf.AsSpan(offset), (ulong)record.Payload.Length);
record.Payload.Span.CopyTo(buf.AsSpan(offset));
offset += record.Payload.Length;
// Sequence (8 bytes LE)
BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(offset), record.Sequence);
offset += 8;
// Checksum (XxHash64 of everything before checksum)
var hash = XxHash64.HashToUInt64(buf.AsSpan(0, offset));
BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(offset), hash);
return buf;
}
public static MessageRecord Decode(ReadOnlySpan<byte> data)
{
if (data.Length < 10)
throw new InvalidDataException("Record too short");
// Verify checksum first
var payloadSpan = data[..^8];
var storedChecksum = BinaryPrimitives.ReadUInt64LittleEndian(data[^8..]);
var computedChecksum = XxHash64.HashToUInt64(payloadSpan);
if (storedChecksum != computedChecksum)
throw new InvalidDataException("Checksum mismatch");
var offset = 0;
// Flags
var flags = data[offset++];
var deleted = (flags & DeletedFlag) != 0;
// Subject
var (subjLen, read) = ReadVarint(data[offset..]);
offset += read;
var subject = Encoding.ASCII.GetString(data.Slice(offset, (int)subjLen));
offset += (int)subjLen;
// Headers
var (hdrLen, hdrRead) = ReadVarint(data[offset..]);
offset += hdrRead;
var headers = data.Slice(offset, (int)hdrLen).ToArray();
offset += (int)hdrLen;
// Payload
var (payLen, payRead) = ReadVarint(data[offset..]);
offset += payRead;
var payload = data.Slice(offset, (int)payLen).ToArray();
offset += (int)payLen;
// Sequence
var sequence = BinaryPrimitives.ReadUInt64LittleEndian(data[offset..]);
return new MessageRecord
{
Sequence = sequence,
Subject = subject,
Headers = headers,
Payload = payload,
Deleted = deleted,
};
}
public static int WriteVarint(Span<byte> buf, ulong value)
{
var i = 0;
while (value >= 0x80)
{
buf[i++] = (byte)(value | 0x80);
value >>= 7;
}
buf[i++] = (byte)value;
return i;
}
public static (ulong value, int bytesRead) ReadVarint(ReadOnlySpan<byte> buf)
{
ulong value = 0;
var shift = 0;
var i = 0;
while (i < buf.Length)
{
var b = buf[i++];
value |= (ulong)(b & 0x7F) << shift;
if ((b & 0x80) == 0)
return (value, i);
shift += 7;
}
throw new InvalidDataException("Unterminated varint");
}
private static int VarintSize(ulong value)
{
var size = 1;
while (value >= 0x80) { size++; value >>= 7; }
return size;
}
}
```
**Step 4: Run test to verify it passes**
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MessageRecordTests" -v normal`
Expected: PASS (all 7 tests)
**Step 5: Commit**
```bash
git add src/NATS.Server/JetStream/Storage/MessageRecord.cs tests/NATS.Server.Tests/JetStream/Storage/MessageRecordTests.cs
git commit -m "feat(storage): add binary message record encoding (Go parity)"
```
---
### Task A2: Message Block Abstraction
**Files:**
- Create: `src/NATS.Server/JetStream/Storage/MsgBlock.cs`
- Test: `tests/NATS.Server.Tests/JetStream/Storage/MsgBlockTests.cs`
**Context:** Go's `msgBlock` is the unit of storage — a file containing sequential message records with an in-memory index. Blocks are sealed (read-only) when they reach a size limit, and a new block is created. This maps to Go's `type msgBlock struct` in `filestore.go:108-180`.
**Step 1: Write the failing test**
```csharp
// MsgBlockTests.cs
namespace NATS.Server.Tests.JetStream.Storage;
public class MsgBlockTests
{
[Fact]
public void Write_SingleMessage_ReturnsSequence()
{
using var block = MsgBlock.Create(0, GetTempPath(), maxBytes: 64 * 1024);
var seq = block.Write("test.subj", ReadOnlyMemory<byte>.Empty, "payload"u8.ToArray());
seq.ShouldBe(1UL);
}
[Fact]
public void Write_MultipleMessages_IncrementsSequence()
{
using var block = MsgBlock.Create(0, GetTempPath(), maxBytes: 64 * 1024);
block.Write("a", ReadOnlyMemory<byte>.Empty, "1"u8.ToArray()).ShouldBe(1UL);
block.Write("b", ReadOnlyMemory<byte>.Empty, "2"u8.ToArray()).ShouldBe(2UL);
block.Write("c", ReadOnlyMemory<byte>.Empty, "3"u8.ToArray()).ShouldBe(3UL);
block.MessageCount.ShouldBe(3UL);
}
[Fact]
public void Read_BySequence_ReturnsMessage()
{
using var block = MsgBlock.Create(0, GetTempPath(), maxBytes: 64 * 1024);
block.Write("test.subj", ReadOnlyMemory<byte>.Empty, "hello"u8.ToArray());
var msg = block.Read(1);
msg.ShouldNotBeNull();
msg.Subject.ShouldBe("test.subj");
msg.Payload.Span.SequenceEqual("hello"u8).ShouldBeTrue();
}
[Fact]
public void Read_NonexistentSequence_ReturnsNull()
{
using var block = MsgBlock.Create(0, GetTempPath(), maxBytes: 64 * 1024);
block.Read(999).ShouldBeNull();
}
[Fact]
public void IsSealed_ReturnsTrueWhenFull()
{
using var block = MsgBlock.Create(0, GetTempPath(), maxBytes: 200);
// Write until block is sealed
while (!block.IsSealed)
block.Write("s", ReadOnlyMemory<byte>.Empty, new byte[50]);
block.IsSealed.ShouldBeTrue();
}
[Fact]
public void Delete_MarksSequenceAsDeleted()
{
using var block = MsgBlock.Create(0, GetTempPath(), maxBytes: 64 * 1024);
block.Write("s", ReadOnlyMemory<byte>.Empty, "data"u8.ToArray());
block.Delete(1).ShouldBeTrue();
block.Read(1).ShouldBeNull();
block.DeletedCount.ShouldBe(1UL);
}
[Fact]
public void Recover_RebuildsIndexFromFile()
{
var path = GetTempPath();
// Write some messages
using (var block = MsgBlock.Create(0, path, maxBytes: 64 * 1024))
{
block.Write("a", ReadOnlyMemory<byte>.Empty, "1"u8.ToArray());
block.Write("b", ReadOnlyMemory<byte>.Empty, "2"u8.ToArray());
block.Flush();
}
// Recover from file
using var recovered = MsgBlock.Recover(0, path);
recovered.MessageCount.ShouldBe(2UL);
recovered.Read(1)!.Subject.ShouldBe("a");
recovered.Read(2)!.Subject.ShouldBe("b");
}
private static string GetTempPath() =>
Path.Combine(Path.GetTempPath(), $"nats_block_{Guid.NewGuid():N}");
}
```
**Step 2: Run test to verify it fails**
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~MsgBlockTests" -v normal`
Expected: FAIL — `MsgBlock` type does not exist.
**Step 3: Write minimal implementation**
Create `src/NATS.Server/JetStream/Storage/MsgBlock.cs` implementing:
- `Create(id, path, maxBytes)` — factory for new block
- `Recover(id, path)` — factory that rebuilds index from existing file
- `Write(subject, headers, payload)` → returns sequence, appends binary record to file buffer
- `Read(sequence)` → looks up in index, reads from file or cache, returns MessageRecord
- `Delete(sequence)` → marks in deletion set (SequenceSet), returns bool
- `Flush()` → writes buffer to disk
- `IsSealed` → true when `_bytesWritten >= _maxBytes`
- Properties: `MessageCount`, `DeletedCount`, `FirstSequence`, `LastSequence`, `BytesUsed`
- Internal: `Dictionary<ulong, (long offset, int length)>` index, `FileStream` for I/O
- `IDisposable` for file handle cleanup
**Go reference:** `filestore.go:108-180` (msgBlock struct), `filestore.go:5720-5800` (writeMsgRecord)
**Step 4:** Run tests, verify pass. **Step 5:** Commit with `feat(storage): add MsgBlock block-based message storage`.
---
### Task A3: FileStore Block Manager Rewrite
**Files:**
- Modify: `src/NATS.Server/JetStream/Storage/FileStore.cs`
- Test: `tests/NATS.Server.Tests/JetStream/Storage/FileStoreBlockTests.cs`
**Context:** Replace the JSONL append-only FileStore with a block manager that creates, seals, and rotates MsgBlocks. The existing async API (`AppendAsync`, `LoadAsync`, etc.) delegates to the block engine. The sync API (`StoreMsg`, `LoadMsg`, etc.) is implemented directly.
**Key changes to FileStore.cs:**
1. Replace `Dictionary<ulong, StoredMessage>` with `List<MsgBlock>` and `_lastBlock` pointer
2. `AppendAsync` → delegates to `StoreMsg` which writes to `_lastBlock`, rotates if sealed
3. `LoadAsync` → delegates to `LoadMsg` which finds block by sequence, reads from block
4. `RemoveAsync` → calls `_blocks[blockIdx].Delete(seq)`, updates state
5. Block rotation: when `_lastBlock.IsSealed`, create new block, add to list
6. Recovery: on construction, scan directory for `.blk` files, call `MsgBlock.Recover` for each
7. Wire S2Codec on write path (compress record before writing to block)
8. Wire AeadEncryptor on write path (encrypt after compression)
9. Atomic flush: write to temp file, rename on seal
**Tests verify:**
- Multi-block rotation (write >64KB to trigger new block)
- Cross-block reads (read from sealed + active blocks)
- Restart recovery (create store, write, dispose, re-create, verify data intact)
- Compression round-trip (enable S2, store+load, verify data matches)
- Encryption round-trip (enable AEAD, store+load, verify data matches)
- Purge by subject (only messages matching subject removed)
- Purge by sequence range
- Concurrent read/write safety
**Step flow:** Write 8 failing tests → implement block manager → verify all pass → commit.
---
### Task A4: Tombstone Tracking and Purge Operations
**Files:**
- Modify: `src/NATS.Server/JetStream/Storage/FileStore.cs`
- Modify: `src/NATS.Server/JetStream/Storage/MsgBlock.cs`
- Test: `tests/NATS.Server.Tests/JetStream/Storage/FileStorePurgeBlockTests.cs`
**Context:** Go tracks deleted messages via `dmap` (AVL-based SequenceSet per block). Purge operations: `PurgeEx(subject, seq, keep)`, `Compact(seq)`, `Truncate(seq)`. The .NET `SequenceSet.cs` (777 lines) already exists and can be used.
**Go reference:** `filestore.go:2200-2400` (purge), `filestore.go:2900-3100` (compact)
**Implementation:**
- Each `MsgBlock` gets a `SequenceSet _deleted` field
- `Delete(seq)` adds to `_deleted`, updates byte counts
- `Compact()` rewrites block excluding deleted sequences
- `FilteredState(seq, subject)` walks blocks, skips deleted
- `SubjectsState(filter)` aggregates per-subject counts excluding deleted
- `PurgeEx` on FileStore iterates blocks, calls `Delete` for matching sequences
**Tests:** Purge by subject, purge by sequence range, compact removes gaps, state queries reflect deletions.
---
### Task A5: Write Cache and TTL Scheduling
**Files:**
- Modify: `src/NATS.Server/JetStream/Storage/MsgBlock.cs` (add cache)
- Modify: `src/NATS.Server/JetStream/Storage/FileStore.cs` (add TTL)
- Test: `tests/NATS.Server.Tests/JetStream/Storage/FileStoreTtlTests.cs`
**Context:** Active block has an in-memory write cache for recent messages. On startup, walk blocks to reconstruct TTL expirations and register with `HashWheel`.
**Go reference:** `filestore.go:180-220` (cache struct), `filestore.go:8900-9000` (age check)
**Implementation:**
- `MsgBlock._cache`: `Dictionary<ulong, MessageRecord>` for hot messages, evicted on timer
- `FileStore._ttlWheel`: Reference to `HashWheel` for scheduling expirations
- On `StoreMsg` with TTL > 0: register expiration callback
- On startup recovery: walk all blocks, check timestamps, re-register unexpired TTLs
---
### Task A6: Port FileStore Go Tests + DB Update
**Files:**
- Modify: existing test files under `tests/NATS.Server.Tests/JetStream/Storage/`
- Create: `tests/NATS.Server.Tests/JetStream/Storage/FileStoreGoParityTests.cs`
**Context:** Port ~159 unmapped tests from `filestore_test.go`. Group by functionality: basic store/load (~30), compression (~15), encryption (~15), recovery (~20), purge/compact (~25), TTL (~10), concurrent (~15), state queries (~20), edge cases (~9).
**For each test ported, update DB:**
```sql
UPDATE go_tests SET status='mapped', dotnet_test='<Name>', dotnet_file='FileStoreGoParityTests.cs',
notes='Ported from <GoFunc> in filestore_test.go' WHERE go_file='filestore_test.go' AND go_test='<GoFunc>';
```
**Batch the DB update in a single SQL script at the end of this task.**
---
## Track B: Consensus (RAFT + JetStream Cluster)
**Blocked by:** Tracks A and C
### Task B1: RAFT Apply Queue and Commit Tracking
**Files:**
- Modify: `src/NATS.Server/Raft/RaftNode.cs`
- Create: `src/NATS.Server/Raft/CommitQueue.cs`
- Test: `tests/NATS.Server.Tests/Raft/RaftApplyQueueTests.cs`
**Context:** Go RAFT tracks `applied`, `processed`, `commit` indexes separately. The apply queue enables the state machine to consume committed entries asynchronously. Currently .NET only has `AppliedIndex`.
**Go reference:** `raft.go:150-160` (applied/processed fields), `raft.go:2100-2150` (ApplyQ)
**Implementation:**
- `CommitQueue<T>`: Channel-based queue for committed entries awaiting state machine application
- `RaftNode.CommitIndex`, `RaftNode.ProcessedIndex` alongside existing `AppliedIndex`
- `Applied(index)` method returns entries applied since last call
- `Processed(index, applied)` tracks pipeline efficiency
---
### Task B2: Campaign Timeout and Election Management
**Files:**
- Modify: `src/NATS.Server/Raft/RaftNode.cs`
- Test: `tests/NATS.Server.Tests/Raft/RaftElectionTimerTests.cs`
**Context:** Go uses randomized election delays (150-300ms) to prevent split votes. Currently .NET elections are on-demand only.
**Go reference:** `raft.go:1400-1450` (resetElectionTimeout), `raft.go:1500-1550` (campaign logic)
**Implementation:**
- `_electionTimer`: `PeriodicTimer` with randomized interval (150-300ms)
- `ResetElectionTimeout()` restarts timer on any heartbeat/append from leader
- Timer expiry triggers `Campaign()` if in Follower state
- `CampaignImmediately()` bypasses timer for testing
---
### Task B3: Health Classification and Peer Tracking
**Files:**
- Modify: `src/NATS.Server/Raft/RaftNode.cs`
- Create: `src/NATS.Server/Raft/RaftPeerState.cs`
- Test: `tests/NATS.Server.Tests/Raft/RaftHealthTests.cs`
**Context:** Go classifies peers as current/catching-up/leaderless based on `lastContact`, `nextIndex`, `matchIndex`.
**Implementation:**
- `RaftPeerState`: record with `LastContact`, `NextIndex`, `MatchIndex`, `Active`
- `Current()` — is this node's log within election timeout of leader?
- `Healthy()` — node health based on last contact time
- `Peers()` — returns list of peer states
- Leader tracks peer progress during replication
---
### Task B4: Membership Changes (Add/Remove Peer)
**Files:**
- Create: `src/NATS.Server/Raft/RaftMembership.cs`
- Modify: `src/NATS.Server/Raft/RaftNode.cs`
- Test: `tests/NATS.Server.Tests/Raft/RaftMembershipTests.cs`
**Context:** Go uses single-server membership changes (not joint consensus). `ProposeAddPeer`/`ProposeRemovePeer` return error if change already in flight.
**Go reference:** `raft.go:2500-2600` (ProposeAddPeer/RemovePeer)
**Implementation:**
- Special RAFT log entry type for membership changes
- `_membershipChangeIndex` tracks uncommitted change
- On commit: update peer list, recompute quorum
- On leader change: re-propose inflight membership changes
---
### Task B5: Snapshot Checkpoints and Log Compaction
**Files:**
- Create: `src/NATS.Server/Raft/RaftSnapshotCheckpoint.cs`
- Modify: `src/NATS.Server/Raft/RaftSnapshotStore.cs`
- Modify: `src/NATS.Server/Raft/RaftLog.cs`
- Test: `tests/NATS.Server.Tests/Raft/RaftSnapshotCheckpointTests.cs`
**Context:** After snapshot, truncate log entries before snapshot index. Streaming snapshot transfer for catching-up followers.
**Go reference:** `raft.go:3200-3400` (CreateSnapshotCheckpoint), `raft.go:3500-3700` (installSnapshot)
**Implementation:**
- `RaftSnapshotCheckpoint`: saves state at a point, allows install or abort
- `InstallSnapshot(chunks)`: chunked streaming transfer to follower
- `CompactLog(upToIndex)`: truncate entries before index, update base index
- `DrainAndReplaySnapshot()`: recovery path
---
### Task B6: Pre-Vote Protocol
**Files:**
- Modify: `src/NATS.Server/Raft/RaftNode.cs`
- Modify: `src/NATS.Server/Raft/RaftWireFormat.cs`
- Test: `tests/NATS.Server.Tests/Raft/RaftPreVoteTests.cs`
**Context:** Before incrementing term, candidate sends pre-vote requests. Prevents partitioned nodes from disrupting stable clusters.
**Go reference:** `raft.go:1600-1700` (pre-vote logic)
**Implementation:**
- New wire message: `RaftPreVoteRequest`/`RaftPreVoteResponse`
- Pre-vote granted only if candidate's log is at least as up-to-date
- Successful pre-vote → proceed to real election with term increment
---
### Task B7: Stream/Consumer Assignment Types
**Files:**
- Create: `src/NATS.Server/JetStream/Cluster/StreamAssignment.cs`
- Create: `src/NATS.Server/JetStream/Cluster/ConsumerAssignment.cs`
- Create: `src/NATS.Server/JetStream/Cluster/RaftGroup.cs`
- Test: `tests/NATS.Server.Tests/JetStream/Cluster/AssignmentSerializationTests.cs`
**Context:** Core data types for cluster coordination. Must serialize/deserialize for RAFT log entries.
**Go reference:** `jetstream_cluster.go:60-120` (streamAssignment, consumerAssignment structs)
---
### Task B8: JetStream Meta-Group RAFT Proposal Workflow
**Files:**
- Rewrite: `src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs` (51 → ~1,500+ lines)
- Test: `tests/NATS.Server.Tests/JetStream/Cluster/MetaGroupProposalTests.cs`
**Context:** The meta-controller receives API requests, validates, proposes stream/consumer assignments to the meta-group RAFT, and on commit all nodes apply the assignment.
**Go reference:** `jetstream_cluster.go:500-1000` (processStreamAssignment), `jetstream_cluster.go:1500-2000` (processConsumerAssignment)
**Implementation:**
- `ProposeStreamCreate(config)` → validate → create StreamAssignment → propose to RAFT
- `ProposeStreamDelete(name)` → propose deletion entry
- `ProposeConsumerCreate(stream, config)` → validate → create ConsumerAssignment → propose
- `ApplyEntry(entry)` → dispatch based on entry type (create/delete/update stream/consumer)
- Inflight tracking: `_inflightStreams`, `_inflightConsumers` dictionaries
- On leader change: re-propose inflight entries
---
### Task B9: Placement Engine
**Files:**
- Rewrite: `src/NATS.Server/JetStream/Cluster/AssetPlacementPlanner.cs` (17 → ~300+ lines)
- Rename to: `src/NATS.Server/JetStream/Cluster/PlacementEngine.cs`
- Test: `tests/NATS.Server.Tests/JetStream/Cluster/PlacementEngineTests.cs`
**Context:** Topology-aware placement: unique nodes, tag matching, cluster affinity.
**Go reference:** `jetstream_cluster.go:2500-2800` (selectPeerGroup)
**Implementation:**
- `SelectPeers(replicas, tags, cluster, exclude)` → returns peer list
- Ensures no two replicas on same node
- Tag filtering: only select nodes matching required tags
- Cluster affinity: prefer peers in same cluster for locality
---
### Task B10: Per-Stream RAFT Groups
**Files:**
- Modify: `src/NATS.Server/JetStream/Cluster/StreamReplicaGroup.cs`
- Test: `tests/NATS.Server.Tests/JetStream/Cluster/StreamRaftGroupTests.cs`
**Context:** Each stream gets its own RAFT group for message replication. The meta-group assigns the replica set, then the stream RAFT group handles message proposals.
**Implementation:**
- `StreamReplicaGroup` wraps a `RaftNode` with stream-specific apply logic
- `ProposeMessage(subject, headers, payload)` → proposes to stream RAFT
- On commit: apply message to local FileStore
- Leadership transfer triggers mirror/source reconnection
---
### Task B11: Port RAFT + Cluster Go Tests + DB Update
**Tests to port:** ~85 from `raft_test.go`, ~358 from `jetstream_cluster_*_test.go`, ~47 from `jetstream_super_cluster_test.go`
**Batch DB update SQL script at end of task.**
---
## Track C: Protocol (Client, Consumer, JetStream API, Mirrors/Sources)
### Task C1: Client Adaptive Buffers and Slow Consumer Detection
**Files:**
- Modify: `src/NATS.Server/NatsClient.cs`
- Test: `tests/NATS.Server.Tests/ClientAdaptiveBufferTests.cs`
**Context:** NatsClient.cs already has adaptive read buffers (lines 252-274) and slow consumer detection (lines 808-850). The gaps are: write buffer pooling with flush coalescing, max control line enforcement (4096 bytes), and write timeout with partial flush recovery.
**Implementation:**
1. Write buffer pool: use `ArrayPool<byte>.Shared` for outbound buffers, coalesce multiple small writes
2. Max control line: in parser, reject control lines >4096 bytes with protocol error
3. Write timeout: `CancellationTokenSource` with configurable timeout on `FlushAsync`
**Tests:** Write coalescing under load, control line overflow rejection, write timeout recovery.
---
### Task C2: AckProcessor NAK/TERM/PROGRESS Support
**Files:**
- Modify: `src/NATS.Server/JetStream/Consumers/AckProcessor.cs`
- Test: `tests/NATS.Server.Tests/JetStream/Consumers/AckProcessorNakTests.cs`
**Context:** AckProcessor currently only handles basic ACK. Go dispatches on ack type: ACK, NAK, TERM, PROGRESS. NAK schedules redelivery with backoff array.
**Go reference:** `consumer.go:2550-2650` (processAck), `consumer.go:2700-2750` (processNak)
**Implementation:**
- Parse ack payload to determine type (Go uses `-NAK`, `+NXT`, `+WPI`, `+TERM`)
- `ProcessNak(seq, delay?)` → schedule redelivery with optional delay
- `ProcessTerm(seq)` → remove from pending, do not redeliver
- `ProcessProgress(seq)` → reset ack deadline (work in progress)
- Thread-safe: replace Dictionary with ConcurrentDictionary or use locking
- Priority queue for expiry: `SortedSet<(DateTimeOffset deadline, ulong seq)>`
---
### Task C3: PushConsumer Delivery Dispatch
**Files:**
- Modify: `src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs`
- Test: `tests/NATS.Server.Tests/JetStream/Consumers/PushConsumerDeliveryTests.cs`
**Context:** PushConsumerEngine.Enqueue queues frames but NO delivery mechanism sends them to the consumer's deliver subject. This is the critical missing piece.
**Go reference:** `consumer.go:2700-3500` (deliverMsg, sendMsg)
**Implementation:**
- Add `DeliverSubject` property from ConsumerConfig
- Background delivery loop: poll `PushFrames` channel, format as HMSG, send via `NatsClient.SendMessage`
- Headers: `Nats-Sequence`, `Nats-Time-Stamp`, `Nats-Subject` (original subject)
- Delivery counter tracking per message
- Flow control: pause delivery when client's pending bytes exceed threshold
---
### Task C4: Redelivery Tracker with Backoff Schedules
**Files:**
- Create: `src/NATS.Server/JetStream/Consumers/RedeliveryTracker.cs`
- Test: `tests/NATS.Server.Tests/JetStream/Consumers/RedeliveryTrackerTests.cs`
**Context:** Track per-message redelivery count and apply configurable backoff arrays.
**Go reference:** `consumer.go:400-500` (pending map), `consumer.go:2800-2900` (backoff logic)
**Implementation:**
- `RedeliveryTracker(backoffMs[])`: configurable backoff schedule
- `Schedule(seq, deliveryCount)` → returns next delivery time based on backoff[min(count, len-1)]
- `GetDue()` → returns sequences ready for redelivery
- `MaxDeliveries(seq)` → true when delivery count exceeds max
---
### Task C5: Priority Group Pinning and Idle Heartbeats
**Files:**
- Create: `src/NATS.Server/JetStream/Consumers/PriorityGroupManager.cs`
- Modify: `src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs` (heartbeat timer)
- Test: `tests/NATS.Server.Tests/JetStream/Consumers/PriorityGroupTests.cs`
**Context:** Sticky consumer assignment within priority groups. Idle heartbeat timer sends empty messages when no data available.
**Go reference:** `consumer.go:500-600` (priority groups), `consumer.go:3100-3200` (heartbeat)
---
### Task C6: PullConsumer Timeout and Filter Compilation
**Files:**
- Modify: `src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs`
- Test: `tests/NATS.Server.Tests/JetStream/Consumers/PullConsumerTimeoutTests.cs`
**Context:** Enforce `ExpiresMs` on long-running fetches. Compile filter subjects for efficient matching.
**Implementation:**
- `CancellationTokenSource` with ExpiresMs timeout on fetch
- Pre-compile filter subjects into `SubjectMatch` patterns for O(1) matching
- Skip non-matching subjects without counting against pending limits
---
### Task C7: JetStream API Leader Forwarding
**Files:**
- Modify: `src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs`
- Modify: `src/NATS.Server/JetStream/JetStreamService.cs`
- Test: `tests/NATS.Server.Tests/JetStream/Api/LeaderForwardingTests.cs`
**Context:** API requests at non-leader nodes must be forwarded to the current leader.
**Go reference:** `jetstream_api.go:200-300` (isLeader check, forward logic)
**Implementation:**
- Before handling any API request, check `_metaGroup.IsLeader()`
- If not leader: serialize request, publish to leader's reply subject, return leader's response
- Timeout on forwarding with error response
---
### Task C8: Stream Purge with Options
**Files:**
- Modify: `src/NATS.Server/JetStream/Api/StreamApiHandlers.cs`
- Modify: `src/NATS.Server/JetStream/StreamManager.cs`
- Test: `tests/NATS.Server.Tests/JetStream/Api/StreamPurgeOptionsTests.cs`
**Context:** Go supports: purge by subject filter, keep N messages, sequence-based purge.
**Go reference:** `jetstream_api.go:1200-1350` (handleStreamPurge)
**Implementation:**
- Parse purge request: `{ "filter": "orders.*", "seq": 100, "keep": 5 }`
- `StreamManager.PurgeEx(filter, seq, keep)` delegates to FileStore
- FileStore iterates blocks, deletes matching sequences
---
### Task C9: Mirror Synchronization Loop
**Files:**
- Rewrite: `src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs` (23 → ~500+ lines)
- Test: `tests/NATS.Server.Tests/JetStream/MirrorSource/MirrorSyncTests.cs`
**Context:** Continuous pull from origin stream, apply messages locally. Maintain origin→current sequence alignment.
**Go reference:** `stream.go:500-800` (processMirrorMsgs, setupMirrorConsumer)
**Implementation:**
- Background task: create ephemeral pull consumer on origin stream
- Fetch batches of messages, apply to local store
- Track `LastOriginSequence` for catchup after restarts
- Retry with exponential backoff on failures
- Health reporting: lag (origin last seq - local last seq)
---
### Task C10: Source Coordination with Filtering
**Files:**
- Rewrite: `src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs` (37 → ~500+ lines)
- Test: `tests/NATS.Server.Tests/JetStream/MirrorSource/SourceFilterTests.cs`
**Context:** Source consumption with subject filtering, account isolation, and deduplication.
**Implementation:**
- `FilterSubject` support: only forward messages matching filter
- `SubjectTransformPrefix` applied before storing
- Deduplication via `Nats-Msg-Id` header with configurable window
- Multiple sources per stream supported
- Lag tracking per source
---
### Task C11: Port Protocol Go Tests + DB Update
**Tests to port:** ~43 client_test.go, ~134 jetstream_consumer_test.go, ~184 jetstream_test.go, ~30 mirror/source
**Batch DB update at end.**
---
## Track D: Networking (Gateway, Leaf Node, Routes)
### Task D1: Gateway Interest-Only Mode State Machine
**Files:**
- Modify: `src/NATS.Server/Gateways/GatewayConnection.cs`
- Create: `src/NATS.Server/Gateways/GatewayInterestTracker.cs`
- Test: `tests/NATS.Server.Tests/Gateways/GatewayInterestModeTests.cs`
**Context:** After initial flooding, gateways switch to sending only messages for subjects with known remote subscribers. Currently .NET forwards everything.
**Go reference:** `gateway.go:100-150` (InterestMode enum), `gateway.go:1500-1600` (switchToInterestOnlyMode)
**Implementation:**
- `GatewayInterestMode` enum: `Optimistic`, `Transitioning`, `InterestOnly`
- Per-account interest state tracking
- `outsie` equivalent: mode + no-interest set (optimistic) or SubList (interest-only)
- Switch threshold: after N unsubscribes, switch from Optimistic → InterestOnly
- `ShouldForward(account, subject)` → checks interest state before sending
**Tests:**
- Start in Optimistic mode, all messages forwarded
- After threshold unsubscribes, switch to InterestOnly
- In InterestOnly, only messages with RS+ subscribers forwarded
- Mode persists across reconnection
---
### Task D2: Route Pool Accounting per Account
**Files:**
- Modify: `src/NATS.Server/Routes/RouteManager.cs`
- Modify: `src/NATS.Server/Routes/RouteConnection.cs`
- Test: `tests/NATS.Server.Tests/Routes/RoutePoolAccountTests.cs`
**Context:** Go hashes account name to determine which pool connection carries its traffic.
**Go reference:** `route.go:533-545` (computeRoutePoolIdx)
**Implementation:**
- `ComputeRoutePoolIdx(poolSize, accountName)` → FNV-1a hash → mod poolSize
- Each route connection tagged with pool index
- Message dispatch routes through appropriate pool connection
- Account-specific dedicated routes bypass pool (optional)
---
### Task D3: Route S2 Compression
**Files:**
- Modify: `src/NATS.Server/Routes/RouteCompressionCodec.cs`
- Test: `tests/NATS.Server.Tests/Routes/RouteS2CompressionTests.cs`
**Context:** Replace Deflate with S2 (IronSnappy). Add compression level auto-tuning.
**Implementation:**
- Replace `DeflateStream` with `IronSnappy` calls
- `CompressionLevel` enum: `Off`, `Fast` (S2 default), `Better`, `Best`
- `Auto` mode: adjust based on RTT measurements
- Negotiation during route handshake
---
### Task D4: Gateway Reply Mapper Expansion
**Files:**
- Modify: `src/NATS.Server/Gateways/ReplyMapper.cs`
- Test: `tests/NATS.Server.Tests/Gateways/ReplyMapperFullTests.cs`
**Context:** Full `_GR_.{clusterId}.{hash}.{originalReply}` handling.
**Go reference:** `gateway.go:2000-2100` (replyMapper logic)
---
### Task D5: Leaf Node Solicited Connections and JetStream Domains
**Files:**
- Modify: `src/NATS.Server/LeafNodes/LeafNodeManager.cs`
- Modify: `src/NATS.Server/LeafNodes/LeafConnection.cs`
- Test: `tests/NATS.Server.Tests/LeafNodes/LeafSolicitedConnectionTests.cs`
**Context:** Outbound leaf connections with retry/reconnect. Forward JetStream domain headers.
**Implementation:**
- `ConnectSolicitedAsync(url, account)` → establish outbound leaf connection
- Retry with exponential backoff (1s, 2s, 4s, ..., max 60s)
- `LeafNodeOptions.JetStreamDomain` propagated in connection handshake
- Domain header forwarded in LMSG frames
---
### Task D6: Leaf Subject Filtering
**Files:**
- Modify: `src/NATS.Server/LeafNodes/LeafHubSpokeMapper.cs`
- Test: `tests/NATS.Server.Tests/LeafNodes/LeafSubjectFilterTests.cs`
**Context:** Export/import specific subjects on leaf connections. Currently only account mapping.
---
### Task D7: Port Networking Go Tests + DB Update
**Tests to port:** ~61 gateway_test.go, ~59 leafnode_test.go, ~39 routes_test.go
**Batch DB update at end.**
---
## Track E: Services (MQTT, Accounts, Config, WebSocket, Monitoring)
### Task E1: MQTT Binary Protocol Parser
**Files:**
- Rewrite: `src/NATS.Server/Mqtt/MqttProtocolParser.cs`
- Rewrite: `src/NATS.Server/Mqtt/MqttPacketReader.cs`
- Rewrite: `src/NATS.Server/Mqtt/MqttPacketWriter.cs`
- Test: `tests/NATS.Server.Tests/Mqtt/MqttBinaryProtocolTests.cs`
**Context:** Replace text-based MQTT parser with proper MQTT v3.1.1 binary encoding. Go's implementation processes binary frames directly.
**Go reference:** `mqtt.go:1000-1200` (parse CONNECT, SUBSCRIBE, PUBLISH packets)
**Implementation:**
- Fixed header: [4:type][4:flags][1-4:remaining_length]
- CONNECT: protocol name, version, flags, keepalive, client ID, will topic/message, credentials
- SUBSCRIBE: packet ID, topic filter + QoS pairs
- PUBLISH: topic name, packet ID (QoS>0), payload
- SUBACK, PUBACK, PUBREC, PUBREL, PUBCOMP responses
---
### Task E2: MQTT Session Persistence with JetStream
**Files:**
- Modify: `src/NATS.Server/Mqtt/MqttListener.cs`
- Create: `src/NATS.Server/Mqtt/MqttSessionStore.cs`
- Test: `tests/NATS.Server.Tests/Mqtt/MqttSessionPersistenceTests.cs`
**Context:** Go stores MQTT sessions in a dedicated `$MQTT_sess` JetStream stream. Session state includes: subscriptions with QoS levels, pending publishes, will messages.
**Go reference:** `mqtt.go:253-300` (mqttSession struct)
**Implementation:**
- `MqttSessionStore`: wraps JetStream stream `$MQTT_sess_<account>`
- `SaveSession(clientId, state)` → store JSON session snapshot
- `LoadSession(clientId)` → restore on reconnect
- `DeleteSession(clientId)` → clean session on CONNECT with clean=true
- Flapper detection: track connect/disconnect timestamps, apply backoff
---
### Task E3: MQTT QoS and Retained Messages
**Files:**
- Modify: `src/NATS.Server/Mqtt/MqttConnection.cs`
- Create: `src/NATS.Server/Mqtt/MqttRetainedStore.cs`
- Test: `tests/NATS.Server.Tests/Mqtt/MqttQosTests.cs`
**Context:** QoS 1 (at-least-once) with PUBACK tracking. QoS 2 (exactly-once) with PUBREC/PUBREL/PUBCOMP. Retained messages stored per-account in JetStream.
**Implementation:**
- `_pendingPublish`: `ConcurrentDictionary<ushort, MqttPendingPublish>` keyed by packet ID
- QoS 1: track PUBLISH → wait PUBACK → remove from pending
- QoS 2: PUBLISH → PUBREC → PUBREL → PUBCOMP state machine
- Retained messages: `$MQTT_retained_<account>` stream, one message per topic (overwrite)
- MQTT wildcard translation: `+``*`, `#``>`, `/``.`
---
### Task E4: Account Import/Export with Cycle Detection
**Files:**
- Create: `src/NATS.Server/Auth/AccountImportExport.cs`
- Modify: `src/NATS.Server/Auth/Account.cs`
- Test: `tests/NATS.Server.Tests/Auth/AccountImportExportTests.cs`
**Context:** Service/stream exports with whitelist enforcement. Imports with weighted destination selection. Cycle detection prevents A→B→A import chains.
**Go reference:** `accounts.go:1500-2000` (addServiceImport, addStreamImport)
**Implementation:**
- `AddServiceExport(subject, accounts[])` → register export with optional account whitelist
- `AddServiceImport(from, subject, to)` → bind import, check cycle
- `DetectCycle(from, to, visited)` → DFS through import graph
- Weighted imports: `AddServiceImport(from, subject, to, weight)` → round-robin or weighted random
---
### Task E5: Account JetStream Limits
**Files:**
- Create: `src/NATS.Server/Auth/AccountLimits.cs`
- Modify: `src/NATS.Server/Auth/Account.cs`
- Test: `tests/NATS.Server.Tests/Auth/AccountLimitsTests.cs`
**Context:** Per-account limits on JetStream resources: max storage, max streams, max consumers.
**Implementation:**
- `AccountLimits`: MaxStorage, MaxStreams, MaxConsumers, MaxAckPending
- `Account.TryReserveStream()` → checks against limits
- `Account.TryReserveConsumer()` → checks against limits
- `Account.TrackStorage(delta)` → updates storage usage atomically
- Evict oldest client when MaxConnections exceeded
---
### Task E6: System Account and $SYS Handling
**Files:**
- Modify: `src/NATS.Server/Auth/Account.cs`
- Modify: `src/NATS.Server/NatsServer.cs`
- Test: `tests/NATS.Server.Tests/Auth/SystemAccountTests.cs`
**Context:** The system account handles `$SYS.>` subjects for internal server-to-server communication.
**Implementation:**
- Designate one account as system (from config or JWT)
- Route `$SYS.>` subscriptions to system account's SubList
- Internal event system publishes to system account subjects
- Non-system accounts cannot subscribe to `$SYS.>`
---
### Task E7: Config Signal Handling (SIGHUP)
**Files:**
- Modify: `src/NATS.Server/Configuration/ConfigReloader.cs`
- Modify: `src/NATS.Server.Host/Program.cs`
- Test: `tests/NATS.Server.Tests/Configuration/SignalReloadTests.cs`
**Context:** On Unix, SIGHUP triggers config reload. Use .NET 10's `PosixSignalRegistration`.
**Implementation:**
- `PosixSignalRegistration.Create(PosixSignal.SIGHUP, handler)`
- Handler calls `ConfigReloader.ReloadAsync()` which diffs and applies changes
- Wire `ConfigReloader.ApplyDiff(diff)` to actually reload subsystems (currently validation-only)
---
### Task E8: Auth Change Propagation on Reload
**Files:**
- Modify: `src/NATS.Server/Configuration/ConfigReloader.cs`
- Modify: `src/NATS.Server/NatsServer.cs`
- Test: `tests/NATS.Server.Tests/Configuration/AuthReloadTests.cs`
**Context:** On config reload, disconnect clients whose credentials are no longer valid.
**Implementation:**
- After diff identifies auth changes, iterate connected clients
- Re-evaluate each client's auth against new config
- Disconnect clients that fail re-evaluation with `AUTH_EXPIRED` error
---
### Task E9: TLS Certificate Reload
**Files:**
- Modify: `src/NATS.Server/Configuration/ConfigReloader.cs`
- Test: `tests/NATS.Server.Tests/Configuration/TlsReloadTests.cs`
**Context:** Reload TLS certificates without restarting the server.
**Implementation:**
- On reload, if TLS config changed, load new certificate
- New connections use new certificate
- Existing connections continue with old certificate until reconnect
---
### Task E10: WebSocket Compression Negotiation
**Files:**
- Modify: `src/NATS.Server/WebSocket/WsUpgrade.cs`
- Modify: `src/NATS.Server/WebSocket/WsCompression.cs`
- Test: `tests/NATS.Server.Tests/WebSocket/WsCompressionNegotiationTests.cs`
**Context:** Full `permessage-deflate` parameter negotiation per RFC 7692.
**Implementation:**
- Parse `Sec-WebSocket-Extensions` header for deflate parameters
- `server_no_context_takeover`, `client_no_context_takeover`
- `server_max_window_bits`, `client_max_window_bits`
- Return negotiated parameters in upgrade response
---
### Task E11: WebSocket JWT Authentication
**Files:**
- Modify: `src/NATS.Server/WebSocket/WsUpgrade.cs`
- Test: `tests/NATS.Server.Tests/WebSocket/WsJwtAuthTests.cs`
**Context:** Extract JWT token from WebSocket upgrade request (cookie or query parameter) and authenticate.
**Implementation:**
- Check `Authorization` header, `jwt` cookie, or `?jwt=` query parameter
- Pass to `AuthService.Authenticate()` pipeline
- On failure, reject WebSocket upgrade with 401
---
### Task E12: Monitoring Connz Filtering and Sort
**Files:**
- Modify: `src/NATS.Server/Monitoring/ConnzHandler.cs`
- Test: `tests/NATS.Server.Tests/Monitoring/ConnzFilterTests.cs`
**Context:** `/connz?acc=ACCOUNT&sort=bytes_to&limit=10&offset=0`
**Implementation:**
- Parse query parameters: `acc`, `sort`, `limit`, `offset`, `state` (open/closed/all)
- Sort by: `cid`, `start`, `subs`, `pending`, `msgs_to`, `msgs_from`, `bytes_to`, `bytes_from`
- Account filtering: only return connections for specified account
- Closed connection ring buffer for disconnect history
---
### Task E13: Full System Event Payloads
**Files:**
- Modify: `src/NATS.Server/Events/EventTypes.cs`
- Test: `tests/NATS.Server.Tests/Events/EventPayloadTests.cs`
**Context:** Ensure all event DTOs have complete JSON fields matching Go's output.
**Go reference:** `events.go:100-300` (event struct fields)
---
### Task E14: Message Trace Propagation
**Files:**
- Rewrite: `src/NATS.Server/Internal/MessageTraceContext.cs` (22 → ~200+ lines)
- Modify: `src/NATS.Server/NatsClient.cs` (trace on publish/deliver)
- Test: `tests/NATS.Server.Tests/Internal/MessageTraceTests.cs`
**Context:** Trace messages through the full delivery pipeline: publish → route → gateway → leaf → deliver.
**Go reference:** `msgtrace.go` (799 lines)
**Implementation:**
- `MessageTraceContext`: collects trace events as message traverses the system
- Trace header: `Nats-Trace-Dest` specifies where to publish trace report
- On each hop: add trace entry with timestamp, node, action
- On delivery: publish collected trace to `Nats-Trace-Dest` subject
---
### Task E15: Port Services Go Tests + DB Update
**Tests to port:** ~59 mqtt_test.go, ~34 accounts_test.go, ~105 reload+opts, ~53 websocket, ~118 monitor/events/msgtrace
**Batch DB update at end of each sub-phase.**
---
## DB Update Script Template
At the end of each track, run a batch SQL update. Example for Track A:
```bash
sqlite3 docs/test_parity.db <<'SQL'
-- Track A: FileStore tests ported
UPDATE go_tests SET status='mapped', dotnet_test='RoundTrip_SimpleMessage', dotnet_file='FileStoreGoParityTests.cs',
notes='Ported from TestFileStoreBasic in filestore_test.go:42'
WHERE go_file='filestore_test.go' AND go_test='TestFileStoreBasic';
-- ... repeat for each ported test ...
-- Verify
SELECT status, COUNT(*) FROM go_tests GROUP BY status;
SQL
```
---
## Execution Summary
| Track | Tasks | Est. Tests | Dependencies | Priority |
|-------|-------|-----------|--------------|----------|
| A: Storage | A1-A6 | ~159 | None | Start immediately |
| D: Networking | D1-D7 | ~159 | None | Start immediately |
| E: Services | E1-E15 | ~369 | None | Start immediately |
| C: Protocol | C1-C11 | ~391 | Track A | After A merges |
| B: Consensus | B1-B11 | ~490 | Tracks A + C | After A+C merge |
| **Total** | **50 tasks** | **~1,568** | | |
**Success criteria:**
- All 50 tasks complete
- All new tests passing (`dotnet test` green)
- `test_parity.db` updated: mapped ratio 29% → ~70%+
- Each track committed and merged to main