diff --git a/Documentation/Subscriptions/SubList.md b/Documentation/Subscriptions/SubList.md index f035c30..5c72161 100644 --- a/Documentation/Subscriptions/SubList.md +++ b/Documentation/Subscriptions/SubList.md @@ -1,6 +1,6 @@ # SubList -`SubList` is the subscription routing trie. Every published message triggers a `Match()` call to find all interested subscribers. `SubList` stores subscriptions indexed by their subject tokens and returns a `SubListResult` containing both plain subscribers and queue groups. +`SubList` is the subscription routing trie for the core server. Every publish path calls `Match()` to find the local plain subscribers and queue groups interested in a subject. The type also tracks remote route and gateway interest so clustering code can answer `HasRemoteInterest(...)` and `MatchRemote(...)` queries without a second routing structure. Go reference: `golang/nats-server/server/sublist.go` @@ -8,32 +8,30 @@ Go reference: `golang/nats-server/server/sublist.go` ## Thread Safety -`SubList` uses a `ReaderWriterLockSlim` (`_lock`) with the following locking discipline: +`SubList` uses a single `ReaderWriterLockSlim` (`_lock`) to protect trie mutation, remote-interest bookkeeping, and cache state. | Operation | Lock | |-----------|------| -| `Count` read | Read lock | -| `Match()` — cache hit | Read lock only | -| `Match()` — cache miss | Write lock (to update cache) | -| `Insert()` | Write lock | -| `Remove()` | Write lock | +| Cache hit in `Match()` | Read lock | +| Cache miss in `Match()` | Write lock | +| `Insert()` / `Remove()` / `RemoveBatch()` | Write lock | +| Remote-interest mutation (`ApplyRemoteSub`, `UpdateRemoteQSub`, cleanup) | Write lock | +| Read-only queries (`Count`, `HasRemoteInterest`, `MatchRemote`, `Stats`) | Read lock | -Cache misses in `Match()` require a write lock because the cache must be updated after the trie traversal. To avoid a race between the read-lock check and the write-lock update, `Match()` uses double-checked locking: after acquiring the write lock, it checks the cache again before doing trie work. +`Match()` uses generation-based double-checked locking. It first checks the cache under a read lock, then retries under the write lock before traversing the trie and updating the cache. --- ## Trie Structure -The trie is built from two private classes, `TrieLevel` and `TrieNode`, nested inside `SubList`. - -### `TrieLevel` and `TrieNode` +The trie is built from `TrieLevel` and `TrieNode`: ```csharp private sealed class TrieLevel { public readonly Dictionary Nodes = new(StringComparer.Ordinal); - public TrieNode? Pwc; // partial wildcard (*) - public TrieNode? Fwc; // full wildcard (>) + public TrieNode? Pwc; + public TrieNode? Fwc; } private sealed class TrieNode @@ -41,202 +39,149 @@ private sealed class TrieNode public TrieLevel? Next; public readonly HashSet PlainSubs = []; public readonly Dictionary> QueueSubs = new(StringComparer.Ordinal); - - public bool IsEmpty => PlainSubs.Count == 0 && QueueSubs.Count == 0 && - (Next == null || (Next.Nodes.Count == 0 && Next.Pwc == null && Next.Fwc == null)); + public bool PackedListEnabled; } ``` -Each level in the trie represents one token position in a subject. A `TrieLevel` holds: +- `Nodes` stores literal-token edges by exact token string. +- `Pwc` stores the `*` edge for the current token position. +- `Fwc` stores the `>` edge for the current token position. +- `PlainSubs` stores non-queue subscriptions attached to the terminal node. +- `QueueSubs` groups queue subscriptions by queue name at the terminal node. -- `Nodes` — a dictionary keyed by literal token string, mapping to the `TrieNode` for that token. Uses `StringComparer.Ordinal` for performance. -- `Pwc` — the node for the `*` wildcard at this level, or `null` if no `*` subscriptions exist at this depth. -- `Fwc` — the node for the `>` wildcard at this level, or `null` if no `>` subscriptions exist at this depth. +The root of the trie is `_root`, a `TrieLevel` with no parent node. -A `TrieNode` sits at the boundary between two levels. It holds the subscriptions registered for subjects whose last token leads to this node: +--- -- `PlainSubs` — a `HashSet` of plain (non-queue) subscribers. -- `QueueSubs` — a dictionary from queue name to the set of members in that queue group. Uses `StringComparer.Ordinal`. -- `Next` — the next `TrieLevel` for deeper token positions. `null` for leaf nodes. -- `IsEmpty` — `true` when the node and all its descendants have no subscriptions. Used during `Remove()` to prune dead branches. +## Token Traversal -The trie root is a `TrieLevel` (`_root`) with no parent node. +`TokenEnumerator` walks a subject string token-by-token using `ReadOnlySpan` slices, so traversal itself does not allocate. Literal-token insert and remove paths use `TryGetLiteralNode(...)` plus `SubjectMatch.TokenEquals(...)` to reuse the existing trie key string when the token is already present, instead of calling `token.ToString()` on every hop. -### `TokenEnumerator` +That keeps literal-subject maintenance allocation-lean while preserving the current `Dictionary` storage model. -`TokenEnumerator` is a `ref struct` that splits a subject string by `.` without allocating. It operates on a `ReadOnlySpan` derived from the original string. +--- + +## Local Subscription Operations + +### Insert + +`Insert(Subscription sub)` walks the trie one token at a time: + +- `*` follows or creates `Pwc` +- `>` follows or creates `Fwc` and terminates further token traversal +- literal tokens follow or create `Nodes[token]` + +The terminal node stores the subscription in either `PlainSubs` or the appropriate queue-group bucket in `QueueSubs`. Every successful insert increments `_generation`, which invalidates cached match results. + +### Remove + +`Remove(Subscription sub)` and `RemoveBatch(IEnumerable)` walk the same subject path, remove the subscription from the terminal node, and then prune empty trie nodes on the way back out. Removing a subscription also bumps `_generation`, so any stale cached result is ignored on the next lookup. + +--- + +## Remote Interest Bookkeeping + +Remote route and gateway subscriptions are stored separately from the local trie in `_remoteSubs`: ```csharp -private ref struct TokenEnumerator -{ - private ReadOnlySpan _remaining; - - public TokenEnumerator(string subject) - { - _remaining = subject.AsSpan(); - Current = default; - } - - public ReadOnlySpan Current { get; private set; } - - public TokenEnumerator GetEnumerator() => this; - - public bool MoveNext() - { - if (_remaining.IsEmpty) - return false; - - int sep = _remaining.IndexOf(SubjectMatch.Sep); - if (sep < 0) - { - Current = _remaining; - _remaining = default; - } - else - { - Current = _remaining[..sep]; - _remaining = _remaining[(sep + 1)..]; - } - return true; - } -} +private readonly Dictionary _remoteSubs = []; ``` -`TokenEnumerator` implements the `foreach` pattern directly (via `GetEnumerator()` returning `this`), so it can be used in `foreach` loops without boxing. `Insert()` uses it during trie traversal to avoid string allocations per token. - ---- - -## Insert - -`Insert(Subscription sub)` adds a subscription to the trie under a write lock. - -The method walks the trie one token at a time using `TokenEnumerator`. For each token: -- If the token is `*`, it creates or follows `level.Pwc`. -- If the token is `>`, it creates or follows `level.Fwc` and sets `sawFwc = true` to reject further tokens. -- Otherwise it creates or follows `level.Nodes[token]`. - -At each step, `node.Next` is created if absent, and `level` advances to `node.Next`. - -After all tokens are consumed, the subscription is added to the terminal node: -- Plain subscription: `node.PlainSubs.Add(sub)`. -- Queue subscription: `node.QueueSubs[sub.Queue].Add(sub)`, creating the inner `HashSet` if this is the first member of that group. - -`_count` is incremented and `AddToCache` is called to update any cached results that would now include this subscription. - ---- - -## Remove - -`Remove(Subscription sub)` removes a subscription from the trie under a write lock. - -The method walks the trie along the subscription's subject, recording the path as a `List<(TrieLevel, TrieNode, string token, bool isPwc, bool isFwc)>`. If any node along the path is missing, the method returns without error (the subscription was never inserted). - -After locating the terminal node, the subscription is removed from `PlainSubs` or from the appropriate `QueueSubs` group. If the queue group becomes empty, its entry is removed from the dictionary. - -If removal succeeds: -- `_count` is decremented. -- `RemoveFromCache` is called to invalidate affected cache entries. -- The path list is walked backwards. At each step, if `node.IsEmpty` is `true`, the node is removed from its parent level (`Pwc = null`, `Fwc = null`, or `Nodes.Remove(token)`). This prunes dead branches so the trie does not accumulate empty nodes over time. - ---- - -## Match - -`Match(string subject)` is called for every published message. It returns a `SubListResult` containing all matching plain and queue subscriptions. - -### Cache check and fallback +`RoutedSubKey` is a compact value key: ```csharp -public SubListResult Match(string subject) -{ - // Check cache under read lock first. - _lock.EnterReadLock(); - try - { - if (_cache != null && _cache.TryGetValue(subject, out var cached)) - return cached; - } - finally - { - _lock.ExitReadLock(); - } - - // Cache miss -- tokenize and match under write lock (needed for cache update). - var tokens = Tokenize(subject); - if (tokens == null) - return SubListResult.Empty; - - _lock.EnterWriteLock(); - try - { - // Re-check cache after acquiring write lock. - if (_cache != null && _cache.TryGetValue(subject, out var cached)) - return cached; - - var plainSubs = new List(); - var queueSubs = new List>(); - - MatchLevel(_root, tokens, 0, plainSubs, queueSubs); - ... - if (_cache != null) - { - _cache[subject] = result; - if (_cache.Count > CacheMax) { /* sweep */ } - } - return result; - } - finally { _lock.ExitWriteLock(); } -} +internal readonly record struct RoutedSubKey( + string RouteId, + string Account, + string Subject, + string? Queue); ``` -On a read-lock cache hit, `Match()` returns immediately with no trie traversal. On a miss, `Tokenize()` splits the subject before acquiring the write lock (subjects with empty tokens return `SubListResult.Empty` immediately). The write lock is then taken and the cache is checked again before invoking `MatchLevel`. +This replaces the earlier `"route|account|subject|queue"` composite string model. The change removes repeated string concatenation, `Split('|')`, and runtime reparsing in remote cleanup paths. -### `MatchLevel` traversal +Remote-interest APIs: -`MatchLevel` is a recursive method that descends the trie matching tokens against the subject array. At each level, for each remaining token position: +- `ApplyRemoteSub(...)` inserts or removes a `RemoteSubscription` +- `UpdateRemoteQSub(...)` updates queue weight for an existing remote queue subscription +- `RemoveRemoteSubs(routeId)` removes all remote interest for a disconnected route +- `RemoveRemoteSubsForAccount(routeId, account)` removes only one route/account slice +- `HasRemoteInterest(account, subject)` answers whether any remote subscription matches +- `MatchRemote(account, subject)` returns the expanded weighted remote matches -1. If `level.Fwc` is set, all subscriptions from that node are added to the result. The `>` wildcard matches all remaining tokens, so no further recursion is needed for this branch. -2. If `level.Pwc` is set, `MatchLevel` recurses with the next token index and `pwc.Next` as the new level. This handles `*` matching the current token. -3. A literal dictionary lookup on `level.Nodes[tokens[i]]` advances the level pointer for the next iteration. +Cleanup paths collect matching `RoutedSubKey` values into a reusable per-thread list and then remove them, avoiding `_remoteSubs.ToArray()` snapshots on every sweep. -After all tokens are consumed, subscriptions from the final literal node and the final `*` position (if present at the last level) are added to the result. The `*` case at the last token requires explicit handling because the loop exits before the recursive call for `*` can execute. +--- -`AddNodeToResults` flattens a node's `PlainSubs` into the accumulator list and merges its `QueueSubs` groups into the queue accumulator, combining groups by name across multiple matching nodes. +## Match Pipeline + +`Match(string subject)` is the hot path. + +1. Increment `_matches` +2. Read the current `_generation` +3. Try the cache under a read lock +4. On cache miss, tokenize the subject and retry under the write lock +5. Traverse the trie and build a `SubListResult` +6. Cache the result with the generation that produced it + +Cached entries are stored as: + +```csharp +private readonly record struct CachedResult(SubListResult Result, long Generation); +``` + +A cache entry is valid only if its stored generation matches the current `_generation`. Any local or remote-interest mutation increments `_generation`, so stale entries are ignored automatically. + +### Match Builder + +Cache misses use a reusable per-thread `MatchBuilder` instead of allocating fresh nested `List>` structures on every traversal. The builder: + +- reuses a `List` for plain subscribers +- reuses queue-group lists across matches +- merges queue matches by queue name during traversal +- materializes the public `SubListResult` arrays only once at the end + +This keeps the public contract unchanged while removing temporary match-building churn from the publish path. + +### Intentional Remaining Allocations + +The current implementation still allocates in two places by design: + +- the tokenized `string[]` produced by `Tokenize(subject)` on cache misses +- the final `Subscription[]` and `Subscription[][]` arrays stored in `SubListResult` + +Those allocations are part of the current public result shape and cache model. --- ## Cache Strategy -The cache is a `Dictionary` keyed by the literal published subject. All operations use `StringComparer.Ordinal`. +The cache is a `Dictionary` keyed by literal publish subject with `StringComparer.Ordinal`. -**Size limits:** The cache holds at most `CacheMax` (1024) entries. When `_cache.Count` exceeds this, a sweep removes entries until the count reaches `CacheSweep` (256). The sweep takes the first `count - 256` keys from the dictionary — no LRU ordering is maintained. +- `CacheMax = 1024` +- `CacheSweep = 256` -**`AddToCache`** is called from `Insert()` to keep cached results consistent after adding a subscription: -- For a literal subscription subject, `AddToCache` does a direct lookup. If the exact key is in the cache, it creates a new `SubListResult` with the subscription appended and replaces the cached entry. -- For a wildcard subscription subject, `AddToCache` scans all cached keys and updates any entry whose key is matched by `SubjectMatch.MatchLiteral(key, subject)`. +When the cache grows past `CacheMax`, `SubListCacheSweeper` schedules a sweep that removes enough keys to return to the target size. The sweep is intentionally simple; it is not LRU. -**`RemoveFromCache`** is called from `Remove()` to invalidate cached results after removing a subscription: -- For a literal subscription subject, `RemoveFromCache` removes the exact cache key. -- For a wildcard subscription subject, `RemoveFromCache` removes all cached keys matched by the pattern. Because it is difficult to reconstruct the correct result without a full trie traversal, invalidation is preferred over update. - -The asymmetry between `AddToCache` (updates in place) and `RemoveFromCache` (invalidates) avoids a second trie traversal on removal at the cost of a cache miss on the next `Match()` for those keys. +The cache stores fully materialized `SubListResult` instances because publish callers need stable array-based results immediately after lookup. --- -## Disposal +## Statistics and Monitoring -`SubList` implements `IDisposable`. `Dispose()` releases the `ReaderWriterLockSlim`: +`Stats()` exposes: -```csharp -public void Dispose() => _lock.Dispose(); -``` +- subscription count +- cache entry count +- insert/remove/match counts +- cache hit rate +- fanout statistics derived from cached results -`SubList` instances are owned by `NatsServer` and disposed during server shutdown. +These counters are used by tests and monitoring code to validate routing behavior and cache effectiveness. --- ## Related Documentation -- [Subscriptions Overview](../Subscriptions/Overview.md) +- [Overview](Overview.md) - + diff --git a/src/NATS.Server/Subscriptions/RoutedSubKey.cs b/src/NATS.Server/Subscriptions/RoutedSubKey.cs new file mode 100644 index 0000000..6ff55bf --- /dev/null +++ b/src/NATS.Server/Subscriptions/RoutedSubKey.cs @@ -0,0 +1,7 @@ +namespace NATS.Server.Subscriptions; + +internal readonly record struct RoutedSubKey(string RouteId, string Account, string Subject, string? Queue) +{ + public static RoutedSubKey FromRemoteSubscription(RemoteSubscription sub) + => new(sub.RouteId, sub.Account, sub.Subject, sub.Queue); +} diff --git a/src/NATS.Server/Subscriptions/SubList.cs b/src/NATS.Server/Subscriptions/SubList.cs index ae5af7e..099d85d 100644 --- a/src/NATS.Server/Subscriptions/SubList.cs +++ b/src/NATS.Server/Subscriptions/SubList.cs @@ -12,11 +12,15 @@ public sealed class SubList : IDisposable { private const int CacheMax = 1024; private const int CacheSweep = 256; + [ThreadStatic] + private static MatchBuilder? s_matchBuilder; + [ThreadStatic] + private static List? s_remoteSubRemovalKeys; private readonly ReaderWriterLockSlim _lock = new(); private readonly TrieLevel _root = new(); private readonly SubListCacheSweeper _sweeper = new(); - private readonly Dictionary _remoteSubs = new(StringComparer.Ordinal); + private readonly Dictionary _remoteSubs = []; private Dictionary? _cache = new(StringComparer.Ordinal); private uint _count; private volatile bool _disposed; @@ -31,8 +35,6 @@ public sealed class SubList : IDisposable private readonly Dictionary>> _queueRemoveNotifications = new(StringComparer.Ordinal); private readonly record struct CachedResult(SubListResult Result, long Generation); - internal readonly record struct RoutedSubKeyInfo(string RouteId, string Account, string Subject, string? Queue); - public event Action? InterestChanged; public SubList() @@ -178,7 +180,7 @@ public sealed class SubList : IDisposable _lock.EnterWriteLock(); try { - var key = BuildRoutedSubKey(sub.RouteId, sub.Account, sub.Subject, sub.Queue); + var key = RoutedSubKey.FromRemoteSubscription(sub); var changed = false; if (sub.IsRemoval) { @@ -223,7 +225,7 @@ public sealed class SubList : IDisposable _lock.EnterWriteLock(); try { - var key = BuildRoutedSubKey(sub.RouteId, sub.Account, sub.Subject, sub.Queue); + var key = RoutedSubKey.FromRemoteSubscription(sub); if (!_remoteSubs.TryGetValue(key, out var existing)) return; @@ -240,51 +242,36 @@ public sealed class SubList : IDisposable } } - internal static string BuildRoutedSubKey(string routeId, string account, string subject, string? queue) - => $"{routeId}|{account}|{subject}|{queue}"; - - internal static string? GetAccNameFromRoutedSubKey(string routedSubKey) - => GetRoutedSubKeyInfo(routedSubKey)?.Account; - - internal static RoutedSubKeyInfo? GetRoutedSubKeyInfo(string routedSubKey) - { - if (string.IsNullOrWhiteSpace(routedSubKey)) - return null; - - var parts = routedSubKey.Split('|'); - if (parts.Length != 4) - return null; - - if (parts[0].Length == 0 || parts[1].Length == 0 || parts[2].Length == 0) - return null; - - var queue = parts[3].Length == 0 ? null : parts[3]; - return new RoutedSubKeyInfo(parts[0], parts[1], parts[2], queue); - } - public int RemoveRemoteSubs(string routeId) { _lock.EnterWriteLock(); try { + var removalKeys = RentRemoteSubRemovalKeys(); var removed = 0; - foreach (var kvp in _remoteSubs.ToArray()) + foreach (var (key, _) in _remoteSubs) { - var info = GetRoutedSubKeyInfo(kvp.Key); - if (info == null || !string.Equals(info.Value.RouteId, routeId, StringComparison.Ordinal)) + if (!string.Equals(key.RouteId, routeId, StringComparison.Ordinal)) continue; - if (_remoteSubs.Remove(kvp.Key)) + removalKeys.Add(key); + } + + foreach (var key in removalKeys) + { + if (_remoteSubs.Remove(key, out var removedSub)) { removed++; InterestChanged?.Invoke(new InterestChange( InterestChangeKind.RemoteRemoved, - kvp.Value.Subject, - kvp.Value.Queue, - kvp.Value.Account)); + removedSub.Subject, + removedSub.Queue, + removedSub.Account)); } } + removalKeys.Clear(); + if (removed > 0) Interlocked.Increment(ref _generation); @@ -301,30 +288,34 @@ public sealed class SubList : IDisposable _lock.EnterWriteLock(); try { + var removalKeys = RentRemoteSubRemovalKeys(); var removed = 0; - foreach (var kvp in _remoteSubs.ToArray()) + foreach (var (key, _) in _remoteSubs) { - var info = GetRoutedSubKeyInfo(kvp.Key); - if (info == null) - continue; - - if (!string.Equals(info.Value.RouteId, routeId, StringComparison.Ordinal) - || !string.Equals(info.Value.Account, account, StringComparison.Ordinal)) + if (!string.Equals(key.RouteId, routeId, StringComparison.Ordinal) + || !string.Equals(key.Account, account, StringComparison.Ordinal)) { continue; } - if (_remoteSubs.Remove(kvp.Key)) + removalKeys.Add(key); + } + + foreach (var key in removalKeys) + { + if (_remoteSubs.Remove(key, out var removedSub)) { removed++; InterestChanged?.Invoke(new InterestChange( InterestChangeKind.RemoteRemoved, - kvp.Value.Subject, - kvp.Value.Queue, - kvp.Value.Account)); + removedSub.Subject, + removedSub.Queue, + removedSub.Account)); } } + removalKeys.Clear(); + if (removed > 0) Interlocked.Increment(ref _generation); @@ -391,9 +382,9 @@ public sealed class SubList : IDisposable } else { - var key = token.ToString(); - if (!level.Nodes.TryGetValue(key, out node)) + if (!TryGetLiteralNode(level, token, out _, out node)) { + var key = token.ToString(); node = new TrieNode(); level.Nodes[key] = node; } @@ -503,13 +494,20 @@ public sealed class SubList : IDisposable } else { - level.Nodes.TryGetValue(token.ToString(), out node); + if (!TryGetLiteralNode(level, token, out var existingToken, out node)) + return false; + + pathList.Add((level, node, existingToken, isPwc: false, isFwc: false)); + if (node.Next == null) + return false; // corrupted trie state + level = node.Next; + continue; } if (node == null) return false; // not found - var tokenStr = token.ToString(); + var tokenStr = isPwc ? "*" : ">"; pathList.Add((level, node, tokenStr, isPwc, isFwc)); if (node.Next == null) return false; // corrupted trie state @@ -587,22 +585,9 @@ public sealed class SubList : IDisposable return cached.Result; } - var plainSubs = new List(); - var queueSubs = new List>(); - MatchLevel(_root, tokens, 0, plainSubs, queueSubs); - - SubListResult result; - if (plainSubs.Count == 0 && queueSubs.Count == 0) - { - result = SubListResult.Empty; - } - else - { - var queueSubsArr = new Subscription[queueSubs.Count][]; - for (int i = 0; i < queueSubs.Count; i++) - queueSubsArr[i] = queueSubs[i].ToArray(); - result = new SubListResult(plainSubs.ToArray(), queueSubsArr); - } + var builder = RentMatchBuilder(); + MatchLevel(_root, tokens, 0, builder); + var result = builder.ToResult(); if (_cache != null) { @@ -681,6 +666,37 @@ public sealed class SubList : IDisposable return removed; } + private static bool TryGetLiteralNode(TrieLevel level, ReadOnlySpan token, out string existingToken, out TrieNode node) + { + foreach (var (candidate, existingNode) in level.Nodes) + { + if (!SubjectMatch.TokenEquals(token, candidate)) + continue; + + existingToken = candidate; + node = existingNode; + return true; + } + + existingToken = string.Empty; + node = null!; + return false; + } + + private static MatchBuilder RentMatchBuilder() + { + var builder = s_matchBuilder ??= new MatchBuilder(); + builder.Reset(); + return builder; + } + + private static List RentRemoteSubRemovalKeys() + { + var keys = s_remoteSubRemovalKeys ??= []; + keys.Clear(); + return keys; + } + private bool HasExactQueueInterestNoLock(string subject, string queue) { var subs = new List(); @@ -827,6 +843,42 @@ public sealed class SubList : IDisposable AddNodeToResults(pwc, plainSubs, queueSubs); } + private static void MatchLevel(TrieLevel? level, string[] tokens, int tokenIndex, MatchBuilder builder) + { + TrieNode? pwc = null; + TrieNode? node = null; + + for (int i = tokenIndex; i < tokens.Length; i++) + { + if (level == null) + return; + + if (level.Fwc != null) + AddNodeToResults(level.Fwc, builder); + + pwc = level.Pwc; + if (pwc != null) + MatchLevel(pwc.Next, tokens, i + 1, builder); + + node = null; + if (level.Nodes.TryGetValue(tokens[i], out var found)) + { + node = found; + level = node.Next; + } + else + { + level = null; + } + } + + if (node != null) + AddNodeToResults(node, builder); + + if (pwc != null) + AddNodeToResults(pwc, builder); + } + private static void AddNodeToResults(TrieNode node, List plainSubs, List> queueSubs) { @@ -858,6 +910,19 @@ public sealed class SubList : IDisposable } } + private static void AddNodeToResults(TrieNode node, MatchBuilder builder) + { + builder.PlainSubs.AddRange(node.PlainSubs); + + foreach (var (queueName, subs) in node.QueueSubs) + { + if (subs.Count == 0) + continue; + + builder.AddQueueGroup(queueName, subs); + } + } + public SubListStats Stats() { _lock.EnterReadLock(); @@ -1373,4 +1438,47 @@ public sealed class SubList : IDisposable public bool IsEmpty => PlainSubs.Count == 0 && QueueSubs.Count == 0 && (Next == null || (Next.Nodes.Count == 0 && Next.Pwc == null && Next.Fwc == null)); } + + private sealed class MatchBuilder + { + private readonly Dictionary _queueIndexes = new(StringComparer.Ordinal); + private readonly List> _queueGroups = []; + private int _queueGroupCount; + + public List PlainSubs { get; } = []; + + public void Reset() + { + PlainSubs.Clear(); + _queueIndexes.Clear(); + for (var i = 0; i < _queueGroupCount; i++) + _queueGroups[i].Clear(); + _queueGroupCount = 0; + } + + public void AddQueueGroup(string queueName, HashSet subs) + { + if (!_queueIndexes.TryGetValue(queueName, out var index)) + { + index = _queueGroupCount++; + _queueIndexes[queueName] = index; + if (index == _queueGroups.Count) + _queueGroups.Add([]); + } + + _queueGroups[index].AddRange(subs); + } + + public SubListResult ToResult() + { + if (PlainSubs.Count == 0 && _queueGroupCount == 0) + return SubListResult.Empty; + + var queueSubsArr = new Subscription[_queueGroupCount][]; + for (var i = 0; i < _queueGroupCount; i++) + queueSubsArr[i] = _queueGroups[i].ToArray(); + + return new SubListResult(PlainSubs.ToArray(), queueSubsArr); + } + } } diff --git a/src/NATS.Server/Subscriptions/SubjectMatch.cs b/src/NATS.Server/Subscriptions/SubjectMatch.cs index a3df07f..69c28f3 100644 --- a/src/NATS.Server/Subscriptions/SubjectMatch.cs +++ b/src/NATS.Server/Subscriptions/SubjectMatch.cs @@ -249,6 +249,9 @@ public static class SubjectMatch return tokens.Count == test.Count; } + internal static bool TokenEquals(ReadOnlySpan token, string candidate) + => token.SequenceEqual(candidate); + private static bool TokensCanMatch(ReadOnlySpan t1, ReadOnlySpan t2) { if (t1.Length == 1 && (t1[0] == Pwc || t1[0] == Fwc)) diff --git a/tests/NATS.Server.Benchmark.Tests/CorePubSub/SubListMatchBenchmarks.cs b/tests/NATS.Server.Benchmark.Tests/CorePubSub/SubListMatchBenchmarks.cs new file mode 100644 index 0000000..f99febb --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/CorePubSub/SubListMatchBenchmarks.cs @@ -0,0 +1,112 @@ +using System.Diagnostics; +using NATS.Server.Benchmark.Tests.Harness; +using NATS.Server.Subscriptions; +using Xunit.Abstractions; + +namespace NATS.Server.Benchmark.Tests.CorePubSub; + +public class SubListMatchBenchmarks(ITestOutputHelper output) +{ + [Fact] + [Trait("Category", "Benchmark")] + public void SubListExactMatch_128Subjects() + { + using var subList = new SubList(); + for (var i = 0; i < 128; i++) + subList.Insert(new Subscription { Subject = $"bench.exact.{i}", Sid = i.ToString() }); + + var (result, allocatedBytes) = Measure("SubList Exact Match (128 subjects)", "DotNet", "bench.exact.64".Length, 250_000, () => + { + _ = subList.Match("bench.exact.64"); + }); + + BenchmarkResultWriter.WriteSingle(output, result); + WriteAllocationSummary(allocatedBytes, result.TotalMessages); + } + + [Fact] + [Trait("Category", "Benchmark")] + public void SubListWildcardMatch_FanIn() + { + using var subList = new SubList(); + subList.Insert(new Subscription { Subject = "orders.created", Sid = "1" }); + subList.Insert(new Subscription { Subject = "orders.*", Sid = "2" }); + subList.Insert(new Subscription { Subject = "orders.>", Sid = "3" }); + subList.Insert(new Subscription { Subject = "orders.created.us", Sid = "4" }); + + var (result, allocatedBytes) = Measure("SubList Wildcard Match", "DotNet", "orders.created".Length, 250_000, () => + { + _ = subList.Match("orders.created"); + }); + + BenchmarkResultWriter.WriteSingle(output, result); + WriteAllocationSummary(allocatedBytes, result.TotalMessages); + } + + [Fact] + [Trait("Category", "Benchmark")] + public void SubListQueueMatch_MergedGroups() + { + using var subList = new SubList(); + subList.Insert(new Subscription { Subject = "jobs.run", Queue = "workers", Sid = "1" }); + subList.Insert(new Subscription { Subject = "jobs.*", Queue = "workers", Sid = "2" }); + subList.Insert(new Subscription { Subject = "jobs.>", Queue = "audit", Sid = "3" }); + + var (result, allocatedBytes) = Measure("SubList Queue Match", "DotNet", "jobs.run".Length, 250_000, () => + { + _ = subList.Match("jobs.run"); + }); + + BenchmarkResultWriter.WriteSingle(output, result); + WriteAllocationSummary(allocatedBytes, result.TotalMessages); + } + + [Fact] + [Trait("Category", "Benchmark")] + public void SubListRemoteInterest_WildcardLookup() + { + using var subList = new SubList(); + for (var i = 0; i < 64; i++) + subList.ApplyRemoteSub(new RemoteSubscription($"remote.{i}.*", null, $"r{i}", "A")); + + var (result, allocatedBytes) = Measure("SubList Remote Interest", "DotNet", "remote.42.created".Length, 250_000, () => + { + _ = subList.HasRemoteInterest("A", "remote.42.created"); + }); + + BenchmarkResultWriter.WriteSingle(output, result); + WriteAllocationSummary(allocatedBytes, result.TotalMessages); + } + + private static (BenchmarkResult Result, long AllocatedBytes) Measure(string name, string serverType, int bytesPerOperation, int iterations, Action operation) + { + GC.Collect(); + GC.WaitForPendingFinalizers(); + GC.Collect(); + + for (var i = 0; i < 1_000; i++) + operation(); + + var before = GC.GetAllocatedBytesForCurrentThread(); + var sw = Stopwatch.StartNew(); + for (var i = 0; i < iterations; i++) + operation(); + sw.Stop(); + var allocatedBytes = GC.GetAllocatedBytesForCurrentThread() - before; + + return (new BenchmarkResult + { + Name = name, + ServerType = serverType, + TotalMessages = iterations, + TotalBytes = (long)iterations * bytesPerOperation, + Duration = sw.Elapsed, + }, allocatedBytes); + } + + private void WriteAllocationSummary(long allocatedBytes, long iterations) + { + output.WriteLine($"Allocated: {allocatedBytes:N0} B total | {allocatedBytes / (double)iterations:F2} B/op"); + output.WriteLine(""); + } +} diff --git a/tests/NATS.Server.Clustering.Tests/Routes/RouteRemoteSubCleanupParityBatch2Tests.cs b/tests/NATS.Server.Clustering.Tests/Routes/RouteRemoteSubCleanupParityBatch2Tests.cs index 684a69c..f20649a 100644 --- a/tests/NATS.Server.Clustering.Tests/Routes/RouteRemoteSubCleanupParityBatch2Tests.cs +++ b/tests/NATS.Server.Clustering.Tests/Routes/RouteRemoteSubCleanupParityBatch2Tests.cs @@ -10,21 +10,14 @@ namespace NATS.Server.Clustering.Tests.Routes; public class RouteRemoteSubCleanupParityBatch2Tests { [Fact] - public void Routed_sub_key_helpers_parse_account_and_queue_fields() + public void Routed_sub_key_exposes_route_account_subject_and_queue_fields() { - var key = SubList.BuildRoutedSubKey("R1", "A", "orders.*", "q1"); + var key = RoutedSubKey.FromRemoteSubscription(new RemoteSubscription("orders.*", "q1", "R1", "A")); - SubList.GetAccNameFromRoutedSubKey(key).ShouldBe("A"); - - var info = SubList.GetRoutedSubKeyInfo(key); - info.ShouldNotBeNull(); - info.Value.RouteId.ShouldBe("R1"); - info.Value.Account.ShouldBe("A"); - info.Value.Subject.ShouldBe("orders.*"); - info.Value.Queue.ShouldBe("q1"); - - SubList.GetRoutedSubKeyInfo("invalid").ShouldBeNull(); - SubList.GetAccNameFromRoutedSubKey("invalid").ShouldBeNull(); + key.RouteId.ShouldBe("R1"); + key.Account.ShouldBe("A"); + key.Subject.ShouldBe("orders.*"); + key.Queue.ShouldBe("q1"); } [Fact] @@ -47,6 +40,23 @@ public class RouteRemoteSubCleanupParityBatch2Tests sl.HasRemoteInterest("B", "orders.created").ShouldBeTrue(); } + [Fact] + public void Applying_same_remote_subscription_twice_is_idempotent_for_interest_tracking() + { + using var sl = new SubList(); + var changes = new List(); + sl.InterestChanged += changes.Add; + + var sub = new RemoteSubscription("orders.*", "workers", "r1", "A"); + + sl.ApplyRemoteSub(sub); + sl.ApplyRemoteSub(sub); + + sl.HasRemoteInterest("A", "orders.created").ShouldBeTrue(); + sl.MatchRemote("A", "orders.created").Count.ShouldBe(1); + changes.Count(change => change.Kind == InterestChangeKind.RemoteAdded).ShouldBe(1); + } + [Fact] public async Task Route_disconnect_cleans_remote_interest_without_explicit_rs_minus() { diff --git a/tests/NATS.Server.Clustering.Tests/Routes/RouteSubscriptionTests.cs b/tests/NATS.Server.Clustering.Tests/Routes/RouteSubscriptionTests.cs index 051de09..c572c8e 100644 --- a/tests/NATS.Server.Clustering.Tests/Routes/RouteSubscriptionTests.cs +++ b/tests/NATS.Server.Clustering.Tests/Routes/RouteSubscriptionTests.cs @@ -208,6 +208,50 @@ public class RouteSubscriptionTests } } + [Fact] + public async Task Removing_one_subject_keeps_other_remote_interest_intact() + { + var cluster = Guid.NewGuid().ToString("N"); + var a = await StartServerAsync(MakeClusterOpts(cluster)); + var b = await StartServerAsync(MakeClusterOpts(cluster, a.Server.ClusterListen!)); + + try + { + await WaitForRouteFormation(a.Server, b.Server); + + await using var nc = new NatsConnection(new NatsOpts + { + Url = $"nats://127.0.0.1:{a.Server.Port}", + }); + await nc.ConnectAsync(); + + await using var sub1 = await nc.SubscribeCoreAsync("multi.one"); + await using var sub2 = await nc.SubscribeCoreAsync("multi.two"); + await nc.PingAsync(); + + await WaitForCondition(() => b.Server.HasRemoteInterest("multi.one") && b.Server.HasRemoteInterest("multi.two")); + b.Server.HasRemoteInterest("multi.one").ShouldBeTrue(); + b.Server.HasRemoteInterest("multi.two").ShouldBeTrue(); + + await sub1.DisposeAsync(); + await nc.PingAsync(); + + await WaitForCondition(() => !b.Server.HasRemoteInterest("multi.one")); + b.Server.HasRemoteInterest("multi.one").ShouldBeFalse(); + b.Server.HasRemoteInterest("multi.two").ShouldBeTrue(); + + await sub2.DisposeAsync(); + await nc.PingAsync(); + + await WaitForCondition(() => !b.Server.HasRemoteInterest("multi.two")); + b.Server.HasRemoteInterest("multi.two").ShouldBeFalse(); + } + finally + { + await DisposeServers(a, b); + } + } + // Go: RS+ wire protocol parsing (low-level) [Fact] public async Task RSplus_frame_registers_remote_interest_via_wire() diff --git a/tests/NATS.Server.Core.Tests/Subscriptions/SubListAllocationGuardTests.cs b/tests/NATS.Server.Core.Tests/Subscriptions/SubListAllocationGuardTests.cs new file mode 100644 index 0000000..f00306d --- /dev/null +++ b/tests/NATS.Server.Core.Tests/Subscriptions/SubListAllocationGuardTests.cs @@ -0,0 +1,64 @@ +using System.Reflection; +using NATS.Server.Subscriptions; + +namespace NATS.Server.Core.Tests; + +public class SubListAllocationGuardTests +{ + [Fact] + public void Remote_subscription_dictionary_uses_dedicated_routed_sub_key_type() + { + var field = typeof(SubList).GetField("_remoteSubs", BindingFlags.Instance | BindingFlags.NonPublic); + + field.ShouldNotBeNull(); + field.FieldType.IsGenericType.ShouldBeTrue(); + field.FieldType.GetGenericArguments()[0].Name.ShouldBe("RoutedSubKey"); + } + + [Fact] + public void Has_remote_interest_supports_exact_and_wildcard_subjects_per_account() + { + using var sl = new SubList(); + sl.ApplyRemoteSub(new RemoteSubscription("orders.created", null, "r1", "A")); + sl.ApplyRemoteSub(new RemoteSubscription("orders.*", null, "r2", "A")); + sl.ApplyRemoteSub(new RemoteSubscription("payments.*", null, "r3", "B")); + + sl.HasRemoteInterest("A", "orders.created").ShouldBeTrue(); + sl.HasRemoteInterest("A", "orders.updated").ShouldBeTrue(); + sl.HasRemoteInterest("A", "payments.created").ShouldBeFalse(); + sl.HasRemoteInterest("B", "payments.posted").ShouldBeTrue(); + sl.HasRemoteInterest("B", "orders.created").ShouldBeFalse(); + } + + [Fact] + public void Match_remote_reflects_queue_weight_updates_for_existing_remote_queue_sub() + { + using var sl = new SubList(); + var sub = new RemoteSubscription("orders.*", "workers", "r1", "A", QueueWeight: 1); + sl.ApplyRemoteSub(sub); + + sl.MatchRemote("A", "orders.created").Count.ShouldBe(1); + + sl.UpdateRemoteQSub(sub with { QueueWeight = 4 }); + + var matches = sl.MatchRemote("A", "orders.created"); + matches.Count.ShouldBe(4); + matches.ShouldAllBe(match => match.Queue == "workers"); + } + + [Fact] + public void Match_merges_queue_groups_from_multiple_matching_nodes_by_queue_name() + { + using var sl = new SubList(); + sl.Insert(new Subscription { Subject = "orders.created", Queue = "workers", Sid = "1" }); + sl.Insert(new Subscription { Subject = "orders.*", Queue = "workers", Sid = "2" }); + sl.Insert(new Subscription { Subject = "orders.>", Queue = "audit", Sid = "3" }); + + var result = sl.Match("orders.created"); + + result.PlainSubs.ShouldBeEmpty(); + result.QueueSubs.Length.ShouldBe(2); + result.QueueSubs.Single(group => group[0].Queue == "workers").Length.ShouldBe(2); + result.QueueSubs.Single(group => group[0].Queue == "audit").Length.ShouldBe(1); + } +} diff --git a/tests/NATS.Server.Core.Tests/Subscriptions/SubListGoParityTests.cs b/tests/NATS.Server.Core.Tests/Subscriptions/SubListGoParityTests.cs index ff8ccdf..022ad6a 100644 --- a/tests/NATS.Server.Core.Tests/Subscriptions/SubListGoParityTests.cs +++ b/tests/NATS.Server.Core.Tests/Subscriptions/SubListGoParityTests.cs @@ -199,6 +199,33 @@ public class SubListGoParityTests sl.Match("foo.bar").PlainSubs.Length.ShouldBe(3); } + [Fact] + public void Cache_generation_bump_rebuilds_match_result_after_insert_and_remove() + { + var sl = new SubList(); + var exact = MakeSub("foo.bar", sid: "1"); + var wildcard = MakeSub("foo.*", sid: "2"); + + sl.Insert(exact); + + var first = sl.Match("foo.bar"); + var second = sl.Match("foo.bar"); + ReferenceEquals(first, second).ShouldBeTrue(); + first.PlainSubs.Select(sub => sub.Sid).ShouldBe(["1"]); + + sl.Insert(wildcard); + + var afterInsert = sl.Match("foo.bar"); + ReferenceEquals(afterInsert, first).ShouldBeFalse(); + afterInsert.PlainSubs.Select(sub => sub.Sid).OrderBy(x => x).ToArray().ShouldBe(["1", "2"]); + + sl.Remove(wildcard); + + var afterRemove = sl.Match("foo.bar"); + ReferenceEquals(afterRemove, afterInsert).ShouldBeFalse(); + afterRemove.PlainSubs.Select(sub => sub.Sid).ShouldBe(["1"]); + } + /// /// Empty result is a shared singleton — two calls that yield no matches return /// the same object reference.