# Base NATS Server Port — Implementation Plan > **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans to implement this plan task-by-task. **Goal:** Port the core NATS pub/sub server from Go to .NET 10 C# — supporting CONNECT/INFO handshake, PUB/SUB/UNSUB, wildcards (`*`, `>`), queue groups, PING/PONG, and HPUB/HMSG headers. **Architecture:** Bottom-up build: subject matching trie (SubList) → protocol parser (System.IO.Pipelines) → client connection handler → server orchestrator. Each layer is independently testable. The library (NATS.Server) is separate from the host console app (NATS.Server.Host). **Tech Stack:** .NET 10, C# 14, xUnit, System.IO.Pipelines, System.Text.Json --- ### Task 0: Project Scaffolding **Files:** - Create: `NatsDotNet.sln` - Create: `src/NATS.Server/NATS.Server.csproj` - Create: `src/NATS.Server.Host/NATS.Server.Host.csproj` - Create: `tests/NATS.Server.Tests/NATS.Server.Tests.csproj` - Create: `Directory.Build.props` **Step 1: Create the solution and projects** ```bash cd /Users/dohertj2/Desktop/natsdotnet # Create solution dotnet new sln -n NatsDotNet # Create class library dotnet new classlib -n NATS.Server -o src/NATS.Server -f net10.0 rm src/NATS.Server/Class1.cs # Create console app dotnet new console -n NATS.Server.Host -o src/NATS.Server.Host -f net10.0 # Create test project dotnet new xunit -n NATS.Server.Tests -o tests/NATS.Server.Tests -f net10.0 # Add projects to solution dotnet sln add src/NATS.Server/NATS.Server.csproj dotnet sln add src/NATS.Server.Host/NATS.Server.Host.csproj dotnet sln add tests/NATS.Server.Tests/NATS.Server.Tests.csproj # Add project references dotnet add src/NATS.Server.Host/NATS.Server.Host.csproj reference src/NATS.Server/NATS.Server.csproj dotnet add tests/NATS.Server.Tests/NATS.Server.Tests.csproj reference src/NATS.Server/NATS.Server.csproj ``` **Step 2: Create Directory.Build.props** ```xml net10.0 preview enable enable true ``` Remove the `` and `` and `` from each individual `.csproj` since they're now in `Directory.Build.props`. **Step 3: Create directory structure** ```bash mkdir -p src/NATS.Server/Protocol mkdir -p src/NATS.Server/Subscriptions ``` **Step 4: Verify build** Run: `dotnet build NatsDotNet.sln` Expected: Build succeeded with 0 errors. **Step 5: Commit** ```bash git add -A git commit -m "feat: scaffold solution with NATS.Server library, host, and test projects" ``` --- ### Task 1: Subscription Types and Subject Validation **Files:** - Create: `src/NATS.Server/Subscriptions/Subscription.cs` - Create: `src/NATS.Server/Subscriptions/SubListResult.cs` - Create: `src/NATS.Server/Subscriptions/SubjectMatch.cs` - Test: `tests/NATS.Server.Tests/SubjectMatchTests.cs` **Reference:** `golang/nats-server/server/sublist.go` lines 1159-1233 for subject validation **Step 1: Write failing tests for subject validation** ```csharp // tests/NATS.Server.Tests/SubjectMatchTests.cs using NATS.Server.Subscriptions; namespace NATS.Server.Tests; public class SubjectMatchTests { [Theory] [InlineData("foo", true)] [InlineData("foo.bar", true)] [InlineData("foo.bar.baz", true)] [InlineData("foo.*", true)] [InlineData("foo.>", true)] [InlineData(">", true)] [InlineData("*", true)] [InlineData("*.bar", true)] [InlineData("foo.*.baz", true)] [InlineData("", false)] [InlineData("foo.", false)] [InlineData(".foo", false)] [InlineData("foo..bar", false)] [InlineData("foo.>.bar", false)] // > must be last token [InlineData("foo bar", false)] // no spaces [InlineData("foo\tbar", false)] // no tabs public void IsValidSubject(string subject, bool expected) { Assert.Equal(expected, SubjectMatch.IsValidSubject(subject)); } [Theory] [InlineData("foo", true)] [InlineData("foo.bar.baz", true)] [InlineData("foo.*", false)] [InlineData("foo.>", false)] [InlineData(">", false)] [InlineData("*", false)] public void IsValidPublishSubject(string subject, bool expected) { Assert.Equal(expected, SubjectMatch.IsValidPublishSubject(subject)); } [Theory] [InlineData("foo", "foo", true)] [InlineData("foo", "bar", false)] [InlineData("foo.bar", "foo.*", true)] [InlineData("foo.bar", "*.bar", true)] [InlineData("foo.bar", "*.*", true)] [InlineData("foo.bar.baz", "foo.>", true)] [InlineData("foo.bar.baz", ">", true)] [InlineData("foo.bar", "foo.>", true)] [InlineData("foo", "foo.>", false)] [InlineData("foo.bar.baz", "foo.*", false)] [InlineData("foo.bar", "foo.bar.>", false)] public void MatchLiteral(string literal, string pattern, bool expected) { Assert.Equal(expected, SubjectMatch.MatchLiteral(literal, pattern)); } } ``` **Step 2: Run tests to verify they fail** Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~SubjectMatchTests" -v quiet` Expected: FAIL — types don't exist yet. **Step 3: Implement Subscription, SubListResult, and SubjectMatch** ```csharp // src/NATS.Server/Subscriptions/Subscription.cs namespace NATS.Server.Subscriptions; public sealed class Subscription { public required string Subject { get; init; } public string? Queue { get; init; } public required string Sid { get; init; } public long MessageCount; // Interlocked public long MaxMessages; // 0 = unlimited } ``` ```csharp // src/NATS.Server/Subscriptions/SubListResult.cs namespace NATS.Server.Subscriptions; public sealed class SubListResult { public static readonly SubListResult Empty = new([], []); public Subscription[] PlainSubs { get; } public Subscription[][] QueueSubs { get; } // outer = groups, inner = members public SubListResult(Subscription[] plainSubs, Subscription[][] queueSubs) { PlainSubs = plainSubs; QueueSubs = queueSubs; } } ``` ```csharp // src/NATS.Server/Subscriptions/SubjectMatch.cs namespace NATS.Server.Subscriptions; public static class SubjectMatch { public const char Pwc = '*'; // partial wildcard public const char Fwc = '>'; // full wildcard public const char Sep = '.'; // token separator public static bool IsValidSubject(string subject) { if (string.IsNullOrEmpty(subject)) return false; bool sawFwc = false; int start = 0; for (int i = 0; i <= subject.Length; i++) { if (i == subject.Length || subject[i] == Sep) { int tokenLen = i - start; if (tokenLen == 0 || sawFwc) return false; if (tokenLen == 1) { char c = subject[start]; if (c == Fwc) sawFwc = true; else if (c is ' ' or '\t' or '\n' or '\r' or '\f') return false; } else { for (int j = start; j < i; j++) { char c = subject[j]; if (c is ' ' or '\t' or '\n' or '\r' or '\f') return false; } } start = i + 1; } } return true; } public static bool IsLiteral(string subject) { for (int i = 0; i < subject.Length; i++) { char c = subject[i]; if (c is Pwc or Fwc) { bool atStart = i == 0 || subject[i - 1] == Sep; bool atEnd = i + 1 == subject.Length || subject[i + 1] == Sep; if (atStart && atEnd) return false; } } return true; } public static bool IsValidPublishSubject(string subject) { return IsValidSubject(subject) && IsLiteral(subject); } /// /// Match a literal subject against a pattern that may contain wildcards. /// public static bool MatchLiteral(string literal, string pattern) { int li = 0, pi = 0; while (pi < pattern.Length) { // Get next pattern token int pTokenStart = pi; while (pi < pattern.Length && pattern[pi] != Sep) pi++; int pTokenLen = pi - pTokenStart; if (pi < pattern.Length) pi++; // skip separator // Full wildcard — matches everything remaining if (pTokenLen == 1 && pattern[pTokenStart] == Fwc) return li < literal.Length; // must have at least one token left // Get next literal token if (li >= literal.Length) return false; int lTokenStart = li; while (li < literal.Length && literal[li] != Sep) li++; int lTokenLen = li - lTokenStart; if (li < literal.Length) li++; // skip separator // Partial wildcard — matches any single token if (pTokenLen == 1 && pattern[pTokenStart] == Pwc) continue; // Literal comparison if (pTokenLen != lTokenLen) return false; if (string.Compare(literal, lTokenStart, pattern, pTokenStart, pTokenLen, StringComparison.Ordinal) != 0) return false; } return li >= literal.Length; // both exhausted } } ``` **Step 4: Run tests to verify they pass** Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~SubjectMatchTests" -v quiet` Expected: All tests PASS. **Step 5: Commit** ```bash git add src/NATS.Server/Subscriptions/ tests/NATS.Server.Tests/SubjectMatchTests.cs git commit -m "feat: add Subscription types and subject validation with wildcard matching" ``` --- ### Task 2: SubList Trie — Insert, Remove, Match **Files:** - Create: `src/NATS.Server/Subscriptions/SubList.cs` - Test: `tests/NATS.Server.Tests/SubListTests.cs` **Reference:** `golang/nats-server/server/sublist.go` — the core trie, matchLevel(), addNodeToResults(), cache logic **Step 1: Write failing tests for SubList** ```csharp // tests/NATS.Server.Tests/SubListTests.cs using NATS.Server.Subscriptions; namespace NATS.Server.Tests; public class SubListTests { private static Subscription MakeSub(string subject, string? queue = null, string sid = "1") => new() { Subject = subject, Queue = queue, Sid = sid }; [Fact] public void Insert_and_match_literal_subject() { var sl = new SubList(); var sub = MakeSub("foo.bar"); sl.Insert(sub); var r = sl.Match("foo.bar"); Assert.Single(r.PlainSubs); Assert.Same(sub, r.PlainSubs[0]); Assert.Empty(r.QueueSubs); } [Fact] public void Match_returns_empty_for_no_match() { var sl = new SubList(); sl.Insert(MakeSub("foo.bar")); var r = sl.Match("foo.baz"); Assert.Empty(r.PlainSubs); } [Fact] public void Match_partial_wildcard() { var sl = new SubList(); var sub = MakeSub("foo.*"); sl.Insert(sub); Assert.Single(sl.Match("foo.bar").PlainSubs); Assert.Single(sl.Match("foo.baz").PlainSubs); Assert.Empty(sl.Match("foo.bar.baz").PlainSubs); } [Fact] public void Match_full_wildcard() { var sl = new SubList(); var sub = MakeSub("foo.>"); sl.Insert(sub); Assert.Single(sl.Match("foo.bar").PlainSubs); Assert.Single(sl.Match("foo.bar.baz").PlainSubs); Assert.Empty(sl.Match("foo").PlainSubs); } [Fact] public void Match_root_full_wildcard() { var sl = new SubList(); sl.Insert(MakeSub(">")); Assert.Single(sl.Match("foo").PlainSubs); Assert.Single(sl.Match("foo.bar").PlainSubs); Assert.Single(sl.Match("foo.bar.baz").PlainSubs); } [Fact] public void Match_collects_multiple_subs() { var sl = new SubList(); sl.Insert(MakeSub("foo.bar", sid: "1")); sl.Insert(MakeSub("foo.*", sid: "2")); sl.Insert(MakeSub("foo.>", sid: "3")); sl.Insert(MakeSub(">", sid: "4")); var r = sl.Match("foo.bar"); Assert.Equal(4, r.PlainSubs.Length); } [Fact] public void Remove_subscription() { var sl = new SubList(); var sub = MakeSub("foo.bar"); sl.Insert(sub); Assert.Single(sl.Match("foo.bar").PlainSubs); sl.Remove(sub); Assert.Empty(sl.Match("foo.bar").PlainSubs); } [Fact] public void Queue_group_subscriptions() { var sl = new SubList(); sl.Insert(MakeSub("foo.bar", queue: "workers", sid: "1")); sl.Insert(MakeSub("foo.bar", queue: "workers", sid: "2")); sl.Insert(MakeSub("foo.bar", queue: "loggers", sid: "3")); var r = sl.Match("foo.bar"); Assert.Empty(r.PlainSubs); Assert.Equal(2, r.QueueSubs.Length); // 2 queue groups } [Fact] public void Count_tracks_subscriptions() { var sl = new SubList(); Assert.Equal(0u, sl.Count); sl.Insert(MakeSub("foo", sid: "1")); sl.Insert(MakeSub("bar", sid: "2")); Assert.Equal(2u, sl.Count); sl.Remove(MakeSub("foo", sid: "1")); // Remove by reference won't work — we need the same instance } [Fact] public void Count_tracks_with_same_instance() { var sl = new SubList(); var sub = MakeSub("foo"); sl.Insert(sub); Assert.Equal(1u, sl.Count); sl.Remove(sub); Assert.Equal(0u, sl.Count); } [Fact] public void Cache_invalidation_on_insert() { var sl = new SubList(); sl.Insert(MakeSub("foo.bar", sid: "1")); // Prime the cache var r1 = sl.Match("foo.bar"); Assert.Single(r1.PlainSubs); // Insert a wildcard that matches — cache should be invalidated sl.Insert(MakeSub("foo.*", sid: "2")); var r2 = sl.Match("foo.bar"); Assert.Equal(2, r2.PlainSubs.Length); } [Fact] public void Match_partial_wildcard_at_different_levels() { var sl = new SubList(); sl.Insert(MakeSub("*.bar.baz", sid: "1")); sl.Insert(MakeSub("foo.*.baz", sid: "2")); sl.Insert(MakeSub("foo.bar.*", sid: "3")); var r = sl.Match("foo.bar.baz"); Assert.Equal(3, r.PlainSubs.Length); } } ``` **Step 2: Run tests to verify they fail** Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~SubListTests" -v quiet` Expected: FAIL — `SubList` class doesn't exist yet. **Step 3: Implement SubList** ```csharp // src/NATS.Server/Subscriptions/SubList.cs namespace NATS.Server.Subscriptions; public sealed class SubList { private const int CacheMax = 1024; private const int CacheSweep = 256; private readonly ReaderWriterLockSlim _lock = new(); private readonly TrieLevel _root = new(); private Dictionary? _cache = new(); private uint _count; private ulong _genId; public uint Count { get { _lock.EnterReadLock(); try { return _count; } finally { _lock.ExitReadLock(); } } } public void Insert(Subscription sub) { var subject = sub.Subject; _lock.EnterWriteLock(); try { var level = _root; TrieNode? node = null; bool sawFwc = false; foreach (var token in new TokenEnumerator(subject)) { if (token.Length == 0 || sawFwc) throw new ArgumentException("Invalid subject", nameof(sub)); if (token.Length == 1 && token[0] == SubjectMatch.Pwc) { node = level.Pwc ??= new TrieNode(); } else if (token.Length == 1 && token[0] == SubjectMatch.Fwc) { node = level.Fwc ??= new TrieNode(); sawFwc = true; } else { var key = token.ToString(); if (!level.Nodes.TryGetValue(key, out node)) { node = new TrieNode(); level.Nodes[key] = node; } } node.Next ??= new TrieLevel(); level = node.Next; } if (node == null) throw new ArgumentException("Invalid subject", nameof(sub)); if (sub.Queue == null) { node.PlainSubs.Add(sub); } else { if (!node.QueueSubs.TryGetValue(sub.Queue, out var qset)) { qset = []; node.QueueSubs[sub.Queue] = qset; } qset.Add(sub); } _count++; _genId++; _cache?.Clear(); } finally { _lock.ExitWriteLock(); } } public void Remove(Subscription sub) { _lock.EnterWriteLock(); try { var level = _root; TrieNode? node = null; bool sawFwc = false; // Stack for pruning empty nodes Span<(TrieLevel level, TrieNode node, string token, bool isPwc, bool isFwc)> path = stackalloc (TrieLevel, TrieNode, string, bool, bool)[32]; // Can't stackalloc tuples with references — use a list instead var pathList = new List<(TrieLevel level, TrieNode node, string token, bool isPwc, bool isFwc)>(); foreach (var token in new TokenEnumerator(sub.Subject)) { if (token.Length == 0 || sawFwc) return; bool isPwc = token.Length == 1 && token[0] == SubjectMatch.Pwc; bool isFwc = token.Length == 1 && token[0] == SubjectMatch.Fwc; if (isPwc) { node = level.Pwc; } else if (isFwc) { node = level.Fwc; sawFwc = true; } else { level.Nodes.TryGetValue(token.ToString(), out node); } if (node == null) return; // not found var tokenStr = token.ToString(); pathList.Add((level, node, tokenStr, isPwc, isFwc)); level = node.Next ?? new TrieLevel(); // shouldn't be null on valid path } if (node == null) return; // Remove from node bool removed; if (sub.Queue == null) { removed = node.PlainSubs.Remove(sub); } else { removed = false; if (node.QueueSubs.TryGetValue(sub.Queue, out var qset)) { removed = qset.Remove(sub); if (qset.Count == 0) node.QueueSubs.Remove(sub.Queue); } } if (!removed) return; _count--; _genId++; _cache?.Clear(); // Prune empty nodes (walk backwards) for (int i = pathList.Count - 1; i >= 0; i--) { var (l, n, t, isPwc, isFwc) = pathList[i]; if (n.IsEmpty) { if (isPwc) l.Pwc = null; else if (isFwc) l.Fwc = null; else l.Nodes.Remove(t); } } } finally { _lock.ExitWriteLock(); } } public SubListResult Match(string subject) { _lock.EnterReadLock(); bool upgraded = false; try { // Check cache if (_cache != null && _cache.TryGetValue(subject, out var cached)) return cached; // Cache miss — collect results var plainSubs = new List(); var queueSubs = new List>(); MatchLevel(_root, subject, 0, plainSubs, queueSubs); SubListResult result; if (plainSubs.Count == 0 && queueSubs.Count == 0) { result = SubListResult.Empty; } else { result = new SubListResult( plainSubs.ToArray(), queueSubs.Select(q => q.ToArray()).ToArray() ); } // Upgrade to write lock for cache update if (_cache != null) { _lock.ExitReadLock(); _lock.EnterWriteLock(); upgraded = true; // Re-check cache after lock upgrade if (_cache.TryGetValue(subject, out cached)) return cached; _cache[subject] = result; if (_cache.Count > CacheMax) { // Sweep: remove entries until at CacheSweep var keys = _cache.Keys.Take(_cache.Count - CacheSweep).ToList(); foreach (var key in keys) _cache.Remove(key); } } return result; } finally { if (upgraded) _lock.ExitWriteLock(); else _lock.ExitReadLock(); } } private static void MatchLevel(TrieLevel level, string subject, int startIndex, List plainSubs, List> queueSubs) { int i = startIndex; while (i <= subject.Length) { // Find next token boundary int tokenStart = i; while (i < subject.Length && subject[i] != SubjectMatch.Sep) i++; var token = subject.AsSpan(tokenStart, i - tokenStart); bool isLast = i >= subject.Length; if (!isLast) i++; // skip separator // Full wildcard (>) at this level matches all remaining tokens if (level.Fwc != null) AddNodeToResults(level.Fwc, plainSubs, queueSubs); // Partial wildcard (*) — recurse with remaining tokens if (level.Pwc != null) { if (isLast) { AddNodeToResults(level.Pwc, plainSubs, queueSubs); } else if (level.Pwc.Next != null) { MatchLevel(level.Pwc.Next, subject, i, plainSubs, queueSubs); } } // Literal match if (level.Nodes.TryGetValue(token.ToString(), out var node)) { if (isLast) { AddNodeToResults(node, plainSubs, queueSubs); } else if (node.Next != null) { level = node.Next; } else { return; // no further levels } } else { return; // no literal match } if (isLast) return; } } private static void AddNodeToResults(TrieNode node, List plainSubs, List> queueSubs) { // Add plain subscriptions foreach (var sub in node.PlainSubs) plainSubs.Add(sub); // Add queue subscriptions grouped by queue name foreach (var (queueName, subs) in node.QueueSubs) { if (subs.Count == 0) continue; // Find existing queue group or create new one List? existing = null; foreach (var qs in queueSubs) { if (qs.Count > 0 && qs[0].Queue == queueName) { existing = qs; break; } } if (existing == null) { existing = new List(); queueSubs.Add(existing); } existing.AddRange(subs); } } /// Enumerates '.' separated tokens in a subject without allocating. private ref struct TokenEnumerator { private ReadOnlySpan _remaining; public TokenEnumerator(string subject) { _remaining = subject.AsSpan(); Current = default; } public ReadOnlySpan Current { get; private set; } public TokenEnumerator GetEnumerator() => this; public bool MoveNext() { if (_remaining.IsEmpty) return false; int sep = _remaining.IndexOf(SubjectMatch.Sep); if (sep < 0) { Current = _remaining; _remaining = default; } else { Current = _remaining[..sep]; _remaining = _remaining[(sep + 1)..]; } return true; } } private sealed class TrieLevel { public readonly Dictionary Nodes = new(); public TrieNode? Pwc; // partial wildcard (*) public TrieNode? Fwc; // full wildcard (>) } private sealed class TrieNode { public TrieLevel? Next; public readonly HashSet PlainSubs = []; public readonly Dictionary> QueueSubs = new(); public bool IsEmpty => PlainSubs.Count == 0 && QueueSubs.Count == 0 && (Next == null || (Next.Nodes.Count == 0 && Next.Pwc == null && Next.Fwc == null)); } } ``` **Step 4: Run tests to verify they pass** Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~SubListTests" -v quiet` Expected: All tests PASS. **Step 5: Commit** ```bash git add src/NATS.Server/Subscriptions/SubList.cs tests/NATS.Server.Tests/SubListTests.cs git commit -m "feat: implement SubList trie with wildcard matching and cache" ``` --- ### Task 3: Protocol Constants and Types **Files:** - Create: `src/NATS.Server/Protocol/NatsProtocol.cs` - Create: `src/NATS.Server/NatsOptions.cs` **Reference:** `golang/nats-server/server/const.go`, `golang/nats-server/server/server.go` lines 109-166 (Info struct), `golang/nats-server/server/client.go` lines 661-690 (ClientOpts) **Step 1: Create protocol constants and DTO types** ```csharp // src/NATS.Server/Protocol/NatsProtocol.cs using System.Text; using System.Text.Json.Serialization; namespace NATS.Server.Protocol; public static class NatsProtocol { public const int MaxControlLineSize = 4096; public const int MaxPayloadSize = 1024 * 1024; // 1MB public const int DefaultPort = 4222; public const string Version = "0.1.0"; public const int ProtoVersion = 1; // Pre-encoded protocol fragments public static readonly byte[] CrLf = "\r\n"u8.ToArray(); public static readonly byte[] PingBytes = "PING\r\n"u8.ToArray(); public static readonly byte[] PongBytes = "PONG\r\n"u8.ToArray(); public static readonly byte[] OkBytes = "+OK\r\n"u8.ToArray(); public static readonly byte[] InfoPrefix = "INFO "u8.ToArray(); public static readonly byte[] MsgPrefix = "MSG "u8.ToArray(); public static readonly byte[] HmsgPrefix = "HMSG "u8.ToArray(); public static readonly byte[] ErrPrefix = "-ERR "u8.ToArray(); } public sealed class ServerInfo { [JsonPropertyName("server_id")] public required string ServerId { get; set; } [JsonPropertyName("server_name")] public required string ServerName { get; set; } [JsonPropertyName("version")] public required string Version { get; set; } [JsonPropertyName("proto")] public int Proto { get; set; } = NatsProtocol.ProtoVersion; [JsonPropertyName("host")] public required string Host { get; set; } [JsonPropertyName("port")] public int Port { get; set; } [JsonPropertyName("headers")] public bool Headers { get; set; } = true; [JsonPropertyName("max_payload")] public int MaxPayload { get; set; } = NatsProtocol.MaxPayloadSize; [JsonPropertyName("client_id")] [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] public ulong ClientId { get; set; } [JsonPropertyName("client_ip")] [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] public string? ClientIp { get; set; } } public sealed class ClientOptions { [JsonPropertyName("verbose")] public bool Verbose { get; set; } [JsonPropertyName("pedantic")] public bool Pedantic { get; set; } [JsonPropertyName("echo")] public bool Echo { get; set; } = true; [JsonPropertyName("name")] public string? Name { get; set; } [JsonPropertyName("lang")] public string? Lang { get; set; } [JsonPropertyName("version")] public string? Version { get; set; } [JsonPropertyName("protocol")] public int Protocol { get; set; } [JsonPropertyName("headers")] public bool Headers { get; set; } [JsonPropertyName("no_responders")] public bool NoResponders { get; set; } } ``` ```csharp // src/NATS.Server/NatsOptions.cs namespace NATS.Server; public sealed class NatsOptions { public string Host { get; set; } = "0.0.0.0"; public int Port { get; set; } = 4222; public string? ServerName { get; set; } public int MaxPayload { get; set; } = 1024 * 1024; // 1MB public int MaxControlLine { get; set; } = 4096; public int MaxConnections { get; set; } = 65536; public TimeSpan PingInterval { get; set; } = TimeSpan.FromMinutes(2); public int MaxPingsOut { get; set; } = 2; } ``` **Step 2: Verify build** Run: `dotnet build NatsDotNet.sln` Expected: Build succeeded. **Step 3: Commit** ```bash git add src/NATS.Server/Protocol/NatsProtocol.cs src/NATS.Server/NatsOptions.cs git commit -m "feat: add protocol constants, ServerInfo, ClientOptions, and NatsOptions" ``` --- ### Task 4: Protocol Parser **Files:** - Create: `src/NATS.Server/Protocol/NatsParser.cs` - Test: `tests/NATS.Server.Tests/ParserTests.cs` **Reference:** `golang/nats-server/server/parser.go` — state machine, argument parsing, payload reading **Step 1: Write failing parser tests** ```csharp // tests/NATS.Server.Tests/ParserTests.cs using System.Buffers; using System.IO.Pipelines; using System.Text; using NATS.Server.Protocol; namespace NATS.Server.Tests; public class ParserTests { private static async Task> ParseAsync(string input) { var pipe = new Pipe(); var commands = new List(); // Write input to pipe var bytes = Encoding.ASCII.GetBytes(input); await pipe.Writer.WriteAsync(bytes); pipe.Writer.Complete(); // Parse from pipe var parser = new NatsParser(maxPayload: NatsProtocol.MaxPayloadSize); while (true) { var result = await pipe.Reader.ReadAsync(); var buffer = result.Buffer; while (parser.TryParse(ref buffer, out var cmd)) commands.Add(cmd); pipe.Reader.AdvanceTo(buffer.Start, buffer.End); if (result.IsCompleted) break; } return commands; } [Fact] public async Task Parse_PING() { var cmds = await ParseAsync("PING\r\n"); Assert.Single(cmds); Assert.Equal(CommandType.Ping, cmds[0].Type); } [Fact] public async Task Parse_PONG() { var cmds = await ParseAsync("PONG\r\n"); Assert.Single(cmds); Assert.Equal(CommandType.Pong, cmds[0].Type); } [Fact] public async Task Parse_CONNECT() { var cmds = await ParseAsync("CONNECT {\"verbose\":false,\"echo\":true}\r\n"); Assert.Single(cmds); Assert.Equal(CommandType.Connect, cmds[0].Type); Assert.Contains("verbose", Encoding.ASCII.GetString(cmds[0].Payload.ToArray())); } [Fact] public async Task Parse_SUB_without_queue() { var cmds = await ParseAsync("SUB foo 1\r\n"); Assert.Single(cmds); Assert.Equal(CommandType.Sub, cmds[0].Type); Assert.Equal("foo", cmds[0].Subject); Assert.Null(cmds[0].Queue); Assert.Equal("1", cmds[0].Sid); } [Fact] public async Task Parse_SUB_with_queue() { var cmds = await ParseAsync("SUB foo workers 1\r\n"); Assert.Single(cmds); Assert.Equal(CommandType.Sub, cmds[0].Type); Assert.Equal("foo", cmds[0].Subject); Assert.Equal("workers", cmds[0].Queue); Assert.Equal("1", cmds[0].Sid); } [Fact] public async Task Parse_UNSUB() { var cmds = await ParseAsync("UNSUB 1\r\n"); Assert.Single(cmds); Assert.Equal(CommandType.Unsub, cmds[0].Type); Assert.Equal("1", cmds[0].Sid); Assert.Equal(-1, cmds[0].MaxMessages); } [Fact] public async Task Parse_UNSUB_with_max() { var cmds = await ParseAsync("UNSUB 1 5\r\n"); Assert.Single(cmds); Assert.Equal(CommandType.Unsub, cmds[0].Type); Assert.Equal("1", cmds[0].Sid); Assert.Equal(5, cmds[0].MaxMessages); } [Fact] public async Task Parse_PUB_with_payload() { var cmds = await ParseAsync("PUB foo 5\r\nHello\r\n"); Assert.Single(cmds); Assert.Equal(CommandType.Pub, cmds[0].Type); Assert.Equal("foo", cmds[0].Subject); Assert.Null(cmds[0].ReplyTo); Assert.Equal("Hello", Encoding.ASCII.GetString(cmds[0].Payload.ToArray())); } [Fact] public async Task Parse_PUB_with_reply() { var cmds = await ParseAsync("PUB foo reply 5\r\nHello\r\n"); Assert.Single(cmds); Assert.Equal(CommandType.Pub, cmds[0].Type); Assert.Equal("foo", cmds[0].Subject); Assert.Equal("reply", cmds[0].ReplyTo); Assert.Equal("Hello", Encoding.ASCII.GetString(cmds[0].Payload.ToArray())); } [Fact] public async Task Parse_multiple_commands() { var cmds = await ParseAsync("PING\r\nPONG\r\nSUB foo 1\r\n"); Assert.Equal(3, cmds.Count); Assert.Equal(CommandType.Ping, cmds[0].Type); Assert.Equal(CommandType.Pong, cmds[1].Type); Assert.Equal(CommandType.Sub, cmds[2].Type); } [Fact] public async Task Parse_PUB_zero_payload() { var cmds = await ParseAsync("PUB foo 0\r\n\r\n"); Assert.Single(cmds); Assert.Equal(CommandType.Pub, cmds[0].Type); Assert.Empty(cmds[0].Payload.ToArray()); } [Fact] public async Task Parse_case_insensitive() { var cmds = await ParseAsync("ping\r\npub FOO 3\r\nabc\r\n"); Assert.Equal(2, cmds.Count); Assert.Equal(CommandType.Ping, cmds[0].Type); Assert.Equal(CommandType.Pub, cmds[1].Type); } [Fact] public async Task Parse_HPUB() { // HPUB subject 12 17\r\nNATS/1.0\r\n\r\nHello\r\n var header = "NATS/1.0\r\n\r\n"; var payload = "Hello"; var total = header.Length + payload.Length; var cmds = await ParseAsync($"HPUB foo {header.Length} {total}\r\n{header}{payload}\r\n"); Assert.Single(cmds); Assert.Equal(CommandType.HPub, cmds[0].Type); Assert.Equal("foo", cmds[0].Subject); Assert.Equal(header.Length, cmds[0].HeaderSize); } [Fact] public async Task Parse_INFO() { var cmds = await ParseAsync("INFO {\"server_id\":\"test\"}\r\n"); Assert.Single(cmds); Assert.Equal(CommandType.Info, cmds[0].Type); } } ``` **Step 2: Run tests to verify they fail** Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~ParserTests" -v quiet` Expected: FAIL — `NatsParser` doesn't exist. **Step 3: Implement NatsParser** ```csharp // src/NATS.Server/Protocol/NatsParser.cs using System.Buffers; using System.Text; namespace NATS.Server.Protocol; public enum CommandType { Ping, Pong, Connect, Info, Pub, HPub, Sub, Unsub, Ok, Err, } public readonly struct ParsedCommand { public CommandType Type { get; init; } public string? Subject { get; init; } public string? ReplyTo { get; init; } public string? Queue { get; init; } public string? Sid { get; init; } public int MaxMessages { get; init; } public int HeaderSize { get; init; } public ReadOnlyMemory Payload { get; init; } public static ParsedCommand Simple(CommandType type) => new() { Type = type, MaxMessages = -1 }; } public sealed class NatsParser { private static readonly byte[] CrLf = "\r\n"u8.ToArray(); private readonly int _maxPayload; // State for split-packet payload reading private bool _awaitingPayload; private int _expectedPayloadSize; private string? _pendingSubject; private string? _pendingReplyTo; private int _pendingHeaderSize; private CommandType _pendingType; public NatsParser(int maxPayload = NatsProtocol.MaxPayloadSize) { _maxPayload = maxPayload; } public bool TryParse(ref ReadOnlySequence buffer, out ParsedCommand command) { command = default; if (_awaitingPayload) return TryReadPayload(ref buffer, out command); // Look for \r\n to find control line var reader = new SequenceReader(buffer); if (!reader.TryReadTo(out ReadOnlySequence line, CrLf.AsSpan())) return false; // Control line size check if (line.Length > NatsProtocol.MaxControlLineSize) throw new ProtocolViolationException("Maximum control line exceeded"); // Get line as contiguous span Span lineSpan = stackalloc byte[(int)line.Length]; line.CopyTo(lineSpan); // Identify command by first bytes if (lineSpan.Length < 2) { if (lineSpan.Length == 1 && lineSpan[0] is (byte)'+') { // partial — need more data return false; } throw new ProtocolViolationException("Unknown protocol operation"); } byte b0 = (byte)(lineSpan[0] | 0x20); // lowercase byte b1 = (byte)(lineSpan[1] | 0x20); switch (b0) { case (byte)'p': if (b1 == (byte)'i') // PING { command = ParsedCommand.Simple(CommandType.Ping); buffer = buffer.Slice(reader.Position); return true; } if (b1 == (byte)'o') // PONG { command = ParsedCommand.Simple(CommandType.Pong); buffer = buffer.Slice(reader.Position); return true; } if (b1 == (byte)'u') // PUB { return ParsePub(lineSpan, ref buffer, reader.Position, out command); } break; case (byte)'h': if (b1 == (byte)'p') // HPUB { return ParseHPub(lineSpan, ref buffer, reader.Position, out command); } break; case (byte)'s': if (b1 == (byte)'u') // SUB { command = ParseSub(lineSpan); buffer = buffer.Slice(reader.Position); return true; } break; case (byte)'u': if (b1 == (byte)'n') // UNSUB { command = ParseUnsub(lineSpan); buffer = buffer.Slice(reader.Position); return true; } break; case (byte)'c': if (b1 == (byte)'o') // CONNECT { command = ParseConnect(lineSpan); buffer = buffer.Slice(reader.Position); return true; } break; case (byte)'i': if (b1 == (byte)'n') // INFO { command = ParseInfo(lineSpan); buffer = buffer.Slice(reader.Position); return true; } break; case (byte)'+': // +OK command = ParsedCommand.Simple(CommandType.Ok); buffer = buffer.Slice(reader.Position); return true; case (byte)'-': // -ERR command = ParsedCommand.Simple(CommandType.Err); buffer = buffer.Slice(reader.Position); return true; } throw new ProtocolViolationException($"Unknown protocol operation"); } private bool ParsePub(Span line, ref ReadOnlySequence buffer, SequencePosition afterLine, out ParsedCommand command) { command = default; // PUB subject [reply] size // Skip "PUB " var args = SplitArgs(line[4..]); string subject; string? reply = null; int size; if (args.Length == 2) { subject = Encoding.ASCII.GetString(args[0]); size = ParseSize(args[1]); } else if (args.Length == 3) { subject = Encoding.ASCII.GetString(args[0]); reply = Encoding.ASCII.GetString(args[1]); size = ParseSize(args[2]); } else { throw new ProtocolViolationException("Invalid PUB arguments"); } if (size < 0 || size > _maxPayload) throw new ProtocolViolationException("Invalid payload size"); // Now read payload + \r\n buffer = buffer.Slice(afterLine); _awaitingPayload = true; _expectedPayloadSize = size; _pendingSubject = subject; _pendingReplyTo = reply; _pendingHeaderSize = -1; _pendingType = CommandType.Pub; return TryReadPayload(ref buffer, out command); } private bool ParseHPub(Span line, ref ReadOnlySequence buffer, SequencePosition afterLine, out ParsedCommand command) { command = default; // HPUB subject [reply] hdr_size total_size // Skip "HPUB " var args = SplitArgs(line[5..]); string subject; string? reply = null; int hdrSize, totalSize; if (args.Length == 3) { subject = Encoding.ASCII.GetString(args[0]); hdrSize = ParseSize(args[1]); totalSize = ParseSize(args[2]); } else if (args.Length == 4) { subject = Encoding.ASCII.GetString(args[0]); reply = Encoding.ASCII.GetString(args[1]); hdrSize = ParseSize(args[2]); totalSize = ParseSize(args[3]); } else { throw new ProtocolViolationException("Invalid HPUB arguments"); } if (hdrSize < 0 || totalSize < 0 || hdrSize > totalSize || totalSize > _maxPayload) throw new ProtocolViolationException("Invalid HPUB sizes"); buffer = buffer.Slice(afterLine); _awaitingPayload = true; _expectedPayloadSize = totalSize; _pendingSubject = subject; _pendingReplyTo = reply; _pendingHeaderSize = hdrSize; _pendingType = CommandType.HPub; return TryReadPayload(ref buffer, out command); } private bool TryReadPayload(ref ReadOnlySequence buffer, out ParsedCommand command) { command = default; // Need: _expectedPayloadSize bytes + \r\n long needed = _expectedPayloadSize + 2; // payload + \r\n if (buffer.Length < needed) return false; // Extract payload var payloadSlice = buffer.Slice(0, _expectedPayloadSize); var payload = new byte[_expectedPayloadSize]; payloadSlice.CopyTo(payload); // Verify \r\n after payload var trailer = buffer.Slice(_expectedPayloadSize, 2); Span trailerBytes = stackalloc byte[2]; trailer.CopyTo(trailerBytes); if (trailerBytes[0] != (byte)'\r' || trailerBytes[1] != (byte)'\n') throw new ProtocolViolationException("Expected \\r\\n after payload"); command = new ParsedCommand { Type = _pendingType, Subject = _pendingSubject, ReplyTo = _pendingReplyTo, Payload = payload, HeaderSize = _pendingHeaderSize, MaxMessages = -1, }; buffer = buffer.Slice(buffer.GetPosition(needed)); _awaitingPayload = false; return true; } private static ParsedCommand ParseSub(Span line) { // SUB subject [queue] sid — skip "SUB " var args = SplitArgs(line[4..]); return args.Length switch { 2 => new ParsedCommand { Type = CommandType.Sub, Subject = Encoding.ASCII.GetString(args[0]), Sid = Encoding.ASCII.GetString(args[1]), MaxMessages = -1, }, 3 => new ParsedCommand { Type = CommandType.Sub, Subject = Encoding.ASCII.GetString(args[0]), Queue = Encoding.ASCII.GetString(args[1]), Sid = Encoding.ASCII.GetString(args[2]), MaxMessages = -1, }, _ => throw new ProtocolViolationException("Invalid SUB arguments"), }; } private static ParsedCommand ParseUnsub(Span line) { // UNSUB sid [max_msgs] — skip "UNSUB " var args = SplitArgs(line[6..]); return args.Length switch { 1 => new ParsedCommand { Type = CommandType.Unsub, Sid = Encoding.ASCII.GetString(args[0]), MaxMessages = -1, }, 2 => new ParsedCommand { Type = CommandType.Unsub, Sid = Encoding.ASCII.GetString(args[0]), MaxMessages = ParseSize(args[1]), }, _ => throw new ProtocolViolationException("Invalid UNSUB arguments"), }; } private static ParsedCommand ParseConnect(Span line) { // CONNECT {json} — skip "CONNECT " int spaceIdx = line.IndexOf((byte)' '); if (spaceIdx < 0) throw new ProtocolViolationException("Invalid CONNECT"); var json = line[(spaceIdx + 1)..]; return new ParsedCommand { Type = CommandType.Connect, Payload = json.ToArray(), MaxMessages = -1, }; } private static ParsedCommand ParseInfo(Span line) { // INFO {json} — skip "INFO " int spaceIdx = line.IndexOf((byte)' '); if (spaceIdx < 0) throw new ProtocolViolationException("Invalid INFO"); var json = line[(spaceIdx + 1)..]; return new ParsedCommand { Type = CommandType.Info, Payload = json.ToArray(), MaxMessages = -1, }; } /// Parse a decimal integer from ASCII bytes. Returns -1 on error. private static int ParseSize(Span data) { if (data.Length == 0 || data.Length > 9) return -1; int n = 0; foreach (byte b in data) { if (b < (byte)'0' || b > (byte)'9') return -1; n = n * 10 + (b - '0'); } return n; } /// Split by spaces/tabs into up to 6 arguments. Returns Span of Spans via ranges. private static SpanArgs SplitArgs(Span data) { var result = new SpanArgs(); int start = -1; for (int i = 0; i < data.Length; i++) { byte b = data[i]; if (b is (byte)' ' or (byte)'\t') { if (start >= 0) { result.Add(data[start..i]); start = -1; } } else { if (start < 0) start = i; } } if (start >= 0) result.Add(data[start..]); return result; } private ref struct SpanArgs { private Span _a0, _a1, _a2, _a3, _a4, _a5; private int _count; public readonly int Length => _count; public void Add(Span arg) { switch (_count) { case 0: _a0 = arg; break; case 1: _a1 = arg; break; case 2: _a2 = arg; break; case 3: _a3 = arg; break; case 4: _a4 = arg; break; case 5: _a5 = arg; break; default: throw new ProtocolViolationException("Too many arguments"); } _count++; } public readonly Span this[int index] => index switch { 0 => _a0, 1 => _a1, 2 => _a2, 3 => _a3, 4 => _a4, 5 => _a5, _ => throw new IndexOutOfRangeException(), }; } } public class ProtocolViolationException : Exception { public ProtocolViolationException(string message) : base(message) { } } ``` **Step 4: Run tests to verify they pass** Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~ParserTests" -v quiet` Expected: All tests PASS. **Step 5: Commit** ```bash git add src/NATS.Server/Protocol/NatsParser.cs tests/NATS.Server.Tests/ParserTests.cs git commit -m "feat: implement NATS protocol parser with System.IO.Pipelines" ``` --- ### Task 5: NatsClient — Connection Handler **Files:** - Create: `src/NATS.Server/NatsClient.cs` - Test: `tests/NATS.Server.Tests/ClientTests.cs` **Reference:** `golang/nats-server/server/client.go` — readLoop, processSub, processUnsub, processConnect, deliverMsg **Step 1: Write failing client tests** ```csharp // tests/NATS.Server.Tests/ClientTests.cs using System.IO.Pipelines; using System.Net; using System.Net.Sockets; using System.Text; using System.Text.Json; using NATS.Server; using NATS.Server.Protocol; namespace NATS.Server.Tests; public class ClientTests : IAsyncDisposable { private readonly Socket _serverSocket; private readonly Socket _clientSocket; private readonly NatsClient _natsClient; private readonly CancellationTokenSource _cts = new(); public ClientTests() { // Create connected socket pair via loopback var listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); listener.Bind(new IPEndPoint(IPAddress.Loopback, 0)); listener.Listen(1); var port = ((IPEndPoint)listener.LocalEndPoint!).Port; _clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); _clientSocket.Connect(IPAddress.Loopback, port); _serverSocket = listener.Accept(); listener.Dispose(); var serverInfo = new ServerInfo { ServerId = "test", ServerName = "test", Version = "0.1.0", Host = "127.0.0.1", Port = 4222, }; _natsClient = new NatsClient(1, _serverSocket, new NatsOptions(), serverInfo); } public async ValueTask DisposeAsync() { await _cts.CancelAsync(); _natsClient.Dispose(); _clientSocket.Dispose(); } [Fact] public async Task Client_sends_INFO_on_start() { var runTask = _natsClient.RunAsync(_cts.Token); // Read from client socket — should get INFO var buf = new byte[4096]; var n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None); var response = Encoding.ASCII.GetString(buf, 0, n); Assert.StartsWith("INFO ", response); Assert.Contains("server_id", response); Assert.Contains("\r\n", response); await _cts.CancelAsync(); } [Fact] public async Task Client_responds_PONG_to_PING() { var runTask = _natsClient.RunAsync(_cts.Token); // Read INFO var buf = new byte[4096]; await _clientSocket.ReceiveAsync(buf, SocketFlags.None); // Send CONNECT then PING await _clientSocket.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPING\r\n")); // Read response — should get PONG var n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None); var response = Encoding.ASCII.GetString(buf, 0, n); Assert.Contains("PONG\r\n", response); await _cts.CancelAsync(); } } ``` **Step 2: Run tests to verify they fail** Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~ClientTests" -v quiet` Expected: FAIL — `NatsClient` doesn't exist. **Step 3: Implement NatsClient** ```csharp // src/NATS.Server/NatsClient.cs using System.Buffers; using System.IO.Pipelines; using System.Net.Sockets; using System.Text; using System.Text.Json; using NATS.Server.Protocol; using NATS.Server.Subscriptions; namespace NATS.Server; public interface IMessageRouter { void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory headers, ReadOnlyMemory payload, NatsClient sender); void RemoveClient(NatsClient client); } public sealed class NatsClient : IDisposable { private readonly Socket _socket; private readonly NetworkStream _stream; private readonly NatsOptions _options; private readonly ServerInfo _serverInfo; private readonly NatsParser _parser; private readonly SemaphoreSlim _writeLock = new(1, 1); private readonly Dictionary _subs = new(); public ulong Id { get; } public ClientOptions? ClientOpts { get; private set; } public IMessageRouter? Router { get; set; } public bool ConnectReceived { get; private set; } // Stats public long InMsgs; public long OutMsgs; public long InBytes; public long OutBytes; public IReadOnlyDictionary Subscriptions => _subs; public NatsClient(ulong id, Socket socket, NatsOptions options, ServerInfo serverInfo) { Id = id; _socket = socket; _stream = new NetworkStream(socket, ownsSocket: false); _options = options; _serverInfo = serverInfo; _parser = new NatsParser(options.MaxPayload); } public async Task RunAsync(CancellationToken ct) { var pipe = new Pipe(); try { // Send INFO await SendInfoAsync(ct); // Start read pump and command processing in parallel var fillTask = FillPipeAsync(pipe.Writer, ct); var processTask = ProcessCommandsAsync(pipe.Reader, ct); await Task.WhenAny(fillTask, processTask); } catch (OperationCanceledException) { } catch (Exception) { /* connection error — clean up */ } finally { Router?.RemoveClient(this); } } private async Task FillPipeAsync(PipeWriter writer, CancellationToken ct) { try { while (!ct.IsCancellationRequested) { var memory = writer.GetMemory(4096); int bytesRead = await _stream.ReadAsync(memory, ct); if (bytesRead == 0) break; writer.Advance(bytesRead); var result = await writer.FlushAsync(ct); if (result.IsCompleted) break; } } finally { await writer.CompleteAsync(); } } private async Task ProcessCommandsAsync(PipeReader reader, CancellationToken ct) { try { while (!ct.IsCancellationRequested) { var result = await reader.ReadAsync(ct); var buffer = result.Buffer; while (_parser.TryParse(ref buffer, out var cmd)) { await DispatchCommandAsync(cmd, ct); } reader.AdvanceTo(buffer.Start, buffer.End); if (result.IsCompleted) break; } } finally { await reader.CompleteAsync(); } } private async ValueTask DispatchCommandAsync(ParsedCommand cmd, CancellationToken ct) { switch (cmd.Type) { case CommandType.Connect: ProcessConnect(cmd); break; case CommandType.Ping: await WriteAsync(NatsProtocol.PongBytes, ct); break; case CommandType.Pong: // Update RTT (placeholder) break; case CommandType.Sub: ProcessSub(cmd); break; case CommandType.Unsub: ProcessUnsub(cmd); break; case CommandType.Pub: case CommandType.HPub: ProcessPub(cmd); break; } } private void ProcessConnect(ParsedCommand cmd) { ClientOpts = JsonSerializer.Deserialize(cmd.Payload.Span) ?? new ClientOptions(); ConnectReceived = true; } private void ProcessSub(ParsedCommand cmd) { var sub = new Subscription { Subject = cmd.Subject!, Queue = cmd.Queue, Sid = cmd.Sid!, }; _subs[cmd.Sid!] = sub; sub.Client = this; if (Router is ISubListAccess sl) sl.SubList.Insert(sub); } private void ProcessUnsub(ParsedCommand cmd) { if (!_subs.TryGetValue(cmd.Sid!, out var sub)) return; if (cmd.MaxMessages > 0) { sub.MaxMessages = cmd.MaxMessages; // Will be cleaned up when MessageCount reaches MaxMessages return; } _subs.Remove(cmd.Sid!); if (Router is ISubListAccess sl) sl.SubList.Remove(sub); } private void ProcessPub(ParsedCommand cmd) { Interlocked.Increment(ref InMsgs); Interlocked.Add(ref InBytes, cmd.Payload.Length); ReadOnlyMemory headers = default; ReadOnlyMemory payload = cmd.Payload; if (cmd.Type == CommandType.HPub && cmd.HeaderSize > 0) { headers = cmd.Payload[..cmd.HeaderSize]; payload = cmd.Payload[cmd.HeaderSize..]; } Router?.ProcessMessage(cmd.Subject!, cmd.ReplyTo, headers, payload, this); } private async Task SendInfoAsync(CancellationToken ct) { var infoJson = JsonSerializer.Serialize(_serverInfo); var infoLine = Encoding.ASCII.GetBytes($"INFO {infoJson}\r\n"); await WriteAsync(infoLine, ct); } public async Task SendMessageAsync(string subject, string sid, string? replyTo, ReadOnlyMemory headers, ReadOnlyMemory payload, CancellationToken ct) { Interlocked.Increment(ref OutMsgs); Interlocked.Add(ref OutBytes, payload.Length + headers.Length); byte[] line; if (headers.Length > 0) { int totalSize = headers.Length + payload.Length; line = Encoding.ASCII.GetBytes($"HMSG {subject} {sid} {(replyTo != null ? replyTo + " " : "")}{headers.Length} {totalSize}\r\n"); } else { line = Encoding.ASCII.GetBytes($"MSG {subject} {sid} {(replyTo != null ? replyTo + " " : "")}{payload.Length}\r\n"); } await _writeLock.WaitAsync(ct); try { await _stream.WriteAsync(line, ct); if (headers.Length > 0) await _stream.WriteAsync(headers, ct); if (payload.Length > 0) await _stream.WriteAsync(payload, ct); await _stream.WriteAsync(NatsProtocol.CrLf, ct); await _stream.FlushAsync(ct); } finally { _writeLock.Release(); } } private async Task WriteAsync(byte[] data, CancellationToken ct) { await _writeLock.WaitAsync(ct); try { await _stream.WriteAsync(data, ct); await _stream.FlushAsync(ct); } finally { _writeLock.Release(); } } public void RemoveAllSubscriptions(SubList subList) { foreach (var sub in _subs.Values) subList.Remove(sub); _subs.Clear(); } public void Dispose() { _stream.Dispose(); _socket.Dispose(); _writeLock.Dispose(); } } public interface ISubListAccess { SubList SubList { get; } } ``` Update `Subscription.cs` to add the `Client` field: ```csharp // Add to Subscription.cs public NatsClient? Client { get; set; } ``` **Step 4: Run tests to verify they pass** Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~ClientTests" -v quiet` Expected: All tests PASS. **Step 5: Commit** ```bash git add src/NATS.Server/NatsClient.cs tests/NATS.Server.Tests/ClientTests.cs src/NATS.Server/Subscriptions/Subscription.cs git commit -m "feat: implement NatsClient connection handler with read/write pipeline" ``` --- ### Task 6: NatsServer — Orchestrator **Files:** - Create: `src/NATS.Server/NatsServer.cs` - Test: `tests/NATS.Server.Tests/ServerTests.cs` **Reference:** `golang/nats-server/server/server.go` — NewServer, AcceptLoop, createClient, `golang/nats-server/server/client.go` — processMsgResults, deliverMsg **Step 1: Write failing server tests** ```csharp // tests/NATS.Server.Tests/ServerTests.cs using System.Net; using System.Net.Sockets; using System.Text; using NATS.Server; namespace NATS.Server.Tests; public class ServerTests : IAsyncDisposable { private readonly NatsServer _server; private readonly int _port; private readonly CancellationTokenSource _cts = new(); public ServerTests() { // Use random port _port = GetFreePort(); _server = new NatsServer(new NatsOptions { Port = _port }); } public async ValueTask DisposeAsync() { await _cts.CancelAsync(); _server.Dispose(); } private static int GetFreePort() { using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); sock.Bind(new IPEndPoint(IPAddress.Loopback, 0)); return ((IPEndPoint)sock.LocalEndPoint!).Port; } private async Task ConnectClientAsync() { var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await sock.ConnectAsync(IPAddress.Loopback, _port); return sock; } private static async Task ReadLineAsync(Socket sock, int bufSize = 4096) { var buf = new byte[bufSize]; var n = await sock.ReceiveAsync(buf, SocketFlags.None); return Encoding.ASCII.GetString(buf, 0, n); } [Fact] public async Task Server_accepts_connection_and_sends_INFO() { var serverTask = _server.StartAsync(_cts.Token); await Task.Delay(100); // let server start using var client = await ConnectClientAsync(); var response = await ReadLineAsync(client); Assert.StartsWith("INFO ", response); await _cts.CancelAsync(); } [Fact] public async Task Server_basic_pubsub() { var serverTask = _server.StartAsync(_cts.Token); await Task.Delay(100); using var pub = await ConnectClientAsync(); using var sub = await ConnectClientAsync(); // Read INFO from both await ReadLineAsync(pub); await ReadLineAsync(sub); // CONNECT + SUB on subscriber await sub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB foo 1\r\n")); await Task.Delay(50); // CONNECT + PUB on publisher await pub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPUB foo 5\r\nHello\r\n")); await Task.Delay(100); // Read MSG from subscriber var buf = new byte[4096]; var n = await sub.ReceiveAsync(buf, SocketFlags.None); var msg = Encoding.ASCII.GetString(buf, 0, n); Assert.Contains("MSG foo 1 5\r\nHello\r\n", msg); await _cts.CancelAsync(); } [Fact] public async Task Server_wildcard_matching() { var serverTask = _server.StartAsync(_cts.Token); await Task.Delay(100); using var pub = await ConnectClientAsync(); using var sub = await ConnectClientAsync(); await ReadLineAsync(pub); await ReadLineAsync(sub); await sub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB foo.* 1\r\n")); await Task.Delay(50); await pub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPUB foo.bar 5\r\nHello\r\n")); await Task.Delay(100); var buf = new byte[4096]; var n = await sub.ReceiveAsync(buf, SocketFlags.None); var msg = Encoding.ASCII.GetString(buf, 0, n); Assert.Contains("MSG foo.bar 1 5\r\n", msg); await _cts.CancelAsync(); } } ``` **Step 2: Run tests to verify they fail** Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~ServerTests" -v quiet` Expected: FAIL — `NatsServer` doesn't exist. **Step 3: Implement NatsServer** ```csharp // src/NATS.Server/NatsServer.cs using System.Collections.Concurrent; using System.Net; using System.Net.Sockets; using NATS.Server.Protocol; using NATS.Server.Subscriptions; namespace NATS.Server; public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable { private readonly NatsOptions _options; private readonly ConcurrentDictionary _clients = new(); private readonly SubList _subList = new(); private readonly ServerInfo _serverInfo; private Socket? _listener; private ulong _nextClientId; public SubList SubList => _subList; public NatsServer(NatsOptions options) { _options = options; _serverInfo = new ServerInfo { ServerId = Guid.NewGuid().ToString("N")[..20].ToUpperInvariant(), ServerName = options.ServerName ?? $"nats-dotnet-{Environment.MachineName}", Version = NatsProtocol.Version, Host = options.Host, Port = options.Port, MaxPayload = options.MaxPayload, }; } public async Task StartAsync(CancellationToken ct) { _listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); _listener.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); _listener.Bind(new IPEndPoint( _options.Host == "0.0.0.0" ? IPAddress.Any : IPAddress.Parse(_options.Host), _options.Port)); _listener.Listen(128); try { while (!ct.IsCancellationRequested) { var socket = await _listener.AcceptAsync(ct); var clientId = Interlocked.Increment(ref _nextClientId); var client = new NatsClient(clientId, socket, _options, _serverInfo); client.Router = this; _clients[clientId] = client; _ = RunClientAsync(client, ct); } } catch (OperationCanceledException) { } } private async Task RunClientAsync(NatsClient client, CancellationToken ct) { try { await client.RunAsync(ct); } catch (Exception) { // Client disconnected or errored } finally { RemoveClient(client); } } public void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory headers, ReadOnlyMemory payload, NatsClient sender) { var result = _subList.Match(subject); // Deliver to plain subscribers foreach (var sub in result.PlainSubs) { if (sub.Client == null || sub.Client == sender && !(sender.ClientOpts?.Echo ?? true)) continue; DeliverMessage(sub, subject, replyTo, headers, payload); } // Deliver to one member of each queue group (round-robin) foreach (var queueGroup in result.QueueSubs) { if (queueGroup.Length == 0) continue; // Simple round-robin — pick based on total delivered across group var idx = Math.Abs((int)Interlocked.Increment(ref sender.OutMsgs)) % queueGroup.Length; // Undo the OutMsgs increment — it'll be incremented properly in SendMessageAsync Interlocked.Decrement(ref sender.OutMsgs); for (int attempt = 0; attempt < queueGroup.Length; attempt++) { var sub = queueGroup[(idx + attempt) % queueGroup.Length]; if (sub.Client != null && (sub.Client != sender || (sender.ClientOpts?.Echo ?? true))) { DeliverMessage(sub, subject, replyTo, headers, payload); break; } } } } private static void DeliverMessage(Subscription sub, string subject, string? replyTo, ReadOnlyMemory headers, ReadOnlyMemory payload) { var client = sub.Client; if (client == null) return; // Check auto-unsub var count = Interlocked.Increment(ref sub.MessageCount); if (sub.MaxMessages > 0 && count > sub.MaxMessages) return; // Fire and forget — deliver asynchronously _ = client.SendMessageAsync(subject, sub.Sid, replyTo, headers, payload, CancellationToken.None); } public void RemoveClient(NatsClient client) { _clients.TryRemove(client.Id, out _); client.RemoveAllSubscriptions(_subList); } public void Dispose() { _listener?.Dispose(); foreach (var client in _clients.Values) client.Dispose(); } } ``` **Step 4: Run tests to verify they pass** Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~ServerTests" -v quiet` Expected: All tests PASS. **Step 5: Commit** ```bash git add src/NATS.Server/NatsServer.cs tests/NATS.Server.Tests/ServerTests.cs git commit -m "feat: implement NatsServer orchestrator with accept loop and message routing" ``` --- ### Task 7: Host Console Application **Files:** - Modify: `src/NATS.Server.Host/Program.cs` **Reference:** `golang/nats-server/main.go` **Step 1: Implement Program.cs** ```csharp // src/NATS.Server.Host/Program.cs using NATS.Server; var options = new NatsOptions(); // Simple CLI argument parsing for (int i = 0; i < args.Length; i++) { switch (args[i]) { case "-p" or "--port" when i + 1 < args.Length: options.Port = int.Parse(args[++i]); break; case "-a" or "--addr" when i + 1 < args.Length: options.Host = args[++i]; break; case "-n" or "--name" when i + 1 < args.Length: options.ServerName = args[++i]; break; } } var server = new NatsServer(options); var cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); }; Console.WriteLine($"[NATS] Listening on {options.Host}:{options.Port}"); try { await server.StartAsync(cts.Token); } catch (OperationCanceledException) { } Console.WriteLine("[NATS] Server stopped."); ``` **Step 2: Verify it builds and runs** Run: `dotnet build src/NATS.Server.Host/NATS.Server.Host.csproj` Expected: Build succeeded. Run (quick smoke): `timeout 2 dotnet run --project src/NATS.Server.Host -- -p 14222 || true` Expected: Prints "Listening on 0.0.0.0:14222" then exits after timeout. **Step 3: Commit** ```bash git add src/NATS.Server.Host/Program.cs git commit -m "feat: add NATS.Server.Host console app with basic CLI arguments" ``` --- ### Task 8: Integration Tests with NATS.Client.Core **Files:** - Modify: `tests/NATS.Server.Tests/NATS.Server.Tests.csproj` (add NuGet ref) - Create: `tests/NATS.Server.Tests/IntegrationTests.cs` **Step 1: Add NATS client NuGet package** ```bash dotnet add tests/NATS.Server.Tests/NATS.Server.Tests.csproj package NATS.Client.Core ``` **Step 2: Write integration tests** ```csharp // tests/NATS.Server.Tests/IntegrationTests.cs using System.Net; using System.Net.Sockets; using System.Text; using NATS.Client.Core; using NATS.Server; namespace NATS.Server.Tests; public class IntegrationTests : IAsyncDisposable { private readonly NatsServer _server; private readonly int _port; private readonly CancellationTokenSource _cts = new(); private readonly Task _serverTask; public IntegrationTests() { _port = GetFreePort(); _server = new NatsServer(new NatsOptions { Port = _port }); _serverTask = _server.StartAsync(_cts.Token); Thread.Sleep(200); // Let server start } public async ValueTask DisposeAsync() { await _cts.CancelAsync(); _server.Dispose(); } private static int GetFreePort() { using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); sock.Bind(new IPEndPoint(IPAddress.Loopback, 0)); return ((IPEndPoint)sock.LocalEndPoint!).Port; } private NatsConnection CreateClient() { var opts = new NatsOpts { Url = $"nats://127.0.0.1:{_port}" }; return new NatsConnection(opts); } [Fact] public async Task PubSub_basic() { await using var pub = CreateClient(); await using var sub = CreateClient(); await pub.ConnectAsync(); await sub.ConnectAsync(); var received = new TaskCompletionSource(); var subscription = Task.Run(async () => { await foreach (var msg in sub.SubscribeAsync("test.subject")) { received.TrySetResult(msg.Data!); break; } }); await Task.Delay(100); // let subscription register await pub.PublishAsync("test.subject", "Hello NATS!"); var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); Assert.Equal("Hello NATS!", result); } [Fact] public async Task PubSub_wildcard_star() { await using var pub = CreateClient(); await using var sub = CreateClient(); await pub.ConnectAsync(); await sub.ConnectAsync(); var received = new TaskCompletionSource(); var subscription = Task.Run(async () => { await foreach (var msg in sub.SubscribeAsync("test.*")) { received.TrySetResult(msg.Subject); break; } }); await Task.Delay(100); await pub.PublishAsync("test.hello", "data"); var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); Assert.Equal("test.hello", result); } [Fact] public async Task PubSub_wildcard_gt() { await using var pub = CreateClient(); await using var sub = CreateClient(); await pub.ConnectAsync(); await sub.ConnectAsync(); var received = new TaskCompletionSource(); var subscription = Task.Run(async () => { await foreach (var msg in sub.SubscribeAsync("test.>")) { received.TrySetResult(msg.Subject); break; } }); await Task.Delay(100); await pub.PublishAsync("test.foo.bar.baz", "data"); var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); Assert.Equal("test.foo.bar.baz", result); } [Fact] public async Task PubSub_fan_out() { await using var pub = CreateClient(); await using var sub1 = CreateClient(); await using var sub2 = CreateClient(); await pub.ConnectAsync(); await sub1.ConnectAsync(); await sub2.ConnectAsync(); var count1 = 0; var count2 = 0; var done = new TaskCompletionSource(); var s1 = Task.Run(async () => { await foreach (var msg in sub1.SubscribeAsync("fanout")) { Interlocked.Increment(ref count1); if (Volatile.Read(ref count1) + Volatile.Read(ref count2) >= 2) done.TrySetResult(); break; } }); var s2 = Task.Run(async () => { await foreach (var msg in sub2.SubscribeAsync("fanout")) { Interlocked.Increment(ref count2); if (Volatile.Read(ref count1) + Volatile.Read(ref count2) >= 2) done.TrySetResult(); break; } }); await Task.Delay(100); await pub.PublishAsync("fanout", "hello"); await done.Task.WaitAsync(TimeSpan.FromSeconds(5)); Assert.Equal(1, count1); Assert.Equal(1, count2); } [Fact] public async Task PingPong() { await using var client = CreateClient(); await client.ConnectAsync(); // If we got here, the connection is alive and PING/PONG works await client.PingAsync(); } } ``` **Step 3: Run tests** Run: `dotnet test tests/NATS.Server.Tests --filter "ClassName~IntegrationTests" -v normal` Expected: All tests PASS (may need debugging iterations). **Step 4: Commit** ```bash git add tests/NATS.Server.Tests/ git commit -m "feat: add integration tests using NATS.Client.Core NuGet package" ``` --- ### Task 9: Final Validation and Cleanup **Files:** - Modify: `CLAUDE.md` (update build commands now that project exists) **Step 1: Run full test suite** Run: `dotnet test NatsDotNet.sln -v normal` Expected: All tests PASS. **Step 2: Verify host app runs and accepts nats CLI connections** ```bash # Terminal 1: Start server dotnet run --project src/NATS.Server.Host -- -p 14222 & # Terminal 2: Test with nats CLI (if available) nats sub test -s nats://127.0.0.1:14222 & nats pub test "Hello from nats CLI" -s nats://127.0.0.1:14222 ``` **Step 3: Update CLAUDE.md with actual verified commands** Update the Build & Test Commands section to match the real project structure. **Step 4: Commit** ```bash git add CLAUDE.md git commit -m "docs: update CLAUDE.md with verified build and test commands" ```