From 61824260df228f7c5c0f4d853e5366cf32298d6e Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 22 Feb 2026 19:45:53 -0500 Subject: [PATCH] feat: add detailed implementation plan for base NATS server port MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 10 tasks from scaffolding through integration tests, each with TDD steps, exact file paths, and complete code. Bottom-up order: SubList → Parser → Client → Server → Integration. --- docs/plans/2026-02-22-base-server-plan.md | 2721 +++++++++++++++++ .../2026-02-22-base-server-plan.md.tasks.json | 16 + 2 files changed, 2737 insertions(+) create mode 100644 docs/plans/2026-02-22-base-server-plan.md create mode 100644 docs/plans/2026-02-22-base-server-plan.md.tasks.json diff --git a/docs/plans/2026-02-22-base-server-plan.md b/docs/plans/2026-02-22-base-server-plan.md new file mode 100644 index 0000000..9007f32 --- /dev/null +++ b/docs/plans/2026-02-22-base-server-plan.md @@ -0,0 +1,2721 @@ +# Base NATS Server Port — Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans to implement this plan task-by-task. + +**Goal:** Port the core NATS pub/sub server from Go to .NET 10 C# — supporting CONNECT/INFO handshake, PUB/SUB/UNSUB, wildcards (`*`, `>`), queue groups, PING/PONG, and HPUB/HMSG headers. + +**Architecture:** Bottom-up build: subject matching trie (SubList) → protocol parser (System.IO.Pipelines) → client connection handler → server orchestrator. Each layer is independently testable. The library (NATS.Server) is separate from the host console app (NATS.Server.Host). + +**Tech Stack:** .NET 10, C# 14, xUnit, System.IO.Pipelines, System.Text.Json + +--- + +### Task 0: Project Scaffolding + +**Files:** +- Create: `NatsDotNet.sln` +- Create: `src/NATS.Server/NATS.Server.csproj` +- Create: `src/NATS.Server.Host/NATS.Server.Host.csproj` +- Create: `tests/NATS.Server.Tests/NATS.Server.Tests.csproj` +- Create: `Directory.Build.props` + +**Step 1: Create the solution and projects** + +```bash +cd /Users/dohertj2/Desktop/natsdotnet + +# Create solution +dotnet new sln -n NatsDotNet + +# Create class library +dotnet new classlib -n NATS.Server -o src/NATS.Server -f net10.0 +rm src/NATS.Server/Class1.cs + +# Create console app +dotnet new console -n NATS.Server.Host -o src/NATS.Server.Host -f net10.0 + +# Create test project +dotnet new xunit -n NATS.Server.Tests -o tests/NATS.Server.Tests -f net10.0 + +# Add projects to solution +dotnet sln add src/NATS.Server/NATS.Server.csproj +dotnet sln add src/NATS.Server.Host/NATS.Server.Host.csproj +dotnet sln add tests/NATS.Server.Tests/NATS.Server.Tests.csproj + +# Add project references +dotnet add src/NATS.Server.Host/NATS.Server.Host.csproj reference src/NATS.Server/NATS.Server.csproj +dotnet add tests/NATS.Server.Tests/NATS.Server.Tests.csproj reference src/NATS.Server/NATS.Server.csproj +``` + +**Step 2: Create Directory.Build.props** + +```xml + + + net10.0 + preview + enable + enable + true + + +``` + +Remove the `` and `` and `` from each individual `.csproj` since they're now in `Directory.Build.props`. + +**Step 3: Create directory structure** + +```bash +mkdir -p src/NATS.Server/Protocol +mkdir -p src/NATS.Server/Subscriptions +``` + +**Step 4: Verify build** + +Run: `dotnet build NatsDotNet.sln` +Expected: Build succeeded with 0 errors. + +**Step 5: Commit** + +```bash +git add -A +git commit -m "feat: scaffold solution with NATS.Server library, host, and test projects" +``` + +--- + +### Task 1: Subscription Types and Subject Validation + +**Files:** +- Create: `src/NATS.Server/Subscriptions/Subscription.cs` +- Create: `src/NATS.Server/Subscriptions/SubListResult.cs` +- Create: `src/NATS.Server/Subscriptions/SubjectMatch.cs` +- Test: `tests/NATS.Server.Tests/SubjectMatchTests.cs` + +**Reference:** `golang/nats-server/server/sublist.go` lines 1159-1233 for subject validation + +**Step 1: Write failing tests for subject validation** + +```csharp +// tests/NATS.Server.Tests/SubjectMatchTests.cs +using NATS.Server.Subscriptions; + +namespace NATS.Server.Tests; + +public class SubjectMatchTests +{ + [Theory] + [InlineData("foo", true)] + [InlineData("foo.bar", true)] + [InlineData("foo.bar.baz", true)] + [InlineData("foo.*", true)] + [InlineData("foo.>", true)] + [InlineData(">", true)] + [InlineData("*", true)] + [InlineData("*.bar", true)] + [InlineData("foo.*.baz", true)] + [InlineData("", false)] + [InlineData("foo.", false)] + [InlineData(".foo", false)] + [InlineData("foo..bar", false)] + [InlineData("foo.>.bar", false)] // > must be last token + [InlineData("foo bar", false)] // no spaces + [InlineData("foo\tbar", false)] // no tabs + public void IsValidSubject(string subject, bool expected) + { + Assert.Equal(expected, SubjectMatch.IsValidSubject(subject)); + } + + [Theory] + [InlineData("foo", true)] + [InlineData("foo.bar.baz", true)] + [InlineData("foo.*", false)] + [InlineData("foo.>", false)] + [InlineData(">", false)] + [InlineData("*", false)] + public void IsValidPublishSubject(string subject, bool expected) + { + Assert.Equal(expected, SubjectMatch.IsValidPublishSubject(subject)); + } + + [Theory] + [InlineData("foo", "foo", true)] + [InlineData("foo", "bar", false)] + [InlineData("foo.bar", "foo.*", true)] + [InlineData("foo.bar", "*.bar", true)] + [InlineData("foo.bar", "*.*", true)] + [InlineData("foo.bar.baz", "foo.>", true)] + [InlineData("foo.bar.baz", ">", true)] + [InlineData("foo.bar", "foo.>", true)] + [InlineData("foo", "foo.>", false)] + [InlineData("foo.bar.baz", "foo.*", false)] + [InlineData("foo.bar", "foo.bar.>", false)] + public void MatchLiteral(string literal, string pattern, bool expected) + { + Assert.Equal(expected, SubjectMatch.MatchLiteral(literal, pattern)); + } +} +``` + +**Step 2: Run tests to verify they fail** + +Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~SubjectMatchTests" -v quiet` +Expected: FAIL — types don't exist yet. + +**Step 3: Implement Subscription, SubListResult, and SubjectMatch** + +```csharp +// src/NATS.Server/Subscriptions/Subscription.cs +namespace NATS.Server.Subscriptions; + +public sealed class Subscription +{ + public required string Subject { get; init; } + public string? Queue { get; init; } + public required string Sid { get; init; } + public long MessageCount; // Interlocked + public long MaxMessages; // 0 = unlimited +} +``` + +```csharp +// src/NATS.Server/Subscriptions/SubListResult.cs +namespace NATS.Server.Subscriptions; + +public sealed class SubListResult +{ + public static readonly SubListResult Empty = new([], []); + + public Subscription[] PlainSubs { get; } + public Subscription[][] QueueSubs { get; } // outer = groups, inner = members + + public SubListResult(Subscription[] plainSubs, Subscription[][] queueSubs) + { + PlainSubs = plainSubs; + QueueSubs = queueSubs; + } +} +``` + +```csharp +// src/NATS.Server/Subscriptions/SubjectMatch.cs +namespace NATS.Server.Subscriptions; + +public static class SubjectMatch +{ + public const char Pwc = '*'; // partial wildcard + public const char Fwc = '>'; // full wildcard + public const char Sep = '.'; // token separator + + public static bool IsValidSubject(string subject) + { + if (string.IsNullOrEmpty(subject)) + return false; + + bool sawFwc = false; + int start = 0; + + for (int i = 0; i <= subject.Length; i++) + { + if (i == subject.Length || subject[i] == Sep) + { + int tokenLen = i - start; + if (tokenLen == 0 || sawFwc) + return false; + + if (tokenLen == 1) + { + char c = subject[start]; + if (c == Fwc) + sawFwc = true; + else if (c is ' ' or '\t' or '\n' or '\r' or '\f') + return false; + } + else + { + for (int j = start; j < i; j++) + { + char c = subject[j]; + if (c is ' ' or '\t' or '\n' or '\r' or '\f') + return false; + } + } + + start = i + 1; + } + } + + return true; + } + + public static bool IsLiteral(string subject) + { + for (int i = 0; i < subject.Length; i++) + { + char c = subject[i]; + if (c is Pwc or Fwc) + { + bool atStart = i == 0 || subject[i - 1] == Sep; + bool atEnd = i + 1 == subject.Length || subject[i + 1] == Sep; + if (atStart && atEnd) + return false; + } + } + return true; + } + + public static bool IsValidPublishSubject(string subject) + { + return IsValidSubject(subject) && IsLiteral(subject); + } + + /// + /// Match a literal subject against a pattern that may contain wildcards. + /// + public static bool MatchLiteral(string literal, string pattern) + { + int li = 0, pi = 0; + + while (pi < pattern.Length) + { + // Get next pattern token + int pTokenStart = pi; + while (pi < pattern.Length && pattern[pi] != Sep) pi++; + int pTokenLen = pi - pTokenStart; + if (pi < pattern.Length) pi++; // skip separator + + // Full wildcard — matches everything remaining + if (pTokenLen == 1 && pattern[pTokenStart] == Fwc) + return li < literal.Length; // must have at least one token left + + // Get next literal token + if (li >= literal.Length) return false; + int lTokenStart = li; + while (li < literal.Length && literal[li] != Sep) li++; + int lTokenLen = li - lTokenStart; + if (li < literal.Length) li++; // skip separator + + // Partial wildcard — matches any single token + if (pTokenLen == 1 && pattern[pTokenStart] == Pwc) + continue; + + // Literal comparison + if (pTokenLen != lTokenLen) + return false; + if (string.Compare(literal, lTokenStart, pattern, pTokenStart, pTokenLen, StringComparison.Ordinal) != 0) + return false; + } + + return li >= literal.Length; // both exhausted + } +} +``` + +**Step 4: Run tests to verify they pass** + +Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~SubjectMatchTests" -v quiet` +Expected: All tests PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/Subscriptions/ tests/NATS.Server.Tests/SubjectMatchTests.cs +git commit -m "feat: add Subscription types and subject validation with wildcard matching" +``` + +--- + +### Task 2: SubList Trie — Insert, Remove, Match + +**Files:** +- Create: `src/NATS.Server/Subscriptions/SubList.cs` +- Test: `tests/NATS.Server.Tests/SubListTests.cs` + +**Reference:** `golang/nats-server/server/sublist.go` — the core trie, matchLevel(), addNodeToResults(), cache logic + +**Step 1: Write failing tests for SubList** + +```csharp +// tests/NATS.Server.Tests/SubListTests.cs +using NATS.Server.Subscriptions; + +namespace NATS.Server.Tests; + +public class SubListTests +{ + private static Subscription MakeSub(string subject, string? queue = null, string sid = "1") + => new() { Subject = subject, Queue = queue, Sid = sid }; + + [Fact] + public void Insert_and_match_literal_subject() + { + var sl = new SubList(); + var sub = MakeSub("foo.bar"); + sl.Insert(sub); + + var r = sl.Match("foo.bar"); + Assert.Single(r.PlainSubs); + Assert.Same(sub, r.PlainSubs[0]); + Assert.Empty(r.QueueSubs); + } + + [Fact] + public void Match_returns_empty_for_no_match() + { + var sl = new SubList(); + sl.Insert(MakeSub("foo.bar")); + + var r = sl.Match("foo.baz"); + Assert.Empty(r.PlainSubs); + } + + [Fact] + public void Match_partial_wildcard() + { + var sl = new SubList(); + var sub = MakeSub("foo.*"); + sl.Insert(sub); + + Assert.Single(sl.Match("foo.bar").PlainSubs); + Assert.Single(sl.Match("foo.baz").PlainSubs); + Assert.Empty(sl.Match("foo.bar.baz").PlainSubs); + } + + [Fact] + public void Match_full_wildcard() + { + var sl = new SubList(); + var sub = MakeSub("foo.>"); + sl.Insert(sub); + + Assert.Single(sl.Match("foo.bar").PlainSubs); + Assert.Single(sl.Match("foo.bar.baz").PlainSubs); + Assert.Empty(sl.Match("foo").PlainSubs); + } + + [Fact] + public void Match_root_full_wildcard() + { + var sl = new SubList(); + sl.Insert(MakeSub(">")); + + Assert.Single(sl.Match("foo").PlainSubs); + Assert.Single(sl.Match("foo.bar").PlainSubs); + Assert.Single(sl.Match("foo.bar.baz").PlainSubs); + } + + [Fact] + public void Match_collects_multiple_subs() + { + var sl = new SubList(); + sl.Insert(MakeSub("foo.bar", sid: "1")); + sl.Insert(MakeSub("foo.*", sid: "2")); + sl.Insert(MakeSub("foo.>", sid: "3")); + sl.Insert(MakeSub(">", sid: "4")); + + var r = sl.Match("foo.bar"); + Assert.Equal(4, r.PlainSubs.Length); + } + + [Fact] + public void Remove_subscription() + { + var sl = new SubList(); + var sub = MakeSub("foo.bar"); + sl.Insert(sub); + Assert.Single(sl.Match("foo.bar").PlainSubs); + + sl.Remove(sub); + Assert.Empty(sl.Match("foo.bar").PlainSubs); + } + + [Fact] + public void Queue_group_subscriptions() + { + var sl = new SubList(); + sl.Insert(MakeSub("foo.bar", queue: "workers", sid: "1")); + sl.Insert(MakeSub("foo.bar", queue: "workers", sid: "2")); + sl.Insert(MakeSub("foo.bar", queue: "loggers", sid: "3")); + + var r = sl.Match("foo.bar"); + Assert.Empty(r.PlainSubs); + Assert.Equal(2, r.QueueSubs.Length); // 2 queue groups + } + + [Fact] + public void Count_tracks_subscriptions() + { + var sl = new SubList(); + Assert.Equal(0u, sl.Count); + + sl.Insert(MakeSub("foo", sid: "1")); + sl.Insert(MakeSub("bar", sid: "2")); + Assert.Equal(2u, sl.Count); + + sl.Remove(MakeSub("foo", sid: "1")); + // Remove by reference won't work — we need the same instance + } + + [Fact] + public void Count_tracks_with_same_instance() + { + var sl = new SubList(); + var sub = MakeSub("foo"); + sl.Insert(sub); + Assert.Equal(1u, sl.Count); + sl.Remove(sub); + Assert.Equal(0u, sl.Count); + } + + [Fact] + public void Cache_invalidation_on_insert() + { + var sl = new SubList(); + sl.Insert(MakeSub("foo.bar", sid: "1")); + + // Prime the cache + var r1 = sl.Match("foo.bar"); + Assert.Single(r1.PlainSubs); + + // Insert a wildcard that matches — cache should be invalidated + sl.Insert(MakeSub("foo.*", sid: "2")); + + var r2 = sl.Match("foo.bar"); + Assert.Equal(2, r2.PlainSubs.Length); + } + + [Fact] + public void Match_partial_wildcard_at_different_levels() + { + var sl = new SubList(); + sl.Insert(MakeSub("*.bar.baz", sid: "1")); + sl.Insert(MakeSub("foo.*.baz", sid: "2")); + sl.Insert(MakeSub("foo.bar.*", sid: "3")); + + var r = sl.Match("foo.bar.baz"); + Assert.Equal(3, r.PlainSubs.Length); + } +} +``` + +**Step 2: Run tests to verify they fail** + +Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~SubListTests" -v quiet` +Expected: FAIL — `SubList` class doesn't exist yet. + +**Step 3: Implement SubList** + +```csharp +// src/NATS.Server/Subscriptions/SubList.cs +namespace NATS.Server.Subscriptions; + +public sealed class SubList +{ + private const int CacheMax = 1024; + private const int CacheSweep = 256; + + private readonly ReaderWriterLockSlim _lock = new(); + private readonly TrieLevel _root = new(); + private Dictionary? _cache = new(); + private uint _count; + private ulong _genId; + + public uint Count + { + get + { + _lock.EnterReadLock(); + try { return _count; } + finally { _lock.ExitReadLock(); } + } + } + + public void Insert(Subscription sub) + { + var subject = sub.Subject; + _lock.EnterWriteLock(); + try + { + var level = _root; + TrieNode? node = null; + bool sawFwc = false; + + foreach (var token in new TokenEnumerator(subject)) + { + if (token.Length == 0 || sawFwc) + throw new ArgumentException("Invalid subject", nameof(sub)); + + if (token.Length == 1 && token[0] == SubjectMatch.Pwc) + { + node = level.Pwc ??= new TrieNode(); + } + else if (token.Length == 1 && token[0] == SubjectMatch.Fwc) + { + node = level.Fwc ??= new TrieNode(); + sawFwc = true; + } + else + { + var key = token.ToString(); + if (!level.Nodes.TryGetValue(key, out node)) + { + node = new TrieNode(); + level.Nodes[key] = node; + } + } + + node.Next ??= new TrieLevel(); + level = node.Next; + } + + if (node == null) + throw new ArgumentException("Invalid subject", nameof(sub)); + + if (sub.Queue == null) + { + node.PlainSubs.Add(sub); + } + else + { + if (!node.QueueSubs.TryGetValue(sub.Queue, out var qset)) + { + qset = []; + node.QueueSubs[sub.Queue] = qset; + } + qset.Add(sub); + } + + _count++; + _genId++; + _cache?.Clear(); + } + finally + { + _lock.ExitWriteLock(); + } + } + + public void Remove(Subscription sub) + { + _lock.EnterWriteLock(); + try + { + var level = _root; + TrieNode? node = null; + bool sawFwc = false; + + // Stack for pruning empty nodes + Span<(TrieLevel level, TrieNode node, string token, bool isPwc, bool isFwc)> path = + stackalloc (TrieLevel, TrieNode, string, bool, bool)[32]; + // Can't stackalloc tuples with references — use a list instead + var pathList = new List<(TrieLevel level, TrieNode node, string token, bool isPwc, bool isFwc)>(); + + foreach (var token in new TokenEnumerator(sub.Subject)) + { + if (token.Length == 0 || sawFwc) + return; + + bool isPwc = token.Length == 1 && token[0] == SubjectMatch.Pwc; + bool isFwc = token.Length == 1 && token[0] == SubjectMatch.Fwc; + + if (isPwc) + { + node = level.Pwc; + } + else if (isFwc) + { + node = level.Fwc; + sawFwc = true; + } + else + { + level.Nodes.TryGetValue(token.ToString(), out node); + } + + if (node == null) + return; // not found + + var tokenStr = token.ToString(); + pathList.Add((level, node, tokenStr, isPwc, isFwc)); + level = node.Next ?? new TrieLevel(); // shouldn't be null on valid path + } + + if (node == null) return; + + // Remove from node + bool removed; + if (sub.Queue == null) + { + removed = node.PlainSubs.Remove(sub); + } + else + { + removed = false; + if (node.QueueSubs.TryGetValue(sub.Queue, out var qset)) + { + removed = qset.Remove(sub); + if (qset.Count == 0) + node.QueueSubs.Remove(sub.Queue); + } + } + + if (!removed) return; + + _count--; + _genId++; + _cache?.Clear(); + + // Prune empty nodes (walk backwards) + for (int i = pathList.Count - 1; i >= 0; i--) + { + var (l, n, t, isPwc, isFwc) = pathList[i]; + if (n.IsEmpty) + { + if (isPwc) l.Pwc = null; + else if (isFwc) l.Fwc = null; + else l.Nodes.Remove(t); + } + } + } + finally + { + _lock.ExitWriteLock(); + } + } + + public SubListResult Match(string subject) + { + _lock.EnterReadLock(); + bool upgraded = false; + try + { + // Check cache + if (_cache != null && _cache.TryGetValue(subject, out var cached)) + return cached; + + // Cache miss — collect results + var plainSubs = new List(); + var queueSubs = new List>(); + + MatchLevel(_root, subject, 0, plainSubs, queueSubs); + + SubListResult result; + if (plainSubs.Count == 0 && queueSubs.Count == 0) + { + result = SubListResult.Empty; + } + else + { + result = new SubListResult( + plainSubs.ToArray(), + queueSubs.Select(q => q.ToArray()).ToArray() + ); + } + + // Upgrade to write lock for cache update + if (_cache != null) + { + _lock.ExitReadLock(); + _lock.EnterWriteLock(); + upgraded = true; + + // Re-check cache after lock upgrade + if (_cache.TryGetValue(subject, out cached)) + return cached; + + _cache[subject] = result; + + if (_cache.Count > CacheMax) + { + // Sweep: remove entries until at CacheSweep + var keys = _cache.Keys.Take(_cache.Count - CacheSweep).ToList(); + foreach (var key in keys) + _cache.Remove(key); + } + } + + return result; + } + finally + { + if (upgraded) + _lock.ExitWriteLock(); + else + _lock.ExitReadLock(); + } + } + + private static void MatchLevel(TrieLevel level, string subject, int startIndex, + List plainSubs, List> queueSubs) + { + int i = startIndex; + + while (i <= subject.Length) + { + // Find next token boundary + int tokenStart = i; + while (i < subject.Length && subject[i] != SubjectMatch.Sep) + i++; + var token = subject.AsSpan(tokenStart, i - tokenStart); + bool isLast = i >= subject.Length; + if (!isLast) i++; // skip separator + + // Full wildcard (>) at this level matches all remaining tokens + if (level.Fwc != null) + AddNodeToResults(level.Fwc, plainSubs, queueSubs); + + // Partial wildcard (*) — recurse with remaining tokens + if (level.Pwc != null) + { + if (isLast) + { + AddNodeToResults(level.Pwc, plainSubs, queueSubs); + } + else if (level.Pwc.Next != null) + { + MatchLevel(level.Pwc.Next, subject, i, plainSubs, queueSubs); + } + } + + // Literal match + if (level.Nodes.TryGetValue(token.ToString(), out var node)) + { + if (isLast) + { + AddNodeToResults(node, plainSubs, queueSubs); + } + else if (node.Next != null) + { + level = node.Next; + } + else + { + return; // no further levels + } + } + else + { + return; // no literal match + } + + if (isLast) + return; + } + } + + private static void AddNodeToResults(TrieNode node, + List plainSubs, List> queueSubs) + { + // Add plain subscriptions + foreach (var sub in node.PlainSubs) + plainSubs.Add(sub); + + // Add queue subscriptions grouped by queue name + foreach (var (queueName, subs) in node.QueueSubs) + { + if (subs.Count == 0) continue; + + // Find existing queue group or create new one + List? existing = null; + foreach (var qs in queueSubs) + { + if (qs.Count > 0 && qs[0].Queue == queueName) + { + existing = qs; + break; + } + } + if (existing == null) + { + existing = new List(); + queueSubs.Add(existing); + } + existing.AddRange(subs); + } + } + + /// Enumerates '.' separated tokens in a subject without allocating. + private ref struct TokenEnumerator + { + private ReadOnlySpan _remaining; + + public TokenEnumerator(string subject) + { + _remaining = subject.AsSpan(); + Current = default; + } + + public ReadOnlySpan Current { get; private set; } + + public TokenEnumerator GetEnumerator() => this; + + public bool MoveNext() + { + if (_remaining.IsEmpty) + return false; + + int sep = _remaining.IndexOf(SubjectMatch.Sep); + if (sep < 0) + { + Current = _remaining; + _remaining = default; + } + else + { + Current = _remaining[..sep]; + _remaining = _remaining[(sep + 1)..]; + } + return true; + } + } + + private sealed class TrieLevel + { + public readonly Dictionary Nodes = new(); + public TrieNode? Pwc; // partial wildcard (*) + public TrieNode? Fwc; // full wildcard (>) + } + + private sealed class TrieNode + { + public TrieLevel? Next; + public readonly HashSet PlainSubs = []; + public readonly Dictionary> QueueSubs = new(); + + public bool IsEmpty => PlainSubs.Count == 0 && QueueSubs.Count == 0 && + (Next == null || (Next.Nodes.Count == 0 && Next.Pwc == null && Next.Fwc == null)); + } +} +``` + +**Step 4: Run tests to verify they pass** + +Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~SubListTests" -v quiet` +Expected: All tests PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/Subscriptions/SubList.cs tests/NATS.Server.Tests/SubListTests.cs +git commit -m "feat: implement SubList trie with wildcard matching and cache" +``` + +--- + +### Task 3: Protocol Constants and Types + +**Files:** +- Create: `src/NATS.Server/Protocol/NatsProtocol.cs` +- Create: `src/NATS.Server/NatsOptions.cs` + +**Reference:** `golang/nats-server/server/const.go`, `golang/nats-server/server/server.go` lines 109-166 (Info struct), `golang/nats-server/server/client.go` lines 661-690 (ClientOpts) + +**Step 1: Create protocol constants and DTO types** + +```csharp +// src/NATS.Server/Protocol/NatsProtocol.cs +using System.Text; +using System.Text.Json.Serialization; + +namespace NATS.Server.Protocol; + +public static class NatsProtocol +{ + public const int MaxControlLineSize = 4096; + public const int MaxPayloadSize = 1024 * 1024; // 1MB + public const int DefaultPort = 4222; + public const string Version = "0.1.0"; + public const int ProtoVersion = 1; + + // Pre-encoded protocol fragments + public static readonly byte[] CrLf = "\r\n"u8.ToArray(); + public static readonly byte[] PingBytes = "PING\r\n"u8.ToArray(); + public static readonly byte[] PongBytes = "PONG\r\n"u8.ToArray(); + public static readonly byte[] OkBytes = "+OK\r\n"u8.ToArray(); + public static readonly byte[] InfoPrefix = "INFO "u8.ToArray(); + public static readonly byte[] MsgPrefix = "MSG "u8.ToArray(); + public static readonly byte[] HmsgPrefix = "HMSG "u8.ToArray(); + public static readonly byte[] ErrPrefix = "-ERR "u8.ToArray(); +} + +public sealed class ServerInfo +{ + [JsonPropertyName("server_id")] + public required string ServerId { get; set; } + + [JsonPropertyName("server_name")] + public required string ServerName { get; set; } + + [JsonPropertyName("version")] + public required string Version { get; set; } + + [JsonPropertyName("proto")] + public int Proto { get; set; } = NatsProtocol.ProtoVersion; + + [JsonPropertyName("host")] + public required string Host { get; set; } + + [JsonPropertyName("port")] + public int Port { get; set; } + + [JsonPropertyName("headers")] + public bool Headers { get; set; } = true; + + [JsonPropertyName("max_payload")] + public int MaxPayload { get; set; } = NatsProtocol.MaxPayloadSize; + + [JsonPropertyName("client_id")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public ulong ClientId { get; set; } + + [JsonPropertyName("client_ip")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? ClientIp { get; set; } +} + +public sealed class ClientOptions +{ + [JsonPropertyName("verbose")] + public bool Verbose { get; set; } + + [JsonPropertyName("pedantic")] + public bool Pedantic { get; set; } + + [JsonPropertyName("echo")] + public bool Echo { get; set; } = true; + + [JsonPropertyName("name")] + public string? Name { get; set; } + + [JsonPropertyName("lang")] + public string? Lang { get; set; } + + [JsonPropertyName("version")] + public string? Version { get; set; } + + [JsonPropertyName("protocol")] + public int Protocol { get; set; } + + [JsonPropertyName("headers")] + public bool Headers { get; set; } + + [JsonPropertyName("no_responders")] + public bool NoResponders { get; set; } +} +``` + +```csharp +// src/NATS.Server/NatsOptions.cs +namespace NATS.Server; + +public sealed class NatsOptions +{ + public string Host { get; set; } = "0.0.0.0"; + public int Port { get; set; } = 4222; + public string? ServerName { get; set; } + public int MaxPayload { get; set; } = 1024 * 1024; // 1MB + public int MaxControlLine { get; set; } = 4096; + public int MaxConnections { get; set; } = 65536; + public TimeSpan PingInterval { get; set; } = TimeSpan.FromMinutes(2); + public int MaxPingsOut { get; set; } = 2; +} +``` + +**Step 2: Verify build** + +Run: `dotnet build NatsDotNet.sln` +Expected: Build succeeded. + +**Step 3: Commit** + +```bash +git add src/NATS.Server/Protocol/NatsProtocol.cs src/NATS.Server/NatsOptions.cs +git commit -m "feat: add protocol constants, ServerInfo, ClientOptions, and NatsOptions" +``` + +--- + +### Task 4: Protocol Parser + +**Files:** +- Create: `src/NATS.Server/Protocol/NatsParser.cs` +- Test: `tests/NATS.Server.Tests/ParserTests.cs` + +**Reference:** `golang/nats-server/server/parser.go` — state machine, argument parsing, payload reading + +**Step 1: Write failing parser tests** + +```csharp +// tests/NATS.Server.Tests/ParserTests.cs +using System.Buffers; +using System.IO.Pipelines; +using System.Text; +using NATS.Server.Protocol; + +namespace NATS.Server.Tests; + +public class ParserTests +{ + private static async Task> ParseAsync(string input) + { + var pipe = new Pipe(); + var commands = new List(); + + // Write input to pipe + var bytes = Encoding.ASCII.GetBytes(input); + await pipe.Writer.WriteAsync(bytes); + pipe.Writer.Complete(); + + // Parse from pipe + var parser = new NatsParser(maxPayload: NatsProtocol.MaxPayloadSize); + while (true) + { + var result = await pipe.Reader.ReadAsync(); + var buffer = result.Buffer; + + while (parser.TryParse(ref buffer, out var cmd)) + commands.Add(cmd); + + pipe.Reader.AdvanceTo(buffer.Start, buffer.End); + + if (result.IsCompleted) + break; + } + + return commands; + } + + [Fact] + public async Task Parse_PING() + { + var cmds = await ParseAsync("PING\r\n"); + Assert.Single(cmds); + Assert.Equal(CommandType.Ping, cmds[0].Type); + } + + [Fact] + public async Task Parse_PONG() + { + var cmds = await ParseAsync("PONG\r\n"); + Assert.Single(cmds); + Assert.Equal(CommandType.Pong, cmds[0].Type); + } + + [Fact] + public async Task Parse_CONNECT() + { + var cmds = await ParseAsync("CONNECT {\"verbose\":false,\"echo\":true}\r\n"); + Assert.Single(cmds); + Assert.Equal(CommandType.Connect, cmds[0].Type); + Assert.Contains("verbose", Encoding.ASCII.GetString(cmds[0].Payload.ToArray())); + } + + [Fact] + public async Task Parse_SUB_without_queue() + { + var cmds = await ParseAsync("SUB foo 1\r\n"); + Assert.Single(cmds); + Assert.Equal(CommandType.Sub, cmds[0].Type); + Assert.Equal("foo", cmds[0].Subject); + Assert.Null(cmds[0].Queue); + Assert.Equal("1", cmds[0].Sid); + } + + [Fact] + public async Task Parse_SUB_with_queue() + { + var cmds = await ParseAsync("SUB foo workers 1\r\n"); + Assert.Single(cmds); + Assert.Equal(CommandType.Sub, cmds[0].Type); + Assert.Equal("foo", cmds[0].Subject); + Assert.Equal("workers", cmds[0].Queue); + Assert.Equal("1", cmds[0].Sid); + } + + [Fact] + public async Task Parse_UNSUB() + { + var cmds = await ParseAsync("UNSUB 1\r\n"); + Assert.Single(cmds); + Assert.Equal(CommandType.Unsub, cmds[0].Type); + Assert.Equal("1", cmds[0].Sid); + Assert.Equal(-1, cmds[0].MaxMessages); + } + + [Fact] + public async Task Parse_UNSUB_with_max() + { + var cmds = await ParseAsync("UNSUB 1 5\r\n"); + Assert.Single(cmds); + Assert.Equal(CommandType.Unsub, cmds[0].Type); + Assert.Equal("1", cmds[0].Sid); + Assert.Equal(5, cmds[0].MaxMessages); + } + + [Fact] + public async Task Parse_PUB_with_payload() + { + var cmds = await ParseAsync("PUB foo 5\r\nHello\r\n"); + Assert.Single(cmds); + Assert.Equal(CommandType.Pub, cmds[0].Type); + Assert.Equal("foo", cmds[0].Subject); + Assert.Null(cmds[0].ReplyTo); + Assert.Equal("Hello", Encoding.ASCII.GetString(cmds[0].Payload.ToArray())); + } + + [Fact] + public async Task Parse_PUB_with_reply() + { + var cmds = await ParseAsync("PUB foo reply 5\r\nHello\r\n"); + Assert.Single(cmds); + Assert.Equal(CommandType.Pub, cmds[0].Type); + Assert.Equal("foo", cmds[0].Subject); + Assert.Equal("reply", cmds[0].ReplyTo); + Assert.Equal("Hello", Encoding.ASCII.GetString(cmds[0].Payload.ToArray())); + } + + [Fact] + public async Task Parse_multiple_commands() + { + var cmds = await ParseAsync("PING\r\nPONG\r\nSUB foo 1\r\n"); + Assert.Equal(3, cmds.Count); + Assert.Equal(CommandType.Ping, cmds[0].Type); + Assert.Equal(CommandType.Pong, cmds[1].Type); + Assert.Equal(CommandType.Sub, cmds[2].Type); + } + + [Fact] + public async Task Parse_PUB_zero_payload() + { + var cmds = await ParseAsync("PUB foo 0\r\n\r\n"); + Assert.Single(cmds); + Assert.Equal(CommandType.Pub, cmds[0].Type); + Assert.Empty(cmds[0].Payload.ToArray()); + } + + [Fact] + public async Task Parse_case_insensitive() + { + var cmds = await ParseAsync("ping\r\npub FOO 3\r\nabc\r\n"); + Assert.Equal(2, cmds.Count); + Assert.Equal(CommandType.Ping, cmds[0].Type); + Assert.Equal(CommandType.Pub, cmds[1].Type); + } + + [Fact] + public async Task Parse_HPUB() + { + // HPUB subject 12 17\r\nNATS/1.0\r\n\r\nHello\r\n + var header = "NATS/1.0\r\n\r\n"; + var payload = "Hello"; + var total = header.Length + payload.Length; + var cmds = await ParseAsync($"HPUB foo {header.Length} {total}\r\n{header}{payload}\r\n"); + Assert.Single(cmds); + Assert.Equal(CommandType.HPub, cmds[0].Type); + Assert.Equal("foo", cmds[0].Subject); + Assert.Equal(header.Length, cmds[0].HeaderSize); + } + + [Fact] + public async Task Parse_INFO() + { + var cmds = await ParseAsync("INFO {\"server_id\":\"test\"}\r\n"); + Assert.Single(cmds); + Assert.Equal(CommandType.Info, cmds[0].Type); + } +} +``` + +**Step 2: Run tests to verify they fail** + +Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~ParserTests" -v quiet` +Expected: FAIL — `NatsParser` doesn't exist. + +**Step 3: Implement NatsParser** + +```csharp +// src/NATS.Server/Protocol/NatsParser.cs +using System.Buffers; +using System.Text; + +namespace NATS.Server.Protocol; + +public enum CommandType +{ + Ping, + Pong, + Connect, + Info, + Pub, + HPub, + Sub, + Unsub, + Ok, + Err, +} + +public readonly struct ParsedCommand +{ + public CommandType Type { get; init; } + public string? Subject { get; init; } + public string? ReplyTo { get; init; } + public string? Queue { get; init; } + public string? Sid { get; init; } + public int MaxMessages { get; init; } + public int HeaderSize { get; init; } + public ReadOnlyMemory Payload { get; init; } + + public static ParsedCommand Simple(CommandType type) => new() { Type = type, MaxMessages = -1 }; +} + +public sealed class NatsParser +{ + private static readonly byte[] CrLf = "\r\n"u8.ToArray(); + private readonly int _maxPayload; + + // State for split-packet payload reading + private bool _awaitingPayload; + private int _expectedPayloadSize; + private string? _pendingSubject; + private string? _pendingReplyTo; + private int _pendingHeaderSize; + private CommandType _pendingType; + + public NatsParser(int maxPayload = NatsProtocol.MaxPayloadSize) + { + _maxPayload = maxPayload; + } + + public bool TryParse(ref ReadOnlySequence buffer, out ParsedCommand command) + { + command = default; + + if (_awaitingPayload) + return TryReadPayload(ref buffer, out command); + + // Look for \r\n to find control line + var reader = new SequenceReader(buffer); + if (!reader.TryReadTo(out ReadOnlySequence line, CrLf.AsSpan())) + return false; + + // Control line size check + if (line.Length > NatsProtocol.MaxControlLineSize) + throw new ProtocolViolationException("Maximum control line exceeded"); + + // Get line as contiguous span + Span lineSpan = stackalloc byte[(int)line.Length]; + line.CopyTo(lineSpan); + + // Identify command by first bytes + if (lineSpan.Length < 2) + { + if (lineSpan.Length == 1 && lineSpan[0] is (byte)'+') + { + // partial — need more data + return false; + } + throw new ProtocolViolationException("Unknown protocol operation"); + } + + byte b0 = (byte)(lineSpan[0] | 0x20); // lowercase + byte b1 = (byte)(lineSpan[1] | 0x20); + + switch (b0) + { + case (byte)'p': + if (b1 == (byte)'i') // PING + { + command = ParsedCommand.Simple(CommandType.Ping); + buffer = buffer.Slice(reader.Position); + return true; + } + if (b1 == (byte)'o') // PONG + { + command = ParsedCommand.Simple(CommandType.Pong); + buffer = buffer.Slice(reader.Position); + return true; + } + if (b1 == (byte)'u') // PUB + { + return ParsePub(lineSpan, ref buffer, reader.Position, out command); + } + break; + + case (byte)'h': + if (b1 == (byte)'p') // HPUB + { + return ParseHPub(lineSpan, ref buffer, reader.Position, out command); + } + break; + + case (byte)'s': + if (b1 == (byte)'u') // SUB + { + command = ParseSub(lineSpan); + buffer = buffer.Slice(reader.Position); + return true; + } + break; + + case (byte)'u': + if (b1 == (byte)'n') // UNSUB + { + command = ParseUnsub(lineSpan); + buffer = buffer.Slice(reader.Position); + return true; + } + break; + + case (byte)'c': + if (b1 == (byte)'o') // CONNECT + { + command = ParseConnect(lineSpan); + buffer = buffer.Slice(reader.Position); + return true; + } + break; + + case (byte)'i': + if (b1 == (byte)'n') // INFO + { + command = ParseInfo(lineSpan); + buffer = buffer.Slice(reader.Position); + return true; + } + break; + + case (byte)'+': // +OK + command = ParsedCommand.Simple(CommandType.Ok); + buffer = buffer.Slice(reader.Position); + return true; + + case (byte)'-': // -ERR + command = ParsedCommand.Simple(CommandType.Err); + buffer = buffer.Slice(reader.Position); + return true; + } + + throw new ProtocolViolationException($"Unknown protocol operation"); + } + + private bool ParsePub(Span line, ref ReadOnlySequence buffer, + SequencePosition afterLine, out ParsedCommand command) + { + command = default; + + // PUB subject [reply] size + // Skip "PUB " + var args = SplitArgs(line[4..]); + + string subject; + string? reply = null; + int size; + + if (args.Length == 2) + { + subject = Encoding.ASCII.GetString(args[0]); + size = ParseSize(args[1]); + } + else if (args.Length == 3) + { + subject = Encoding.ASCII.GetString(args[0]); + reply = Encoding.ASCII.GetString(args[1]); + size = ParseSize(args[2]); + } + else + { + throw new ProtocolViolationException("Invalid PUB arguments"); + } + + if (size < 0 || size > _maxPayload) + throw new ProtocolViolationException("Invalid payload size"); + + // Now read payload + \r\n + buffer = buffer.Slice(afterLine); + _awaitingPayload = true; + _expectedPayloadSize = size; + _pendingSubject = subject; + _pendingReplyTo = reply; + _pendingHeaderSize = -1; + _pendingType = CommandType.Pub; + + return TryReadPayload(ref buffer, out command); + } + + private bool ParseHPub(Span line, ref ReadOnlySequence buffer, + SequencePosition afterLine, out ParsedCommand command) + { + command = default; + + // HPUB subject [reply] hdr_size total_size + // Skip "HPUB " + var args = SplitArgs(line[5..]); + + string subject; + string? reply = null; + int hdrSize, totalSize; + + if (args.Length == 3) + { + subject = Encoding.ASCII.GetString(args[0]); + hdrSize = ParseSize(args[1]); + totalSize = ParseSize(args[2]); + } + else if (args.Length == 4) + { + subject = Encoding.ASCII.GetString(args[0]); + reply = Encoding.ASCII.GetString(args[1]); + hdrSize = ParseSize(args[2]); + totalSize = ParseSize(args[3]); + } + else + { + throw new ProtocolViolationException("Invalid HPUB arguments"); + } + + if (hdrSize < 0 || totalSize < 0 || hdrSize > totalSize || totalSize > _maxPayload) + throw new ProtocolViolationException("Invalid HPUB sizes"); + + buffer = buffer.Slice(afterLine); + _awaitingPayload = true; + _expectedPayloadSize = totalSize; + _pendingSubject = subject; + _pendingReplyTo = reply; + _pendingHeaderSize = hdrSize; + _pendingType = CommandType.HPub; + + return TryReadPayload(ref buffer, out command); + } + + private bool TryReadPayload(ref ReadOnlySequence buffer, out ParsedCommand command) + { + command = default; + + // Need: _expectedPayloadSize bytes + \r\n + long needed = _expectedPayloadSize + 2; // payload + \r\n + if (buffer.Length < needed) + return false; + + // Extract payload + var payloadSlice = buffer.Slice(0, _expectedPayloadSize); + var payload = new byte[_expectedPayloadSize]; + payloadSlice.CopyTo(payload); + + // Verify \r\n after payload + var trailer = buffer.Slice(_expectedPayloadSize, 2); + Span trailerBytes = stackalloc byte[2]; + trailer.CopyTo(trailerBytes); + if (trailerBytes[0] != (byte)'\r' || trailerBytes[1] != (byte)'\n') + throw new ProtocolViolationException("Expected \\r\\n after payload"); + + command = new ParsedCommand + { + Type = _pendingType, + Subject = _pendingSubject, + ReplyTo = _pendingReplyTo, + Payload = payload, + HeaderSize = _pendingHeaderSize, + MaxMessages = -1, + }; + + buffer = buffer.Slice(buffer.GetPosition(needed)); + _awaitingPayload = false; + return true; + } + + private static ParsedCommand ParseSub(Span line) + { + // SUB subject [queue] sid — skip "SUB " + var args = SplitArgs(line[4..]); + + return args.Length switch + { + 2 => new ParsedCommand + { + Type = CommandType.Sub, + Subject = Encoding.ASCII.GetString(args[0]), + Sid = Encoding.ASCII.GetString(args[1]), + MaxMessages = -1, + }, + 3 => new ParsedCommand + { + Type = CommandType.Sub, + Subject = Encoding.ASCII.GetString(args[0]), + Queue = Encoding.ASCII.GetString(args[1]), + Sid = Encoding.ASCII.GetString(args[2]), + MaxMessages = -1, + }, + _ => throw new ProtocolViolationException("Invalid SUB arguments"), + }; + } + + private static ParsedCommand ParseUnsub(Span line) + { + // UNSUB sid [max_msgs] — skip "UNSUB " + var args = SplitArgs(line[6..]); + + return args.Length switch + { + 1 => new ParsedCommand + { + Type = CommandType.Unsub, + Sid = Encoding.ASCII.GetString(args[0]), + MaxMessages = -1, + }, + 2 => new ParsedCommand + { + Type = CommandType.Unsub, + Sid = Encoding.ASCII.GetString(args[0]), + MaxMessages = ParseSize(args[1]), + }, + _ => throw new ProtocolViolationException("Invalid UNSUB arguments"), + }; + } + + private static ParsedCommand ParseConnect(Span line) + { + // CONNECT {json} — skip "CONNECT " + int spaceIdx = line.IndexOf((byte)' '); + if (spaceIdx < 0) + throw new ProtocolViolationException("Invalid CONNECT"); + + var json = line[(spaceIdx + 1)..]; + return new ParsedCommand + { + Type = CommandType.Connect, + Payload = json.ToArray(), + MaxMessages = -1, + }; + } + + private static ParsedCommand ParseInfo(Span line) + { + // INFO {json} — skip "INFO " + int spaceIdx = line.IndexOf((byte)' '); + if (spaceIdx < 0) + throw new ProtocolViolationException("Invalid INFO"); + + var json = line[(spaceIdx + 1)..]; + return new ParsedCommand + { + Type = CommandType.Info, + Payload = json.ToArray(), + MaxMessages = -1, + }; + } + + /// Parse a decimal integer from ASCII bytes. Returns -1 on error. + private static int ParseSize(Span data) + { + if (data.Length == 0 || data.Length > 9) + return -1; + int n = 0; + foreach (byte b in data) + { + if (b < (byte)'0' || b > (byte)'9') + return -1; + n = n * 10 + (b - '0'); + } + return n; + } + + /// Split by spaces/tabs into up to 6 arguments. Returns Span of Spans via ranges. + private static SpanArgs SplitArgs(Span data) + { + var result = new SpanArgs(); + int start = -1; + + for (int i = 0; i < data.Length; i++) + { + byte b = data[i]; + if (b is (byte)' ' or (byte)'\t') + { + if (start >= 0) + { + result.Add(data[start..i]); + start = -1; + } + } + else + { + if (start < 0) start = i; + } + } + if (start >= 0) + result.Add(data[start..]); + + return result; + } + + private ref struct SpanArgs + { + private Span _a0, _a1, _a2, _a3, _a4, _a5; + private int _count; + + public readonly int Length => _count; + + public void Add(Span arg) + { + switch (_count) + { + case 0: _a0 = arg; break; + case 1: _a1 = arg; break; + case 2: _a2 = arg; break; + case 3: _a3 = arg; break; + case 4: _a4 = arg; break; + case 5: _a5 = arg; break; + default: throw new ProtocolViolationException("Too many arguments"); + } + _count++; + } + + public readonly Span this[int index] => index switch + { + 0 => _a0, 1 => _a1, 2 => _a2, 3 => _a3, 4 => _a4, 5 => _a5, + _ => throw new IndexOutOfRangeException(), + }; + } +} + +public class ProtocolViolationException : Exception +{ + public ProtocolViolationException(string message) : base(message) { } +} +``` + +**Step 4: Run tests to verify they pass** + +Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~ParserTests" -v quiet` +Expected: All tests PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/Protocol/NatsParser.cs tests/NATS.Server.Tests/ParserTests.cs +git commit -m "feat: implement NATS protocol parser with System.IO.Pipelines" +``` + +--- + +### Task 5: NatsClient — Connection Handler + +**Files:** +- Create: `src/NATS.Server/NatsClient.cs` +- Test: `tests/NATS.Server.Tests/ClientTests.cs` + +**Reference:** `golang/nats-server/server/client.go` — readLoop, processSub, processUnsub, processConnect, deliverMsg + +**Step 1: Write failing client tests** + +```csharp +// tests/NATS.Server.Tests/ClientTests.cs +using System.IO.Pipelines; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Text.Json; +using NATS.Server; +using NATS.Server.Protocol; + +namespace NATS.Server.Tests; + +public class ClientTests : IAsyncDisposable +{ + private readonly Socket _serverSocket; + private readonly Socket _clientSocket; + private readonly NatsClient _natsClient; + private readonly CancellationTokenSource _cts = new(); + + public ClientTests() + { + // Create connected socket pair via loopback + var listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + listener.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + listener.Listen(1); + var port = ((IPEndPoint)listener.LocalEndPoint!).Port; + + _clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + _clientSocket.Connect(IPAddress.Loopback, port); + _serverSocket = listener.Accept(); + listener.Dispose(); + + var serverInfo = new ServerInfo + { + ServerId = "test", + ServerName = "test", + Version = "0.1.0", + Host = "127.0.0.1", + Port = 4222, + }; + + _natsClient = new NatsClient(1, _serverSocket, new NatsOptions(), serverInfo); + } + + public async ValueTask DisposeAsync() + { + await _cts.CancelAsync(); + _natsClient.Dispose(); + _clientSocket.Dispose(); + } + + [Fact] + public async Task Client_sends_INFO_on_start() + { + var runTask = _natsClient.RunAsync(_cts.Token); + + // Read from client socket — should get INFO + var buf = new byte[4096]; + var n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None); + var response = Encoding.ASCII.GetString(buf, 0, n); + + Assert.StartsWith("INFO ", response); + Assert.Contains("server_id", response); + Assert.Contains("\r\n", response); + + await _cts.CancelAsync(); + } + + [Fact] + public async Task Client_responds_PONG_to_PING() + { + var runTask = _natsClient.RunAsync(_cts.Token); + + // Read INFO + var buf = new byte[4096]; + await _clientSocket.ReceiveAsync(buf, SocketFlags.None); + + // Send CONNECT then PING + await _clientSocket.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPING\r\n")); + + // Read response — should get PONG + var n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None); + var response = Encoding.ASCII.GetString(buf, 0, n); + + Assert.Contains("PONG\r\n", response); + + await _cts.CancelAsync(); + } +} +``` + +**Step 2: Run tests to verify they fail** + +Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~ClientTests" -v quiet` +Expected: FAIL — `NatsClient` doesn't exist. + +**Step 3: Implement NatsClient** + +```csharp +// src/NATS.Server/NatsClient.cs +using System.Buffers; +using System.IO.Pipelines; +using System.Net.Sockets; +using System.Text; +using System.Text.Json; +using NATS.Server.Protocol; +using NATS.Server.Subscriptions; + +namespace NATS.Server; + +public interface IMessageRouter +{ + void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory headers, + ReadOnlyMemory payload, NatsClient sender); + void RemoveClient(NatsClient client); +} + +public sealed class NatsClient : IDisposable +{ + private readonly Socket _socket; + private readonly NetworkStream _stream; + private readonly NatsOptions _options; + private readonly ServerInfo _serverInfo; + private readonly NatsParser _parser; + private readonly SemaphoreSlim _writeLock = new(1, 1); + private readonly Dictionary _subs = new(); + + public ulong Id { get; } + public ClientOptions? ClientOpts { get; private set; } + public IMessageRouter? Router { get; set; } + public bool ConnectReceived { get; private set; } + + // Stats + public long InMsgs; + public long OutMsgs; + public long InBytes; + public long OutBytes; + + public IReadOnlyDictionary Subscriptions => _subs; + + public NatsClient(ulong id, Socket socket, NatsOptions options, ServerInfo serverInfo) + { + Id = id; + _socket = socket; + _stream = new NetworkStream(socket, ownsSocket: false); + _options = options; + _serverInfo = serverInfo; + _parser = new NatsParser(options.MaxPayload); + } + + public async Task RunAsync(CancellationToken ct) + { + var pipe = new Pipe(); + try + { + // Send INFO + await SendInfoAsync(ct); + + // Start read pump and command processing in parallel + var fillTask = FillPipeAsync(pipe.Writer, ct); + var processTask = ProcessCommandsAsync(pipe.Reader, ct); + + await Task.WhenAny(fillTask, processTask); + } + catch (OperationCanceledException) { } + catch (Exception) { /* connection error — clean up */ } + finally + { + Router?.RemoveClient(this); + } + } + + private async Task FillPipeAsync(PipeWriter writer, CancellationToken ct) + { + try + { + while (!ct.IsCancellationRequested) + { + var memory = writer.GetMemory(4096); + int bytesRead = await _stream.ReadAsync(memory, ct); + if (bytesRead == 0) + break; + + writer.Advance(bytesRead); + var result = await writer.FlushAsync(ct); + if (result.IsCompleted) + break; + } + } + finally + { + await writer.CompleteAsync(); + } + } + + private async Task ProcessCommandsAsync(PipeReader reader, CancellationToken ct) + { + try + { + while (!ct.IsCancellationRequested) + { + var result = await reader.ReadAsync(ct); + var buffer = result.Buffer; + + while (_parser.TryParse(ref buffer, out var cmd)) + { + await DispatchCommandAsync(cmd, ct); + } + + reader.AdvanceTo(buffer.Start, buffer.End); + + if (result.IsCompleted) + break; + } + } + finally + { + await reader.CompleteAsync(); + } + } + + private async ValueTask DispatchCommandAsync(ParsedCommand cmd, CancellationToken ct) + { + switch (cmd.Type) + { + case CommandType.Connect: + ProcessConnect(cmd); + break; + + case CommandType.Ping: + await WriteAsync(NatsProtocol.PongBytes, ct); + break; + + case CommandType.Pong: + // Update RTT (placeholder) + break; + + case CommandType.Sub: + ProcessSub(cmd); + break; + + case CommandType.Unsub: + ProcessUnsub(cmd); + break; + + case CommandType.Pub: + case CommandType.HPub: + ProcessPub(cmd); + break; + } + } + + private void ProcessConnect(ParsedCommand cmd) + { + ClientOpts = JsonSerializer.Deserialize(cmd.Payload.Span) + ?? new ClientOptions(); + ConnectReceived = true; + } + + private void ProcessSub(ParsedCommand cmd) + { + var sub = new Subscription + { + Subject = cmd.Subject!, + Queue = cmd.Queue, + Sid = cmd.Sid!, + }; + + _subs[cmd.Sid!] = sub; + sub.Client = this; + + if (Router is ISubListAccess sl) + sl.SubList.Insert(sub); + } + + private void ProcessUnsub(ParsedCommand cmd) + { + if (!_subs.TryGetValue(cmd.Sid!, out var sub)) + return; + + if (cmd.MaxMessages > 0) + { + sub.MaxMessages = cmd.MaxMessages; + // Will be cleaned up when MessageCount reaches MaxMessages + return; + } + + _subs.Remove(cmd.Sid!); + + if (Router is ISubListAccess sl) + sl.SubList.Remove(sub); + } + + private void ProcessPub(ParsedCommand cmd) + { + Interlocked.Increment(ref InMsgs); + Interlocked.Add(ref InBytes, cmd.Payload.Length); + + ReadOnlyMemory headers = default; + ReadOnlyMemory payload = cmd.Payload; + + if (cmd.Type == CommandType.HPub && cmd.HeaderSize > 0) + { + headers = cmd.Payload[..cmd.HeaderSize]; + payload = cmd.Payload[cmd.HeaderSize..]; + } + + Router?.ProcessMessage(cmd.Subject!, cmd.ReplyTo, headers, payload, this); + } + + private async Task SendInfoAsync(CancellationToken ct) + { + var infoJson = JsonSerializer.Serialize(_serverInfo); + var infoLine = Encoding.ASCII.GetBytes($"INFO {infoJson}\r\n"); + await WriteAsync(infoLine, ct); + } + + public async Task SendMessageAsync(string subject, string sid, string? replyTo, + ReadOnlyMemory headers, ReadOnlyMemory payload, CancellationToken ct) + { + Interlocked.Increment(ref OutMsgs); + Interlocked.Add(ref OutBytes, payload.Length + headers.Length); + + byte[] line; + if (headers.Length > 0) + { + int totalSize = headers.Length + payload.Length; + line = Encoding.ASCII.GetBytes($"HMSG {subject} {sid} {(replyTo != null ? replyTo + " " : "")}{headers.Length} {totalSize}\r\n"); + } + else + { + line = Encoding.ASCII.GetBytes($"MSG {subject} {sid} {(replyTo != null ? replyTo + " " : "")}{payload.Length}\r\n"); + } + + await _writeLock.WaitAsync(ct); + try + { + await _stream.WriteAsync(line, ct); + if (headers.Length > 0) + await _stream.WriteAsync(headers, ct); + if (payload.Length > 0) + await _stream.WriteAsync(payload, ct); + await _stream.WriteAsync(NatsProtocol.CrLf, ct); + await _stream.FlushAsync(ct); + } + finally + { + _writeLock.Release(); + } + } + + private async Task WriteAsync(byte[] data, CancellationToken ct) + { + await _writeLock.WaitAsync(ct); + try + { + await _stream.WriteAsync(data, ct); + await _stream.FlushAsync(ct); + } + finally + { + _writeLock.Release(); + } + } + + public void RemoveAllSubscriptions(SubList subList) + { + foreach (var sub in _subs.Values) + subList.Remove(sub); + _subs.Clear(); + } + + public void Dispose() + { + _stream.Dispose(); + _socket.Dispose(); + _writeLock.Dispose(); + } +} + +public interface ISubListAccess +{ + SubList SubList { get; } +} +``` + +Update `Subscription.cs` to add the `Client` field: + +```csharp +// Add to Subscription.cs +public NatsClient? Client { get; set; } +``` + +**Step 4: Run tests to verify they pass** + +Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~ClientTests" -v quiet` +Expected: All tests PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/NatsClient.cs tests/NATS.Server.Tests/ClientTests.cs src/NATS.Server/Subscriptions/Subscription.cs +git commit -m "feat: implement NatsClient connection handler with read/write pipeline" +``` + +--- + +### Task 6: NatsServer — Orchestrator + +**Files:** +- Create: `src/NATS.Server/NatsServer.cs` +- Test: `tests/NATS.Server.Tests/ServerTests.cs` + +**Reference:** `golang/nats-server/server/server.go` — NewServer, AcceptLoop, createClient, `golang/nats-server/server/client.go` — processMsgResults, deliverMsg + +**Step 1: Write failing server tests** + +```csharp +// tests/NATS.Server.Tests/ServerTests.cs +using System.Net; +using System.Net.Sockets; +using System.Text; +using NATS.Server; + +namespace NATS.Server.Tests; + +public class ServerTests : IAsyncDisposable +{ + private readonly NatsServer _server; + private readonly int _port; + private readonly CancellationTokenSource _cts = new(); + + public ServerTests() + { + // Use random port + _port = GetFreePort(); + _server = new NatsServer(new NatsOptions { Port = _port }); + } + + public async ValueTask DisposeAsync() + { + await _cts.CancelAsync(); + _server.Dispose(); + } + + private static int GetFreePort() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + sock.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + return ((IPEndPoint)sock.LocalEndPoint!).Port; + } + + private async Task ConnectClientAsync() + { + var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(IPAddress.Loopback, _port); + return sock; + } + + private static async Task ReadLineAsync(Socket sock, int bufSize = 4096) + { + var buf = new byte[bufSize]; + var n = await sock.ReceiveAsync(buf, SocketFlags.None); + return Encoding.ASCII.GetString(buf, 0, n); + } + + [Fact] + public async Task Server_accepts_connection_and_sends_INFO() + { + var serverTask = _server.StartAsync(_cts.Token); + await Task.Delay(100); // let server start + + using var client = await ConnectClientAsync(); + var response = await ReadLineAsync(client); + + Assert.StartsWith("INFO ", response); + await _cts.CancelAsync(); + } + + [Fact] + public async Task Server_basic_pubsub() + { + var serverTask = _server.StartAsync(_cts.Token); + await Task.Delay(100); + + using var pub = await ConnectClientAsync(); + using var sub = await ConnectClientAsync(); + + // Read INFO from both + await ReadLineAsync(pub); + await ReadLineAsync(sub); + + // CONNECT + SUB on subscriber + await sub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB foo 1\r\n")); + await Task.Delay(50); + + // CONNECT + PUB on publisher + await pub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPUB foo 5\r\nHello\r\n")); + await Task.Delay(100); + + // Read MSG from subscriber + var buf = new byte[4096]; + var n = await sub.ReceiveAsync(buf, SocketFlags.None); + var msg = Encoding.ASCII.GetString(buf, 0, n); + + Assert.Contains("MSG foo 1 5\r\nHello\r\n", msg); + await _cts.CancelAsync(); + } + + [Fact] + public async Task Server_wildcard_matching() + { + var serverTask = _server.StartAsync(_cts.Token); + await Task.Delay(100); + + using var pub = await ConnectClientAsync(); + using var sub = await ConnectClientAsync(); + + await ReadLineAsync(pub); + await ReadLineAsync(sub); + + await sub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB foo.* 1\r\n")); + await Task.Delay(50); + + await pub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPUB foo.bar 5\r\nHello\r\n")); + await Task.Delay(100); + + var buf = new byte[4096]; + var n = await sub.ReceiveAsync(buf, SocketFlags.None); + var msg = Encoding.ASCII.GetString(buf, 0, n); + + Assert.Contains("MSG foo.bar 1 5\r\n", msg); + await _cts.CancelAsync(); + } +} +``` + +**Step 2: Run tests to verify they fail** + +Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~ServerTests" -v quiet` +Expected: FAIL — `NatsServer` doesn't exist. + +**Step 3: Implement NatsServer** + +```csharp +// src/NATS.Server/NatsServer.cs +using System.Collections.Concurrent; +using System.Net; +using System.Net.Sockets; +using NATS.Server.Protocol; +using NATS.Server.Subscriptions; + +namespace NATS.Server; + +public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable +{ + private readonly NatsOptions _options; + private readonly ConcurrentDictionary _clients = new(); + private readonly SubList _subList = new(); + private readonly ServerInfo _serverInfo; + private Socket? _listener; + private ulong _nextClientId; + + public SubList SubList => _subList; + + public NatsServer(NatsOptions options) + { + _options = options; + _serverInfo = new ServerInfo + { + ServerId = Guid.NewGuid().ToString("N")[..20].ToUpperInvariant(), + ServerName = options.ServerName ?? $"nats-dotnet-{Environment.MachineName}", + Version = NatsProtocol.Version, + Host = options.Host, + Port = options.Port, + MaxPayload = options.MaxPayload, + }; + } + + public async Task StartAsync(CancellationToken ct) + { + _listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + _listener.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); + _listener.Bind(new IPEndPoint( + _options.Host == "0.0.0.0" ? IPAddress.Any : IPAddress.Parse(_options.Host), + _options.Port)); + _listener.Listen(128); + + try + { + while (!ct.IsCancellationRequested) + { + var socket = await _listener.AcceptAsync(ct); + var clientId = Interlocked.Increment(ref _nextClientId); + + var client = new NatsClient(clientId, socket, _options, _serverInfo); + client.Router = this; + _clients[clientId] = client; + + _ = RunClientAsync(client, ct); + } + } + catch (OperationCanceledException) { } + } + + private async Task RunClientAsync(NatsClient client, CancellationToken ct) + { + try + { + await client.RunAsync(ct); + } + catch (Exception) + { + // Client disconnected or errored + } + finally + { + RemoveClient(client); + } + } + + public void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory headers, + ReadOnlyMemory payload, NatsClient sender) + { + var result = _subList.Match(subject); + + // Deliver to plain subscribers + foreach (var sub in result.PlainSubs) + { + if (sub.Client == null || sub.Client == sender && !(sender.ClientOpts?.Echo ?? true)) + continue; + + DeliverMessage(sub, subject, replyTo, headers, payload); + } + + // Deliver to one member of each queue group (round-robin) + foreach (var queueGroup in result.QueueSubs) + { + if (queueGroup.Length == 0) continue; + + // Simple round-robin — pick based on total delivered across group + var idx = Math.Abs((int)Interlocked.Increment(ref sender.OutMsgs)) % queueGroup.Length; + // Undo the OutMsgs increment — it'll be incremented properly in SendMessageAsync + Interlocked.Decrement(ref sender.OutMsgs); + + for (int attempt = 0; attempt < queueGroup.Length; attempt++) + { + var sub = queueGroup[(idx + attempt) % queueGroup.Length]; + if (sub.Client != null && (sub.Client != sender || (sender.ClientOpts?.Echo ?? true))) + { + DeliverMessage(sub, subject, replyTo, headers, payload); + break; + } + } + } + } + + private static void DeliverMessage(Subscription sub, string subject, string? replyTo, + ReadOnlyMemory headers, ReadOnlyMemory payload) + { + var client = sub.Client; + if (client == null) return; + + // Check auto-unsub + var count = Interlocked.Increment(ref sub.MessageCount); + if (sub.MaxMessages > 0 && count > sub.MaxMessages) + return; + + // Fire and forget — deliver asynchronously + _ = client.SendMessageAsync(subject, sub.Sid, replyTo, headers, payload, CancellationToken.None); + } + + public void RemoveClient(NatsClient client) + { + _clients.TryRemove(client.Id, out _); + client.RemoveAllSubscriptions(_subList); + } + + public void Dispose() + { + _listener?.Dispose(); + foreach (var client in _clients.Values) + client.Dispose(); + } +} +``` + +**Step 4: Run tests to verify they pass** + +Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~ServerTests" -v quiet` +Expected: All tests PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/NatsServer.cs tests/NATS.Server.Tests/ServerTests.cs +git commit -m "feat: implement NatsServer orchestrator with accept loop and message routing" +``` + +--- + +### Task 7: Host Console Application + +**Files:** +- Modify: `src/NATS.Server.Host/Program.cs` + +**Reference:** `golang/nats-server/main.go` + +**Step 1: Implement Program.cs** + +```csharp +// src/NATS.Server.Host/Program.cs +using NATS.Server; + +var options = new NatsOptions(); + +// Simple CLI argument parsing +for (int i = 0; i < args.Length; i++) +{ + switch (args[i]) + { + case "-p" or "--port" when i + 1 < args.Length: + options.Port = int.Parse(args[++i]); + break; + case "-a" or "--addr" when i + 1 < args.Length: + options.Host = args[++i]; + break; + case "-n" or "--name" when i + 1 < args.Length: + options.ServerName = args[++i]; + break; + } +} + +var server = new NatsServer(options); + +var cts = new CancellationTokenSource(); +Console.CancelKeyPress += (_, e) => +{ + e.Cancel = true; + cts.Cancel(); +}; + +Console.WriteLine($"[NATS] Listening on {options.Host}:{options.Port}"); + +try +{ + await server.StartAsync(cts.Token); +} +catch (OperationCanceledException) { } + +Console.WriteLine("[NATS] Server stopped."); +``` + +**Step 2: Verify it builds and runs** + +Run: `dotnet build src/NATS.Server.Host/NATS.Server.Host.csproj` +Expected: Build succeeded. + +Run (quick smoke): `timeout 2 dotnet run --project src/NATS.Server.Host -- -p 14222 || true` +Expected: Prints "Listening on 0.0.0.0:14222" then exits after timeout. + +**Step 3: Commit** + +```bash +git add src/NATS.Server.Host/Program.cs +git commit -m "feat: add NATS.Server.Host console app with basic CLI arguments" +``` + +--- + +### Task 8: Integration Tests with NATS.Client.Core + +**Files:** +- Modify: `tests/NATS.Server.Tests/NATS.Server.Tests.csproj` (add NuGet ref) +- Create: `tests/NATS.Server.Tests/IntegrationTests.cs` + +**Step 1: Add NATS client NuGet package** + +```bash +dotnet add tests/NATS.Server.Tests/NATS.Server.Tests.csproj package NATS.Client.Core +``` + +**Step 2: Write integration tests** + +```csharp +// tests/NATS.Server.Tests/IntegrationTests.cs +using System.Net; +using System.Net.Sockets; +using System.Text; +using NATS.Client.Core; +using NATS.Server; + +namespace NATS.Server.Tests; + +public class IntegrationTests : IAsyncDisposable +{ + private readonly NatsServer _server; + private readonly int _port; + private readonly CancellationTokenSource _cts = new(); + private readonly Task _serverTask; + + public IntegrationTests() + { + _port = GetFreePort(); + _server = new NatsServer(new NatsOptions { Port = _port }); + _serverTask = _server.StartAsync(_cts.Token); + Thread.Sleep(200); // Let server start + } + + public async ValueTask DisposeAsync() + { + await _cts.CancelAsync(); + _server.Dispose(); + } + + private static int GetFreePort() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + sock.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + return ((IPEndPoint)sock.LocalEndPoint!).Port; + } + + private NatsConnection CreateClient() + { + var opts = new NatsOpts { Url = $"nats://127.0.0.1:{_port}" }; + return new NatsConnection(opts); + } + + [Fact] + public async Task PubSub_basic() + { + await using var pub = CreateClient(); + await using var sub = CreateClient(); + + await pub.ConnectAsync(); + await sub.ConnectAsync(); + + var received = new TaskCompletionSource(); + + var subscription = Task.Run(async () => + { + await foreach (var msg in sub.SubscribeAsync("test.subject")) + { + received.TrySetResult(msg.Data!); + break; + } + }); + + await Task.Delay(100); // let subscription register + + await pub.PublishAsync("test.subject", "Hello NATS!"); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.Equal("Hello NATS!", result); + } + + [Fact] + public async Task PubSub_wildcard_star() + { + await using var pub = CreateClient(); + await using var sub = CreateClient(); + + await pub.ConnectAsync(); + await sub.ConnectAsync(); + + var received = new TaskCompletionSource(); + + var subscription = Task.Run(async () => + { + await foreach (var msg in sub.SubscribeAsync("test.*")) + { + received.TrySetResult(msg.Subject); + break; + } + }); + + await Task.Delay(100); + await pub.PublishAsync("test.hello", "data"); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.Equal("test.hello", result); + } + + [Fact] + public async Task PubSub_wildcard_gt() + { + await using var pub = CreateClient(); + await using var sub = CreateClient(); + + await pub.ConnectAsync(); + await sub.ConnectAsync(); + + var received = new TaskCompletionSource(); + + var subscription = Task.Run(async () => + { + await foreach (var msg in sub.SubscribeAsync("test.>")) + { + received.TrySetResult(msg.Subject); + break; + } + }); + + await Task.Delay(100); + await pub.PublishAsync("test.foo.bar.baz", "data"); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.Equal("test.foo.bar.baz", result); + } + + [Fact] + public async Task PubSub_fan_out() + { + await using var pub = CreateClient(); + await using var sub1 = CreateClient(); + await using var sub2 = CreateClient(); + + await pub.ConnectAsync(); + await sub1.ConnectAsync(); + await sub2.ConnectAsync(); + + var count1 = 0; + var count2 = 0; + var done = new TaskCompletionSource(); + + var s1 = Task.Run(async () => + { + await foreach (var msg in sub1.SubscribeAsync("fanout")) + { + Interlocked.Increment(ref count1); + if (Volatile.Read(ref count1) + Volatile.Read(ref count2) >= 2) + done.TrySetResult(); + break; + } + }); + + var s2 = Task.Run(async () => + { + await foreach (var msg in sub2.SubscribeAsync("fanout")) + { + Interlocked.Increment(ref count2); + if (Volatile.Read(ref count1) + Volatile.Read(ref count2) >= 2) + done.TrySetResult(); + break; + } + }); + + await Task.Delay(100); + await pub.PublishAsync("fanout", "hello"); + + await done.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.Equal(1, count1); + Assert.Equal(1, count2); + } + + [Fact] + public async Task PingPong() + { + await using var client = CreateClient(); + await client.ConnectAsync(); + + // If we got here, the connection is alive and PING/PONG works + await client.PingAsync(); + } +} +``` + +**Step 3: Run tests** + +Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~IntegrationTests" -v normal` +Expected: All tests PASS (may need debugging iterations). + +**Step 4: Commit** + +```bash +git add tests/NATS.Server.Tests/ +git commit -m "feat: add integration tests using NATS.Client.Core NuGet package" +``` + +--- + +### Task 9: Final Validation and Cleanup + +**Files:** +- Modify: `CLAUDE.md` (update build commands now that project exists) + +**Step 1: Run full test suite** + +Run: `dotnet test NatsDotNet.sln -v normal` +Expected: All tests PASS. + +**Step 2: Verify host app runs and accepts nats CLI connections** + +```bash +# Terminal 1: Start server +dotnet run --project src/NATS.Server.Host -- -p 14222 & + +# Terminal 2: Test with nats CLI (if available) +nats sub test -s nats://127.0.0.1:14222 & +nats pub test "Hello from nats CLI" -s nats://127.0.0.1:14222 +``` + +**Step 3: Update CLAUDE.md with actual verified commands** + +Update the Build & Test Commands section to match the real project structure. + +**Step 4: Commit** + +```bash +git add CLAUDE.md +git commit -m "docs: update CLAUDE.md with verified build and test commands" +``` diff --git a/docs/plans/2026-02-22-base-server-plan.md.tasks.json b/docs/plans/2026-02-22-base-server-plan.md.tasks.json new file mode 100644 index 0000000..7d5eae8 --- /dev/null +++ b/docs/plans/2026-02-22-base-server-plan.md.tasks.json @@ -0,0 +1,16 @@ +{ + "planPath": "docs/plans/2026-02-22-base-server-plan.md", + "tasks": [ + {"id": 7, "subject": "Task 0: Project Scaffolding", "status": "pending"}, + {"id": 8, "subject": "Task 1: Subscription Types and Subject Validation", "status": "pending", "blockedBy": [7]}, + {"id": 9, "subject": "Task 2: SubList Trie — Insert, Remove, Match", "status": "pending", "blockedBy": [8]}, + {"id": 10, "subject": "Task 3: Protocol Constants and Types", "status": "pending", "blockedBy": [7]}, + {"id": 11, "subject": "Task 4: Protocol Parser", "status": "pending", "blockedBy": [10]}, + {"id": 12, "subject": "Task 5: NatsClient — Connection Handler", "status": "pending", "blockedBy": [9, 11]}, + {"id": 13, "subject": "Task 6: NatsServer — Orchestrator", "status": "pending", "blockedBy": [12]}, + {"id": 14, "subject": "Task 7: Host Console Application", "status": "pending", "blockedBy": [13]}, + {"id": 15, "subject": "Task 8: Integration Tests with NATS.Client.Core", "status": "pending", "blockedBy": [13]}, + {"id": 16, "subject": "Task 9: Final Validation and Cleanup", "status": "pending", "blockedBy": [14, 15]} + ], + "lastUpdated": "2026-02-22T18:00:00Z" +}