diff --git a/docs/plans/2026-02-23-sections3-6-gaps-plan.md b/docs/plans/2026-02-23-sections3-6-gaps-plan.md new file mode 100644 index 0000000..b794753 --- /dev/null +++ b/docs/plans/2026-02-23-sections3-6-gaps-plan.md @@ -0,0 +1,2384 @@ +# Sections 3-6 Gaps Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans to implement this plan task-by-task. + +**Goal:** Implement all remaining gaps in Protocol Parsing (section 3), Subscriptions & Subject Matching (section 4), Authentication & Authorization (section 5), and Configuration (section 6) to achieve golang parity. + +**Architecture:** Port Go reference implementations faithfully. Use `Interlocked` for atomic counters, `ReaderWriterLockSlim` for trie locking, `PeriodicTimer` for background sweeps. Each task is independently testable via xUnit/Shouldly. + +**Tech Stack:** .NET 10 / C# 14, xUnit 3, Shouldly, Serilog + +--- + +### Task 1: NatsOptions — Add New Configuration Fields + +**Files:** +- Modify: `src/NATS.Server/NatsOptions.cs:17` (after `MaxPingsOut`) + +**Step 1: Write the failing test** + +Create test file `tests/NATS.Server.Tests/NatsOptionsTests.cs`: + +```csharp +namespace NATS.Server.Tests; + +public class NatsOptionsTests +{ + [Fact] + public void Defaults_are_correct() + { + var opts = new NatsOptions(); + opts.MaxSubs.ShouldBe(0); + opts.MaxSubTokens.ShouldBe(0); + opts.Debug.ShouldBe(false); + opts.Trace.ShouldBe(false); + opts.LogFile.ShouldBeNull(); + opts.LogSizeLimit.ShouldBe(0L); + opts.Tags.ShouldBeNull(); + } +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~NatsOptionsTests.Defaults_are_correct" -v normal` +Expected: FAIL — properties don't exist. + +**Step 3: Write minimal implementation** + +Add to `NatsOptions.cs` after line 17 (`MaxPingsOut`): + +```csharp + public int MaxSubs { get; set; } // 0 = unlimited (per-connection) + public int MaxSubTokens { get; set; } // 0 = unlimited + public bool Debug { get; set; } + public bool Trace { get; set; } + public string? LogFile { get; set; } + public long LogSizeLimit { get; set; } + public Dictionary? Tags { get; set; } +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~NatsOptionsTests" -v normal` +Expected: PASS + +**Step 5: Commit** + +```bash +git add src/NATS.Server/NatsOptions.cs tests/NATS.Server.Tests/NatsOptionsTests.cs +git commit -m "feat: add MaxSubs, MaxSubTokens, Debug, Trace, LogFile, LogSizeLimit, Tags to NatsOptions" +``` + +--- + +### Task 2: CLI Flags — Add -D/-V/-DV and Logging Options + +**Files:** +- Modify: `src/NATS.Server.Host/Program.cs:13-58` (switch statement) +- Modify: `src/NATS.Server.Host/NATS.Server.Host.csproj` (add Serilog.Sinks.File) +- Modify: `Directory.Packages.props` (add Serilog.Sinks.File version) + +**Step 1: Add Serilog.Sinks.File package** + +Add to `Directory.Packages.props` under Logging: +```xml + +``` + +Add to `NATS.Server.Host.csproj`: +```xml + +``` + +**Step 2: Add CLI flag cases to Program.cs** + +In the switch statement (after `--tlsverify` case at line 56), add: + +```csharp + case "-D" or "--debug": + options.Debug = true; + break; + case "-V" or "--trace": + options.Trace = true; + break; + case "-DV": + options.Debug = true; + options.Trace = true; + break; + case "-l" or "--log" when i + 1 < args.Length: + options.LogFile = args[++i]; + break; + case "--log_size_limit" when i + 1 < args.Length: + options.LogSizeLimit = long.Parse(args[++i]); + break; +``` + +**Step 3: Wire logging options into Serilog configuration** + +Replace the Serilog setup at lines 4-8 of `Program.cs` with: + +```csharp +// Parse options first (move var options above Serilog setup) +var options = new NatsOptions(); + +for (int i = 0; i < args.Length; i++) +{ + // ... existing switch ... +} + +// Configure Serilog based on options +var logConfig = new LoggerConfiguration() + .Enrich.FromLogContext() + .WriteTo.Console(outputTemplate: "[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj}{NewLine}{Exception}"); + +if (options.Trace) + logConfig.MinimumLevel.Verbose(); +else if (options.Debug) + logConfig.MinimumLevel.Debug(); +else + logConfig.MinimumLevel.Information(); + +if (options.LogFile != null) +{ + logConfig.WriteTo.File( + options.LogFile, + fileSizeLimitBytes: options.LogSizeLimit > 0 ? options.LogSizeLimit : null, + rollOnFileSizeLimit: options.LogSizeLimit > 0); +} + +Log.Logger = logConfig.CreateLogger(); +``` + +**Step 4: Build to verify** + +Run: `dotnet build src/NATS.Server.Host` +Expected: Success + +**Step 5: Commit** + +```bash +git add Directory.Packages.props src/NATS.Server.Host/NATS.Server.Host.csproj src/NATS.Server.Host/Program.cs +git commit -m "feat: add -D/-V/-DV debug/trace CLI flags and file logging support" +``` + +--- + +### Task 3: SubjectMatch Utilities — TokenAt, NumTokens, SubjectsCollide, UTF-8 Validation + +**Files:** +- Modify: `src/NATS.Server/Subscriptions/SubjectMatch.cs` +- Modify: `tests/NATS.Server.Tests/SubjectMatchTests.cs` + +**Step 1: Write failing tests** + +Add to `SubjectMatchTests.cs`: + +```csharp + [Theory] + [InlineData("foo.bar.baz", 3)] + [InlineData("foo", 1)] + [InlineData("a.b.c.d.e", 5)] + [InlineData("", 0)] + public void NumTokens(string subject, int expected) + { + SubjectMatch.NumTokens(subject).ShouldBe(expected); + } + + [Theory] + [InlineData("foo.bar.baz", 0, "foo")] + [InlineData("foo.bar.baz", 1, "bar")] + [InlineData("foo.bar.baz", 2, "baz")] + [InlineData("foo", 0, "foo")] + [InlineData("foo.bar.baz", 5, "")] + public void TokenAt(string subject, int index, string expected) + { + SubjectMatch.TokenAt(subject, index).ToString().ShouldBe(expected); + } + + [Theory] + [InlineData("foo.bar", "foo.bar", true)] + [InlineData("foo.bar", "foo.baz", false)] + [InlineData("foo.*", "foo.bar", true)] + [InlineData("foo.*", "foo.>", true)] + [InlineData("foo.>", "foo.bar.baz", true)] + [InlineData(">", "foo.bar", true)] + [InlineData("foo.*", "bar.*", false)] + [InlineData("foo.*.baz", "foo.bar.*", true)] + [InlineData("*.bar", "foo.*", true)] + [InlineData("foo.*", "bar.>", false)] + public void SubjectsCollide(string subj1, string subj2, bool expected) + { + SubjectMatch.SubjectsCollide(subj1, subj2).ShouldBe(expected); + } + + [Theory] + [InlineData("foo\0bar", true, false)] + [InlineData("foo\0bar", false, true)] + [InlineData("foo.bar", true, true)] + public void IsValidSubject_checkRunes(string subject, bool checkRunes, bool expected) + { + SubjectMatch.IsValidSubject(subject, checkRunes).ShouldBe(expected); + } +``` + +**Step 2: Run tests to verify they fail** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~SubjectMatchTests" -v normal` +Expected: FAIL — methods don't exist. + +**Step 3: Implement the methods** + +Add to `SubjectMatch.cs`: + +```csharp + /// Count dot-delimited tokens. Empty string returns 0. + public static int NumTokens(string subject) + { + if (string.IsNullOrEmpty(subject)) + return 0; + int count = 1; + for (int i = 0; i < subject.Length; i++) + { + if (subject[i] == Sep) + count++; + } + return count; + } + + /// Return the 0-based nth token as a span. Returns empty if out of range. + public static ReadOnlySpan TokenAt(string subject, int index) + { + if (string.IsNullOrEmpty(subject)) + return default; + + var span = subject.AsSpan(); + int current = 0; + int start = 0; + for (int i = 0; i < span.Length; i++) + { + if (span[i] == Sep) + { + if (current == index) + return span[start..i]; + start = i + 1; + current++; + } + } + if (current == index) + return span[start..]; + return default; + } + + /// + /// Determines if two subject patterns (possibly containing wildcards) can both + /// match the same literal subject. Reference: Go sublist.go SubjectsCollide. + /// + public static bool SubjectsCollide(string subj1, string subj2) + { + if (subj1 == subj2) + return true; + + bool lit1 = IsLiteral(subj1); + bool lit2 = IsLiteral(subj2); + + // Both literal: must be equal (already checked) + if (lit1 && lit2) + return false; + + // One literal, one wildcard: check if wildcard matches the literal + if (lit1 && !lit2) + return MatchLiteral(subj1, subj2); + if (lit2 && !lit1) + return MatchLiteral(subj2, subj1); + + // Both have wildcards: token-by-token comparison + int n1 = NumTokens(subj1); + int n2 = NumTokens(subj2); + bool hasFwc1 = subj1.Contains('>'); + bool hasFwc2 = subj2.Contains('>'); + + if (!hasFwc1 && !hasFwc2 && n1 != n2) + return false; + if (n1 < n2 && !hasFwc1) + return false; + if (n2 < n1 && !hasFwc2) + return false; + + int stop = Math.Min(n1, n2); + for (int i = 0; i < stop; i++) + { + var t1 = TokenAt(subj1, i); + var t2 = TokenAt(subj2, i); + if (!TokensCanMatch(t1, t2)) + return false; + } + return true; + } + + private static bool TokensCanMatch(ReadOnlySpan t1, ReadOnlySpan t2) + { + if (t1.Length == 1 && (t1[0] == Pwc || t1[0] == Fwc)) + return true; + if (t2.Length == 1 && (t2[0] == Pwc || t2[0] == Fwc)) + return true; + return t1.SequenceEqual(t2); + } + + /// + /// Validates subject. When checkRunes is true, also rejects null bytes. + /// + public static bool IsValidSubject(string subject, bool checkRunes) + { + if (!IsValidSubject(subject)) + return false; + if (!checkRunes) + return true; + for (int i = 0; i < subject.Length; i++) + { + if (subject[i] == '\0') + return false; + } + return true; + } +``` + +**Step 4: Run tests to verify they pass** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~SubjectMatchTests" -v normal` +Expected: All PASS + +**Step 5: Commit** + +```bash +git add src/NATS.Server/Subscriptions/SubjectMatch.cs tests/NATS.Server.Tests/SubjectMatchTests.cs +git commit -m "feat: add NumTokens, TokenAt, SubjectsCollide, UTF-8 validation to SubjectMatch" +``` + +--- + +### Task 4: SubList — Generation ID, Stats, and Utility Methods + +This is the largest task. It modifies `SubList.cs` heavily and adds new types. + +**Files:** +- Modify: `src/NATS.Server/Subscriptions/SubList.cs` +- Create: `src/NATS.Server/Subscriptions/SubListStats.cs` +- Modify: `tests/NATS.Server.Tests/SubListTests.cs` + +**Step 1: Create SubListStats** + +Create `src/NATS.Server/Subscriptions/SubListStats.cs`: + +```csharp +namespace NATS.Server.Subscriptions; + +public sealed class SubListStats +{ + public uint NumSubs { get; init; } + public uint NumCache { get; init; } + public ulong NumInserts { get; init; } + public ulong NumRemoves { get; init; } + public ulong NumMatches { get; init; } + public double CacheHitRate { get; init; } + public uint MaxFanout { get; init; } + public double AvgFanout { get; init; } +} +``` + +**Step 2: Write failing tests for all new methods** + +Add to `SubListTests.cs`: + +```csharp + [Fact] + public void Stats_returns_correct_values() + { + var sl = new SubList(); + sl.Insert(MakeSub("foo.bar", sid: "1")); + sl.Insert(MakeSub("foo.baz", sid: "2")); + sl.Match("foo.bar"); + sl.Match("foo.bar"); // cache hit + + var stats = sl.Stats(); + stats.NumSubs.ShouldBe(2u); + stats.NumInserts.ShouldBe(2ul); + stats.NumMatches.ShouldBe(2ul); + stats.CacheHitRate.ShouldBeGreaterThan(0.0); + } + + [Fact] + public void HasInterest_returns_true_when_subscribers_exist() + { + var sl = new SubList(); + sl.Insert(MakeSub("foo.bar")); + sl.HasInterest("foo.bar").ShouldBeTrue(); + sl.HasInterest("foo.baz").ShouldBeFalse(); + } + + [Fact] + public void HasInterest_with_wildcards() + { + var sl = new SubList(); + sl.Insert(MakeSub("foo.*")); + sl.HasInterest("foo.bar").ShouldBeTrue(); + sl.HasInterest("bar.baz").ShouldBeFalse(); + } + + [Fact] + public void NumInterest_counts_subscribers() + { + var sl = new SubList(); + sl.Insert(MakeSub("foo.bar", sid: "1")); + sl.Insert(MakeSub("foo.*", sid: "2")); + sl.Insert(MakeSub("foo.bar", queue: "q1", sid: "3")); + + var (np, nq) = sl.NumInterest("foo.bar"); + np.ShouldBe(2); // foo.bar + foo.* + nq.ShouldBe(1); // queue sub + } + + [Fact] + public void RemoveBatch_removes_all() + { + var sl = new SubList(); + var sub1 = MakeSub("foo.bar", sid: "1"); + var sub2 = MakeSub("foo.baz", sid: "2"); + var sub3 = MakeSub("bar.qux", sid: "3"); + sl.Insert(sub1); + sl.Insert(sub2); + sl.Insert(sub3); + sl.Count.ShouldBe(3u); + + sl.RemoveBatch([sub1, sub2]); + sl.Count.ShouldBe(1u); + sl.Match("foo.bar").PlainSubs.ShouldBeEmpty(); + sl.Match("bar.qux").PlainSubs.ShouldHaveSingleItem(); + } + + [Fact] + public void All_returns_every_subscription() + { + var sl = new SubList(); + var sub1 = MakeSub("foo.bar", sid: "1"); + var sub2 = MakeSub("foo.*", sid: "2"); + var sub3 = MakeSub("bar.>", queue: "q", sid: "3"); + sl.Insert(sub1); + sl.Insert(sub2); + sl.Insert(sub3); + + var all = sl.All(); + all.Count.ShouldBe(3); + all.ShouldContain(sub1); + all.ShouldContain(sub2); + all.ShouldContain(sub3); + } + + [Fact] + public void ReverseMatch_finds_patterns_matching_literal() + { + var sl = new SubList(); + var sub1 = MakeSub("foo.bar", sid: "1"); + var sub2 = MakeSub("foo.*", sid: "2"); + var sub3 = MakeSub("foo.>", sid: "3"); + var sub4 = MakeSub("bar.baz", sid: "4"); + sl.Insert(sub1); + sl.Insert(sub2); + sl.Insert(sub3); + sl.Insert(sub4); + + var result = sl.ReverseMatch("foo.bar"); + result.PlainSubs.Length.ShouldBe(3); // foo.bar, foo.*, foo.> + result.PlainSubs.ShouldContain(sub1); + result.PlainSubs.ShouldContain(sub2); + result.PlainSubs.ShouldContain(sub3); + } + + [Fact] + public void Generation_ID_invalidates_cache() + { + var sl = new SubList(); + sl.Insert(MakeSub("foo.bar", sid: "1")); + + // Prime cache + var r1 = sl.Match("foo.bar"); + r1.PlainSubs.Length.ShouldBe(1); + + // Insert another sub (bumps generation) + sl.Insert(MakeSub("foo.bar", sid: "2")); + + // Cache should be invalidated by generation mismatch + var r2 = sl.Match("foo.bar"); + r2.PlainSubs.Length.ShouldBe(2); + } +``` + +**Step 3: Run tests to verify they fail** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~SubListTests" -v normal` +Expected: FAIL — methods don't exist. + +**Step 4: Implement SubList changes** + +Replace `SubList.cs` with the following major modifications: + +1. **Add fields** at the top of the class (after `_cache`): + +```csharp + private long _generation; + private ulong _matches; + private ulong _cacheHits; + private ulong _inserts; + private ulong _removes; +``` + +2. **Add generation to cache result** — wrap cached results: + +```csharp + private readonly record struct CachedResult(SubListResult Result, long Generation); + // Change _cache type: + private Dictionary? _cache = new(StringComparer.Ordinal); +``` + +3. **Modify `Insert()`** — bump generation, track inserts, remove `AddToCache`: + +After `_count++;` (line 92): +```csharp + _inserts++; + Interlocked.Increment(ref _generation); + // No need for per-key AddToCache — generation invalidation handles it +``` + +Remove the `AddToCache(subject, sub);` call. + +4. **Modify `Remove()`** — bump generation, track removes, remove `RemoveFromCache`: + +After `_count--;` (line 166): +```csharp + _removes++; + Interlocked.Increment(ref _generation); + // No need for per-key RemoveFromCache — generation invalidation handles it +``` + +Remove the `RemoveFromCache(sub.Subject);` call. + +5. **Modify `Match()`** — use generation-based cache, track stats: + +```csharp + public SubListResult Match(string subject) + { + Interlocked.Increment(ref _matches); + + var currentGen = Interlocked.Read(ref _generation); + + // Check cache under read lock first. + _lock.EnterReadLock(); + try + { + if (_cache != null && _cache.TryGetValue(subject, out var cached) && cached.Generation == currentGen) + { + Interlocked.Increment(ref _cacheHits); + return cached.Result; + } + } + finally + { + _lock.ExitReadLock(); + } + + // Cache miss — tokenize and match under write lock. + var tokens = Tokenize(subject); + if (tokens == null) + return SubListResult.Empty; + + _lock.EnterWriteLock(); + try + { + currentGen = Interlocked.Read(ref _generation); + + // Re-check cache after acquiring write lock. + if (_cache != null && _cache.TryGetValue(subject, out var cached) && cached.Generation == currentGen) + { + Interlocked.Increment(ref _cacheHits); + return cached.Result; + } + + var plainSubs = new List(); + var queueSubs = new List>(); + MatchLevel(_root, tokens, 0, plainSubs, queueSubs); + + SubListResult result; + if (plainSubs.Count == 0 && queueSubs.Count == 0) + { + result = SubListResult.Empty; + } + else + { + var queueSubsArr = new Subscription[queueSubs.Count][]; + for (int i = 0; i < queueSubs.Count; i++) + queueSubsArr[i] = queueSubs[i].ToArray(); + result = new SubListResult(plainSubs.ToArray(), queueSubsArr); + } + + if (_cache != null) + { + _cache[subject] = new CachedResult(result, currentGen); + if (_cache.Count > CacheMax) + { + var keys = _cache.Keys.Take(_cache.Count - CacheSweep).ToList(); + foreach (var key in keys) + _cache.Remove(key); + } + } + + return result; + } + finally + { + _lock.ExitWriteLock(); + } + } +``` + +6. **Add Stats():** + +```csharp + public SubListStats Stats() + { + _lock.EnterReadLock(); + uint numSubs, numCache; + ulong inserts, removes; + try + { + numSubs = _count; + numCache = (uint)(_cache?.Count ?? 0); + inserts = _inserts; + removes = _removes; + } + finally + { + _lock.ExitReadLock(); + } + + var matches = Interlocked.Read(ref _matches); + var cacheHits = Interlocked.Read(ref _cacheHits); + var hitRate = matches > 0 ? (double)cacheHits / matches : 0.0; + + uint maxFanout = 0; + long totalFanout = 0; + int cacheEntries = 0; + + _lock.EnterReadLock(); + try + { + if (_cache != null) + { + foreach (var (_, entry) in _cache) + { + var r = entry.Result; + var f = r.PlainSubs.Length + r.QueueSubs.Length; + totalFanout += f; + if (f > maxFanout) maxFanout = (uint)f; + cacheEntries++; + } + } + } + finally + { + _lock.ExitReadLock(); + } + + return new SubListStats + { + NumSubs = numSubs, + NumCache = numCache, + NumInserts = inserts, + NumRemoves = removes, + NumMatches = matches, + CacheHitRate = hitRate, + MaxFanout = maxFanout, + AvgFanout = cacheEntries > 0 ? (double)totalFanout / cacheEntries : 0.0, + }; + } +``` + +7. **Add HasInterest():** + +```csharp + public bool HasInterest(string subject) + { + // Check cache first + var currentGen = Interlocked.Read(ref _generation); + _lock.EnterReadLock(); + try + { + if (_cache != null && _cache.TryGetValue(subject, out var cached) && cached.Generation == currentGen) + { + var r = cached.Result; + return r.PlainSubs.Length > 0 || r.QueueSubs.Length > 0; + } + } + finally + { + _lock.ExitReadLock(); + } + + // Walk the trie — short-circuit on first hit + var tokens = Tokenize(subject); + if (tokens == null) return false; + + _lock.EnterReadLock(); + try + { + return HasInterestLevel(_root, tokens, 0); + } + finally + { + _lock.ExitReadLock(); + } + } + + private static bool HasInterestLevel(TrieLevel? level, string[] tokens, int tokenIndex) + { + TrieNode? pwc = null; + TrieNode? node = null; + + for (int i = tokenIndex; i < tokens.Length; i++) + { + if (level == null) return false; + if (level.Fwc != null && NodeHasInterest(level.Fwc)) return true; + + pwc = level.Pwc; + if (pwc != null && HasInterestLevel(pwc.Next, tokens, i + 1)) return true; + + node = null; + if (level.Nodes.TryGetValue(tokens[i], out var found)) + { + node = found; + level = node.Next; + } + else + { + level = null; + } + } + + if (node != null && NodeHasInterest(node)) return true; + if (pwc != null && NodeHasInterest(pwc)) return true; + return false; + } + + private static bool NodeHasInterest(TrieNode node) + { + return node.PlainSubs.Count > 0 || node.QueueSubs.Count > 0; + } +``` + +8. **Add NumInterest():** + +```csharp + public (int plainCount, int queueCount) NumInterest(string subject) + { + var tokens = Tokenize(subject); + if (tokens == null) return (0, 0); + + _lock.EnterReadLock(); + try + { + int np = 0, nq = 0; + CountInterestLevel(_root, tokens, 0, ref np, ref nq); + return (np, nq); + } + finally + { + _lock.ExitReadLock(); + } + } + + private static void CountInterestLevel(TrieLevel? level, string[] tokens, int tokenIndex, + ref int np, ref int nq) + { + TrieNode? pwc = null; + TrieNode? node = null; + + for (int i = tokenIndex; i < tokens.Length; i++) + { + if (level == null) return; + if (level.Fwc != null) AddNodeCounts(level.Fwc, ref np, ref nq); + + pwc = level.Pwc; + if (pwc != null) CountInterestLevel(pwc.Next, tokens, i + 1, ref np, ref nq); + + node = null; + if (level.Nodes.TryGetValue(tokens[i], out var found)) + { + node = found; + level = node.Next; + } + else + { + level = null; + } + } + + if (node != null) AddNodeCounts(node, ref np, ref nq); + if (pwc != null) AddNodeCounts(pwc, ref np, ref nq); + } + + private static void AddNodeCounts(TrieNode node, ref int np, ref int nq) + { + np += node.PlainSubs.Count; + foreach (var (_, qset) in node.QueueSubs) + nq += qset.Count; + } +``` + +9. **Add RemoveBatch():** + +```csharp + public void RemoveBatch(IEnumerable subs) + { + _lock.EnterWriteLock(); + try + { + var wasEnabled = _cache != null; + _cache = null; // Disable cache for bulk operation + + foreach (var sub in subs) + RemoveInternal(sub); + + Interlocked.Increment(ref _generation); + + if (wasEnabled) + _cache = new Dictionary(StringComparer.Ordinal); + } + finally + { + _lock.ExitWriteLock(); + } + } +``` + +This requires extracting the core remove logic from `Remove()` into a private `RemoveInternal()` that doesn't acquire the lock or bump generation. + +10. **Add All():** + +```csharp + public IReadOnlyList All() + { + var subs = new List(); + _lock.EnterReadLock(); + try + { + CollectAllSubs(_root, subs); + } + finally + { + _lock.ExitReadLock(); + } + return subs; + } + + private static void CollectAllSubs(TrieLevel level, List subs) + { + foreach (var (_, node) in level.Nodes) + { + foreach (var sub in node.PlainSubs) + subs.Add(sub); + foreach (var (_, qset) in node.QueueSubs) + foreach (var sub in qset) + subs.Add(sub); + if (node.Next != null) + CollectAllSubs(node.Next, subs); + } + if (level.Pwc != null) + { + foreach (var sub in level.Pwc.PlainSubs) + subs.Add(sub); + foreach (var (_, qset) in level.Pwc.QueueSubs) + foreach (var sub in qset) + subs.Add(sub); + if (level.Pwc.Next != null) + CollectAllSubs(level.Pwc.Next, subs); + } + if (level.Fwc != null) + { + foreach (var sub in level.Fwc.PlainSubs) + subs.Add(sub); + foreach (var (_, qset) in level.Fwc.QueueSubs) + foreach (var sub in qset) + subs.Add(sub); + if (level.Fwc.Next != null) + CollectAllSubs(level.Fwc.Next, subs); + } + } +``` + +11. **Add ReverseMatch():** + +```csharp + public SubListResult ReverseMatch(string subject) + { + var tokens = Tokenize(subject); + if (tokens == null) + return SubListResult.Empty; + + _lock.EnterReadLock(); + try + { + var plainSubs = new List(); + var queueSubs = new List>(); + ReverseMatchLevel(_root, tokens, 0, plainSubs, queueSubs); + + if (plainSubs.Count == 0 && queueSubs.Count == 0) + return SubListResult.Empty; + + var queueSubsArr = new Subscription[queueSubs.Count][]; + for (int i = 0; i < queueSubs.Count; i++) + queueSubsArr[i] = queueSubs[i].ToArray(); + return new SubListResult(plainSubs.ToArray(), queueSubsArr); + } + finally + { + _lock.ExitReadLock(); + } + } + + private static void ReverseMatchLevel(TrieLevel? level, string[] tokens, int tokenIndex, + List plainSubs, List> queueSubs) + { + if (level == null || tokenIndex >= tokens.Length) + return; + + var token = tokens[tokenIndex]; + bool isLast = tokenIndex == tokens.Length - 1; + + // Full wildcard in input: collect ALL nodes at this level and below + if (token == ">") + { + CollectAllNodes(level, plainSubs, queueSubs); + return; + } + + // Partial wildcard in input: fan out to all literal children + if (token == "*") + { + foreach (var (_, node) in level.Nodes) + { + if (isLast) + AddNodeToResults(node, plainSubs, queueSubs); + else + ReverseMatchLevel(node.Next, tokens, tokenIndex + 1, plainSubs, queueSubs); + } + } + else + { + // Literal token: match literal node + if (level.Nodes.TryGetValue(token, out var node)) + { + if (isLast) + AddNodeToResults(node, plainSubs, queueSubs); + else + ReverseMatchLevel(node.Next, tokens, tokenIndex + 1, plainSubs, queueSubs); + } + } + + // Also check stored * and > at this level (they match any input token) + if (level.Pwc != null) + { + if (isLast) + AddNodeToResults(level.Pwc, plainSubs, queueSubs); + else + ReverseMatchLevel(level.Pwc.Next, tokens, tokenIndex + 1, plainSubs, queueSubs); + } + if (level.Fwc != null) + { + AddNodeToResults(level.Fwc, plainSubs, queueSubs); + } + } + + private static void CollectAllNodes(TrieLevel level, List plainSubs, + List> queueSubs) + { + foreach (var (_, node) in level.Nodes) + { + AddNodeToResults(node, plainSubs, queueSubs); + if (node.Next != null) + CollectAllNodes(node.Next, plainSubs, queueSubs); + } + if (level.Pwc != null) + { + AddNodeToResults(level.Pwc, plainSubs, queueSubs); + if (level.Pwc.Next != null) + CollectAllNodes(level.Pwc.Next, plainSubs, queueSubs); + } + if (level.Fwc != null) + { + AddNodeToResults(level.Fwc, plainSubs, queueSubs); + if (level.Fwc.Next != null) + CollectAllNodes(level.Fwc.Next, plainSubs, queueSubs); + } + } +``` + +12. **Remove AddToCache/RemoveFromCache/AddSubToResult** — these are no longer needed with generation-based invalidation. + +**Step 5: Run tests to verify they pass** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~SubListTests" -v normal` +Expected: All PASS + +**Step 6: Commit** + +```bash +git add src/NATS.Server/Subscriptions/SubList.cs src/NATS.Server/Subscriptions/SubListStats.cs tests/NATS.Server.Tests/SubListTests.cs +git commit -m "feat: add generation-based cache, Stats, HasInterest, NumInterest, RemoveBatch, All, ReverseMatch to SubList" +``` + +--- + +### Task 5: NatsHeaderParser — MIME Header Parsing + +**Files:** +- Create: `src/NATS.Server/Protocol/NatsHeaderParser.cs` +- Create: `tests/NATS.Server.Tests/NatsHeaderParserTests.cs` + +**Step 1: Write failing tests** + +```csharp +using NATS.Server.Protocol; + +namespace NATS.Server.Tests; + +public class NatsHeaderParserTests +{ + [Fact] + public void Parse_status_line_only() + { + var input = "NATS/1.0 503\r\n\r\n"u8; + var result = NatsHeaderParser.Parse(input); + result.Status.ShouldBe(503); + result.Description.ShouldBeEmpty(); + result.Headers.ShouldBeEmpty(); + } + + [Fact] + public void Parse_status_with_description() + { + var input = "NATS/1.0 503 No Responders\r\n\r\n"u8; + var result = NatsHeaderParser.Parse(input); + result.Status.ShouldBe(503); + result.Description.ShouldBe("No Responders"); + } + + [Fact] + public void Parse_headers_with_values() + { + var input = "NATS/1.0\r\nFoo: bar\r\nBaz: qux\r\n\r\n"u8; + var result = NatsHeaderParser.Parse(input); + result.Status.ShouldBe(0); + result.Headers["Foo"].ShouldBe(["bar"]); + result.Headers["Baz"].ShouldBe(["qux"]); + } + + [Fact] + public void Parse_multi_value_header() + { + var input = "NATS/1.0\r\nX-Tag: a\r\nX-Tag: b\r\n\r\n"u8; + var result = NatsHeaderParser.Parse(input); + result.Headers["X-Tag"].ShouldBe(["a", "b"]); + } + + [Fact] + public void Parse_invalid_returns_defaults() + { + var input = "GARBAGE\r\n\r\n"u8; + var result = NatsHeaderParser.Parse(input); + result.Status.ShouldBe(-1); + } +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~NatsHeaderParserTests" -v normal` +Expected: FAIL — type doesn't exist. + +**Step 3: Implement NatsHeaderParser** + +Create `src/NATS.Server/Protocol/NatsHeaderParser.cs`: + +```csharp +using System.Text; + +namespace NATS.Server.Protocol; + +public readonly struct NatsHeaders +{ + public int Status { get; init; } + public string Description { get; init; } + public Dictionary Headers { get; init; } + + public static readonly NatsHeaders Invalid = new() { Status = -1, Description = string.Empty, Headers = new() }; +} + +public static class NatsHeaderParser +{ + private static readonly byte[] CrLf = "\r\n"u8.ToArray(); + private static readonly byte[] Prefix = "NATS/1.0"u8.ToArray(); + + public static NatsHeaders Parse(ReadOnlySpan data) + { + if (data.Length < Prefix.Length) + return NatsHeaders.Invalid; + + // Check NATS/1.0 prefix + if (!data[..Prefix.Length].SequenceEqual(Prefix)) + return NatsHeaders.Invalid; + + int pos = Prefix.Length; + int status = 0; + string description = string.Empty; + + // Parse status line: NATS/1.0[ status[ description]]\r\n + int lineEnd = data[pos..].IndexOf(CrLf); + if (lineEnd < 0) + return NatsHeaders.Invalid; + + var statusLine = data[pos..(pos + lineEnd)]; + pos += lineEnd + 2; // skip \r\n + + if (statusLine.Length > 0) + { + // Skip leading space + int si = 0; + while (si < statusLine.Length && statusLine[si] == (byte)' ') + si++; + + // Parse status code + int numStart = si; + while (si < statusLine.Length && statusLine[si] >= (byte)'0' && statusLine[si] <= (byte)'9') + si++; + + if (si > numStart) + { + status = int.Parse(Encoding.ASCII.GetString(statusLine[numStart..si])); + + // Parse description (rest after space) + while (si < statusLine.Length && statusLine[si] == (byte)' ') + si++; + if (si < statusLine.Length) + description = Encoding.ASCII.GetString(statusLine[si..]); + } + } + + // Parse key-value headers until empty line (\r\n) + var headers = new Dictionary>(StringComparer.OrdinalIgnoreCase); + while (pos < data.Length) + { + var remaining = data[pos..]; + // Check for empty line (end of headers) + if (remaining.Length >= 2 && remaining[0] == (byte)'\r' && remaining[1] == (byte)'\n') + break; + + lineEnd = remaining.IndexOf(CrLf); + if (lineEnd < 0) + break; + + var headerLine = remaining[..lineEnd]; + pos += lineEnd + 2; + + // Split on first ':' + int colon = headerLine.IndexOf((byte)':'); + if (colon < 0) + continue; + + var key = Encoding.ASCII.GetString(headerLine[..colon]).Trim(); + var value = Encoding.ASCII.GetString(headerLine[(colon + 1)..]).Trim(); + + if (!headers.TryGetValue(key, out var values)) + { + values = []; + headers[key] = values; + } + values.Add(value); + } + + var result = new Dictionary(headers.Count, StringComparer.OrdinalIgnoreCase); + foreach (var (k, v) in headers) + result[k] = v.ToArray(); + + return new NatsHeaders + { + Status = status, + Description = description, + Headers = result, + }; + } +} +``` + +**Step 4: Run tests to verify they pass** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~NatsHeaderParserTests" -v normal` +Expected: All PASS + +**Step 5: Commit** + +```bash +git add src/NATS.Server/Protocol/NatsHeaderParser.cs tests/NATS.Server.Tests/NatsHeaderParserTests.cs +git commit -m "feat: add NatsHeaderParser for MIME header parsing" +``` + +--- + +### Task 6: PermissionLruCache — 128-Entry LRU Permission Cache + +**Files:** +- Create: `src/NATS.Server/Auth/PermissionLruCache.cs` +- Create: `tests/NATS.Server.Tests/PermissionLruCacheTests.cs` +- Modify: `src/NATS.Server/Auth/ClientPermissions.cs` + +**Step 1: Write failing tests** + +```csharp +using NATS.Server.Auth; + +namespace NATS.Server.Tests; + +public class PermissionLruCacheTests +{ + [Fact] + public void Get_returns_none_for_unknown_key() + { + var cache = new PermissionLruCache(128); + cache.TryGet("foo", out _).ShouldBeFalse(); + } + + [Fact] + public void Set_and_get_returns_value() + { + var cache = new PermissionLruCache(128); + cache.Set("foo", true); + cache.TryGet("foo", out var v).ShouldBeTrue(); + v.ShouldBeTrue(); + } + + [Fact] + public void Evicts_oldest_when_full() + { + var cache = new PermissionLruCache(3); + cache.Set("a", true); + cache.Set("b", true); + cache.Set("c", true); + cache.Set("d", true); // evicts "a" + + cache.TryGet("a", out _).ShouldBeFalse(); + cache.TryGet("b", out _).ShouldBeTrue(); + cache.TryGet("d", out _).ShouldBeTrue(); + } + + [Fact] + public void Get_promotes_to_front() + { + var cache = new PermissionLruCache(3); + cache.Set("a", true); + cache.Set("b", true); + cache.Set("c", true); + + // Access "a" to promote it + cache.TryGet("a", out _); + + cache.Set("d", true); // should evict "b" (oldest untouched) + cache.TryGet("a", out _).ShouldBeTrue(); + cache.TryGet("b", out _).ShouldBeFalse(); + } +} +``` + +**Step 2: Run to verify fail** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~PermissionLruCacheTests" -v normal` + +**Step 3: Implement** + +Create `src/NATS.Server/Auth/PermissionLruCache.cs`: + +```csharp +namespace NATS.Server.Auth; + +/// +/// Fixed-capacity LRU cache for permission results. +/// Lock-protected (per-client, low contention). +/// Reference: Go client.go maxPermCacheSize=128. +/// +public sealed class PermissionLruCache +{ + private readonly int _capacity; + private readonly Dictionary> _map; + private readonly LinkedList<(string Key, bool Value)> _list = new(); + private readonly object _lock = new(); + + public PermissionLruCache(int capacity = 128) + { + _capacity = capacity; + _map = new Dictionary>(capacity, StringComparer.Ordinal); + } + + public bool TryGet(string key, out bool value) + { + lock (_lock) + { + if (_map.TryGetValue(key, out var node)) + { + value = node.Value.Value; + _list.Remove(node); + _list.AddFirst(node); + return true; + } + value = default; + return false; + } + } + + public void Set(string key, bool value) + { + lock (_lock) + { + if (_map.TryGetValue(key, out var existing)) + { + _list.Remove(existing); + existing.Value = (key, value); + _list.AddFirst(existing); + return; + } + + if (_map.Count >= _capacity) + { + var last = _list.Last!; + _map.Remove(last.Value.Key); + _list.RemoveLast(); + } + + var node = new LinkedListNode<(string Key, bool Value)>((key, value)); + _list.AddFirst(node); + _map[key] = node; + } + } +} +``` + +**Step 4: Wire into ClientPermissions — replace ConcurrentDictionary** + +In `ClientPermissions.cs`: +- Replace `private readonly ConcurrentDictionary _pubCache = new(StringComparer.Ordinal);` + with `private readonly PermissionLruCache _pubCache = new(128);` +- Change `IsPublishAllowed`: + +```csharp + public bool IsPublishAllowed(string subject) + { + if (_publish == null) + return true; + + if (_pubCache.TryGet(subject, out var cached)) + return cached; + + var allowed = _publish.IsAllowed(subject); + _pubCache.Set(subject, allowed); + return allowed; + } +``` + +**Step 5: Run tests** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~PermissionLruCacheTests" -v normal` +Expected: All PASS + +**Step 6: Commit** + +```bash +git add src/NATS.Server/Auth/PermissionLruCache.cs src/NATS.Server/Auth/ClientPermissions.cs tests/NATS.Server.Tests/PermissionLruCacheTests.cs +git commit -m "feat: add PermissionLruCache (128-entry LRU) and wire into ClientPermissions" +``` + +--- + +### Task 7: Account Limits and AccountConfig + +**Files:** +- Modify: `src/NATS.Server/Auth/Account.cs` +- Create: `src/NATS.Server/Auth/AccountConfig.cs` +- Modify: `src/NATS.Server/NatsOptions.cs` +- Modify: `src/NATS.Server/NatsServer.cs` +- Modify: `src/NATS.Server/Auth/AuthService.cs` +- Create: `tests/NATS.Server.Tests/AccountTests.cs` + +**Step 1: Write failing tests** + +Create `tests/NATS.Server.Tests/AccountTests.cs`: + +```csharp +using NATS.Server.Auth; + +namespace NATS.Server.Tests; + +public class AccountTests +{ + [Fact] + public void Account_enforces_max_connections() + { + var acc = new Account("test") { MaxConnections = 2 }; + acc.AddClient(1).ShouldBeTrue(); + acc.AddClient(2).ShouldBeTrue(); + acc.AddClient(3).ShouldBeFalse(); // exceeds limit + acc.ClientCount.ShouldBe(2); + } + + [Fact] + public void Account_unlimited_connections_when_zero() + { + var acc = new Account("test") { MaxConnections = 0 }; + acc.AddClient(1).ShouldBeTrue(); + acc.AddClient(2).ShouldBeTrue(); + } + + [Fact] + public void Account_enforces_max_subscriptions() + { + var acc = new Account("test") { MaxSubscriptions = 2 }; + acc.IncrementSubscriptions().ShouldBeTrue(); + acc.IncrementSubscriptions().ShouldBeTrue(); + acc.IncrementSubscriptions().ShouldBeFalse(); + } + + [Fact] + public void Account_decrement_subscriptions() + { + var acc = new Account("test") { MaxSubscriptions = 1 }; + acc.IncrementSubscriptions().ShouldBeTrue(); + acc.DecrementSubscriptions(); + acc.IncrementSubscriptions().ShouldBeTrue(); // slot freed + } +} +``` + +**Step 2: Implement Account changes** + +Modify `Account.cs`: + +```csharp +using System.Collections.Concurrent; +using NATS.Server.Subscriptions; + +namespace NATS.Server.Auth; + +public sealed class Account : IDisposable +{ + public const string GlobalAccountName = "$G"; + + public string Name { get; } + public SubList SubList { get; } = new(); + public Permissions? DefaultPermissions { get; set; } + public int MaxConnections { get; set; } // 0 = unlimited + public int MaxSubscriptions { get; set; } // 0 = unlimited + + private readonly ConcurrentDictionary _clients = new(); + private int _subscriptionCount; + + public Account(string name) + { + Name = name; + } + + public int ClientCount => _clients.Count; + public int SubscriptionCount => Volatile.Read(ref _subscriptionCount); + + /// Returns false if max connections exceeded. + public bool AddClient(ulong clientId) + { + if (MaxConnections > 0 && _clients.Count >= MaxConnections) + return false; + _clients[clientId] = 0; + return true; + } + + public void RemoveClient(ulong clientId) => _clients.TryRemove(clientId, out _); + + public bool IncrementSubscriptions() + { + if (MaxSubscriptions > 0 && Volatile.Read(ref _subscriptionCount) >= MaxSubscriptions) + return false; + Interlocked.Increment(ref _subscriptionCount); + return true; + } + + public void DecrementSubscriptions() + { + Interlocked.Decrement(ref _subscriptionCount); + } + + public void Dispose() => SubList.Dispose(); +} +``` + +**Step 3: Create AccountConfig** + +Create `src/NATS.Server/Auth/AccountConfig.cs`: + +```csharp +namespace NATS.Server.Auth; + +public sealed class AccountConfig +{ + public int MaxConnections { get; init; } // 0 = unlimited + public int MaxSubscriptions { get; init; } // 0 = unlimited + public Permissions? DefaultPermissions { get; init; } +} +``` + +**Step 4: Add Accounts to NatsOptions** + +Add to `NatsOptions.cs` after `NoAuthUser`: + +```csharp + // Account configuration + public Dictionary? Accounts { get; set; } +``` + +**Step 5: Wire account limits in NatsServer.GetOrCreateAccount** + +Modify `GetOrCreateAccount` in `NatsServer.cs`: + +```csharp + public Account GetOrCreateAccount(string name) + { + return _accounts.GetOrAdd(name, n => + { + var acc = new Account(n); + if (_options.Accounts != null && _options.Accounts.TryGetValue(n, out var config)) + { + acc.MaxConnections = config.MaxConnections; + acc.MaxSubscriptions = config.MaxSubscriptions; + acc.DefaultPermissions = config.DefaultPermissions; + } + return acc; + }); + } +``` + +**Step 6: Update Account.AddClient callers to check return value** + +In `NatsClient.ProcessConnectAsync()`, after `Account.AddClient(Id)`: + +```csharp + if (!Account.AddClient(Id)) + { + Account = null; + await SendErrAndCloseAsync("maximum connections for account exceeded", + ClientClosedReason.AuthenticationViolation); + return; + } +``` + +Do the same for the no-auth path below. + +**Step 7: Run tests** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~AccountTests" -v normal` +Expected: All PASS + +**Step 8: Commit** + +```bash +git add src/NATS.Server/Auth/Account.cs src/NATS.Server/Auth/AccountConfig.cs src/NATS.Server/NatsOptions.cs src/NATS.Server/NatsServer.cs src/NATS.Server/NatsClient.cs tests/NATS.Server.Tests/AccountTests.cs +git commit -m "feat: add per-account connection/subscription limits with AccountConfig" +``` + +--- + +### Task 8: MaxSubs Enforcement, Subscribe Deny Queue, Delivery-Time Deny + +**Files:** +- Modify: `src/NATS.Server/NatsClient.cs` (ProcessSub for MaxSubs + account sub limits) +- Modify: `src/NATS.Server/NatsServer.cs` (DeliverMessage for deny filtering + auto-unsub cleanup) +- Modify: `src/NATS.Server/Auth/ClientPermissions.cs` (IsDeliveryAllowed, queue deny) +- Modify: `src/NATS.Server/Protocol/NatsProtocol.cs` (add error string) + +**Step 1: Add error constant** + +In `NatsProtocol.cs`, add: +```csharp + public const string ErrMaxSubscriptionsExceeded = "Maximum Subscriptions Exceeded"; +``` + +**Step 2: Enforce MaxSubs in ProcessSub** + +In `NatsClient.ProcessSub()`, after the permission check and before creating the subscription: + +```csharp + // Per-connection subscription limit + if (_options.MaxSubs > 0 && _subs.Count >= _options.MaxSubs) + { + _logger.LogDebug("Client {ClientId} max subscriptions exceeded", Id); + _ = SendErrAndCloseAsync(NatsProtocol.ErrMaxSubscriptionsExceeded, + ClientClosedReason.MaxSubscriptionsExceeded); + return; + } + + // Per-account subscription limit + if (Account != null && !Account.IncrementSubscriptions()) + { + _logger.LogDebug("Client {ClientId} account subscription limit exceeded", Id); + _ = SendErrAndCloseAsync(NatsProtocol.ErrMaxSubscriptionsExceeded, + ClientClosedReason.MaxSubscriptionsExceeded); + return; + } +``` + +**Step 3: Add IsDeliveryAllowed to ClientPermissions** + +In `ClientPermissions.cs`, add: + +```csharp + /// + /// Checks whether a message on this subject should be delivered to this client. + /// Evaluates the subscribe deny list (msg delivery filter). + /// + public bool IsDeliveryAllowed(string subject) + { + if (_subscribe == null) + return true; + return _subscribe.IsDeliveryAllowed(subject); + } +``` + +Add to `PermissionSet`: + +```csharp + /// Checks deny list only — for delivery-time filtering. + public bool IsDeliveryAllowed(string subject) + { + if (_deny == null) + return true; + var result = _deny.Match(subject); + return result.PlainSubs.Length == 0 && result.QueueSubs.Length == 0; + } +``` + +**Step 4: Fix IsSubscribeAllowed to check queue against deny list** + +```csharp + public bool IsSubscribeAllowed(string subject, string? queue = null) + { + if (_subscribe == null) + return true; + + if (!_subscribe.IsAllowed(subject)) + return false; + + // Check queue group against subscribe deny list + if (queue != null && _subscribe.IsDenied(queue)) + return false; + + return true; + } +``` + +Add `IsDenied` to `PermissionSet`: + +```csharp + public bool IsDenied(string subject) + { + if (_deny == null) return false; + var result = _deny.Match(subject); + return result.PlainSubs.Length > 0 || result.QueueSubs.Length > 0; + } +``` + +**Step 5: Update DeliverMessage for deny filtering + auto-unsub cleanup** + +In `NatsServer.DeliverMessage()`, replace the method: + +```csharp + private void DeliverMessage(Subscription sub, string subject, string? replyTo, + ReadOnlyMemory headers, ReadOnlyMemory payload) + { + var client = sub.Client; + if (client == null) return; + + // Check auto-unsub + var count = Interlocked.Increment(ref sub.MessageCount); + if (sub.MaxMessages > 0 && count > sub.MaxMessages) + { + // Clean up exhausted subscription from trie and client tracking + var subList = client.Account?.SubList ?? _globalAccount.SubList; + subList.Remove(sub); + client.RemoveSubscription(sub.Sid); + return; + } + + // Deny-list delivery filter + if (client.Permissions?.IsDeliveryAllowed(subject) == false) + return; + + client.SendMessage(subject, sub.Sid, replyTo, headers, payload); + } +``` + +**Step 6: Add RemoveSubscription to NatsClient** + +Add public method to `NatsClient`: + +```csharp + public void RemoveSubscription(string sid) + { + if (_subs.Remove(sid)) + Account?.DecrementSubscriptions(); + } +``` + +**Step 7: Also decrement account subs in ProcessUnsub** + +In `ProcessUnsub`, after `_subs.Remove(cmd.Sid!);`: +```csharp + Account?.DecrementSubscriptions(); +``` + +**Step 8: Also add Permissions property to NatsClient (public accessor)** + +```csharp + public ClientPermissions? Permissions => _permissions; +``` + +**Step 9: Run all tests** + +Run: `dotnet test tests/NATS.Server.Tests -v normal` +Expected: All PASS + +**Step 10: Commit** + +```bash +git add src/NATS.Server/NatsClient.cs src/NATS.Server/NatsServer.cs src/NATS.Server/Auth/ClientPermissions.cs src/NATS.Server/Protocol/NatsProtocol.cs +git commit -m "feat: add MaxSubs enforcement, delivery-time deny filtering, auto-unsub cleanup" +``` + +--- + +### Task 9: Response Permissions (Reply Tracking) + +**Files:** +- Create: `src/NATS.Server/Auth/ResponseTracker.cs` +- Modify: `src/NATS.Server/Auth/ClientPermissions.cs` +- Modify: `src/NATS.Server/NatsClient.cs` +- Create: `tests/NATS.Server.Tests/ResponseTrackerTests.cs` + +**Step 1: Write failing tests** + +```csharp +using NATS.Server.Auth; + +namespace NATS.Server.Tests; + +public class ResponseTrackerTests +{ + [Fact] + public void Allows_reply_subject_after_registration() + { + var tracker = new ResponseTracker(maxMsgs: 1, expires: TimeSpan.FromMinutes(5)); + tracker.RegisterReply("_INBOX.abc123"); + tracker.IsReplyAllowed("_INBOX.abc123").ShouldBeTrue(); + } + + [Fact] + public void Denies_unknown_reply_subject() + { + var tracker = new ResponseTracker(maxMsgs: 1, expires: TimeSpan.FromMinutes(5)); + tracker.IsReplyAllowed("_INBOX.unknown").ShouldBeFalse(); + } + + [Fact] + public void Enforces_max_messages() + { + var tracker = new ResponseTracker(maxMsgs: 2, expires: TimeSpan.FromMinutes(5)); + tracker.RegisterReply("_INBOX.abc"); + tracker.IsReplyAllowed("_INBOX.abc").ShouldBeTrue(); + tracker.IsReplyAllowed("_INBOX.abc").ShouldBeTrue(); + tracker.IsReplyAllowed("_INBOX.abc").ShouldBeFalse(); // exceeded + } + + [Fact] + public void Enforces_expiry() + { + var tracker = new ResponseTracker(maxMsgs: 0, expires: TimeSpan.FromMilliseconds(1)); + tracker.RegisterReply("_INBOX.abc"); + Thread.Sleep(50); + tracker.IsReplyAllowed("_INBOX.abc").ShouldBeFalse(); + } + + [Fact] + public void Prune_removes_expired() + { + var tracker = new ResponseTracker(maxMsgs: 0, expires: TimeSpan.FromMilliseconds(1)); + tracker.RegisterReply("_INBOX.a"); + tracker.RegisterReply("_INBOX.b"); + Thread.Sleep(50); + tracker.Prune(); + tracker.Count.ShouldBe(0); + } +} +``` + +**Step 2: Implement ResponseTracker** + +Create `src/NATS.Server/Auth/ResponseTracker.cs`: + +```csharp +namespace NATS.Server.Auth; + +/// +/// Tracks reply subjects that a client is temporarily allowed to publish to. +/// Reference: Go client.go resp struct, setResponsePermissionIfNeeded. +/// +public sealed class ResponseTracker +{ + private readonly int _maxMsgs; // 0 = unlimited + private readonly TimeSpan _expires; // TimeSpan.Zero = no TTL + private readonly Dictionary _replies = new(StringComparer.Ordinal); + private readonly object _lock = new(); + + public ResponseTracker(int maxMsgs, TimeSpan expires) + { + _maxMsgs = maxMsgs; + _expires = expires; + } + + public int Count + { + get { lock (_lock) return _replies.Count; } + } + + public void RegisterReply(string replySubject) + { + lock (_lock) + { + _replies[replySubject] = (DateTime.UtcNow, 0); + } + } + + /// + /// Returns true if the reply subject is allowed and increments the count. + /// Returns false if unknown, expired, or count exhausted. + /// + public bool IsReplyAllowed(string subject) + { + lock (_lock) + { + if (!_replies.TryGetValue(subject, out var entry)) + return false; + + // Check expiry + if (_expires > TimeSpan.Zero && DateTime.UtcNow - entry.RegisteredAt > _expires) + { + _replies.Remove(subject); + return false; + } + + // Check max messages + var newCount = entry.Count + 1; + if (_maxMsgs > 0 && newCount > _maxMsgs) + { + _replies.Remove(subject); + return false; + } + + _replies[subject] = (entry.RegisteredAt, newCount); + return true; + } + } + + public void Prune() + { + lock (_lock) + { + if (_expires <= TimeSpan.Zero && _maxMsgs <= 0) + return; + + var now = DateTime.UtcNow; + var toRemove = new List(); + foreach (var (key, entry) in _replies) + { + if (_expires > TimeSpan.Zero && now - entry.RegisteredAt > _expires) + toRemove.Add(key); + else if (_maxMsgs > 0 && entry.Count >= _maxMsgs) + toRemove.Add(key); + } + foreach (var key in toRemove) + _replies.Remove(key); + } + } +} +``` + +**Step 3: Wire into ClientPermissions.Build** + +In `ClientPermissions.cs`, add field and modify constructor/build: + +```csharp + private readonly ResponseTracker? _responseTracker; + + private ClientPermissions(PermissionSet? publish, PermissionSet? subscribe, ResponseTracker? responseTracker) + { + _publish = publish; + _subscribe = subscribe; + _responseTracker = responseTracker; + } + + public static ClientPermissions? Build(Permissions? permissions) + { + if (permissions == null) + return null; + + var pub = PermissionSet.Build(permissions.Publish); + var sub = PermissionSet.Build(permissions.Subscribe); + ResponseTracker? responseTracker = null; + if (permissions.Response != null) + responseTracker = new ResponseTracker(permissions.Response.MaxMsgs, permissions.Response.Expires); + + if (pub == null && sub == null && responseTracker == null) + return null; + + return new ClientPermissions(pub, sub, responseTracker); + } + + public ResponseTracker? ResponseTracker => _responseTracker; +``` + +**Step 4: Modify IsPublishAllowed to check response tracker** + +```csharp + public bool IsPublishAllowed(string subject) + { + if (_publish == null) + return true; + + if (_pubCache.TryGet(subject, out var cached)) + return cached; + + var allowed = _publish.IsAllowed(subject); + + // If denied but response tracking is enabled, check reply table + if (!allowed && _responseTracker != null) + { + if (_responseTracker.IsReplyAllowed(subject)) + return true; // Don't cache dynamic reply permissions + } + + _pubCache.Set(subject, allowed); + return allowed; + } +``` + +**Step 5: Register reply subjects in NatsServer.DeliverMessage** + +After `client.SendMessage(...)` in `DeliverMessage()`: + +```csharp + // Track reply subject for response permissions + if (replyTo != null && client.Permissions?.ResponseTracker != null) + { + if (client.Permissions.IsPublishAllowed(replyTo) == false) + client.Permissions.ResponseTracker.RegisterReply(replyTo); + } +``` + +**Step 6: Run tests** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ResponseTrackerTests" -v normal` +Expected: All PASS + +**Step 7: Commit** + +```bash +git add src/NATS.Server/Auth/ResponseTracker.cs src/NATS.Server/Auth/ClientPermissions.cs src/NATS.Server/NatsServer.cs src/NATS.Server/NatsClient.cs tests/NATS.Server.Tests/ResponseTrackerTests.cs +git commit -m "feat: add response permission tracking for dynamic reply subject authorization" +``` + +--- + +### Task 10: Auth Expiry Enforcement + +**Files:** +- Modify: `src/NATS.Server/NatsClient.cs` +- Modify: `src/NATS.Server/ClientClosedReason.cs` + +**Step 1: Add AuthenticationExpired close reason** + +In `ClientClosedReason.cs`, add after `NoRespondersRequiresHeaders`: +```csharp + AuthenticationExpired, +``` + +Add to the extension method: +```csharp + ClientClosedReason.AuthenticationExpired => "Authentication Expired", +``` + +**Step 2: Implement expiry timer in ProcessConnectAsync** + +In `NatsClient.ProcessConnectAsync()`, after `_flags.SetFlag(ClientFlags.ConnectProcessFinished);`, add: + +```csharp + // Start auth expiry timer if needed + if (_authService.IsAuthRequired && result?.Expiry is { } expiry) + { + var remaining = expiry - DateTimeOffset.UtcNow; + if (remaining > TimeSpan.Zero) + { + _ = Task.Run(async () => + { + try + { + await Task.Delay(remaining, _clientCts!.Token); + _logger.LogDebug("Client {ClientId} authentication expired", Id); + await SendErrAndCloseAsync("Authentication Expired", + ClientClosedReason.AuthenticationExpired); + } + catch (OperationCanceledException) { } + }, _clientCts!.Token); + } + else + { + await SendErrAndCloseAsync("Authentication Expired", + ClientClosedReason.AuthenticationExpired); + return; + } + } +``` + +Note: we need to capture `result` from the auth block so it's accessible. Move the `AuthResult? result = null` declaration to before the auth-required check and store it. + +**Step 3: Run all tests** + +Run: `dotnet test tests/NATS.Server.Tests -v normal` +Expected: All PASS + +**Step 4: Commit** + +```bash +git add src/NATS.Server/NatsClient.cs src/NATS.Server/ClientClosedReason.cs +git commit -m "feat: add auth expiry enforcement timer" +``` + +--- + +### Task 11: INFO Serialization Caching + +**Files:** +- Modify: `src/NATS.Server/NatsServer.cs` +- Modify: `src/NATS.Server/NatsClient.cs` + +**Step 1: Add cached INFO bytes to NatsServer** + +In `NatsServer`, add field: +```csharp + private byte[] _cachedInfoLine = []; +``` + +In `StartAsync()`, after `_serverInfo.Port = actualPort;` (line 294), add: +```csharp + BuildCachedInfo(); +``` + +Also call `BuildCachedInfo()` at the end of the constructor (after `_serverInfo` is fully populated, line 263): +```csharp + BuildCachedInfo(); +``` + +Add the method: +```csharp + private void BuildCachedInfo() + { + var infoJson = System.Text.Json.JsonSerializer.Serialize(_serverInfo); + _cachedInfoLine = Encoding.ASCII.GetBytes($"INFO {infoJson}\r\n"); + } + + public byte[] CachedInfoLine => _cachedInfoLine; +``` + +**Step 2: Modify NatsClient.SendInfo to use cached bytes** + +In `NatsClient.cs`, replace `SendInfo()` method: + +```csharp + private void SendInfo() + { + if (Router is NatsServer server) + { + QueueOutbound(server.CachedInfoLine); + } + else + { + // Fallback for tests or non-server contexts + var infoJson = System.Text.Json.JsonSerializer.Serialize(_serverInfo); + var infoLine = Encoding.ASCII.GetBytes($"INFO {infoJson}\r\n"); + QueueOutbound(infoLine); + } + } +``` + +For NKey connections, the per-connection INFO (with nonce) is already built in `AcceptClientAsync` and passed via `_serverInfo` constructor param — that path serializes fresh per connection which is correct. + +**Step 3: Run all tests** + +Run: `dotnet test tests/NATS.Server.Tests -v normal` +Expected: All PASS + +**Step 4: Commit** + +```bash +git add src/NATS.Server/NatsServer.cs src/NATS.Server/NatsClient.cs +git commit -m "feat: cache INFO serialization — build once at startup instead of per-connection" +``` + +--- + +### Task 12: Protocol Tracing + +**Files:** +- Modify: `src/NATS.Server/Protocol/NatsParser.cs` +- Modify: `src/NATS.Server/NatsClient.cs` + +**Step 1: Add trace logging to NatsParser** + +Add `ILogger?` parameter to NatsParser constructor: + +```csharp + private readonly ILogger? _logger; + + public NatsParser(int maxPayload = NatsProtocol.MaxPayloadSize, ILogger? logger = null) + { + _maxPayload = maxPayload; + _logger = logger; + } +``` + +Add trace method: + +```csharp + private void TraceInOp(string op, ReadOnlySpan arg = default) + { + if (_logger == null || !_logger.IsEnabled(LogLevel.Trace)) + return; + if (arg.IsEmpty) + _logger.LogTrace("<<- {Op}", op); + else + _logger.LogTrace("<<- {Op} {Arg}", op, Encoding.ASCII.GetString(arg)); + } +``` + +Add `using Microsoft.Extensions.Logging;` at top. + +Call `TraceInOp` after each command dispatch in `TryParse`: +- After PING: `TraceInOp("PING");` +- After PONG: `TraceInOp("PONG");` +- After PUB return: `TraceInOp("PUB", argsSpan);` (inside ParsePub before return) +- After HPUB: `TraceInOp("HPUB", argsSpan);` +- After SUB: `TraceInOp("SUB", argsSpan);` +- After UNSUB: `TraceInOp("UNSUB", argsSpan);` +- After CONNECT: `TraceInOp("CONNECT");` +- After INFO: `TraceInOp("INFO");` +- After +OK: `TraceInOp("+OK");` +- After -ERR: `TraceInOp("-ERR");` + +**Step 2: Pass logger to NatsParser in NatsClient constructor** + +In `NatsClient` constructor, change: +```csharp + _parser = new NatsParser(options.MaxPayload, options.Trace ? logger : null); +``` + +**Step 3: Run all tests** + +Run: `dotnet test tests/NATS.Server.Tests -v normal` +Expected: All PASS (parser tests create NatsParser without logger, which is fine — logger is optional) + +**Step 4: Commit** + +```bash +git add src/NATS.Server/Protocol/NatsParser.cs src/NATS.Server/NatsClient.cs +git commit -m "feat: add protocol tracing (<<- op arg) at LogLevel.Trace" +``` + +--- + +### Task 13: MSG/HMSG Construction Optimization + +**Files:** +- Modify: `src/NATS.Server/NatsClient.cs` (SendMessage method) + +**Step 1: Optimize SendMessage to use Span-based construction** + +Replace the `SendMessage` method body with buffer-based construction: + +```csharp + public void SendMessage(string subject, string sid, string? replyTo, + ReadOnlyMemory headers, ReadOnlyMemory payload) + { + Interlocked.Increment(ref OutMsgs); + Interlocked.Add(ref OutBytes, payload.Length + headers.Length); + Interlocked.Increment(ref _serverStats.OutMsgs); + Interlocked.Add(ref _serverStats.OutBytes, payload.Length + headers.Length); + + // Estimate control line size: "MSG subject sid [reply] size\r\n" or HMSG variant + // Max: "HMSG " + subject + " " + sid + " " + reply + " " + hdrSize + " " + totalSize + "\r\n" + var estimatedLineSize = 5 + subject.Length + 1 + sid.Length + 1 + + (replyTo != null ? replyTo.Length + 1 : 0) + 20 + 2; // 20 for numbers + \r\n + + var totalPayloadLen = headers.Length + payload.Length; + var totalLen = estimatedLineSize + totalPayloadLen + 2; // trailing \r\n + var buffer = new byte[totalLen]; + var span = buffer.AsSpan(); + int pos = 0; + + // Write prefix + if (headers.Length > 0) + { + "HMSG "u8.CopyTo(span); + pos = 5; + } + else + { + "MSG "u8.CopyTo(span); + pos = 4; + } + + // Subject + pos += Encoding.ASCII.GetBytes(subject, span[pos..]); + span[pos++] = (byte)' '; + + // SID + pos += Encoding.ASCII.GetBytes(sid, span[pos..]); + span[pos++] = (byte)' '; + + // Reply-to + if (replyTo != null) + { + pos += Encoding.ASCII.GetBytes(replyTo, span[pos..]); + span[pos++] = (byte)' '; + } + + // Sizes + if (headers.Length > 0) + { + int totalSize = headers.Length + payload.Length; + headers.Length.TryFormat(span[pos..], out int written); + pos += written; + span[pos++] = (byte)' '; + totalSize.TryFormat(span[pos..], out written); + pos += written; + } + else + { + payload.Length.TryFormat(span[pos..], out int written); + pos += written; + } + + // CRLF + span[pos++] = (byte)'\r'; + span[pos++] = (byte)'\n'; + + // Headers + payload + trailing CRLF + if (headers.Length > 0) + { + headers.Span.CopyTo(span[pos..]); + pos += headers.Length; + } + if (payload.Length > 0) + { + payload.Span.CopyTo(span[pos..]); + pos += payload.Length; + } + span[pos++] = (byte)'\r'; + span[pos++] = (byte)'\n'; + + QueueOutbound(buffer.AsMemory(0, pos)); + } +``` + +**Step 2: Run all tests** + +Run: `dotnet test tests/NATS.Server.Tests -v normal` +Expected: All PASS + +**Step 3: Commit** + +```bash +git add src/NATS.Server/NatsClient.cs +git commit -m "feat: optimize MSG/HMSG construction using Span-based buffer writes" +``` + +--- + +### Task 14: Verify, Run Full Test Suite, Update differences.md + +**Files:** +- Run: full test suite +- Modify: `differences.md` (sections 3-6) + +**Step 1: Build entire solution** + +Run: `dotnet build` +Expected: Success with no errors + +**Step 2: Run all tests** + +Run: `dotnet test -v normal` +Expected: All PASS + +**Step 3: Update differences.md sections 3-6** + +Update each table row where a feature has been implemented, changing `N` to `Y` and updating the Notes column. Key changes: + +- Section 3: INFO serialization → "Y (cached at startup)", protocol tracing → "Y", MIME header parsing → "Y", MSG/HMSG construction → "Y (Span-based)" +- Section 4: Stats → "Y", HasInterest → "Y", NumInterest → "Y", ReverseMatch → "Y", RemoveBatch → "Y", All → "Y", atomic generation ID → "Y", SubjectsCollide → "Y", tokenAt/numTokens → "Y", UTF-8 validation → "Y", per-account subscription limits → "Y", permission caching → "Y (128-entry LRU)" +- Section 5: Deny enforcement at delivery → "Y", permission caching → "Y", queue deny → "Y", response permissions → "Y", per-account connection limits → "Y", per-account subscription limits → "Y", auth expiry → "Y", auto-unsub cleanup → "Y", multi-account user resolution → "Y" +- Section 6: -D/-V/-DV → "Y", MaxSubs → "Y", MaxSubTokens → "Y", Debug/Trace → "Y", LogFile → "Y", Tags → "Y" +- Summary section: remove items that are now implemented + +**Step 4: Commit** + +```bash +git add differences.md +git commit -m "docs: update differences.md sections 3-6 to reflect implemented features" +``` + +--- + +## Parallelization Strategy + +The following tasks can be safely executed in parallel (they touch different files): + +**Batch 1 (all independent, different files):** +- Task 1: NatsOptions fields +- Task 3: SubjectMatch utilities +- Task 5: NatsHeaderParser +- Task 6: PermissionLruCache + +**Batch 2 (depend on Batch 1 outputs):** +- Task 2: CLI flags (depends on Task 1 for NatsOptions fields) +- Task 4: SubList overhaul (independent) +- Task 7: Account limits (depends on Task 1 for NatsOptions.Accounts) + +**Batch 3 (depend on Batch 2):** +- Task 8: MaxSubs/deny/delivery (depends on Tasks 6, 7) +- Task 9: Response permissions (depends on Task 6) + +**Batch 4 (depend on earlier tasks):** +- Task 10: Auth expiry (light, independent) +- Task 11: INFO caching (independent) +- Task 12: Protocol tracing (depends on Task 1 for Trace option) +- Task 13: MSG optimization (independent) + +**Final:** +- Task 14: Verify and update docs (depends on all) diff --git a/docs/plans/2026-02-23-sections3-6-gaps-plan.md.tasks.json b/docs/plans/2026-02-23-sections3-6-gaps-plan.md.tasks.json new file mode 100644 index 0000000..52f04ae --- /dev/null +++ b/docs/plans/2026-02-23-sections3-6-gaps-plan.md.tasks.json @@ -0,0 +1,20 @@ +{ + "planPath": "docs/plans/2026-02-23-sections3-6-gaps-plan.md", + "tasks": [ + {"id": 6, "subject": "Task 1: NatsOptions — Add New Configuration Fields", "status": "pending"}, + {"id": 7, "subject": "Task 2: CLI Flags — Add -D/-V/-DV and Logging Options", "status": "pending", "blockedBy": [6]}, + {"id": 8, "subject": "Task 3: SubjectMatch Utilities", "status": "pending"}, + {"id": 9, "subject": "Task 4: SubList — Generation ID, Stats, and Utility Methods", "status": "pending"}, + {"id": 10, "subject": "Task 5: NatsHeaderParser — MIME Header Parsing", "status": "pending"}, + {"id": 11, "subject": "Task 6: PermissionLruCache — 128-Entry LRU", "status": "pending"}, + {"id": 12, "subject": "Task 7: Account Limits and AccountConfig", "status": "pending", "blockedBy": [6]}, + {"id": 13, "subject": "Task 8: MaxSubs Enforcement, Subscribe Deny Queue, Delivery-Time Deny", "status": "pending", "blockedBy": [11, 12]}, + {"id": 14, "subject": "Task 9: Response Permissions (Reply Tracking)", "status": "pending", "blockedBy": [11, 13]}, + {"id": 15, "subject": "Task 10: Auth Expiry Enforcement", "status": "pending"}, + {"id": 16, "subject": "Task 11: INFO Serialization Caching", "status": "pending"}, + {"id": 17, "subject": "Task 12: Protocol Tracing", "status": "pending", "blockedBy": [6]}, + {"id": 18, "subject": "Task 13: MSG/HMSG Construction Optimization", "status": "pending"}, + {"id": 19, "subject": "Task 14: Verify, Run Full Test Suite, Update differences.md", "status": "pending", "blockedBy": [13, 14, 15, 16, 17, 18]} + ], + "lastUpdated": "2026-02-23T00:00:00Z" +}