From d1f22255d750eca29aded0a92a69a84979ee12c2 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 13 Mar 2026 10:08:50 -0400 Subject: [PATCH] docs: record SubList allocation strategy --- Documentation/Subscriptions/SubList.md | 285 +++++++----------- .../CorePubSub/SubListMatchBenchmarks.cs | 112 +++++++ .../NATS.Server.Benchmark.Tests.csproj | 4 + 3 files changed, 231 insertions(+), 170 deletions(-) create mode 100644 tests/NATS.Server.Benchmark.Tests/CorePubSub/SubListMatchBenchmarks.cs diff --git a/Documentation/Subscriptions/SubList.md b/Documentation/Subscriptions/SubList.md index f035c30..5c72161 100644 --- a/Documentation/Subscriptions/SubList.md +++ b/Documentation/Subscriptions/SubList.md @@ -1,6 +1,6 @@ # SubList -`SubList` is the subscription routing trie. Every published message triggers a `Match()` call to find all interested subscribers. `SubList` stores subscriptions indexed by their subject tokens and returns a `SubListResult` containing both plain subscribers and queue groups. +`SubList` is the subscription routing trie for the core server. Every publish path calls `Match()` to find the local plain subscribers and queue groups interested in a subject. The type also tracks remote route and gateway interest so clustering code can answer `HasRemoteInterest(...)` and `MatchRemote(...)` queries without a second routing structure. Go reference: `golang/nats-server/server/sublist.go` @@ -8,32 +8,30 @@ Go reference: `golang/nats-server/server/sublist.go` ## Thread Safety -`SubList` uses a `ReaderWriterLockSlim` (`_lock`) with the following locking discipline: +`SubList` uses a single `ReaderWriterLockSlim` (`_lock`) to protect trie mutation, remote-interest bookkeeping, and cache state. | Operation | Lock | |-----------|------| -| `Count` read | Read lock | -| `Match()` — cache hit | Read lock only | -| `Match()` — cache miss | Write lock (to update cache) | -| `Insert()` | Write lock | -| `Remove()` | Write lock | +| Cache hit in `Match()` | Read lock | +| Cache miss in `Match()` | Write lock | +| `Insert()` / `Remove()` / `RemoveBatch()` | Write lock | +| Remote-interest mutation (`ApplyRemoteSub`, `UpdateRemoteQSub`, cleanup) | Write lock | +| Read-only queries (`Count`, `HasRemoteInterest`, `MatchRemote`, `Stats`) | Read lock | -Cache misses in `Match()` require a write lock because the cache must be updated after the trie traversal. To avoid a race between the read-lock check and the write-lock update, `Match()` uses double-checked locking: after acquiring the write lock, it checks the cache again before doing trie work. +`Match()` uses generation-based double-checked locking. It first checks the cache under a read lock, then retries under the write lock before traversing the trie and updating the cache. --- ## Trie Structure -The trie is built from two private classes, `TrieLevel` and `TrieNode`, nested inside `SubList`. - -### `TrieLevel` and `TrieNode` +The trie is built from `TrieLevel` and `TrieNode`: ```csharp private sealed class TrieLevel { public readonly Dictionary Nodes = new(StringComparer.Ordinal); - public TrieNode? Pwc; // partial wildcard (*) - public TrieNode? Fwc; // full wildcard (>) + public TrieNode? Pwc; + public TrieNode? Fwc; } private sealed class TrieNode @@ -41,202 +39,149 @@ private sealed class TrieNode public TrieLevel? Next; public readonly HashSet PlainSubs = []; public readonly Dictionary> QueueSubs = new(StringComparer.Ordinal); - - public bool IsEmpty => PlainSubs.Count == 0 && QueueSubs.Count == 0 && - (Next == null || (Next.Nodes.Count == 0 && Next.Pwc == null && Next.Fwc == null)); + public bool PackedListEnabled; } ``` -Each level in the trie represents one token position in a subject. A `TrieLevel` holds: +- `Nodes` stores literal-token edges by exact token string. +- `Pwc` stores the `*` edge for the current token position. +- `Fwc` stores the `>` edge for the current token position. +- `PlainSubs` stores non-queue subscriptions attached to the terminal node. +- `QueueSubs` groups queue subscriptions by queue name at the terminal node. -- `Nodes` — a dictionary keyed by literal token string, mapping to the `TrieNode` for that token. Uses `StringComparer.Ordinal` for performance. -- `Pwc` — the node for the `*` wildcard at this level, or `null` if no `*` subscriptions exist at this depth. -- `Fwc` — the node for the `>` wildcard at this level, or `null` if no `>` subscriptions exist at this depth. +The root of the trie is `_root`, a `TrieLevel` with no parent node. -A `TrieNode` sits at the boundary between two levels. It holds the subscriptions registered for subjects whose last token leads to this node: +--- -- `PlainSubs` — a `HashSet` of plain (non-queue) subscribers. -- `QueueSubs` — a dictionary from queue name to the set of members in that queue group. Uses `StringComparer.Ordinal`. -- `Next` — the next `TrieLevel` for deeper token positions. `null` for leaf nodes. -- `IsEmpty` — `true` when the node and all its descendants have no subscriptions. Used during `Remove()` to prune dead branches. +## Token Traversal -The trie root is a `TrieLevel` (`_root`) with no parent node. +`TokenEnumerator` walks a subject string token-by-token using `ReadOnlySpan` slices, so traversal itself does not allocate. Literal-token insert and remove paths use `TryGetLiteralNode(...)` plus `SubjectMatch.TokenEquals(...)` to reuse the existing trie key string when the token is already present, instead of calling `token.ToString()` on every hop. -### `TokenEnumerator` +That keeps literal-subject maintenance allocation-lean while preserving the current `Dictionary` storage model. -`TokenEnumerator` is a `ref struct` that splits a subject string by `.` without allocating. It operates on a `ReadOnlySpan` derived from the original string. +--- + +## Local Subscription Operations + +### Insert + +`Insert(Subscription sub)` walks the trie one token at a time: + +- `*` follows or creates `Pwc` +- `>` follows or creates `Fwc` and terminates further token traversal +- literal tokens follow or create `Nodes[token]` + +The terminal node stores the subscription in either `PlainSubs` or the appropriate queue-group bucket in `QueueSubs`. Every successful insert increments `_generation`, which invalidates cached match results. + +### Remove + +`Remove(Subscription sub)` and `RemoveBatch(IEnumerable)` walk the same subject path, remove the subscription from the terminal node, and then prune empty trie nodes on the way back out. Removing a subscription also bumps `_generation`, so any stale cached result is ignored on the next lookup. + +--- + +## Remote Interest Bookkeeping + +Remote route and gateway subscriptions are stored separately from the local trie in `_remoteSubs`: ```csharp -private ref struct TokenEnumerator -{ - private ReadOnlySpan _remaining; - - public TokenEnumerator(string subject) - { - _remaining = subject.AsSpan(); - Current = default; - } - - public ReadOnlySpan Current { get; private set; } - - public TokenEnumerator GetEnumerator() => this; - - public bool MoveNext() - { - if (_remaining.IsEmpty) - return false; - - int sep = _remaining.IndexOf(SubjectMatch.Sep); - if (sep < 0) - { - Current = _remaining; - _remaining = default; - } - else - { - Current = _remaining[..sep]; - _remaining = _remaining[(sep + 1)..]; - } - return true; - } -} +private readonly Dictionary _remoteSubs = []; ``` -`TokenEnumerator` implements the `foreach` pattern directly (via `GetEnumerator()` returning `this`), so it can be used in `foreach` loops without boxing. `Insert()` uses it during trie traversal to avoid string allocations per token. - ---- - -## Insert - -`Insert(Subscription sub)` adds a subscription to the trie under a write lock. - -The method walks the trie one token at a time using `TokenEnumerator`. For each token: -- If the token is `*`, it creates or follows `level.Pwc`. -- If the token is `>`, it creates or follows `level.Fwc` and sets `sawFwc = true` to reject further tokens. -- Otherwise it creates or follows `level.Nodes[token]`. - -At each step, `node.Next` is created if absent, and `level` advances to `node.Next`. - -After all tokens are consumed, the subscription is added to the terminal node: -- Plain subscription: `node.PlainSubs.Add(sub)`. -- Queue subscription: `node.QueueSubs[sub.Queue].Add(sub)`, creating the inner `HashSet` if this is the first member of that group. - -`_count` is incremented and `AddToCache` is called to update any cached results that would now include this subscription. - ---- - -## Remove - -`Remove(Subscription sub)` removes a subscription from the trie under a write lock. - -The method walks the trie along the subscription's subject, recording the path as a `List<(TrieLevel, TrieNode, string token, bool isPwc, bool isFwc)>`. If any node along the path is missing, the method returns without error (the subscription was never inserted). - -After locating the terminal node, the subscription is removed from `PlainSubs` or from the appropriate `QueueSubs` group. If the queue group becomes empty, its entry is removed from the dictionary. - -If removal succeeds: -- `_count` is decremented. -- `RemoveFromCache` is called to invalidate affected cache entries. -- The path list is walked backwards. At each step, if `node.IsEmpty` is `true`, the node is removed from its parent level (`Pwc = null`, `Fwc = null`, or `Nodes.Remove(token)`). This prunes dead branches so the trie does not accumulate empty nodes over time. - ---- - -## Match - -`Match(string subject)` is called for every published message. It returns a `SubListResult` containing all matching plain and queue subscriptions. - -### Cache check and fallback +`RoutedSubKey` is a compact value key: ```csharp -public SubListResult Match(string subject) -{ - // Check cache under read lock first. - _lock.EnterReadLock(); - try - { - if (_cache != null && _cache.TryGetValue(subject, out var cached)) - return cached; - } - finally - { - _lock.ExitReadLock(); - } - - // Cache miss -- tokenize and match under write lock (needed for cache update). - var tokens = Tokenize(subject); - if (tokens == null) - return SubListResult.Empty; - - _lock.EnterWriteLock(); - try - { - // Re-check cache after acquiring write lock. - if (_cache != null && _cache.TryGetValue(subject, out var cached)) - return cached; - - var plainSubs = new List(); - var queueSubs = new List>(); - - MatchLevel(_root, tokens, 0, plainSubs, queueSubs); - ... - if (_cache != null) - { - _cache[subject] = result; - if (_cache.Count > CacheMax) { /* sweep */ } - } - return result; - } - finally { _lock.ExitWriteLock(); } -} +internal readonly record struct RoutedSubKey( + string RouteId, + string Account, + string Subject, + string? Queue); ``` -On a read-lock cache hit, `Match()` returns immediately with no trie traversal. On a miss, `Tokenize()` splits the subject before acquiring the write lock (subjects with empty tokens return `SubListResult.Empty` immediately). The write lock is then taken and the cache is checked again before invoking `MatchLevel`. +This replaces the earlier `"route|account|subject|queue"` composite string model. The change removes repeated string concatenation, `Split('|')`, and runtime reparsing in remote cleanup paths. -### `MatchLevel` traversal +Remote-interest APIs: -`MatchLevel` is a recursive method that descends the trie matching tokens against the subject array. At each level, for each remaining token position: +- `ApplyRemoteSub(...)` inserts or removes a `RemoteSubscription` +- `UpdateRemoteQSub(...)` updates queue weight for an existing remote queue subscription +- `RemoveRemoteSubs(routeId)` removes all remote interest for a disconnected route +- `RemoveRemoteSubsForAccount(routeId, account)` removes only one route/account slice +- `HasRemoteInterest(account, subject)` answers whether any remote subscription matches +- `MatchRemote(account, subject)` returns the expanded weighted remote matches -1. If `level.Fwc` is set, all subscriptions from that node are added to the result. The `>` wildcard matches all remaining tokens, so no further recursion is needed for this branch. -2. If `level.Pwc` is set, `MatchLevel` recurses with the next token index and `pwc.Next` as the new level. This handles `*` matching the current token. -3. A literal dictionary lookup on `level.Nodes[tokens[i]]` advances the level pointer for the next iteration. +Cleanup paths collect matching `RoutedSubKey` values into a reusable per-thread list and then remove them, avoiding `_remoteSubs.ToArray()` snapshots on every sweep. -After all tokens are consumed, subscriptions from the final literal node and the final `*` position (if present at the last level) are added to the result. The `*` case at the last token requires explicit handling because the loop exits before the recursive call for `*` can execute. +--- -`AddNodeToResults` flattens a node's `PlainSubs` into the accumulator list and merges its `QueueSubs` groups into the queue accumulator, combining groups by name across multiple matching nodes. +## Match Pipeline + +`Match(string subject)` is the hot path. + +1. Increment `_matches` +2. Read the current `_generation` +3. Try the cache under a read lock +4. On cache miss, tokenize the subject and retry under the write lock +5. Traverse the trie and build a `SubListResult` +6. Cache the result with the generation that produced it + +Cached entries are stored as: + +```csharp +private readonly record struct CachedResult(SubListResult Result, long Generation); +``` + +A cache entry is valid only if its stored generation matches the current `_generation`. Any local or remote-interest mutation increments `_generation`, so stale entries are ignored automatically. + +### Match Builder + +Cache misses use a reusable per-thread `MatchBuilder` instead of allocating fresh nested `List>` structures on every traversal. The builder: + +- reuses a `List` for plain subscribers +- reuses queue-group lists across matches +- merges queue matches by queue name during traversal +- materializes the public `SubListResult` arrays only once at the end + +This keeps the public contract unchanged while removing temporary match-building churn from the publish path. + +### Intentional Remaining Allocations + +The current implementation still allocates in two places by design: + +- the tokenized `string[]` produced by `Tokenize(subject)` on cache misses +- the final `Subscription[]` and `Subscription[][]` arrays stored in `SubListResult` + +Those allocations are part of the current public result shape and cache model. --- ## Cache Strategy -The cache is a `Dictionary` keyed by the literal published subject. All operations use `StringComparer.Ordinal`. +The cache is a `Dictionary` keyed by literal publish subject with `StringComparer.Ordinal`. -**Size limits:** The cache holds at most `CacheMax` (1024) entries. When `_cache.Count` exceeds this, a sweep removes entries until the count reaches `CacheSweep` (256). The sweep takes the first `count - 256` keys from the dictionary — no LRU ordering is maintained. +- `CacheMax = 1024` +- `CacheSweep = 256` -**`AddToCache`** is called from `Insert()` to keep cached results consistent after adding a subscription: -- For a literal subscription subject, `AddToCache` does a direct lookup. If the exact key is in the cache, it creates a new `SubListResult` with the subscription appended and replaces the cached entry. -- For a wildcard subscription subject, `AddToCache` scans all cached keys and updates any entry whose key is matched by `SubjectMatch.MatchLiteral(key, subject)`. +When the cache grows past `CacheMax`, `SubListCacheSweeper` schedules a sweep that removes enough keys to return to the target size. The sweep is intentionally simple; it is not LRU. -**`RemoveFromCache`** is called from `Remove()` to invalidate cached results after removing a subscription: -- For a literal subscription subject, `RemoveFromCache` removes the exact cache key. -- For a wildcard subscription subject, `RemoveFromCache` removes all cached keys matched by the pattern. Because it is difficult to reconstruct the correct result without a full trie traversal, invalidation is preferred over update. - -The asymmetry between `AddToCache` (updates in place) and `RemoveFromCache` (invalidates) avoids a second trie traversal on removal at the cost of a cache miss on the next `Match()` for those keys. +The cache stores fully materialized `SubListResult` instances because publish callers need stable array-based results immediately after lookup. --- -## Disposal +## Statistics and Monitoring -`SubList` implements `IDisposable`. `Dispose()` releases the `ReaderWriterLockSlim`: +`Stats()` exposes: -```csharp -public void Dispose() => _lock.Dispose(); -``` +- subscription count +- cache entry count +- insert/remove/match counts +- cache hit rate +- fanout statistics derived from cached results -`SubList` instances are owned by `NatsServer` and disposed during server shutdown. +These counters are used by tests and monitoring code to validate routing behavior and cache effectiveness. --- ## Related Documentation -- [Subscriptions Overview](../Subscriptions/Overview.md) +- [Overview](Overview.md) - + diff --git a/tests/NATS.Server.Benchmark.Tests/CorePubSub/SubListMatchBenchmarks.cs b/tests/NATS.Server.Benchmark.Tests/CorePubSub/SubListMatchBenchmarks.cs new file mode 100644 index 0000000..f99febb --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/CorePubSub/SubListMatchBenchmarks.cs @@ -0,0 +1,112 @@ +using System.Diagnostics; +using NATS.Server.Benchmark.Tests.Harness; +using NATS.Server.Subscriptions; +using Xunit.Abstractions; + +namespace NATS.Server.Benchmark.Tests.CorePubSub; + +public class SubListMatchBenchmarks(ITestOutputHelper output) +{ + [Fact] + [Trait("Category", "Benchmark")] + public void SubListExactMatch_128Subjects() + { + using var subList = new SubList(); + for (var i = 0; i < 128; i++) + subList.Insert(new Subscription { Subject = $"bench.exact.{i}", Sid = i.ToString() }); + + var (result, allocatedBytes) = Measure("SubList Exact Match (128 subjects)", "DotNet", "bench.exact.64".Length, 250_000, () => + { + _ = subList.Match("bench.exact.64"); + }); + + BenchmarkResultWriter.WriteSingle(output, result); + WriteAllocationSummary(allocatedBytes, result.TotalMessages); + } + + [Fact] + [Trait("Category", "Benchmark")] + public void SubListWildcardMatch_FanIn() + { + using var subList = new SubList(); + subList.Insert(new Subscription { Subject = "orders.created", Sid = "1" }); + subList.Insert(new Subscription { Subject = "orders.*", Sid = "2" }); + subList.Insert(new Subscription { Subject = "orders.>", Sid = "3" }); + subList.Insert(new Subscription { Subject = "orders.created.us", Sid = "4" }); + + var (result, allocatedBytes) = Measure("SubList Wildcard Match", "DotNet", "orders.created".Length, 250_000, () => + { + _ = subList.Match("orders.created"); + }); + + BenchmarkResultWriter.WriteSingle(output, result); + WriteAllocationSummary(allocatedBytes, result.TotalMessages); + } + + [Fact] + [Trait("Category", "Benchmark")] + public void SubListQueueMatch_MergedGroups() + { + using var subList = new SubList(); + subList.Insert(new Subscription { Subject = "jobs.run", Queue = "workers", Sid = "1" }); + subList.Insert(new Subscription { Subject = "jobs.*", Queue = "workers", Sid = "2" }); + subList.Insert(new Subscription { Subject = "jobs.>", Queue = "audit", Sid = "3" }); + + var (result, allocatedBytes) = Measure("SubList Queue Match", "DotNet", "jobs.run".Length, 250_000, () => + { + _ = subList.Match("jobs.run"); + }); + + BenchmarkResultWriter.WriteSingle(output, result); + WriteAllocationSummary(allocatedBytes, result.TotalMessages); + } + + [Fact] + [Trait("Category", "Benchmark")] + public void SubListRemoteInterest_WildcardLookup() + { + using var subList = new SubList(); + for (var i = 0; i < 64; i++) + subList.ApplyRemoteSub(new RemoteSubscription($"remote.{i}.*", null, $"r{i}", "A")); + + var (result, allocatedBytes) = Measure("SubList Remote Interest", "DotNet", "remote.42.created".Length, 250_000, () => + { + _ = subList.HasRemoteInterest("A", "remote.42.created"); + }); + + BenchmarkResultWriter.WriteSingle(output, result); + WriteAllocationSummary(allocatedBytes, result.TotalMessages); + } + + private static (BenchmarkResult Result, long AllocatedBytes) Measure(string name, string serverType, int bytesPerOperation, int iterations, Action operation) + { + GC.Collect(); + GC.WaitForPendingFinalizers(); + GC.Collect(); + + for (var i = 0; i < 1_000; i++) + operation(); + + var before = GC.GetAllocatedBytesForCurrentThread(); + var sw = Stopwatch.StartNew(); + for (var i = 0; i < iterations; i++) + operation(); + sw.Stop(); + var allocatedBytes = GC.GetAllocatedBytesForCurrentThread() - before; + + return (new BenchmarkResult + { + Name = name, + ServerType = serverType, + TotalMessages = iterations, + TotalBytes = (long)iterations * bytesPerOperation, + Duration = sw.Elapsed, + }, allocatedBytes); + } + + private void WriteAllocationSummary(long allocatedBytes, long iterations) + { + output.WriteLine($"Allocated: {allocatedBytes:N0} B total | {allocatedBytes / (double)iterations:F2} B/op"); + output.WriteLine(""); + } +} diff --git a/tests/NATS.Server.Benchmark.Tests/NATS.Server.Benchmark.Tests.csproj b/tests/NATS.Server.Benchmark.Tests/NATS.Server.Benchmark.Tests.csproj index 6290932..4655016 100644 --- a/tests/NATS.Server.Benchmark.Tests/NATS.Server.Benchmark.Tests.csproj +++ b/tests/NATS.Server.Benchmark.Tests/NATS.Server.Benchmark.Tests.csproj @@ -23,4 +23,8 @@ + + + +