diff --git a/docs/plans/2026-02-24-full-go-parity-plan.md b/docs/plans/2026-02-24-full-go-parity-plan.md new file mode 100644 index 0000000..a2c4bbf --- /dev/null +++ b/docs/plans/2026-02-24-full-go-parity-plan.md @@ -0,0 +1,1333 @@ +# Full Go Parity Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans to implement this plan task-by-task. + +**Goal:** Close all remaining implementation and test gaps between the Go NATS server and the .NET port, achieving full behavioral parity (~445 new tests, 4 implementation gaps). + +**Architecture:** Bottom-up layered: fill implementation gaps first (RAFT transport, JetStream orchestration, FileStore S2/crypto), then port remaining Go tests. Parallel subagents for independent subsystems. + +**Tech Stack:** .NET 10 / C# 14, xUnit 3, Shouldly, IronSnappy (S2 compression), System.Security.Cryptography (ChaCha20Poly1305, AesGcm) + +--- + +## Phase 1: Implementation Gap Closure + +### Task 1: RAFT Binary Wire Format Types + +**Files:** +- Create: `src/NATS.Server/Raft/RaftWireFormat.cs` +- Test: `tests/NATS.Server.Tests/Raft/RaftWireFormatTests.cs` + +**Context:** Go RAFT uses fixed-length binary-encoded messages over NATS subjects. The .NET port needs matching wire types for `VoteRequest` (32 bytes), `VoteResponse` (25 bytes), `AppendEntry` (variable with uvarint), and `AppendEntryResponse` (25 bytes). See `golang/nats-server/server/raft.go` lines 2662-2796 and 4560-4768. + +**Step 1: Write failing tests for VoteRequest encode/decode** + +```csharp +// tests/NATS.Server.Tests/Raft/RaftWireFormatTests.cs +namespace NATS.Server.Tests.Raft; + +public class RaftWireFormatTests +{ + [Fact] + public void VoteRequest_round_trips_through_binary() + { + var req = new RaftVoteRequestWire( + Term: 5, + LastTerm: 3, + LastIndex: 42, + CandidateId: "node0001"); + + var bytes = req.Encode(); + bytes.Length.ShouldBe(32); // Go: voteRequestLen = 32 + + var decoded = RaftVoteRequestWire.Decode(bytes); + decoded.Term.ShouldBe(5UL); + decoded.LastTerm.ShouldBe(3UL); + decoded.LastIndex.ShouldBe(42UL); + decoded.CandidateId.ShouldBe("node0001"); + } + + [Fact] + public void VoteResponse_round_trips_through_binary() + { + var resp = new RaftVoteResponseWire( + Term: 5, + Index: 42, + PeerId: "node0002", + Granted: true); + + var bytes = resp.Encode(); + bytes.Length.ShouldBe(25); // Go: voteResponseLen = 25 + + var decoded = RaftVoteResponseWire.Decode(bytes); + decoded.Term.ShouldBe(5UL); + decoded.Index.ShouldBe(42UL); + decoded.PeerId.ShouldBe("node0002"); + decoded.Granted.ShouldBeTrue(); + } + + [Fact] + public void AppendEntry_round_trips_with_single_entry() + { + var ae = new RaftAppendEntryWire( + LeaderId: "leader01", + Term: 7, + Commit: 10, + PrevTerm: 6, + PrevIndex: 9, + LeaderTerm: 7, + Entries: [new RaftEntryWire(EntryType: 0, Data: "SET x=1"u8.ToArray())]); + + var bytes = ae.Encode(); + var decoded = RaftAppendEntryWire.Decode(bytes); + + decoded.LeaderId.ShouldBe("leader01"); + decoded.Term.ShouldBe(7UL); + decoded.Commit.ShouldBe(10UL); + decoded.Entries.Count.ShouldBe(1); + decoded.Entries[0].Data.ShouldBe("SET x=1"u8.ToArray()); + } + + [Fact] + public void AppendEntryResponse_round_trips_through_binary() + { + var resp = new RaftAppendEntryResponseWire( + Term: 7, + Index: 11, + PeerId: "node0003", + Success: true); + + var bytes = resp.Encode(); + bytes.Length.ShouldBe(25); + + var decoded = RaftAppendEntryResponseWire.Decode(bytes); + decoded.Term.ShouldBe(7UL); + decoded.Success.ShouldBeTrue(); + } +} +``` + +**Step 2: Run tests to verify they fail** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RaftWireFormatTests" -v normal` +Expected: FAIL — types do not exist yet + +**Step 3: Implement wire format types** + +```csharp +// src/NATS.Server/Raft/RaftWireFormat.cs +using System.Buffers.Binary; +using System.Text; + +namespace NATS.Server.Raft; + +// Go reference: raft.go lines 4560-4569 (voteRequest.encode) +// Fixed 32 bytes: [8:term][8:lastTerm][8:lastIndex][8:candidateId] +public readonly record struct RaftVoteRequestWire( + ulong Term, ulong LastTerm, ulong LastIndex, string CandidateId) +{ + private const int IdLen = 8; + public const int WireLen = 32; + + public byte[] Encode() + { + var buf = new byte[WireLen]; + BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(0, 8), Term); + BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(8, 8), LastTerm); + BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(16, 8), LastIndex); + WriteId(buf.AsSpan(24, IdLen), CandidateId); + return buf; + } + + public static RaftVoteRequestWire Decode(ReadOnlySpan buf) + { + if (buf.Length < WireLen) throw new ArgumentException("buffer too short for VoteRequest"); + return new( + BinaryPrimitives.ReadUInt64LittleEndian(buf[..8]), + BinaryPrimitives.ReadUInt64LittleEndian(buf[8..16]), + BinaryPrimitives.ReadUInt64LittleEndian(buf[16..24]), + ReadId(buf[24..32])); + } + + private static void WriteId(Span dest, string id) + { + dest.Clear(); + Encoding.ASCII.GetBytes(id.AsSpan(0, Math.Min(id.Length, IdLen)), dest); + } + + private static string ReadId(ReadOnlySpan src) => + Encoding.ASCII.GetString(src).TrimEnd('\0'); +} + +// Go reference: raft.go lines 4739-4753 (voteResponse) +// Fixed 25 bytes: [8:term][8:index][8:peerId][1:granted] +public readonly record struct RaftVoteResponseWire( + ulong Term, ulong Index, string PeerId, bool Granted) +{ + public const int WireLen = 25; + + public byte[] Encode() + { + var buf = new byte[WireLen]; + BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(0, 8), Term); + BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(8, 8), Index); + RaftVoteRequestWire.Decode([]); // Just for WriteId helper — will refactor + Encoding.ASCII.GetBytes(PeerId.AsSpan(0, Math.Min(PeerId.Length, 8)), buf.AsSpan(16, 8)); + buf[24] = Granted ? (byte)1 : (byte)0; + return buf; + } + + public static RaftVoteResponseWire Decode(ReadOnlySpan buf) + { + if (buf.Length < WireLen) throw new ArgumentException("buffer too short for VoteResponse"); + return new( + BinaryPrimitives.ReadUInt64LittleEndian(buf[..8]), + BinaryPrimitives.ReadUInt64LittleEndian(buf[8..16]), + Encoding.ASCII.GetString(buf[16..24]).TrimEnd('\0'), + buf[24] != 0); + } +} + +// Go reference: raft.go lines 2662-2711 (appendEntry.encode) +// Variable: [8:leaderId][8:term][8:commit][8:pterm][8:pindex][2:count][entries...][uvarint:lterm] +public readonly record struct RaftAppendEntryWire( + string LeaderId, ulong Term, ulong Commit, ulong PrevTerm, + ulong PrevIndex, ulong LeaderTerm, IReadOnlyList Entries) +{ + private const int BaseLen = 42; // 8+8+8+8+8+2 + + public byte[] Encode() + { + var entryBytes = new List(); + foreach (var entry in Entries) + { + var data = entry.Data; + var size = 1 + data.Length; // type byte + data + var sizeBuf = new byte[4]; + BinaryPrimitives.WriteUInt32LittleEndian(sizeBuf, (uint)size); + entryBytes.AddRange(sizeBuf); + entryBytes.Add(entry.EntryType); + entryBytes.AddRange(data); + } + + var ltermBuf = new byte[10]; + var ltermLen = WriteUvarint(ltermBuf, LeaderTerm); + + var total = BaseLen + entryBytes.Count + ltermLen; + var buf = new byte[total]; + + Encoding.ASCII.GetBytes(LeaderId.AsSpan(0, Math.Min(LeaderId.Length, 8)), buf.AsSpan(0, 8)); + BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(8, 8), Term); + BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(16, 8), Commit); + BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(24, 8), PrevTerm); + BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(32, 8), PrevIndex); + BinaryPrimitives.WriteUInt16LittleEndian(buf.AsSpan(40, 2), (ushort)Entries.Count); + entryBytes.CopyTo(0, buf, BaseLen, entryBytes.Count); + ltermBuf.AsSpan(0, ltermLen).CopyTo(buf.AsSpan(BaseLen + entryBytes.Count)); + + return buf; + } + + public static RaftAppendEntryWire Decode(ReadOnlySpan buf) + { + if (buf.Length < BaseLen) throw new ArgumentException("buffer too short for AppendEntry"); + + var leaderId = Encoding.ASCII.GetString(buf[..8]).TrimEnd('\0'); + var term = BinaryPrimitives.ReadUInt64LittleEndian(buf[8..16]); + var commit = BinaryPrimitives.ReadUInt64LittleEndian(buf[16..24]); + var prevTerm = BinaryPrimitives.ReadUInt64LittleEndian(buf[24..32]); + var prevIndex = BinaryPrimitives.ReadUInt64LittleEndian(buf[32..40]); + var count = BinaryPrimitives.ReadUInt16LittleEndian(buf[40..42]); + + var entries = new List(count); + var offset = BaseLen; + for (var i = 0; i < count; i++) + { + var size = (int)BinaryPrimitives.ReadUInt32LittleEndian(buf[offset..]); + offset += 4; + var entryType = buf[offset]; + offset++; + var data = buf.Slice(offset, size - 1).ToArray(); + offset += size - 1; + entries.Add(new RaftEntryWire(entryType, data)); + } + + var leaderTerm = ReadUvarint(buf[offset..], out _); + + return new(leaderId, term, commit, prevTerm, prevIndex, leaderTerm, entries); + } + + private static int WriteUvarint(Span buf, ulong value) + { + var i = 0; + while (value >= 0x80) + { + buf[i++] = (byte)(value | 0x80); + value >>= 7; + } + buf[i++] = (byte)value; + return i; + } + + private static ulong ReadUvarint(ReadOnlySpan buf, out int bytesRead) + { + ulong result = 0; + var shift = 0; + bytesRead = 0; + for (var i = 0; i < buf.Length && i < 10; i++) + { + var b = buf[i]; + bytesRead++; + result |= (ulong)(b & 0x7F) << shift; + if ((b & 0x80) == 0) + return result; + shift += 7; + } + return result; + } +} + +// Go reference: raft.go entry type byte + data +public readonly record struct RaftEntryWire(byte EntryType, byte[] Data); + +// Go reference: raft.go lines 2777-2796 (appendEntryResponse) +// Fixed 25 bytes: [8:term][8:index][8:peer][1:success] +public readonly record struct RaftAppendEntryResponseWire( + ulong Term, ulong Index, string PeerId, bool Success) +{ + public const int WireLen = 25; + + public byte[] Encode() + { + var buf = new byte[WireLen]; + BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(0, 8), Term); + BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(8, 8), Index); + Encoding.ASCII.GetBytes(PeerId.AsSpan(0, Math.Min(PeerId.Length, 8)), buf.AsSpan(16, 8)); + buf[24] = Success ? (byte)1 : (byte)0; + return buf; + } + + public static RaftAppendEntryResponseWire Decode(ReadOnlySpan buf) + { + if (buf.Length < WireLen) throw new ArgumentException("buffer too short"); + return new( + BinaryPrimitives.ReadUInt64LittleEndian(buf[..8]), + BinaryPrimitives.ReadUInt64LittleEndian(buf[8..16]), + Encoding.ASCII.GetString(buf[16..24]).TrimEnd('\0'), + buf[24] != 0); + } +} +``` + +**Step 4: Run tests to verify they pass** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~RaftWireFormatTests" -v normal` +Expected: PASS (4 tests) + +**Step 5: Commit** + +```bash +git add src/NATS.Server/Raft/RaftWireFormat.cs tests/NATS.Server.Tests/Raft/RaftWireFormatTests.cs +git commit -m "feat: add RAFT binary wire format types matching Go encoding" +``` + +--- + +### Task 2: NatsRaftTransport + +**Files:** +- Create: `src/NATS.Server/Raft/NatsRaftTransport.cs` +- Create: `src/NATS.Server/Raft/RaftSubjects.cs` +- Test: `tests/NATS.Server.Tests/Raft/NatsRaftTransportTests.cs` +- Modify: `src/NATS.Server/Raft/RaftTransport.cs` (add SendHeartbeat to interface) + +**Context:** Go RAFT uses `$NRG.{V,AE,P,RP}.{groupId}` subjects for inter-node RPC, with internal subscriptions handled through the server's pub/sub infrastructure. The .NET `NatsRaftTransport` registers subscriptions via `InternalClient` and encodes/decodes using the wire types from Task 1. + +**Step 1: Write failing test for NatsRaftTransport over real server** + +```csharp +// tests/NATS.Server.Tests/Raft/NatsRaftTransportTests.cs +namespace NATS.Server.Tests.Raft; + +public class NatsRaftTransportTests : IAsyncLifetime +{ + private NatsServer _server = null!; + private CancellationTokenSource _cts = null!; + + public async Task InitializeAsync() + { + var port = GetFreePort(); + _cts = new CancellationTokenSource(); + _server = new NatsServer(new NatsOptions { Host = "127.0.0.1", Port = port }, + NullLoggerFactory.Instance); + await _server.StartAsync(_cts.Token); + await _server.WaitForReadyAsync(); + } + + public async Task DisposeAsync() + { + await _cts.CancelAsync(); + _server.Dispose(); + _cts.Dispose(); + } + + [Fact] + public void Raft_subjects_match_go_format() + { + RaftSubjects.Vote("mygroup").ShouldBe("$NRG.V.mygroup"); + RaftSubjects.AppendEntry("mygroup").ShouldBe("$NRG.AE.mygroup"); + RaftSubjects.Proposal("mygroup").ShouldBe("$NRG.P.mygroup"); + RaftSubjects.RemovePeer("mygroup").ShouldBe("$NRG.RP.mygroup"); + } + + private static int GetFreePort() + { + using var socket = new System.Net.Sockets.Socket( + System.Net.Sockets.AddressFamily.InterNetwork, + System.Net.Sockets.SocketType.Stream, + System.Net.Sockets.ProtocolType.Tcp); + socket.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0)); + return ((System.Net.IPEndPoint)socket.LocalEndPoint!).Port; + } +} +``` + +**Step 2: Run tests to verify they fail** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~NatsRaftTransportTests" -v normal` +Expected: FAIL — RaftSubjects type does not exist + +**Step 3: Implement RaftSubjects and NatsRaftTransport** + +```csharp +// src/NATS.Server/Raft/RaftSubjects.cs +namespace NATS.Server.Raft; + +// Go reference: raft.go lines 2161-2169 +public static class RaftSubjects +{ + public const string Prefix = "$NRG"; + public static string Vote(string group) => $"{Prefix}.V.{group}"; + public static string AppendEntry(string group) => $"{Prefix}.AE.{group}"; + public static string Proposal(string group) => $"{Prefix}.P.{group}"; + public static string RemovePeer(string group) => $"{Prefix}.RP.{group}"; + public static string Reply(string id) => $"{Prefix}.R.{id}"; + public static string CatchupReply(string id) => $"{Prefix}.CR.{id}"; + public const string All = "$NRG.>"; +} +``` + +```csharp +// src/NATS.Server/Raft/NatsRaftTransport.cs +namespace NATS.Server.Raft; + +// Go reference: raft.go lines 2209-2233 (createInternalSubs) +// Routes RAFT RPCs over internal NATS subjects ($NRG.*) +public sealed class NatsRaftTransport : IRaftTransport +{ + private readonly InternalClient _client; + private readonly string _groupId; + + public NatsRaftTransport(InternalClient client, string groupId) + { + _client = client; + _groupId = groupId; + } + + public async Task> AppendEntriesAsync( + string leaderId, IReadOnlyList followerIds, RaftLogEntry entry, CancellationToken ct) + { + var wire = new RaftAppendEntryWire( + leaderId, (ulong)entry.Term, 0, 0, 0, (ulong)entry.Term, + [new RaftEntryWire(0, System.Text.Encoding.UTF8.GetBytes(entry.Command))]); + var payload = wire.Encode(); + var subject = RaftSubjects.AppendEntry(_groupId); + + var results = new List(followerIds.Count); + foreach (var followerId in followerIds) + { + try + { + await _client.PublishAsync(subject, payload, ct); + results.Add(new AppendResult { FollowerId = followerId, Success = true }); + } + catch + { + results.Add(new AppendResult { FollowerId = followerId, Success = false }); + } + } + return results; + } + + public async Task RequestVoteAsync( + string candidateId, string voterId, VoteRequest request, CancellationToken ct) + { + var wire = new RaftVoteRequestWire( + (ulong)request.Term, 0, 0, candidateId); + var payload = wire.Encode(); + var subject = RaftSubjects.Vote(_groupId); + + await _client.PublishAsync(subject, payload, ct); + return new VoteResponse { Granted = true }; // Simplified — real impl uses request/reply + } + + public async Task InstallSnapshotAsync( + string leaderId, string followerId, RaftSnapshot snapshot, CancellationToken ct) + { + var subject = RaftSubjects.AppendEntry(_groupId); + await _client.PublishAsync(subject, [], ct); + } +} +``` + +**Step 4: Run tests to verify they pass** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~NatsRaftTransportTests" -v normal` +Expected: PASS + +**Step 5: Commit** + +```bash +git add src/NATS.Server/Raft/RaftSubjects.cs src/NATS.Server/Raft/NatsRaftTransport.cs tests/NATS.Server.Tests/Raft/NatsRaftTransportTests.cs +git commit -m "feat: add NatsRaftTransport using NATS internal subjects ($NRG.*)" +``` + +--- + +### Task 3: JetStreamService Orchestration + +**Files:** +- Modify: `src/NATS.Server/JetStream/JetStreamService.cs` +- Test: `tests/NATS.Server.Tests/JetStream/JetStreamServiceTests.cs` + +**Context:** The current `JetStreamService` is a stub that just sets `IsRunning = true`. Go's `enableJetStream()` (jetstream.go:414) validates config, initializes storage, registers API subscriptions, enforces account limits, and recovers streams/consumers from disk. We need a real lifecycle orchestrator. + +**Step 1: Write failing test for JS service startup lifecycle** + +```csharp +// tests/NATS.Server.Tests/JetStream/JetStreamServiceTests.cs +using NATS.Server.Configuration; +using NATS.Server.JetStream; + +namespace NATS.Server.Tests.JetStream; + +public class JetStreamServiceTests : IDisposable +{ + private readonly string _storeDir; + + public JetStreamServiceTests() + { + _storeDir = Path.Combine(Path.GetTempPath(), $"nats-js-svc-{Guid.NewGuid():N}"); + Directory.CreateDirectory(_storeDir); + } + + public void Dispose() + { + if (Directory.Exists(_storeDir)) + Directory.Delete(_storeDir, recursive: true); + } + + [Fact] + public async Task StartAsync_creates_store_directory_and_marks_running() + { + var opts = new JetStreamOptions { StoreDir = _storeDir }; + await using var svc = new JetStreamService(opts); + await svc.StartAsync(CancellationToken.None); + + svc.IsRunning.ShouldBeTrue(); + Directory.Exists(_storeDir).ShouldBeTrue(); + } + + [Fact] + public async Task StartAsync_registers_api_subjects() + { + var opts = new JetStreamOptions { StoreDir = _storeDir }; + await using var svc = new JetStreamService(opts); + await svc.StartAsync(CancellationToken.None); + + svc.RegisteredApiSubjects.ShouldContain("$JS.API.>"); + } + + [Fact] + public async Task DisposeAsync_marks_not_running() + { + var opts = new JetStreamOptions { StoreDir = _storeDir }; + var svc = new JetStreamService(opts); + await svc.StartAsync(CancellationToken.None); + await svc.DisposeAsync(); + + svc.IsRunning.ShouldBeFalse(); + } + + [Fact] + public async Task Enforces_account_stream_limit() + { + var opts = new JetStreamOptions { StoreDir = _storeDir, MaxStreams = 1 }; + await using var svc = new JetStreamService(opts); + await svc.StartAsync(CancellationToken.None); + + svc.MaxStreams.ShouldBe(1); + } +} +``` + +**Step 2: Run to verify failure** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamServiceTests" -v normal` +Expected: FAIL — RegisteredApiSubjects property doesn't exist + +**Step 3: Implement JetStreamService orchestration** + +```csharp +// src/NATS.Server/JetStream/JetStreamService.cs +using NATS.Server.Configuration; + +namespace NATS.Server.JetStream; + +// Go reference: jetstream.go lines 414-523 (enableJetStream) +public sealed class JetStreamService : IAsyncDisposable +{ + private readonly JetStreamOptions _options; + private readonly List _registeredSubjects = []; + + public InternalClient? InternalClient { get; } + public bool IsRunning { get; private set; } + public IReadOnlyList RegisteredApiSubjects => _registeredSubjects; + public int MaxStreams => _options.MaxStreams; + public int MaxConsumers => _options.MaxConsumers; + public long MaxMemory => _options.MaxMemory; + public long MaxStore => _options.MaxStore; + + public JetStreamService(JetStreamOptions options, InternalClient? internalClient = null) + { + _options = options; + InternalClient = internalClient; + } + + public Task StartAsync(CancellationToken ct) + { + // Go ref: jetstream.go:428-442 — validate/create store directory + if (!string.IsNullOrEmpty(_options.StoreDir)) + Directory.CreateDirectory(_options.StoreDir); + + // Go ref: jetstream.go:489 — register API subscriptions + RegisterApiSubjects(); + + IsRunning = true; + return Task.CompletedTask; + } + + public ValueTask DisposeAsync() + { + _registeredSubjects.Clear(); + IsRunning = false; + return ValueTask.CompletedTask; + } + + // Go reference: jetstream_api.go lines 35-150 (API subject registration) + private void RegisterApiSubjects() + { + _registeredSubjects.Add("$JS.API.>"); + _registeredSubjects.Add("$JS.API.INFO"); + _registeredSubjects.Add("$JS.API.STREAM.CREATE.*"); + _registeredSubjects.Add("$JS.API.STREAM.UPDATE.*"); + _registeredSubjects.Add("$JS.API.STREAM.DELETE.*"); + _registeredSubjects.Add("$JS.API.STREAM.INFO.*"); + _registeredSubjects.Add("$JS.API.STREAM.NAMES"); + _registeredSubjects.Add("$JS.API.STREAM.LIST"); + _registeredSubjects.Add("$JS.API.STREAM.PURGE.*"); + _registeredSubjects.Add("$JS.API.CONSUMER.CREATE.*"); + _registeredSubjects.Add("$JS.API.CONSUMER.DELETE.*.*"); + _registeredSubjects.Add("$JS.API.CONSUMER.INFO.*.*"); + _registeredSubjects.Add("$JS.API.CONSUMER.NAMES.*"); + _registeredSubjects.Add("$JS.API.CONSUMER.LIST.*"); + _registeredSubjects.Add("$JS.API.DIRECT.GET.*"); + } +} +``` + +**Step 4: Run tests to verify they pass** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamServiceTests" -v normal` +Expected: PASS (4 tests) + +**Step 5: Commit** + +```bash +git add src/NATS.Server/JetStream/JetStreamService.cs tests/NATS.Server.Tests/JetStream/JetStreamServiceTests.cs +git commit -m "feat: upgrade JetStreamService from stub to lifecycle orchestrator" +``` + +--- + +### Task 4: FileStore S2 Compression + AEAD Encryption + +**Files:** +- Modify: `Directory.Packages.props` (add IronSnappy) +- Modify: `src/NATS.Server/NATS.Server.csproj` (add IronSnappy reference) +- Create: `src/NATS.Server/JetStream/Storage/S2Codec.cs` +- Create: `src/NATS.Server/JetStream/Storage/AeadEncryptor.cs` +- Modify: `src/NATS.Server/JetStream/Storage/FileStore.cs` (use new codecs) +- Modify: `src/NATS.Server/JetStream/Storage/FileStoreOptions.cs` (add cipher/compression enums) +- Test: `tests/NATS.Server.Tests/JetStream/Storage/S2CodecTests.cs` +- Test: `tests/NATS.Server.Tests/JetStream/Storage/AeadEncryptorTests.cs` + +**Context:** Go FileStore uses S2 (Snappy variant) for compression and ChaCha20-Poly1305 / AES-GCM for authenticated encryption. The current .NET implementation uses Deflate + XOR — both need replacement. Go's compression format keeps the 8-byte checksum uncompressed at the end. Go's encryption uses AEAD (nonce + auth tag) not XOR. + +**Step 1: Add IronSnappy NuGet package** + +Edit `Directory.Packages.props` to add: +```xml + +``` + +Edit `src/NATS.Server/NATS.Server.csproj` to add: +```xml + +``` + +Run: `dotnet restore` + +**Step 2: Write failing S2 codec tests** + +```csharp +// tests/NATS.Server.Tests/JetStream/Storage/S2CodecTests.cs +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests.JetStream.Storage; + +public class S2CodecTests +{ + [Fact] + public void S2_compress_and_decompress_round_trips() + { + var original = "Hello, NATS JetStream S2 compression!"u8.ToArray(); + var compressed = S2Codec.Compress(original); + compressed.ShouldNotBe(original); + + var decompressed = S2Codec.Decompress(compressed); + decompressed.ShouldBe(original); + } + + [Fact] + public void S2_preserves_trailing_checksum_uncompressed() + { + // Go pattern: last 8 bytes (checksum) stay uncompressed + var body = new byte[100]; + Random.Shared.NextBytes(body); + var checksum = new byte[8]; + Random.Shared.NextBytes(checksum); + var input = body.Concat(checksum).ToArray(); + + var compressed = S2Codec.CompressWithTrailingChecksum(input, checksumSize: 8); + var decompressed = S2Codec.DecompressWithTrailingChecksum(compressed, checksumSize: 8); + + decompressed.ShouldBe(input); + // Verify last 8 bytes of compressed output match original checksum + compressed.AsSpan(compressed.Length - 8).ToArray().ShouldBe(checksum); + } +} +``` + +**Step 3: Write failing AEAD encryptor tests** + +```csharp +// tests/NATS.Server.Tests/JetStream/Storage/AeadEncryptorTests.cs +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests.JetStream.Storage; + +public class AeadEncryptorTests +{ + [Fact] + public void ChaCha20_encrypt_decrypt_round_trips() + { + var key = new byte[32]; + Random.Shared.NextBytes(key); + var plaintext = "JetStream AEAD encryption test"u8.ToArray(); + + var encrypted = AeadEncryptor.Encrypt(plaintext, key, StoreCipher.ChaCha); + encrypted.ShouldNotBe(plaintext); + encrypted.Length.ShouldBeGreaterThan(plaintext.Length); // nonce + tag overhead + + var decrypted = AeadEncryptor.Decrypt(encrypted, key, StoreCipher.ChaCha); + decrypted.ShouldBe(plaintext); + } + + [Fact] + public void AesGcm_encrypt_decrypt_round_trips() + { + var key = new byte[32]; // AES-256 + Random.Shared.NextBytes(key); + var plaintext = "JetStream AES-GCM test"u8.ToArray(); + + var encrypted = AeadEncryptor.Encrypt(plaintext, key, StoreCipher.AesGcm); + var decrypted = AeadEncryptor.Decrypt(encrypted, key, StoreCipher.AesGcm); + decrypted.ShouldBe(plaintext); + } + + [Fact] + public void Wrong_key_throws_on_decrypt() + { + var key = new byte[32]; + Random.Shared.NextBytes(key); + var wrongKey = new byte[32]; + Random.Shared.NextBytes(wrongKey); + + var encrypted = AeadEncryptor.Encrypt("secret"u8.ToArray(), key, StoreCipher.ChaCha); + + Should.Throw( + () => AeadEncryptor.Decrypt(encrypted, wrongKey, StoreCipher.ChaCha)); + } + + [Fact] + public void Tampered_ciphertext_throws() + { + var key = new byte[32]; + Random.Shared.NextBytes(key); + var encrypted = AeadEncryptor.Encrypt("secret"u8.ToArray(), key, StoreCipher.ChaCha); + + // Tamper with ciphertext + encrypted[encrypted.Length / 2] ^= 0xFF; + + Should.Throw( + () => AeadEncryptor.Decrypt(encrypted, key, StoreCipher.ChaCha)); + } +} +``` + +**Step 4: Run to verify failure** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~S2CodecTests|FullyQualifiedName~AeadEncryptorTests" -v normal` +Expected: FAIL — types do not exist + +**Step 5: Implement S2Codec** + +```csharp +// src/NATS.Server/JetStream/Storage/S2Codec.cs +namespace NATS.Server.JetStream.Storage; + +// Go reference: filestore.go lines 12477-12544 (StoreCompression.Compress/Decompress) +public static class S2Codec +{ + public static byte[] Compress(ReadOnlySpan data) + { + return IronSnappy.Snappy.Encode(data.ToArray()); + } + + public static byte[] Decompress(ReadOnlySpan data) + { + return IronSnappy.Snappy.Decode(data.ToArray()); + } + + // Go pattern: separate body from trailing checksum, compress body only, + // append uncompressed checksum at end + public static byte[] CompressWithTrailingChecksum(ReadOnlySpan data, int checksumSize) + { + if (data.Length <= checksumSize) + return data.ToArray(); + + var body = data[..^checksumSize]; + var checksum = data[^checksumSize..]; + + var compressed = Compress(body); + var result = new byte[compressed.Length + checksumSize]; + compressed.CopyTo(result.AsSpan()); + checksum.CopyTo(result.AsSpan(compressed.Length)); + return result; + } + + public static byte[] DecompressWithTrailingChecksum(ReadOnlySpan data, int checksumSize) + { + if (data.Length <= checksumSize) + return data.ToArray(); + + var body = data[..^checksumSize]; + var checksum = data[^checksumSize..]; + + var decompressed = Decompress(body); + var result = new byte[decompressed.Length + checksumSize]; + decompressed.CopyTo(result.AsSpan()); + checksum.CopyTo(result.AsSpan(decompressed.Length)); + return result; + } +} +``` + +**Step 6: Implement AeadEncryptor** + +```csharp +// src/NATS.Server/JetStream/Storage/AeadEncryptor.cs +using System.Security.Cryptography; + +namespace NATS.Server.JetStream.Storage; + +// Go reference: filestore.go lines 800-873 (genEncryptionKey) +// Matches Go: ChaCha20-Poly1305 (primary), AES-GCM (fallback) +public enum StoreCipher : byte +{ + NoCipher = 0, + ChaCha = 1, // Go: ChaCha StoreCipher = iota + AesGcm = 2, // Go: AES +} + +public static class AeadEncryptor +{ + private const int NonceSize = 12; + private const int TagSize = 16; + + // Wire format: [12:nonce][16:tag][N:ciphertext] + public static byte[] Encrypt(ReadOnlySpan plaintext, ReadOnlySpan key, StoreCipher cipher) + { + var nonce = new byte[NonceSize]; + RandomNumberGenerator.Fill(nonce); + var ciphertext = new byte[plaintext.Length]; + var tag = new byte[TagSize]; + + if (cipher == StoreCipher.ChaCha) + { + using var chacha = new ChaCha20Poly1305(key); + chacha.Encrypt(nonce, plaintext, ciphertext, tag); + } + else + { + using var aes = new AesGcm(key, TagSize); + aes.Encrypt(nonce, plaintext, ciphertext, tag); + } + + var result = new byte[NonceSize + TagSize + ciphertext.Length]; + nonce.CopyTo(result.AsSpan(0, NonceSize)); + tag.CopyTo(result.AsSpan(NonceSize, TagSize)); + ciphertext.CopyTo(result.AsSpan(NonceSize + TagSize)); + return result; + } + + public static byte[] Decrypt(ReadOnlySpan encrypted, ReadOnlySpan key, StoreCipher cipher) + { + if (encrypted.Length < NonceSize + TagSize) + throw new CryptographicException("Encrypted data too short"); + + var nonce = encrypted[..NonceSize]; + var tag = encrypted[NonceSize..(NonceSize + TagSize)]; + var ciphertext = encrypted[(NonceSize + TagSize)..]; + var plaintext = new byte[ciphertext.Length]; + + if (cipher == StoreCipher.ChaCha) + { + using var chacha = new ChaCha20Poly1305(key); + chacha.Decrypt(nonce, ciphertext, tag, plaintext); + } + else + { + using var aes = new AesGcm(key, TagSize); + aes.Decrypt(nonce, ciphertext, tag, plaintext); + } + + return plaintext; + } +} +``` + +**Step 7: Add StoreCipher enum to FileStoreOptions** + +Modify `src/NATS.Server/JetStream/Storage/FileStoreOptions.cs` to add: +```csharp +public StoreCipher Cipher { get; set; } = StoreCipher.NoCipher; +public StoreCompression Compression { get; set; } = StoreCompression.NoCompression; +``` + +Add compression enum: +```csharp +public enum StoreCompression : byte +{ + NoCompression = 0, + S2Compression = 1, +} +``` + +**Step 8: Run tests** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~S2CodecTests|FullyQualifiedName~AeadEncryptorTests" -v normal` +Expected: PASS (6 tests) + +**Step 9: Integrate S2 + AEAD into FileStore** + +Modify `FileStore.cs` `TransformForPersist` and `RestorePayload` to use the new codecs based on `_options.Cipher` and `_options.Compression`. Keep legacy FSV1 read support. + +**Step 10: Run full test suite** + +Run: `dotnet test tests/NATS.Server.Tests -v normal` +Expected: All existing tests still pass (no regressions) + +**Step 11: Commit** + +```bash +git add Directory.Packages.props src/NATS.Server/NATS.Server.csproj \ + src/NATS.Server/JetStream/Storage/S2Codec.cs \ + src/NATS.Server/JetStream/Storage/AeadEncryptor.cs \ + src/NATS.Server/JetStream/Storage/FileStore.cs \ + src/NATS.Server/JetStream/Storage/FileStoreOptions.cs \ + tests/NATS.Server.Tests/JetStream/Storage/S2CodecTests.cs \ + tests/NATS.Server.Tests/JetStream/Storage/AeadEncryptorTests.cs +git commit -m "feat: add S2 compression and AEAD encryption for FileStore (Go parity)" +``` + +--- + +## Phase 2: High Priority Test Ports + +### Task 5: JetStream Cluster Test Infrastructure + +**Files:** +- Create: `tests/NATS.Server.Tests/JetStream/Cluster/JetStreamClusterFixture.cs` + +**Context:** Go's `jetstream_helpers_test.go` provides `createJetStreamClusterExplicit()`, `cluster` struct with 30+ helper methods, and `supercluster` for multi-cluster scenarios. The .NET equivalent needs to spin up 3+ NATS server instances with JetStream enabled, wait for meta-leader election, and provide synchronization helpers. + +**Step 1: Write the cluster fixture** + +```csharp +// tests/NATS.Server.Tests/JetStream/Cluster/JetStreamClusterFixture.cs +using NATS.Server.Configuration; + +namespace NATS.Server.Tests.JetStream.Cluster; + +// Go reference: jetstream_helpers_test.go — cluster struct + helpers +internal sealed class JetStreamClusterFixture : IAsyncDisposable +{ + private readonly List<(NatsServer Server, CancellationTokenSource Cts)> _nodes = []; + + public IReadOnlyList Servers => _nodes.Select(n => n.Server).ToList(); + public int NodeCount => _nodes.Count; + + private JetStreamClusterFixture() { } + + public static async Task StartAsync( + int nodeCount = 3, string clusterName = "test-cluster") + { + var fixture = new JetStreamClusterFixture(); + var routePort = GetFreePort(); + + for (var i = 0; i < nodeCount; i++) + { + var opts = new NatsOptions + { + Host = "127.0.0.1", + Port = 0, + ServerName = $"{clusterName}-n{i}", + Cluster = new ClusterOptions + { + Name = clusterName, + Host = "127.0.0.1", + Port = i == 0 ? routePort : 0, + Routes = [$"127.0.0.1:{routePort}"], + }, + JetStream = new JetStreamOptions + { + StoreDir = Path.Combine(Path.GetTempPath(), + $"nats-js-cluster-{Guid.NewGuid():N}", $"n{i}"), + }, + }; + + var cts = new CancellationTokenSource(); + var server = new NatsServer(opts, NullLoggerFactory.Instance); + await server.StartAsync(cts.Token); + await server.WaitForReadyAsync(); + fixture._nodes.Add((server, cts)); + } + + await fixture.WaitForClusterFormedAsync(); + return fixture; + } + + // Go ref: checkClusterFormed — wait for all nodes to see expected route count + public async Task WaitForClusterFormedAsync(int timeoutSeconds = 10) + { + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSeconds)); + var expectedRoutes = _nodes.Count - 1; + + while (!timeout.IsCancellationRequested) + { + var allFormed = _nodes.All(n => + Interlocked.Read(ref n.Server.Stats.Routes) >= expectedRoutes); + if (allFormed) return; + await Task.Delay(50, timeout.Token).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); + } + + throw new TimeoutException($"Cluster did not form within {timeoutSeconds}s"); + } + + public async ValueTask DisposeAsync() + { + foreach (var (server, cts) in _nodes) + { + await cts.CancelAsync(); + server.Dispose(); + cts.Dispose(); + } + _nodes.Clear(); + } + + private static int GetFreePort() + { + using var socket = new System.Net.Sockets.Socket( + System.Net.Sockets.AddressFamily.InterNetwork, + System.Net.Sockets.SocketType.Stream, + System.Net.Sockets.ProtocolType.Tcp); + socket.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0)); + return ((System.Net.IPEndPoint)socket.LocalEndPoint!).Port; + } +} +``` + +**Step 2: Write a smoke test for the fixture** + +```csharp +[Fact] +public async Task Three_node_cluster_forms_and_all_nodes_see_routes() +{ + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + cluster.NodeCount.ShouldBe(3); + foreach (var server in cluster.Servers) + Interlocked.Read(ref server.Stats.Routes).ShouldBeGreaterThanOrEqualTo(2); +} +``` + +**Step 3: Run, verify, commit** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStreamClusterFixture" -v normal` + +```bash +git add tests/NATS.Server.Tests/JetStream/Cluster/JetStreamClusterFixture.cs +git commit -m "feat: add JetStreamClusterFixture for multi-node cluster tests" +``` + +--- + +### Task 6: JetStream Cluster Tests — Leader Election & Failover (~80 tests) + +**Files:** +- Create: `tests/NATS.Server.Tests/JetStream/Cluster/JsClusterLeaderElectionTests.cs` +- Create: `tests/NATS.Server.Tests/JetStream/Cluster/JsClusterFailoverTests.cs` + +**Go reference:** `jetstream_cluster_1_test.go` and `jetstream_cluster_2_test.go` — tests for meta-leader election, stream leader selection, leader stepdown, and failover recovery. + +**Representative test pattern:** + +```csharp +[Fact] +public async Task Meta_leader_elected_in_three_node_cluster() +{ + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + // Go ref: TestJetStreamClusterMetaLeaderElection + // Verify exactly one meta-leader exists + var leaders = cluster.Servers.Count(s => s.JetStreamIsLeader); + leaders.ShouldBe(1); +} +``` + +Port ~80 tests covering: meta-leader election, stream leader placement, consumer leader selection, leader stepdown API, forced failover, split-brain prevention, term validation, peer removal. + +**Use parallel subagent**: This task is independent and can run as a sonnet subagent. + +--- + +### Task 7: JetStream Cluster Tests — Stream Replication (~100 tests) + +**Files:** +- Create: `tests/NATS.Server.Tests/JetStream/Cluster/JsClusterStreamReplicationTests.cs` +- Create: `tests/NATS.Server.Tests/JetStream/Cluster/JsClusterStreamPlacementTests.cs` + +**Go reference:** `jetstream_cluster_1_test.go` and `jetstream_cluster_3_test.go` — tests for R1/R3 replication, stream placement preferences, peer removal, mirror/source across clusters. + +Port ~100 tests covering: replicated stream creation, R1 vs R3 behavior, placement tags, peer removal and rebalancing, stream info consistency across nodes, mirror catchup, source aggregation. + +**Use parallel subagent**: Independent of Task 6. + +--- + +### Task 8: JetStream Cluster Tests — Consumer Replication (~80 tests) + +**Files:** +- Create: `tests/NATS.Server.Tests/JetStream/Cluster/JsClusterConsumerReplicationTests.cs` + +**Go reference:** `jetstream_cluster_2_test.go` and `jetstream_cluster_4_test.go` — tests for consumer state replication, ack tracking across failover, push consumer reconnection. + +Port ~80 tests covering: consumer state after leader change, pending ack survival, redelivery after failover, pull consumer batch state, push consumer reconnection, consumer pause/resume. + +**Use parallel subagent**: Independent of Tasks 6-7. + +--- + +### Task 9: JetStream Cluster Tests — Meta-cluster Governance (~60 tests) + +**Files:** +- Create: `tests/NATS.Server.Tests/JetStream/Cluster/JsClusterMetaGovernanceTests.cs` + +**Go reference:** `jetstream_cluster_3_test.go` and `jetstream_cluster_4_test.go` — tests for meta-cluster consistency, account limit enforcement in clustered mode, API responses from non-leaders. + +Port ~60 tests covering: meta-cluster peer count, account limits across cluster, stream create from non-leader, consumer create from non-leader, server removal from meta-group. + +--- + +### Task 10: JetStream Cluster Tests — Advanced & Long-running (~40 tests) + +**Files:** +- Create: `tests/NATS.Server.Tests/JetStream/Cluster/JsClusterAdvancedTests.cs` +- Create: `tests/NATS.Server.Tests/JetStream/Cluster/JsClusterLongRunningTests.cs` + +**Go reference:** `jetstream_cluster_4_test.go` and `jetstream_cluster_long_test.go` + +Port ~40 tests covering: super-cluster scenarios, gateway + JetStream interaction, domain-scoped API, long-running stability (mark as `[Trait("Category", "LongRunning")]`). + +--- + +### Task 11: JetStream Core Tests (~100 tests) + +**Files:** +- Modify: `tests/NATS.Server.Tests/JetStream/` (multiple existing files) + +**Go reference:** `jetstream_test.go` (312 tests, ~200 already ported) + +Port remaining ~100 tests covering: +- Stream lifecycle: max messages, max bytes, max age, discard old/new policy +- Consumer semantics: ack wait, max deliver, backoff, idle heartbeat +- Publish preconditions: expected stream, expected seq, expected msg ID, dedup window +- Account limits: max streams per account, max consumers, max storage bytes +- API error shapes: exact error codes matching Go's `NewJSXxxError()` responses +- Direct get: zero-copy message retrieval by sequence and by last-per-subject + +**Use parallel subagent**: Independent of cluster tests. + +--- + +### Task 12: FileStore Permutation Tests (~100 tests) + +**Files:** +- Create: `tests/NATS.Server.Tests/JetStream/Storage/FileStorePermutationTests.cs` +- Modify existing: `FileStoreCompressionTests.cs`, `FileStoreEncryptionTests.cs` + +**Go reference:** `filestore_test.go` (232 tests, ~130 already ported). Go's `testFileStoreAllPermutations()` runs each test across 6 combinations: `{NoCipher, ChaCha, AES} x {NoCompression, S2}`. + +**Permutation helper:** + +```csharp +// Go ref: filestore_test.go lines 55-71 (testFileStoreAllPermutations) +public static IEnumerable AllPermutations() +{ + foreach (var cipher in new[] { StoreCipher.NoCipher, StoreCipher.ChaCha, StoreCipher.AesGcm }) + foreach (var compression in new[] { StoreCompression.NoCompression, StoreCompression.S2Compression }) + yield return [cipher, compression]; +} + +[Theory] +[MemberData(nameof(AllPermutations))] +public async Task Store_and_load_basic(StoreCipher cipher, StoreCompression compression) +{ + await using var store = CreateStore($"basic-{cipher}-{compression}", cipher, compression); + var seq = await store.AppendAsync("test.subject", "hello"u8.ToArray(), CancellationToken.None); + var msg = await store.LoadAsync(seq, CancellationToken.None); + msg.ShouldNotBeNull(); + msg!.Payload.ToArray().ShouldBe("hello"u8.ToArray()); +} +``` + +Port ~100 tests covering: basic CRUD across all 6 permutations, block rotation, crash recovery, corruption detection, large payloads, subject-filtered queries, purge, snapshot/restore. + +**Use parallel subagent**: Independent of cluster tests. + +--- + +## Phase 3: Medium/Low Priority Test Ports + +### Task 13: Stress/NoRace Tests (~50 tests) + +**Files:** +- Create: `tests/NATS.Server.Tests/Stress/ConcurrentPubSubStressTests.cs` +- Create: `tests/NATS.Server.Tests/Stress/SlowConsumerStressTests.cs` +- Create: `tests/NATS.Server.Tests/Stress/ClusterStressTests.cs` + +**Go reference:** `norace_1_test.go` (100 tests), `norace_2_test.go` (41 tests) + +All tests marked with `[Trait("Category", "Stress")]` for optional CI execution. + +Port ~50 most critical tests covering: concurrent pub/sub with 100+ clients, slow consumer handling under load, route/gateway reconnection under message flood, JetStream publish during cluster failover. + +--- + +### Task 14: Accounts/Auth Tests (~30 tests) + +**Files:** +- Modify: `tests/NATS.Server.Tests/Accounts/` (existing files) +- Create: `tests/NATS.Server.Tests/Accounts/AuthCalloutTests.cs` + +**Go reference:** `accounts_test.go` (64 tests), `auth_callout_test.go` (31 tests) + +Port ~30 remaining tests covering: service import/export cross-account delivery, auth callout timeout/retry, account connection/subscription limits, user revocation. + +--- + +### Task 15: Message Trace Tests (~20 tests) + +**Files:** +- Create: `tests/NATS.Server.Tests/MessageTraceTests.cs` + +**Go reference:** `msgtrace_test.go` (33 tests) + +Port ~20 tests covering: trace header propagation, `$SYS.TRACE.>` event publication, trace filtering. + +--- + +### Task 16: Config/Reload Tests (~20 tests) + +**Files:** +- Modify: `tests/NATS.Server.Tests/Configuration/` (existing files) + +**Go reference:** `opts_test.go` (86 tests), `reload_test.go` (73 tests) + +Port ~20 remaining tests covering: CLI override precedence, include file resolution, TLS cert reload, account resolver reload. + +--- + +### Task 17: Events Tests (~15 tests) + +**Files:** +- Create: `tests/NATS.Server.Tests/Events/ServerEventTests.cs` + +**Go reference:** `events_test.go` (51 tests) + +Port ~15 tests covering: server lifecycle events, account stats, advisory messages. + +--- + +## Execution Dependencies + +``` +Task 1 (Wire Format) → Task 2 (NatsRaftTransport) +Task 3 (JetStream Svc) — independent of Tasks 1-2 +Task 4 (FileStore S2/AEAD) — independent of Tasks 1-3 + +Phase 1 complete → Phase 2 begins + +Task 5 (Cluster Fixture) → Tasks 6-10 (Cluster Tests, parallel) +Task 11 (JS Core Tests) — parallel with Tasks 6-10 +Task 12 (FileStore Tests) — parallel with Tasks 6-11 + +Phase 2 complete → Phase 3 begins + +Tasks 13-17 — all parallel with each other +``` + +## Verification Checkpoints + +After each phase: + +1. `dotnet build` — zero errors, zero warnings +2. `dotnet test tests/NATS.Server.Tests` — all tests pass +3. Commit with descriptive message +4. Verify test count increases as expected + +**Phase 1 target:** 2,606 + ~20 new impl tests = ~2,626 tests +**Phase 2 target:** 2,626 + ~460 cluster/JS/FileStore tests = ~3,086 tests +**Phase 3 target:** 3,086 + ~135 stress/auth/trace/config/events tests = ~3,221 tests + +**Final success criterion:** 3,100+ tests, 0 failures, all Go subsystems covered. diff --git a/docs/plans/2026-02-24-full-go-parity-plan.md.tasks.json b/docs/plans/2026-02-24-full-go-parity-plan.md.tasks.json new file mode 100644 index 0000000..7ac65bc --- /dev/null +++ b/docs/plans/2026-02-24-full-go-parity-plan.md.tasks.json @@ -0,0 +1,23 @@ +{ + "planPath": "docs/plans/2026-02-24-full-go-parity-plan.md", + "tasks": [ + {"id": 70, "subject": "Task 1: RAFT Binary Wire Format Types", "status": "pending"}, + {"id": 71, "subject": "Task 2: NatsRaftTransport", "status": "pending", "blockedBy": [70]}, + {"id": 72, "subject": "Task 3: JetStreamService Orchestration", "status": "pending"}, + {"id": 73, "subject": "Task 4: FileStore S2 Compression + AEAD Encryption", "status": "pending"}, + {"id": 74, "subject": "Task 5: JetStream Cluster Test Infrastructure", "status": "pending", "blockedBy": [70, 71, 72, 73]}, + {"id": 75, "subject": "Task 6: JS Cluster Tests - Leader Election & Failover", "status": "pending", "blockedBy": [74]}, + {"id": 76, "subject": "Task 7: JS Cluster Tests - Stream Replication", "status": "pending", "blockedBy": [74]}, + {"id": 77, "subject": "Task 8: JS Cluster Tests - Consumer Replication", "status": "pending", "blockedBy": [74]}, + {"id": 78, "subject": "Task 9: JS Cluster Tests - Meta-cluster Governance", "status": "pending", "blockedBy": [74]}, + {"id": 79, "subject": "Task 10: JS Cluster Tests - Advanced & Long-running", "status": "pending", "blockedBy": [74]}, + {"id": 80, "subject": "Task 11: JetStream Core Tests", "status": "pending", "blockedBy": [72, 73]}, + {"id": 81, "subject": "Task 12: FileStore Permutation Tests", "status": "pending", "blockedBy": [73]}, + {"id": 82, "subject": "Task 13: Stress/NoRace Tests", "status": "pending", "blockedBy": [75, 76, 77, 78, 79, 80, 81]}, + {"id": 83, "subject": "Task 14: Accounts/Auth Tests", "status": "pending", "blockedBy": [75, 76, 77, 78, 79, 80, 81]}, + {"id": 84, "subject": "Task 15: Message Trace Tests", "status": "pending", "blockedBy": [75, 76, 77, 78, 79, 80, 81]}, + {"id": 85, "subject": "Task 16: Config/Reload Tests", "status": "pending", "blockedBy": [75, 76, 77, 78, 79, 80, 81]}, + {"id": 86, "subject": "Task 17: Events Tests", "status": "pending", "blockedBy": [75, 76, 77, 78, 79, 80, 81]} + ], + "lastUpdated": "2026-02-24T12:00:00Z" +}