diff --git a/src/NATS.Server/Subscriptions/SubList.cs b/src/NATS.Server/Subscriptions/SubList.cs index 047bef2..dd6b9ef 100644 --- a/src/NATS.Server/Subscriptions/SubList.cs +++ b/src/NATS.Server/Subscriptions/SubList.cs @@ -13,9 +13,16 @@ public sealed class SubList : IDisposable private readonly ReaderWriterLockSlim _lock = new(); private readonly TrieLevel _root = new(); - private Dictionary? _cache = new(StringComparer.Ordinal); + private Dictionary? _cache = new(StringComparer.Ordinal); private uint _count; private volatile bool _disposed; + private long _generation; + private ulong _matches; + private ulong _cacheHits; + private ulong _inserts; + private ulong _removes; + + private readonly record struct CachedResult(SubListResult Result, long Generation); public void Dispose() { @@ -90,7 +97,8 @@ public sealed class SubList : IDisposable } _count++; - AddToCache(subject, sub); + _inserts++; + Interlocked.Increment(ref _generation); } finally { @@ -104,78 +112,10 @@ public sealed class SubList : IDisposable _lock.EnterWriteLock(); try { - var level = _root; - TrieNode? node = null; - bool sawFwc = false; - - var pathList = new List<(TrieLevel level, TrieNode node, string token, bool isPwc, bool isFwc)>(); - - foreach (var token in new TokenEnumerator(sub.Subject)) + if (RemoveInternal(sub)) { - if (token.Length == 0 || sawFwc) - return; - - bool isPwc = token.Length == 1 && token[0] == SubjectMatch.Pwc; - bool isFwc = token.Length == 1 && token[0] == SubjectMatch.Fwc; - - if (isPwc) - { - node = level.Pwc; - } - else if (isFwc) - { - node = level.Fwc; - sawFwc = true; - } - else - { - level.Nodes.TryGetValue(token.ToString(), out node); - } - - if (node == null) - return; // not found - - var tokenStr = token.ToString(); - pathList.Add((level, node, tokenStr, isPwc, isFwc)); - if (node.Next == null) - return; // corrupted trie state - level = node.Next; - } - - if (node == null) return; - - // Remove from node - bool removed; - if (sub.Queue == null) - { - removed = node.PlainSubs.Remove(sub); - } - else - { - removed = false; - if (node.QueueSubs.TryGetValue(sub.Queue, out var qset)) - { - removed = qset.Remove(sub); - if (qset.Count == 0) - node.QueueSubs.Remove(sub.Queue); - } - } - - if (!removed) return; - - _count--; - RemoveFromCache(sub.Subject); - - // Prune empty nodes (walk backwards) - for (int i = pathList.Count - 1; i >= 0; i--) - { - var (l, n, t, isPwc, isFwc) = pathList[i]; - if (n.IsEmpty) - { - if (isPwc) l.Pwc = null; - else if (isFwc) l.Fwc = null; - else l.Nodes.Remove(t); - } + _removes++; + Interlocked.Increment(ref _generation); } } finally @@ -184,22 +124,107 @@ public sealed class SubList : IDisposable } } + /// + /// Core remove logic without lock acquisition or generation bumping. + /// Assumes write lock is held. Returns true if a subscription was actually removed. + /// + private bool RemoveInternal(Subscription sub) + { + var level = _root; + TrieNode? node = null; + bool sawFwc = false; + + var pathList = new List<(TrieLevel level, TrieNode node, string token, bool isPwc, bool isFwc)>(); + + foreach (var token in new TokenEnumerator(sub.Subject)) + { + if (token.Length == 0 || sawFwc) + return false; + + bool isPwc = token.Length == 1 && token[0] == SubjectMatch.Pwc; + bool isFwc = token.Length == 1 && token[0] == SubjectMatch.Fwc; + + if (isPwc) + { + node = level.Pwc; + } + else if (isFwc) + { + node = level.Fwc; + sawFwc = true; + } + else + { + level.Nodes.TryGetValue(token.ToString(), out node); + } + + if (node == null) + return false; // not found + + var tokenStr = token.ToString(); + pathList.Add((level, node, tokenStr, isPwc, isFwc)); + if (node.Next == null) + return false; // corrupted trie state + level = node.Next; + } + + if (node == null) return false; + + // Remove from node + bool removed; + if (sub.Queue == null) + { + removed = node.PlainSubs.Remove(sub); + } + else + { + removed = false; + if (node.QueueSubs.TryGetValue(sub.Queue, out var qset)) + { + removed = qset.Remove(sub); + if (qset.Count == 0) + node.QueueSubs.Remove(sub.Queue); + } + } + + if (!removed) return false; + + _count--; + + // Prune empty nodes (walk backwards) + for (int i = pathList.Count - 1; i >= 0; i--) + { + var (l, n, t, isPwc, isFwc) = pathList[i]; + if (n.IsEmpty) + { + if (isPwc) l.Pwc = null; + else if (isFwc) l.Fwc = null; + else l.Nodes.Remove(t); + } + } + + return true; + } + public SubListResult Match(string subject) { - // Check cache under read lock first. + Interlocked.Increment(ref _matches); + var currentGen = Interlocked.Read(ref _generation); + _lock.EnterReadLock(); try { - if (_cache != null && _cache.TryGetValue(subject, out var cached)) - return cached; + if (_cache != null && _cache.TryGetValue(subject, out var cached) && cached.Generation == currentGen) + { + Interlocked.Increment(ref _cacheHits); + return cached.Result; + } } finally { _lock.ExitReadLock(); } - // Cache miss -- tokenize and match under write lock (needed for cache update). - // Tokenize the subject. var tokens = Tokenize(subject); if (tokens == null) return SubListResult.Empty; @@ -207,13 +232,15 @@ public sealed class SubList : IDisposable _lock.EnterWriteLock(); try { - // Re-check cache after acquiring write lock. - if (_cache != null && _cache.TryGetValue(subject, out var cached)) - return cached; + currentGen = Interlocked.Read(ref _generation); + if (_cache != null && _cache.TryGetValue(subject, out var cached) && cached.Generation == currentGen) + { + Interlocked.Increment(ref _cacheHits); + return cached.Result; + } var plainSubs = new List(); var queueSubs = new List>(); - MatchLevel(_root, tokens, 0, plainSubs, queueSubs); SubListResult result; @@ -226,19 +253,14 @@ public sealed class SubList : IDisposable var queueSubsArr = new Subscription[queueSubs.Count][]; for (int i = 0; i < queueSubs.Count; i++) queueSubsArr[i] = queueSubs[i].ToArray(); - - result = new SubListResult( - plainSubs.ToArray(), - queueSubsArr); + result = new SubListResult(plainSubs.ToArray(), queueSubsArr); } if (_cache != null) { - _cache[subject] = result; - + _cache[subject] = new CachedResult(result, currentGen); if (_cache.Count > CacheMax) { - // Sweep: remove entries until at CacheSweep count. var keys = _cache.Keys.Take(_cache.Count - CacheSweep).ToList(); foreach (var key in keys) _cache.Remove(key); @@ -356,119 +378,352 @@ public sealed class SubList : IDisposable } } - /// - /// Adds a subscription to matching cache entries. - /// Assumes write lock is held. - /// - private void AddToCache(string subject, Subscription sub) + public SubListStats Stats() { - if (_cache == null) - return; - - // If literal subject, we can do a direct lookup. - if (SubjectMatch.IsLiteral(subject)) + _lock.EnterReadLock(); + uint numSubs, numCache; + ulong inserts, removes; + try { - if (_cache.TryGetValue(subject, out var r)) - { - _cache[subject] = AddSubToResult(r, sub); - } - return; + numSubs = _count; + numCache = (uint)(_cache?.Count ?? 0); + inserts = _inserts; + removes = _removes; + } + finally + { + _lock.ExitReadLock(); } - // Wildcard subscription -- check all cached keys. - var keysToUpdate = new List<(string key, SubListResult result)>(); - foreach (var (key, r) in _cache) + var matches = Interlocked.Read(ref _matches); + var cacheHits = Interlocked.Read(ref _cacheHits); + var hitRate = matches > 0 ? (double)cacheHits / matches : 0.0; + + uint maxFanout = 0; + long totalFanout = 0; + int cacheEntries = 0; + + _lock.EnterReadLock(); + try { - if (SubjectMatch.MatchLiteral(key, subject)) + if (_cache != null) { - keysToUpdate.Add((key, r)); + foreach (var (_, entry) in _cache) + { + var r = entry.Result; + var f = r.PlainSubs.Length + r.QueueSubs.Length; + totalFanout += f; + if (f > maxFanout) maxFanout = (uint)f; + cacheEntries++; + } } } - foreach (var (key, r) in keysToUpdate) + finally { - _cache[key] = AddSubToResult(r, sub); + _lock.ExitReadLock(); + } + + return new SubListStats + { + NumSubs = numSubs, + NumCache = numCache, + NumInserts = inserts, + NumRemoves = removes, + NumMatches = matches, + CacheHitRate = hitRate, + MaxFanout = maxFanout, + AvgFanout = cacheEntries > 0 ? (double)totalFanout / cacheEntries : 0.0, + }; + } + + public bool HasInterest(string subject) + { + var currentGen = Interlocked.Read(ref _generation); + _lock.EnterReadLock(); + try + { + if (_cache != null && _cache.TryGetValue(subject, out var cached) && cached.Generation == currentGen) + { + var r = cached.Result; + return r.PlainSubs.Length > 0 || r.QueueSubs.Length > 0; + } + } + finally + { + _lock.ExitReadLock(); + } + + var tokens = Tokenize(subject); + if (tokens == null) return false; + + _lock.EnterReadLock(); + try + { + return HasInterestLevel(_root, tokens, 0); + } + finally + { + _lock.ExitReadLock(); } } - /// - /// Removes cache entries that match the given subject. - /// Assumes write lock is held. - /// - private void RemoveFromCache(string subject) + public (int plainCount, int queueCount) NumInterest(string subject) { - if (_cache == null) - return; + var tokens = Tokenize(subject); + if (tokens == null) return (0, 0); - // If literal subject, we can do a direct removal. - if (SubjectMatch.IsLiteral(subject)) + _lock.EnterReadLock(); + try { - _cache.Remove(subject); - return; + int np = 0, nq = 0; + CountInterestLevel(_root, tokens, 0, ref np, ref nq); + return (np, nq); } - - // Wildcard subscription -- remove all matching cached keys. - var keysToRemove = new List(); - foreach (var key in _cache.Keys) + finally { - if (SubjectMatch.MatchLiteral(key, subject)) - { - keysToRemove.Add(key); - } - } - foreach (var key in keysToRemove) - { - _cache.Remove(key); + _lock.ExitReadLock(); } } - /// - /// Creates a new result with the given subscription added. - /// - private static SubListResult AddSubToResult(SubListResult result, Subscription sub) + public void RemoveBatch(IEnumerable subs) { - if (sub.Queue == null) + _lock.EnterWriteLock(); + try { - var newPlain = new Subscription[result.PlainSubs.Length + 1]; - result.PlainSubs.CopyTo(newPlain, 0); - newPlain[^1] = sub; - return new SubListResult(newPlain, result.QueueSubs); + var wasEnabled = _cache != null; + _cache = null; + + foreach (var sub in subs) + RemoveInternal(sub); + + Interlocked.Increment(ref _generation); + + if (wasEnabled) + _cache = new Dictionary(StringComparer.Ordinal); + } + finally + { + _lock.ExitWriteLock(); + } + } + + public IReadOnlyList All() + { + var subs = new List(); + _lock.EnterReadLock(); + try + { + CollectAllSubs(_root, subs); + } + finally + { + _lock.ExitReadLock(); + } + return subs; + } + + public SubListResult ReverseMatch(string subject) + { + var tokens = Tokenize(subject); + if (tokens == null) + return SubListResult.Empty; + + _lock.EnterReadLock(); + try + { + var plainSubs = new List(); + var queueSubs = new List>(); + ReverseMatchLevel(_root, tokens, 0, plainSubs, queueSubs); + + if (plainSubs.Count == 0 && queueSubs.Count == 0) + return SubListResult.Empty; + + var queueSubsArr = new Subscription[queueSubs.Count][]; + for (int i = 0; i < queueSubs.Count; i++) + queueSubsArr[i] = queueSubs[i].ToArray(); + return new SubListResult(plainSubs.ToArray(), queueSubsArr); + } + finally + { + _lock.ExitReadLock(); + } + } + + private static bool HasInterestLevel(TrieLevel? level, string[] tokens, int tokenIndex) + { + TrieNode? pwc = null; + TrieNode? node = null; + + for (int i = tokenIndex; i < tokens.Length; i++) + { + if (level == null) return false; + if (level.Fwc != null && NodeHasInterest(level.Fwc)) return true; + + pwc = level.Pwc; + if (pwc != null && HasInterestLevel(pwc.Next, tokens, i + 1)) return true; + + node = null; + if (level.Nodes.TryGetValue(tokens[i], out var found)) + { + node = found; + level = node.Next; + } + else + { + level = null; + } + } + + if (node != null && NodeHasInterest(node)) return true; + if (pwc != null && NodeHasInterest(pwc)) return true; + return false; + } + + private static bool NodeHasInterest(TrieNode node) + { + return node.PlainSubs.Count > 0 || node.QueueSubs.Count > 0; + } + + private static void CountInterestLevel(TrieLevel? level, string[] tokens, int tokenIndex, + ref int np, ref int nq) + { + TrieNode? pwc = null; + TrieNode? node = null; + + for (int i = tokenIndex; i < tokens.Length; i++) + { + if (level == null) return; + if (level.Fwc != null) AddNodeCounts(level.Fwc, ref np, ref nq); + + pwc = level.Pwc; + if (pwc != null) CountInterestLevel(pwc.Next, tokens, i + 1, ref np, ref nq); + + node = null; + if (level.Nodes.TryGetValue(tokens[i], out var found)) + { + node = found; + level = node.Next; + } + else + { + level = null; + } + } + + if (node != null) AddNodeCounts(node, ref np, ref nq); + if (pwc != null) AddNodeCounts(pwc, ref np, ref nq); + } + + private static void AddNodeCounts(TrieNode node, ref int np, ref int nq) + { + np += node.PlainSubs.Count; + foreach (var (_, qset) in node.QueueSubs) + nq += qset.Count; + } + + private static void CollectAllSubs(TrieLevel level, List subs) + { + foreach (var (_, node) in level.Nodes) + { + foreach (var sub in node.PlainSubs) + subs.Add(sub); + foreach (var (_, qset) in node.QueueSubs) + foreach (var sub in qset) + subs.Add(sub); + if (node.Next != null) + CollectAllSubs(node.Next, subs); + } + if (level.Pwc != null) + { + foreach (var sub in level.Pwc.PlainSubs) + subs.Add(sub); + foreach (var (_, qset) in level.Pwc.QueueSubs) + foreach (var sub in qset) + subs.Add(sub); + if (level.Pwc.Next != null) + CollectAllSubs(level.Pwc.Next, subs); + } + if (level.Fwc != null) + { + foreach (var sub in level.Fwc.PlainSubs) + subs.Add(sub); + foreach (var (_, qset) in level.Fwc.QueueSubs) + foreach (var sub in qset) + subs.Add(sub); + if (level.Fwc.Next != null) + CollectAllSubs(level.Fwc.Next, subs); + } + } + + private static void ReverseMatchLevel(TrieLevel? level, string[] tokens, int tokenIndex, + List plainSubs, List> queueSubs) + { + if (level == null || tokenIndex >= tokens.Length) + return; + + var token = tokens[tokenIndex]; + bool isLast = tokenIndex == tokens.Length - 1; + + if (token == ">") + { + CollectAllNodes(level, plainSubs, queueSubs); + return; + } + + if (token == "*") + { + foreach (var (_, node) in level.Nodes) + { + if (isLast) + AddNodeToResults(node, plainSubs, queueSubs); + else + ReverseMatchLevel(node.Next, tokens, tokenIndex + 1, plainSubs, queueSubs); + } } else { - // Find existing queue group - var queueSubs = result.QueueSubs; - int slot = -1; - for (int i = 0; i < queueSubs.Length; i++) + if (level.Nodes.TryGetValue(token, out var node)) { - if (queueSubs[i].Length > 0 && queueSubs[i][0].Queue == sub.Queue) - { - slot = i; - break; - } - } - - // Deep copy queue subs - var newQueueSubs = new Subscription[queueSubs.Length + (slot < 0 ? 1 : 0)][]; - for (int i = 0; i < queueSubs.Length; i++) - { - if (i == slot) - { - var newGroup = new Subscription[queueSubs[i].Length + 1]; - queueSubs[i].CopyTo(newGroup, 0); - newGroup[^1] = sub; - newQueueSubs[i] = newGroup; - } + if (isLast) + AddNodeToResults(node, plainSubs, queueSubs); else - { - newQueueSubs[i] = (Subscription[])queueSubs[i].Clone(); - } - } - if (slot < 0) - { - newQueueSubs[^1] = [sub]; + ReverseMatchLevel(node.Next, tokens, tokenIndex + 1, plainSubs, queueSubs); } + } - return new SubListResult(result.PlainSubs, newQueueSubs); + if (level.Pwc != null) + { + if (isLast) + AddNodeToResults(level.Pwc, plainSubs, queueSubs); + else + ReverseMatchLevel(level.Pwc.Next, tokens, tokenIndex + 1, plainSubs, queueSubs); + } + if (level.Fwc != null) + { + AddNodeToResults(level.Fwc, plainSubs, queueSubs); + } + } + + private static void CollectAllNodes(TrieLevel level, List plainSubs, + List> queueSubs) + { + foreach (var (_, node) in level.Nodes) + { + AddNodeToResults(node, plainSubs, queueSubs); + if (node.Next != null) + CollectAllNodes(node.Next, plainSubs, queueSubs); + } + if (level.Pwc != null) + { + AddNodeToResults(level.Pwc, plainSubs, queueSubs); + if (level.Pwc.Next != null) + CollectAllNodes(level.Pwc.Next, plainSubs, queueSubs); + } + if (level.Fwc != null) + { + AddNodeToResults(level.Fwc, plainSubs, queueSubs); + if (level.Fwc.Next != null) + CollectAllNodes(level.Fwc.Next, plainSubs, queueSubs); } } diff --git a/src/NATS.Server/Subscriptions/SubListStats.cs b/src/NATS.Server/Subscriptions/SubListStats.cs new file mode 100644 index 0000000..76e31eb --- /dev/null +++ b/src/NATS.Server/Subscriptions/SubListStats.cs @@ -0,0 +1,13 @@ +namespace NATS.Server.Subscriptions; + +public sealed class SubListStats +{ + public uint NumSubs { get; init; } + public uint NumCache { get; init; } + public ulong NumInserts { get; init; } + public ulong NumRemoves { get; init; } + public ulong NumMatches { get; init; } + public double CacheHitRate { get; init; } + public uint MaxFanout { get; init; } + public double AvgFanout { get; init; } +} diff --git a/tests/NATS.Server.Tests/SubListTests.cs b/tests/NATS.Server.Tests/SubListTests.cs index 51d1544..8f3eaf9 100644 --- a/tests/NATS.Server.Tests/SubListTests.cs +++ b/tests/NATS.Server.Tests/SubListTests.cs @@ -156,4 +156,125 @@ public class SubListTests var r = sl.Match("foo.bar.baz"); r.PlainSubs.Length.ShouldBe(3); } + + [Fact] + public void Stats_returns_correct_values() + { + var sl = new SubList(); + sl.Insert(MakeSub("foo.bar", sid: "1")); + sl.Insert(MakeSub("foo.baz", sid: "2")); + sl.Match("foo.bar"); + sl.Match("foo.bar"); // cache hit + + var stats = sl.Stats(); + stats.NumSubs.ShouldBe(2u); + stats.NumInserts.ShouldBe(2ul); + stats.NumMatches.ShouldBe(2ul); + stats.CacheHitRate.ShouldBeGreaterThan(0.0); + } + + [Fact] + public void HasInterest_returns_true_when_subscribers_exist() + { + var sl = new SubList(); + sl.Insert(MakeSub("foo.bar")); + sl.HasInterest("foo.bar").ShouldBeTrue(); + sl.HasInterest("foo.baz").ShouldBeFalse(); + } + + [Fact] + public void HasInterest_with_wildcards() + { + var sl = new SubList(); + sl.Insert(MakeSub("foo.*")); + sl.HasInterest("foo.bar").ShouldBeTrue(); + sl.HasInterest("bar.baz").ShouldBeFalse(); + } + + [Fact] + public void NumInterest_counts_subscribers() + { + var sl = new SubList(); + sl.Insert(MakeSub("foo.bar", sid: "1")); + sl.Insert(MakeSub("foo.*", sid: "2")); + sl.Insert(MakeSub("foo.bar", queue: "q1", sid: "3")); + + var (np, nq) = sl.NumInterest("foo.bar"); + np.ShouldBe(2); // foo.bar + foo.* + nq.ShouldBe(1); // queue sub + } + + [Fact] + public void RemoveBatch_removes_all() + { + var sl = new SubList(); + var sub1 = MakeSub("foo.bar", sid: "1"); + var sub2 = MakeSub("foo.baz", sid: "2"); + var sub3 = MakeSub("bar.qux", sid: "3"); + sl.Insert(sub1); + sl.Insert(sub2); + sl.Insert(sub3); + sl.Count.ShouldBe(3u); + + sl.RemoveBatch([sub1, sub2]); + sl.Count.ShouldBe(1u); + sl.Match("foo.bar").PlainSubs.ShouldBeEmpty(); + sl.Match("bar.qux").PlainSubs.ShouldHaveSingleItem(); + } + + [Fact] + public void All_returns_every_subscription() + { + var sl = new SubList(); + var sub1 = MakeSub("foo.bar", sid: "1"); + var sub2 = MakeSub("foo.*", sid: "2"); + var sub3 = MakeSub("bar.>", queue: "q", sid: "3"); + sl.Insert(sub1); + sl.Insert(sub2); + sl.Insert(sub3); + + var all = sl.All(); + all.Count.ShouldBe(3); + all.ShouldContain(sub1); + all.ShouldContain(sub2); + all.ShouldContain(sub3); + } + + [Fact] + public void ReverseMatch_finds_patterns_matching_literal() + { + var sl = new SubList(); + var sub1 = MakeSub("foo.bar", sid: "1"); + var sub2 = MakeSub("foo.*", sid: "2"); + var sub3 = MakeSub("foo.>", sid: "3"); + var sub4 = MakeSub("bar.baz", sid: "4"); + sl.Insert(sub1); + sl.Insert(sub2); + sl.Insert(sub3); + sl.Insert(sub4); + + var result = sl.ReverseMatch("foo.bar"); + result.PlainSubs.Length.ShouldBe(3); // foo.bar, foo.*, foo.> + result.PlainSubs.ShouldContain(sub1); + result.PlainSubs.ShouldContain(sub2); + result.PlainSubs.ShouldContain(sub3); + } + + [Fact] + public void Generation_ID_invalidates_cache() + { + var sl = new SubList(); + sl.Insert(MakeSub("foo.bar", sid: "1")); + + // Prime cache + var r1 = sl.Match("foo.bar"); + r1.PlainSubs.Length.ShouldBe(1); + + // Insert another sub (bumps generation) + sl.Insert(MakeSub("foo.bar", sid: "2")); + + // Cache should be invalidated by generation mismatch + var r2 = sl.Match("foo.bar"); + r2.PlainSubs.Length.ShouldBe(2); + } }