diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/DataStructures/SubscriptionIndex.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/DataStructures/SubscriptionIndex.cs new file mode 100644 index 0000000..c910a2b --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/DataStructures/SubscriptionIndex.cs @@ -0,0 +1,1664 @@ +// Copyright 2016-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/sublist.go in the NATS server Go source. + +using System.Text; + +namespace ZB.MOM.NatsNet.Server.Internal.DataStructures; + +/// +/// A trie-based routing mechanism for NATS subject distribution. +/// Matches published subjects to interested subscribers, supporting +/// * (single-token) and > (full) wildcards. +/// Mirrors Go's Sublist. +/// +public sealed class SubscriptionIndex +{ + // Wildcard and separator constants. + internal const char Pwc = '*'; + internal const string Pwcs = "*"; + internal const char Fwc = '>'; + internal const string Fwcs = ">"; + internal const string Tsep = "."; + internal const char Btsep = '.'; + + // Error singletons. + public static readonly Exception ErrInvalidSubject = new ArgumentException("sublist: invalid subject"); + public static readonly Exception ErrNotFound = new KeyNotFoundException("sublist: no matches found"); + public static readonly Exception ErrNilChan = new ArgumentNullException("sublist: nil channel"); + public static readonly Exception ErrAlreadyRegistered = new InvalidOperationException("sublist: notification already registered"); + + // Cache limits. + internal const int SlCacheMax = 1024; + internal const int SlCacheSweep = 256; + // Threshold for creating a fast plist for Match. + internal const int PlistMin = 256; + + // Shared empty result singleton. + internal static readonly SubscriptionIndexResult EmptyResult = new(); + + // ------------------------------------------------------------------------- + // Fields + // ------------------------------------------------------------------------- + + private readonly ReaderWriterLockSlim _lock = new(LockRecursionPolicy.NoRecursion); + private long _genId; + private long _matches; + private long _cacheHits; + private long _inserts; + private long _removes; + private SublistLevel _root; + private Dictionary? _cache; + private int _ccSweep; + private NotifyMaps? _notify; + private uint _count; + + // ------------------------------------------------------------------------- + // Construction + // ------------------------------------------------------------------------- + + private SubscriptionIndex(bool enableCache) + { + _root = new SublistLevel(); + _cache = enableCache ? new Dictionary() : null; + } + + /// Creates a new SubscriptionIndex with caching controlled by . + public static SubscriptionIndex NewSublist(bool enableCache) => new(enableCache); + + /// Creates a new SubscriptionIndex with caching enabled. + public static SubscriptionIndex NewSublistWithCache() => new(true); + + /// Creates a new SubscriptionIndex with caching disabled. + public static SubscriptionIndex NewSublistNoCache() => new(false); + + // ------------------------------------------------------------------------- + // Public API — Cache + // ------------------------------------------------------------------------- + + /// Returns whether caching is enabled. + public bool CacheEnabled() + { + _lock.EnterReadLock(); + try { return _cache != null; } + finally { _lock.ExitReadLock(); } + } + + /// Returns the number of cached result entries. + public int CacheCount() + { + _lock.EnterReadLock(); + try { return _cache?.Count ?? 0; } + finally { _lock.ExitReadLock(); } + } + + // ------------------------------------------------------------------------- + // Public API — Insert / Remove + // ------------------------------------------------------------------------- + + /// Inserts a subscription into the index. Mirrors Sublist.Insert. + public Exception? Insert(Subscription sub) + { + var subject = Encoding.ASCII.GetString(sub.Subject); + + _lock.EnterWriteLock(); + try + { + bool sfwc = false, haswc = false, isnew = false; + SublistNode? n = null; + var l = _root; + + var start = 0; + for (var i = 0; i <= subject.Length; i++) + { + if (i < subject.Length && subject[i] != Btsep) + continue; + + var tokenLen = i - start; + if (tokenLen == 0 || sfwc) + return ErrInvalidSubject; + + var t = subject.Substring(start, tokenLen); + + if (tokenLen > 1) + { + if (!l.Nodes.TryGetValue(t, out n)) + { + n = new SublistNode(); + l.Nodes[t] = n; + } + } + else + { + switch (t[0]) + { + case Pwc: + n = l.Pwc; + haswc = true; + if (n == null) + { + n = new SublistNode(); + l.Pwc = n; + } + break; + case Fwc: + n = l.Fwc; + haswc = true; + sfwc = true; + if (n == null) + { + n = new SublistNode(); + l.Fwc = n; + } + break; + default: + if (!l.Nodes.TryGetValue(t, out n)) + { + n = new SublistNode(); + l.Nodes[t] = n; + } + break; + } + } + + n.Next ??= new SublistLevel(); + l = n.Next; + + start = i + 1; + } + + if (n == null) + return ErrInvalidSubject; + + if (sub.Queue == null || sub.Queue.Length == 0) + { + n.PSubs[sub] = default; + isnew = n.PSubs.Count == 1; + if (n.PList != null) + { + n.PList.Add(sub); + } + else if (n.PSubs.Count > PlistMin) + { + n.PList = new List(n.PSubs.Count); + foreach (var psub in n.PSubs.Keys) + n.PList.Add(psub); + } + } + else + { + n.QSubs ??= new Dictionary>(); + var qname = Encoding.ASCII.GetString(sub.Queue); + if (!n.QSubs.TryGetValue(qname, out var subs)) + { + subs = new Dictionary(); + n.QSubs[qname] = subs; + isnew = true; + } + subs[sub] = 0; + } + + _count++; + _inserts++; + + AddToCache(subject, sub); + Interlocked.Increment(ref _genId); + + if (_notify != null && isnew && !haswc && _notify.Insert.Count > 0) + ChkForInsertNotification(subject, sub.Queue != null ? Encoding.ASCII.GetString(sub.Queue) : string.Empty); + } + finally + { + _lock.ExitWriteLock(); + } + + return null; + } + + /// Removes a single subscription. Mirrors Sublist.Remove. + public Exception? Remove(Subscription sub) => RemoveInternal(sub, true, true); + + /// Removes a batch of subscriptions. Mirrors Sublist.RemoveBatch. + public Exception? RemoveBatch(IReadOnlyList subs) + { + if (subs.Count == 0) return null; + + _lock.EnterWriteLock(); + try + { + var wasEnabled = _cache != null; + _cache = null; + + Exception? err = null; + foreach (var sub in subs) + { + var lerr = RemoveInternal(sub, false, false); + err ??= lerr; + } + + Interlocked.Increment(ref _genId); + if (wasEnabled) + _cache = new Dictionary(); + + return err; + } + finally + { + _lock.ExitWriteLock(); + } + } + + /// Invalidates cache for a remote queue sub weight change. + public void UpdateRemoteQSub(Subscription sub) + { + _lock.EnterWriteLock(); + try + { + RemoveFromCache(Encoding.ASCII.GetString(sub.Subject)); + Interlocked.Increment(ref _genId); + } + finally + { + _lock.ExitWriteLock(); + } + } + + // ------------------------------------------------------------------------- + // Public API — Match / HasInterest / NumInterest + // ------------------------------------------------------------------------- + + /// + /// Matches all subscriptions for a literal subject. + /// Returns a result containing plain and queue subscribers. + /// Mirrors Sublist.Match. + /// + public SubscriptionIndexResult Match(string subject) => MatchInternal(subject, true, false); + + /// Matches using a byte[] subject. Mirrors Sublist.MatchBytes. + public SubscriptionIndexResult MatchBytes(byte[] subject) => MatchInternal(Encoding.ASCII.GetString(subject), true, true); + + /// Returns true if there is any interest in the literal subject. + public bool HasInterest(string subject) => HasInterestInternal(subject, true, out _, out _); + + /// Returns counts of plain and queue subscribers for the literal subject. + public (int np, int nq) NumInterest(string subject) + { + HasInterestInternal(subject, true, out var np, out var nq); + return (np, nq); + } + + // ------------------------------------------------------------------------- + // Public API — ReverseMatch + // ------------------------------------------------------------------------- + + /// + /// For a given subject (which may contain wildcards), returns all subscriptions + /// that would match that subject. Mirrors Sublist.ReverseMatch. + /// + public SubscriptionIndexResult ReverseMatch(string subject) + { + var tokens = TokenizeIntoArray(subject); + var result = new SubscriptionIndexResult(); + + _lock.EnterReadLock(); + try + { + ReverseMatchLevel(_root, tokens.AsSpan(), null, result); + if (result.PSubs.Count == 0 && result.QSubs.Count == 0) + result = EmptyResult; + } + finally + { + _lock.ExitReadLock(); + } + return result; + } + + // ------------------------------------------------------------------------- + // Public API — Enumerate + // ------------------------------------------------------------------------- + + /// Returns the subscription count. + public uint Count() + { + _lock.EnterReadLock(); + try { return _count; } + finally { _lock.ExitReadLock(); } + } + + /// Collects all subscriptions into . Mirrors Sublist.All. + public void All(List subs) + { + _lock.EnterReadLock(); + try { CollectAllSubs(_root, subs); } + finally { _lock.ExitReadLock(); } + } + + /// Collects local client subscriptions. Mirrors Sublist.localSubs. + public void LocalSubs(List subs, bool includeLeafHubs) + { + _lock.EnterReadLock(); + try { CollectLocalSubs(_root, subs, includeLeafHubs); } + finally { _lock.ExitReadLock(); } + } + + /// Returns the generation ID (incremented on every mutation). Thread-safe. + public long GenId() => Interlocked.Read(ref _genId); + + // ------------------------------------------------------------------------- + // Public API — Stats + // ------------------------------------------------------------------------- + + /// Returns statistics for the current state. Mirrors Sublist.Stats. + public SublistStats Stats() + { + var st = new SublistStats(); + + _lock.EnterReadLock(); + var cache = _cache; + var cc = _cache?.Count ?? 0; + st.NumSubs = _count; + st.NumInserts = (ulong)Interlocked.Read(ref _inserts); + st.NumRemoves = (ulong)Interlocked.Read(ref _removes); + _lock.ExitReadLock(); + + st.NumCache = (uint)cc; + st.NumMatches = (ulong)Interlocked.Read(ref _matches); + st.CacheHitsRaw = (ulong)Interlocked.Read(ref _cacheHits); + if (st.NumMatches > 0) + st.CacheHitRate = (double)st.CacheHitsRaw / st.NumMatches; + + if (cache != null) + { + int tot = 0, max = 0, clen = 0; + _lock.EnterReadLock(); + try + { + foreach (var r in _cache!.Values) + { + clen++; + var l = r.PSubs.Count + r.QSubs.Count; + tot += l; + if (l > max) max = l; + } + } + finally + { + _lock.ExitReadLock(); + } + st.TotFanout = tot; + st.CacheCnt = clen; + st.MaxFanout = (uint)max; + if (tot > 0) + st.AvgFanout = (double)tot / clen; + } + + return st; + } + + // ------------------------------------------------------------------------- + // Public API — Notifications + // ------------------------------------------------------------------------- + + /// Registers a notification for interest changes on a literal subject. + public Exception? RegisterNotification(string subject, Action notify) => + RegisterNotificationInternal(subject, string.Empty, notify); + + /// Registers a notification for queue interest changes. + public Exception? RegisterQueueNotification(string subject, string queue, Action notify) => + RegisterNotificationInternal(subject, queue, notify); + + /// Clears a notification. + public bool ClearNotification(string subject, Action notify) => + ClearNotificationInternal(subject, string.Empty, notify); + + /// Clears a queue notification. + public bool ClearQueueNotification(string subject, string queue, Action notify) => + ClearNotificationInternal(subject, queue, notify); + + // ------------------------------------------------------------------------- + // Internal: numLevels (for testing) + // ------------------------------------------------------------------------- + + /// Returns the maximum depth of the trie. Used in tests. + internal int NumLevels() => VisitLevel(_root, 0); + + // ------------------------------------------------------------------------- + // Private: Remove internals + // ------------------------------------------------------------------------- + + private Exception? RemoveInternal(Subscription sub, bool shouldLock, bool doCacheUpdates) + { + var subject = Encoding.ASCII.GetString(sub.Subject); + + if (shouldLock) _lock.EnterWriteLock(); + try + { + bool sfwc = false, haswc = false; + SublistNode? n = null; + var l = _root; + + var lnts = new LevelNodeToken[32]; + var levelCount = 0; + + var start = 0; + for (var i = 0; i <= subject.Length; i++) + { + if (i < subject.Length && subject[i] != Btsep) + continue; + + var tokenLen = i - start; + if (tokenLen == 0 || sfwc) + return ErrInvalidSubject; + + if (l == null!) + return ErrNotFound; + + var t = subject.Substring(start, tokenLen); + + if (tokenLen > 1) + { + l.Nodes.TryGetValue(t, out n); + } + else + { + switch (t[0]) + { + case Pwc: + n = l.Pwc; + haswc = true; + break; + case Fwc: + n = l.Fwc; + haswc = true; + sfwc = true; + break; + default: + l.Nodes.TryGetValue(t, out n); + break; + } + } + + if (n != null) + { + if (levelCount < lnts.Length) + lnts[levelCount++] = new LevelNodeToken(l, n, t); + l = n.Next!; + } + else + { + l = null!; + } + + start = i + 1; + } + + var (removed, last) = RemoveFromNode(n, sub); + if (!removed) + return ErrNotFound; + + _count--; + _removes++; + + for (var i = levelCount - 1; i >= 0; i--) + { + ref var lnt = ref lnts[i]; + if (lnt.Node.IsEmpty()) + lnt.Level.PruneNode(lnt.Node, lnt.Token); + } + + if (doCacheUpdates) + { + RemoveFromCache(subject); + Interlocked.Increment(ref _genId); + } + + if (_notify != null && last && !haswc && _notify.Remove.Count > 0) + ChkForRemoveNotification(subject, sub.Queue != null ? Encoding.ASCII.GetString(sub.Queue) : string.Empty); + + return null; + } + finally + { + if (shouldLock) _lock.ExitWriteLock(); + } + } + + private static (bool found, bool last) RemoveFromNode(SublistNode? n, Subscription sub) + { + if (n == null) return (false, true); + + if (sub.Queue == null || sub.Queue.Length == 0) + { + var found = n.PSubs.Remove(sub); + if (found && n.PList != null) + n.PList = null; // Will be re-populated on Match if needed. + return (found, n.PSubs.Count == 0); + } + + // Queue subscription. + var qname = Encoding.ASCII.GetString(sub.Queue); + if (n.QSubs == null || !n.QSubs.TryGetValue(qname, out var qsub)) + return (false, false); + + var removed = qsub.Remove(sub); + bool last = false; + if (qsub.Count == 0) + { + last = true; + n.QSubs.Remove(qname); + } + return (removed, last); + } + + // ------------------------------------------------------------------------- + // Private: Match internals + // ------------------------------------------------------------------------- + + private SubscriptionIndexResult MatchInternal(string subject, bool doLock, bool doCopyOnCache) + { + Interlocked.Increment(ref _matches); + + // Check cache first. + if (doLock) _lock.EnterReadLock(); + var cacheEnabled = _cache != null; + SubscriptionIndexResult? cached = null; + _cache?.TryGetValue(subject, out cached); + if (doLock) _lock.ExitReadLock(); + + if (cached != null) + { + Interlocked.Increment(ref _cacheHits); + return cached; + } + + // Tokenize. + var tokens = TokenizeForMatch(subject); + if (tokens == null) + return EmptyResult; + + var result = new SubscriptionIndexResult(); + + int n; + if (doLock) + { + if (cacheEnabled) + _lock.EnterWriteLock(); + else + _lock.EnterReadLock(); + } + + try + { + MatchLevel(_root, tokens, result); + + if (result.PSubs.Count == 0 && result.QSubs.Count == 0) + result = EmptyResult; + + if (cacheEnabled) + { + _cache![subject] = result; + n = _cache.Count; + } + else + { + n = 0; + } + } + finally + { + if (doLock) + { + if (cacheEnabled) _lock.ExitWriteLock(); + else _lock.ExitReadLock(); + } + } + + // Reduce cache if over limit. + if (cacheEnabled && n > SlCacheMax && Interlocked.CompareExchange(ref _ccSweep, 1, 0) == 0) + Task.Run(ReduceCacheCount); + + return result; + } + + internal SubscriptionIndexResult MatchNoLock(string subject) => MatchInternal(subject, false, false); + + private bool HasInterestInternal(string subject, bool doLock, out int np, out int nq) + { + np = 0; + nq = 0; + + // Check cache first. + if (doLock) _lock.EnterReadLock(); + bool matched = false; + if (_cache != null && _cache.TryGetValue(subject, out var cached)) + { + np = cached.PSubs.Count; + foreach (var qsub in cached.QSubs) + nq += qsub.Count; + matched = cached.PSubs.Count + cached.QSubs.Count > 0; + } + if (doLock) _lock.ExitReadLock(); + + if (matched) + { + Interlocked.Increment(ref _cacheHits); + return true; + } + + var tokens = TokenizeForMatch(subject); + if (tokens == null) return false; + + if (doLock) _lock.EnterReadLock(); + try + { + return MatchLevelForAny(_root, tokens.AsSpan(), ref np, ref nq); + } + finally + { + if (doLock) _lock.ExitReadLock(); + } + } + + // ------------------------------------------------------------------------- + // Private: Cache management + // ------------------------------------------------------------------------- + + private void AddToCache(string subject, Subscription sub) + { + if (_cache == null) return; + + if (SubjectIsLiteral(subject)) + { + if (_cache.TryGetValue(subject, out var r)) + _cache[subject] = r.AddSubToResult(sub); + return; + } + + // Wildcard subject — update any matching cache entries. + foreach (var key in _cache.Keys.ToArray()) + { + if (MatchLiteral(key, subject)) + { + _cache[key] = _cache[key].AddSubToResult(sub); + } + } + } + + private void RemoveFromCache(string subject) + { + if (_cache == null) return; + + if (SubjectIsLiteral(subject)) + { + _cache.Remove(subject); + return; + } + + foreach (var key in _cache.Keys.ToArray()) + { + if (MatchLiteral(key, subject)) + _cache.Remove(key); + } + } + + private void ReduceCacheCount() + { + try + { + _lock.EnterWriteLock(); + try + { + if (_cache == null) return; + foreach (var key in _cache.Keys.ToArray()) + { + _cache.Remove(key); + if (_cache.Count <= SlCacheSweep) + break; + } + } + finally + { + _lock.ExitWriteLock(); + } + } + finally + { + Interlocked.Exchange(ref _ccSweep, 0); + } + } + + // ------------------------------------------------------------------------- + // Private: Trie matching (matchLevel) + // ------------------------------------------------------------------------- + + private static void MatchLevel(SublistLevel? l, string[] toks, SubscriptionIndexResult results) + { + SublistNode? pwc = null, n = null; + for (int i = 0; i < toks.Length; i++) + { + if (l == null) return; + if (l.Fwc != null) AddNodeToResults(l.Fwc, results); + pwc = l.Pwc; + if (pwc != null) MatchLevel(pwc.Next, toks.AsSpan(i + 1).ToArray(), results); + + l.Nodes.TryGetValue(toks[i], out n); + l = n?.Next; + } + if (n != null) AddNodeToResults(n, results); + if (pwc != null) AddNodeToResults(pwc, results); + } + + private static bool MatchLevelForAny(SublistLevel? l, ReadOnlySpan toks, ref int np, ref int nq) + { + SublistNode? pwc = null, n = null; + for (int i = 0; i < toks.Length; i++) + { + if (l == null) return false; + if (l.Fwc != null) + { + np += l.Fwc.PSubs.Count; + foreach (var qsub in l.Fwc.QSubs?.Values ?? Enumerable.Empty>()) + nq += qsub.Count; + return true; + } + pwc = l.Pwc; + if (pwc != null) + { + if (MatchLevelForAny(pwc.Next, toks[(i + 1)..], ref np, ref nq)) + return true; + } + l.Nodes.TryGetValue(toks[i], out n); + l = n?.Next; + } + if (n != null) + { + np += n.PSubs.Count; + foreach (var qsub in n.QSubs?.Values ?? Enumerable.Empty>()) + nq += qsub.Count; + if ((n.PList?.Count ?? 0) > 0 || n.PSubs.Count > 0 || (n.QSubs?.Count ?? 0) > 0) + return true; + } + if (pwc != null) + { + np += pwc.PSubs.Count; + foreach (var qsub in pwc.QSubs?.Values ?? Enumerable.Empty>()) + nq += qsub.Count; + return (pwc.PList?.Count ?? 0) > 0 || pwc.PSubs.Count > 0 || (pwc.QSubs?.Count ?? 0) > 0; + } + return false; + } + + // ------------------------------------------------------------------------- + // Private: Reverse match + // ------------------------------------------------------------------------- + + private static void ReverseMatchLevel(SublistLevel? l, ReadOnlySpan toks, SublistNode? n, SubscriptionIndexResult results) + { + if (l == null) return; + for (int i = 0; i < toks.Length; i++) + { + var t = toks[i]; + if (t.Length == 1) + { + if (t[0] == Fwc) + { + GetAllNodes(l, results); + return; + } + if (t[0] == Pwc) + { + foreach (var nd in l.Nodes.Values) + ReverseMatchLevel(nd.Next, toks[(i + 1)..], nd, results); + if (l.Pwc != null) + ReverseMatchLevel(l.Pwc.Next, toks[(i + 1)..], n, results); + if (l.Fwc != null) + GetAllNodes(l, results); + return; + } + } + + if (l.Fwc != null) + { + GetAllNodes(l, results); + return; + } + if (l.Pwc != null) + ReverseMatchLevel(l.Pwc.Next, toks[(i + 1)..], n, results); + + if (!l.Nodes.TryGetValue(t, out var next)) + { + n = null; + break; + } + n = next; + l = n.Next; + } + if (n != null) AddNodeToResults(n, results); + } + + private static void GetAllNodes(SublistLevel? l, SubscriptionIndexResult results) + { + if (l == null) return; + if (l.Pwc != null) AddNodeToResults(l.Pwc, results); + if (l.Fwc != null) AddNodeToResults(l.Fwc, results); + foreach (var n in l.Nodes.Values) + { + AddNodeToResults(n, results); + GetAllNodes(n.Next, results); + } + } + + // ------------------------------------------------------------------------- + // Private: addNodeToResults + // ------------------------------------------------------------------------- + + private static void AddNodeToResults(SublistNode n, SubscriptionIndexResult results) + { + // Plain subscriptions. + if (n.PList != null) + { + results.PSubs.AddRange(n.PList); + } + else + { + foreach (var psub in n.PSubs.Keys) + results.PSubs.Add(psub); + } + + // Queue subscriptions. + if (n.QSubs == null) return; + foreach (var (qname, qr) in n.QSubs) + { + if (qr.Count == 0) continue; + + var i = FindQSlot(Encoding.ASCII.GetBytes(qname), results.QSubs); + if (i < 0) + { + i = results.QSubs.Count; + results.QSubs.Add(new List(qr.Count)); + } + + foreach (var sub in qr.Keys) + { + if (IsRemoteQSub(sub)) + { + var ns = Interlocked.CompareExchange(ref sub.Qw, 0, 0); + for (var j = 0; j < ns; j++) + results.QSubs[i].Add(sub); + } + else + { + results.QSubs[i].Add(sub); + } + } + } + } + + internal static int FindQSlot(byte[]? queue, List> qsl) + { + if (queue == null) return -1; + for (int i = 0; i < qsl.Count; i++) + { + if (qsl[i].Count > 0 && qsl[i][0].Queue != null && queue.AsSpan().SequenceEqual(qsl[i][0].Queue)) + return i; + } + return -1; + } + + internal static bool IsRemoteQSub(Subscription sub) + { + return sub.Queue != null && sub.Queue.Length > 0 + && sub.Client != null + && (sub.Client.Kind == ClientKind.Router || sub.Client.Kind == ClientKind.Leaf); + } + + // ------------------------------------------------------------------------- + // Private: Enumerate + // ------------------------------------------------------------------------- + + private static void AddLocalSub(Subscription sub, List subs, bool includeLeafHubs) + { + if (sub.Client == null) return; + var kind = sub.Client.Kind; + if (kind == ClientKind.Client || kind == ClientKind.System || + kind == ClientKind.JetStream || kind == ClientKind.Account || + (includeLeafHubs && sub.Client.IsHubLeafNode())) + { + subs.Add(sub); + } + } + + private static void AddNodeToSubsLocal(SublistNode n, List subs, bool includeLeafHubs) + { + if (n.PList != null) + { + foreach (var sub in n.PList) + AddLocalSub(sub, subs, includeLeafHubs); + } + else + { + foreach (var sub in n.PSubs.Keys) + AddLocalSub(sub, subs, includeLeafHubs); + } + if (n.QSubs != null) + { + foreach (var qr in n.QSubs.Values) + foreach (var sub in qr.Keys) + AddLocalSub(sub, subs, includeLeafHubs); + } + } + + private static void CollectLocalSubs(SublistLevel? l, List subs, bool includeLeafHubs) + { + if (l == null) return; + foreach (var n in l.Nodes.Values) + { + AddNodeToSubsLocal(n, subs, includeLeafHubs); + CollectLocalSubs(n.Next, subs, includeLeafHubs); + } + if (l.Pwc != null) { AddNodeToSubsLocal(l.Pwc, subs, includeLeafHubs); CollectLocalSubs(l.Pwc.Next, subs, includeLeafHubs); } + if (l.Fwc != null) { AddNodeToSubsLocal(l.Fwc, subs, includeLeafHubs); CollectLocalSubs(l.Fwc.Next, subs, includeLeafHubs); } + } + + private static void AddAllNodeToSubs(SublistNode n, List subs) + { + if (n.PList != null) + subs.AddRange(n.PList); + else + foreach (var sub in n.PSubs.Keys) + subs.Add(sub); + + if (n.QSubs != null) + foreach (var qr in n.QSubs.Values) + foreach (var sub in qr.Keys) + subs.Add(sub); + } + + private static void CollectAllSubs(SublistLevel? l, List subs) + { + if (l == null) return; + foreach (var n in l.Nodes.Values) + { + AddAllNodeToSubs(n, subs); + CollectAllSubs(n.Next, subs); + } + if (l.Pwc != null) { AddAllNodeToSubs(l.Pwc, subs); CollectAllSubs(l.Pwc.Next, subs); } + if (l.Fwc != null) { AddAllNodeToSubs(l.Fwc, subs); CollectAllSubs(l.Fwc.Next, subs); } + } + + // ------------------------------------------------------------------------- + // Private: Notifications + // ------------------------------------------------------------------------- + + private Exception? RegisterNotificationInternal(string subject, string queue, Action notify) + { + if (SubjectHasWildcard(subject)) + return ErrInvalidSubject; + + bool hasInterest = false; + var r = Match(subject); + + if (r.PSubs.Count + r.QSubs.Count > 0) + { + if (queue.Length == 0) + { + foreach (var sub in r.PSubs) + { + if (Encoding.ASCII.GetString(sub.Subject) == subject) + { + hasInterest = true; + break; + } + } + } + else + { + foreach (var qsub in r.QSubs) + { + if (qsub.Count == 0) continue; + var qs = qsub[0]; + if (Encoding.ASCII.GetString(qs.Subject) == subject && + qs.Queue != null && Encoding.ASCII.GetString(qs.Queue) == queue) + { + hasInterest = true; + break; + } + } + } + } + + var key = KeyFromSubjectAndQueue(subject, queue); + Exception? err; + + _lock.EnterWriteLock(); + try + { + _notify ??= new NotifyMaps(); + err = hasInterest + ? AddNotify(_notify.Remove, key, notify) + : AddNotify(_notify.Insert, key, notify); + } + finally + { + _lock.ExitWriteLock(); + } + + if (err == null) + SendNotification(notify, hasInterest); + + return err; + } + + private bool ClearNotificationInternal(string subject, string queue, Action notify) + { + _lock.EnterWriteLock(); + try + { + if (_notify == null) return false; + var key = KeyFromSubjectAndQueue(subject, queue); + var didRemove = ChkAndRemove(key, notify, _notify.Remove); + didRemove = didRemove || ChkAndRemove(key, notify, _notify.Insert); + if (_notify.Remove.Count + _notify.Insert.Count == 0) + _notify = null; + return didRemove; + } + finally + { + _lock.ExitWriteLock(); + } + } + + private static bool ChkAndRemove(string key, Action notify, Dictionary>> ms) + { + if (!ms.TryGetValue(key, out var chs)) return false; + for (int i = 0; i < chs.Count; i++) + { + if (chs[i] == notify) + { + chs[i] = chs[^1]; + chs.RemoveAt(chs.Count - 1); + if (chs.Count == 0) ms.Remove(key); + return true; + } + } + return false; + } + + private static Exception? AddNotify(Dictionary>> m, string subject, Action notify) + { + if (m.TryGetValue(subject, out var chs)) + { + foreach (var ch in chs) + if (ch == notify) + return ErrAlreadyRegistered; + } + else + { + chs = new List>(); + m[subject] = chs; + } + chs.Add(notify); + return null; + } + + private static void SendNotification(Action notify, bool hasInterest) + { + // Non-blocking send — in Go this was select { case ch <- val: default: } + // In .NET we just invoke; the caller should ensure it doesn't block. + try { notify(hasInterest); } + catch { /* swallow if handler faults */ } + } + + private static string KeyFromSubjectAndQueue(string subject, string queue) + { + if (queue.Length == 0) return subject; + return subject + " " + queue; + } + + private void ChkForInsertNotification(string subject, string queue) + { + var key = KeyFromSubjectAndQueue(subject, queue); + if (_notify!.Insert.TryGetValue(key, out var chs) && chs.Count > 0) + { + foreach (var ch in chs) SendNotification(ch, true); + if (!_notify.Remove.TryGetValue(key, out var rmChs)) + { + rmChs = new List>(); + _notify.Remove[key] = rmChs; + } + rmChs.AddRange(chs); + _notify.Insert.Remove(key); + } + } + + private void ChkForRemoveNotification(string subject, string queue) + { + var key = KeyFromSubjectAndQueue(subject, queue); + if (!_notify!.Remove.TryGetValue(key, out var chs) || chs.Count == 0) return; + + bool hasInterest = false; + var r = MatchNoLock(subject); + if (r.PSubs.Count + r.QSubs.Count > 0) + { + if (queue.Length == 0) + { + foreach (var sub in r.PSubs) + { + if (Encoding.ASCII.GetString(sub.Subject) == subject) + { + hasInterest = true; + break; + } + } + } + else + { + foreach (var qsub in r.QSubs) + { + if (qsub.Count == 0) continue; + var qs = qsub[0]; + if (Encoding.ASCII.GetString(qs.Subject) == subject && + qs.Queue != null && Encoding.ASCII.GetString(qs.Queue) == queue) + { + hasInterest = true; + break; + } + } + } + } + + if (!hasInterest) + { + foreach (var ch in chs) SendNotification(ch, false); + if (!_notify.Insert.TryGetValue(key, out var insChs)) + { + insChs = new List>(); + _notify.Insert[key] = insChs; + } + insChs.AddRange(chs); + _notify.Remove.Remove(key); + } + } + + // ------------------------------------------------------------------------- + // Private: visitLevel (depth calculation for tests) + // ------------------------------------------------------------------------- + + private static int VisitLevel(SublistLevel? l, int depth) + { + if (l == null || l.NumNodes() == 0) return depth; + depth++; + var maxDepth = depth; + foreach (var n in l.Nodes.Values) + { + var d = VisitLevel(n.Next, depth); + if (d > maxDepth) maxDepth = d; + } + if (l.Pwc != null) { var d = VisitLevel(l.Pwc.Next, depth); if (d > maxDepth) maxDepth = d; } + if (l.Fwc != null) { var d = VisitLevel(l.Fwc.Next, depth); if (d > maxDepth) maxDepth = d; } + return maxDepth; + } + + // ------------------------------------------------------------------------- + // Private: Tokenization helpers + // ------------------------------------------------------------------------- + + private static string[]? TokenizeForMatch(string subject) + { + if (subject.Length == 0) return null; + var tokens = new List(8); + var start = 0; + for (var i = 0; i < subject.Length; i++) + { + if (subject[i] == Btsep) + { + if (i - start == 0) return null; + tokens.Add(subject.Substring(start, i - start)); + start = i + 1; + } + } + if (start >= subject.Length) return null; + tokens.Add(subject[start..]); + return tokens.ToArray(); + } + + internal static string[] TokenizeIntoArray(string subject) + { + var tokens = new List(8); + var start = 0; + for (var i = 0; i < subject.Length; i++) + { + if (subject[i] == Btsep) + { + tokens.Add(subject.Substring(start, i - start)); + start = i + 1; + } + } + tokens.Add(subject[start..]); + return tokens.ToArray(); + } + + // ------------------------------------------------------------------------- + // Public static: Subject validation and utility methods + // ------------------------------------------------------------------------- + + /// Returns true if the subject contains any wildcard tokens. + public static bool SubjectHasWildcard(string subject) + { + for (int i = 0; i < subject.Length; i++) + { + var c = subject[i]; + if (c == Pwc || c == Fwc) + { + if ((i == 0 || subject[i - 1] == Btsep) && + (i + 1 == subject.Length || subject[i + 1] == Btsep)) + return true; + } + } + return false; + } + + /// Returns true if the subject is a literal (no wildcards). + public static bool SubjectIsLiteral(string subject) + { + for (int i = 0; i < subject.Length; i++) + { + var c = subject[i]; + if (c == Pwc || c == Fwc) + { + if ((i == 0 || subject[i - 1] == Btsep) && + (i + 1 == subject.Length || subject[i + 1] == Btsep)) + return false; + } + } + return true; + } + + /// Returns true if subject is valid (allows wildcards). + public static bool IsValidSubject(string subject) => IsValidSubjectInternal(subject, false); + + /// Returns true if subject is valid and literal (no wildcards). + public static bool IsValidPublishSubject(string subject) => + IsValidSubject(subject) && SubjectIsLiteral(subject); + + /// Returns true if subject is valid and literal (no wildcards). + public static bool IsValidLiteralSubject(string subject) => + IsValidLiteralSubjectTokens(subject.Split(Btsep)); + + private static bool IsValidSubjectInternal(string subject, bool checkRunes) + { + if (string.IsNullOrEmpty(subject)) return false; + + if (checkRunes) + { + if (subject.Contains('\0')) return false; + foreach (var r in subject) + { + if (r == '\uFFFD') return false; // RuneError + } + } + + bool sfwc = false; + var start = 0; + for (var i = 0; i <= subject.Length; i++) + { + if (i < subject.Length && subject[i] != Btsep) + continue; + + var tokenLen = i - start; + if (tokenLen == 0 || sfwc) return false; + + if (tokenLen > 1) + { + var t = subject.AsSpan(start, tokenLen); + if (t.ContainsAny(" \t\n\r\f")) + return false; + } + else + { + switch (subject[start]) + { + case Fwc: sfwc = true; break; + case ' ': case '\t': case '\n': case '\r': case '\f': return false; + } + } + + start = i + 1; + } + return true; + } + + private static bool IsValidLiteralSubjectTokens(string[] tokens) + { + foreach (var t in tokens) + { + if (t.Length == 0) return false; + if (t.Length > 1) continue; + if (t[0] == Pwc || t[0] == Fwc) return false; + } + return true; + } + + /// Determines if two subjects could both match a single literal subject. + public static bool SubjectsCollide(string subj1, string subj2) + { + if (subj1 == subj2) return true; + + var toks1 = subj1.Split(Btsep); + var toks2 = subj2.Split(Btsep); + AnalyzeTokens(toks1, out var pwc1, out var fwc1); + AnalyzeTokens(toks2, out var pwc2, out var fwc2); + + bool l1 = !(pwc1 || fwc1), l2 = !(pwc2 || fwc2); + if (l1 && l2) return subj1 == subj2; + if (l1 && !l2) return IsSubsetMatch(toks1, subj2); + if (l2 && !l1) return IsSubsetMatch(toks2, subj1); + + if (!fwc1 && !fwc2 && toks1.Length != toks2.Length) return false; + if (toks1.Length != toks2.Length) + { + if ((toks1.Length < toks2.Length && !fwc1) || (toks2.Length < toks1.Length && !fwc2)) + return false; + } + + var stop = Math.Min(toks1.Length, toks2.Length); + for (int i = 0; i < stop; i++) + { + if (!TokensCanMatch(toks1[i], toks2[i])) + return false; + } + return true; + } + + /// Returns true if the subject matches the filter. Mirrors SubjectMatchesFilter. + public static bool SubjectMatchesFilter(string subject, string filter) => + SubjectIsSubsetMatch(subject, filter); + + /// Matches a literal subject against a potentially-wildcarded subject. Used in the cache layer. + public static bool MatchLiteral(string literal, string subject) + { + int li = 0; + int ll = literal.Length; + int ls = subject.Length; + + for (int i = 0; i < ls; i++) + { + if (li >= ll) return false; + + switch (subject[i]) + { + case Pwc: + if (i == 0 || subject[i - 1] == Btsep) + { + if (i == ls - 1) + { + while (true) + { + if (li >= ll) return true; + if (literal[li] == Btsep) return false; + li++; + } + } + else if (subject[i + 1] == Btsep) + { + while (true) + { + if (li >= ll) return false; + if (literal[li] == Btsep) break; + li++; + } + i++; + } + } + break; + case Fwc: + if ((i == 0 || subject[i - 1] == Btsep) && i == ls - 1) + return true; + break; + } + + if (subject[i] != literal[li]) return false; + li++; + } + return li >= ll; + } + + /// Returns the number of dot-separated tokens in a subject. + public static int NumTokens(string subject) + { + if (subject.Length == 0) return 0; + int count = 0; + for (int i = 0; i < subject.Length; i++) + if (subject[i] == Btsep) count++; + return count + 1; + } + + /// Returns the 1-based indexed token from a subject. + public static string TokenAt(string subject, int index) + { + int ti = 1, start = 0; + for (int i = 0; i < subject.Length; i++) + { + if (subject[i] == Btsep) + { + if (ti == index) return subject[start..i]; + start = i + 1; + ti++; + } + } + if (ti == index) return subject[start..]; + return string.Empty; + } + + internal static bool SubjectIsSubsetMatch(string subject, string test) + { + var tts = TokenizeIntoArray(subject); + return IsSubsetMatch(tts, test); + } + + internal static bool IsSubsetMatch(string[] tokens, string test) + { + var tts = TokenizeIntoArray(test); + return IsSubsetMatchTokenized(tokens, tts); + } + + internal static bool IsSubsetMatchTokenized(string[] tokens, string[] test) + { + for (int i = 0; i < test.Length; i++) + { + if (i >= tokens.Length) return false; + + var t2 = test[i]; + if (t2.Length == 0) return false; + if (t2[0] == Fwc && t2.Length == 1) return true; + + var t1 = tokens[i]; + if (t1.Length == 0 || (t1[0] == Fwc && t1.Length == 1)) return false; + if (t1[0] == Pwc && t1.Length == 1) + { + if (!(t2[0] == Pwc && t2.Length == 1)) return false; + if (i >= test.Length) return true; + continue; + } + if (t2[0] != Pwc && string.Compare(t1, t2, StringComparison.Ordinal) != 0) + return false; + } + return tokens.Length == test.Length; + } + + private static void AnalyzeTokens(string[] tokens, out bool hasPWC, out bool hasFWC) + { + hasPWC = false; + hasFWC = false; + foreach (var t in tokens) + { + if (t.Length == 0 || t.Length > 1) continue; + switch (t[0]) + { + case Pwc: hasPWC = true; break; + case Fwc: hasFWC = true; break; + } + } + } + + private static bool TokensCanMatch(string t1, string t2) + { + if (t1.Length == 0 || t2.Length == 0) return false; + if (t1[0] == Pwc || t2[0] == Pwc || t1[0] == Fwc || t2[0] == Fwc) return true; + return t1 == t2; + } + + // ------------------------------------------------------------------------- + // Nested types: SublistNode, SublistLevel, NotifyMaps + // ------------------------------------------------------------------------- + + internal sealed class SublistNode + { + public readonly Dictionary PSubs = new(ReferenceEqualityComparer.Instance); + public Dictionary>? QSubs; + public List? PList; + public SublistLevel? Next; + + public bool IsEmpty() + { + return PSubs.Count == 0 && (QSubs == null || QSubs.Count == 0) && + (Next == null || Next.NumNodes() == 0); + } + } + + internal sealed class SublistLevel + { + public readonly Dictionary Nodes = new(); + public SublistNode? Pwc; + public SublistNode? Fwc; + + public int NumNodes() + { + var num = Nodes.Count; + if (Pwc != null) num++; + if (Fwc != null) num++; + return num; + } + + public void PruneNode(SublistNode n, string t) + { + if (ReferenceEquals(n, Fwc)) Fwc = null; + else if (ReferenceEquals(n, Pwc)) Pwc = null; + else Nodes.Remove(t); + } + } + + private sealed class NotifyMaps + { + public readonly Dictionary>> Insert = new(); + public readonly Dictionary>> Remove = new(); + } + + private readonly struct LevelNodeToken + { + public readonly SublistLevel Level; + public readonly SublistNode Node; + public readonly string Token; + + public LevelNodeToken(SublistLevel level, SublistNode node, string token) + { + Level = level; + Node = node; + Token = token; + } + } +} + +/// +/// Result of a subscription match containing plain and queue subscribers. +/// Mirrors Go's SublistResult. +/// +public sealed class SubscriptionIndexResult +{ + public List PSubs { get; } = new(); + public List> QSubs { get; } = new(); + + /// Deep copies this result. Mirrors copyResult. + public SubscriptionIndexResult Copy() + { + var nr = new SubscriptionIndexResult(); + nr.PSubs.AddRange(PSubs); + foreach (var qr in QSubs) + nr.QSubs.Add(new List(qr)); + return nr; + } + + /// Adds a subscription to a copy of this result. Mirrors addSubToResult. + public SubscriptionIndexResult AddSubToResult(Subscription sub) + { + var nr = Copy(); + if (sub.Queue == null || sub.Queue.Length == 0) + { + nr.PSubs.Add(sub); + } + else + { + var i = SubscriptionIndex.FindQSlot(sub.Queue, nr.QSubs); + if (i >= 0) + { + nr.QSubs[i].Add(sub); + } + else + { + nr.QSubs.Add(new List { sub }); + } + } + return nr; + } +} + +/// +/// Public statistics for the subscription index. Mirrors Go's SublistStats. +/// +public sealed class SublistStats +{ + public uint NumSubs { get; set; } + public uint NumCache { get; set; } + public ulong NumInserts { get; set; } + public ulong NumRemoves { get; set; } + public ulong NumMatches { get; set; } + public double CacheHitRate { get; set; } + public uint MaxFanout { get; set; } + public double AvgFanout { get; set; } + + // Internal fields used for aggregation. + internal int TotFanout { get; set; } + internal int CacheCnt { get; set; } + internal ulong CacheHitsRaw { get; set; } + + /// Aggregates another stats object into this one. Mirrors SublistStats.add. + public void Add(SublistStats stat) + { + NumSubs += stat.NumSubs; + NumCache += stat.NumCache; + NumInserts += stat.NumInserts; + NumRemoves += stat.NumRemoves; + NumMatches += stat.NumMatches; + CacheHitsRaw += stat.CacheHitsRaw; + if (MaxFanout < stat.MaxFanout) MaxFanout = stat.MaxFanout; + + TotFanout += stat.TotFanout; + CacheCnt += stat.CacheCnt; + if (TotFanout > 0) + AvgFanout = (double)TotFanout / CacheCnt; + if (NumMatches > 0) + CacheHitRate = (double)CacheHitsRaw / NumMatches; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/Subscription.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/Subscription.cs new file mode 100644 index 0000000..bf63462 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/Subscription.cs @@ -0,0 +1,77 @@ +// Copyright 2012-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/client.go (subscription struct) in the NATS server Go source. + +namespace ZB.MOM.NatsNet.Server.Internal; + +/// +/// Represents a client subscription in the NATS server. +/// Mirrors the Go subscription struct from client.go. +/// This is a minimal stub; full client integration will be added in later sessions. +/// +public sealed class Subscription +{ + /// The subject this subscription is listening on. + public byte[] Subject { get; set; } = []; + + /// The queue group name, or null/empty for non-queue subscriptions. + public byte[]? Queue { get; set; } + + /// The subscription identifier. + public byte[]? Sid { get; set; } + + /// Queue weight for remote queue subscriptions. + public int Qw; + + /// Closed flag (0 = open, 1 = closed). + private int _closed; + + /// The client that owns this subscription. Null in test/stub scenarios. + public NatsClient? Client { get; set; } + + /// Marks this subscription as closed. + public void Close() => Interlocked.Exchange(ref _closed, 1); + + /// Returns true if this subscription has been closed. + public bool IsClosed() => Interlocked.CompareExchange(ref _closed, 0, 0) == 1; +} + +/// +/// Represents the kind of client connection. +/// Mirrors Go's clientKind enum. +/// This is a minimal stub; full implementation in later sessions. +/// +public enum ClientKind +{ + Client = 0, + Router = 1, + Gateway = 2, + System = 3, + Leaf = 4, + JetStream = 5, + Account = 6, +} + +/// +/// Minimal client stub for subscription routing. +/// Full implementation will be added in later sessions. +/// +public class NatsClient +{ + /// The kind of client connection. + public ClientKind Kind { get; set; } + + /// Whether this is a hub leaf node. Stub for now. + public virtual bool IsHubLeafNode() => false; +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/DataStructures/SubscriptionIndexTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/DataStructures/SubscriptionIndexTests.cs new file mode 100644 index 0000000..fe11062 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/DataStructures/SubscriptionIndexTests.cs @@ -0,0 +1,1040 @@ +// Copyright 2012-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server.Tests.Internal.DataStructures; + +/// +/// Tests for SubscriptionIndex — mirrors tests from server/sublist_test.go. +/// Cache/no-cache pairs are collapsed into Theory with InlineData. +/// +public class SubscriptionIndexTests +{ + // Helper factories matching Go's newSub/newQSub/newRemoteQSub. + private static Subscription NewSub(string subject) => + new() { Subject = Encoding.ASCII.GetBytes(subject), Client = new NatsClient { Kind = ClientKind.Client } }; + + private static Subscription NewQSub(string subject, string queue) => + queue.Length > 0 + ? new() { Subject = Encoding.ASCII.GetBytes(subject), Queue = Encoding.ASCII.GetBytes(queue) } + : NewSub(subject); + + private static Subscription NewRemoteQSub(string subject, string queue, int weight) => + queue.Length > 0 + ? new() { Subject = Encoding.ASCII.GetBytes(subject), Queue = Encoding.ASCII.GetBytes(queue), Qw = weight, Client = new NatsClient { Kind = ClientKind.Router } } + : NewSub(subject); + + private static SubscriptionIndex MakeSl(bool cache) => + cache ? SubscriptionIndex.NewSublistWithCache() : SubscriptionIndex.NewSublistNoCache(); + + private static void VerifyMember(List subs, Subscription expected) + { + subs.ShouldContain(expected, $"Subscription for [{Encoding.ASCII.GetString(expected.Subject)}] not found in results"); + } + + private static void VerifyQMember(List> qsubs, Subscription expected) + { + var idx = SubscriptionIndex.FindQSlot(expected.Queue, qsubs); + idx.ShouldBeGreaterThanOrEqualTo(0); + qsubs[idx].ShouldContain(expected); + } + + // ------------------------------------------------------------------------- + // Init & Count (T:2962-2964) + // ------------------------------------------------------------------------- + + [Fact] // T:2962 + public void Init_StartsWithZeroCount() + { + var s = SubscriptionIndex.NewSublistWithCache(); + s.Count().ShouldBe(0u); + } + + [Theory] // T:2963, T:2964 + [InlineData(true)] + [InlineData(false)] + public void InsertCount_ReturnsCorrectCount(bool cache) + { + var s = MakeSl(cache); + s.Insert(NewSub("foo")); + s.Insert(NewSub("bar")); + s.Insert(NewSub("foo.bar")); + s.Count().ShouldBe(3u); + } + + // ------------------------------------------------------------------------- + // Simple Match (T:2965-2968) + // ------------------------------------------------------------------------- + + [Theory] // T:2965, T:2966 + [InlineData(true)] + [InlineData(false)] + public void Simple_SingleTokenMatch(bool cache) + { + var s = MakeSl(cache); + var sub = NewSub("foo"); + s.Insert(sub); + var r = s.Match("foo"); + r.PSubs.Count.ShouldBe(1); + VerifyMember(r.PSubs, sub); + } + + [Theory] // T:2967, T:2968 + [InlineData(true)] + [InlineData(false)] + public void Simple_MultiTokenMatch(bool cache) + { + var s = MakeSl(cache); + var sub = NewSub("foo.bar.baz"); + s.Insert(sub); + var r = s.Match("foo.bar.baz"); + r.PSubs.Count.ShouldBe(1); + VerifyMember(r.PSubs, sub); + } + + // ------------------------------------------------------------------------- + // Wildcard Match (T:2969-2974) + // ------------------------------------------------------------------------- + + [Theory] // T:2969, T:2970 + [InlineData(true)] + [InlineData(false)] + public void PartialWildcard_MatchesCorrectly(bool cache) + { + var s = MakeSl(cache); + var lsub = NewSub("a.b.c"); + var psub = NewSub("a.*.c"); + s.Insert(lsub); + s.Insert(psub); + var r = s.Match("a.b.c"); + r.PSubs.Count.ShouldBe(2); + VerifyMember(r.PSubs, lsub); + VerifyMember(r.PSubs, psub); + } + + [Theory] // T:2971, T:2972 + [InlineData(true)] + [InlineData(false)] + public void PartialWildcardAtEnd_MatchesCorrectly(bool cache) + { + var s = MakeSl(cache); + var lsub = NewSub("a.b.c"); + var psub = NewSub("a.b.*"); + s.Insert(lsub); + s.Insert(psub); + var r = s.Match("a.b.c"); + r.PSubs.Count.ShouldBe(2); + VerifyMember(r.PSubs, lsub); + VerifyMember(r.PSubs, psub); + } + + [Theory] // T:2973, T:2974 + [InlineData(true)] + [InlineData(false)] + public void FullWildcard_MatchesAll(bool cache) + { + var s = MakeSl(cache); + var lsub = NewSub("a.b.c"); + var fsub = NewSub("a.>"); + s.Insert(lsub); + s.Insert(fsub); + var r = s.Match("a.b.c"); + r.PSubs.Count.ShouldBe(2); + VerifyMember(r.PSubs, lsub); + VerifyMember(r.PSubs, fsub); + + r = s.Match("a.>"); + r.PSubs.Count.ShouldBe(1); + VerifyMember(r.PSubs, fsub); + } + + // ------------------------------------------------------------------------- + // Remove (T:2975-2984) + // ------------------------------------------------------------------------- + + [Theory] // T:2975, T:2976 + [InlineData(true)] + [InlineData(false)] + public void Remove_BasicRemoval(bool cache) + { + var s = MakeSl(cache); + var sub = NewSub("a.b.c.d"); + s.Insert(sub); + s.Count().ShouldBe(1u); + s.Match("a.b.c.d").PSubs.Count.ShouldBe(1); + s.Remove(NewSub("a.b.c")); // wrong sub, no-op + s.Count().ShouldBe(1u); + s.Remove(sub); + s.Count().ShouldBe(0u); + s.Match("a.b.c.d").PSubs.Count.ShouldBe(0); + } + + [Theory] // T:2977, T:2978 + [InlineData(true)] + [InlineData(false)] + public void RemoveWildcard_CorrectCountsAndMatch(bool cache) + { + var s = MakeSl(cache); + var sub = NewSub("a.b.c.d"); + var psub = NewSub("a.b.*.d"); + var fsub = NewSub("a.b.>"); + s.Insert(sub); s.Insert(psub); s.Insert(fsub); + s.Count().ShouldBe(3u); + s.Match("a.b.c.d").PSubs.Count.ShouldBe(3); + s.Remove(sub); s.Count().ShouldBe(2u); + s.Remove(fsub); s.Count().ShouldBe(1u); + s.Remove(psub); s.Count().ShouldBe(0u); + s.Match("a.b.c.d").PSubs.Count.ShouldBe(0); + } + + [Theory] // T:2979, T:2980 + [InlineData(true)] + [InlineData(false)] + public void RemoveCleanup_LevelsReclaimed(bool cache) + { + var s = MakeSl(cache); + var sub = NewSub("a.b.c.d.e.f"); + s.NumLevels().ShouldBe(0); + s.Insert(sub); + s.NumLevels().ShouldBe(6); + s.Remove(sub); + s.NumLevels().ShouldBe(0); + } + + [Theory] // T:2981, T:2982 + [InlineData(true)] + [InlineData(false)] + public void RemoveCleanupWildcards_LevelsReclaimed(bool cache) + { + var s = MakeSl(cache); + var sub = NewSub("a.b.*.d.e.>"); + s.NumLevels().ShouldBe(0); + s.Insert(sub); + s.NumLevels().ShouldBe(6); + s.Remove(sub); + s.NumLevels().ShouldBe(0); + } + + [Theory] // T:2983, T:2984 + [InlineData(true)] + [InlineData(false)] + public void RemoveWithLargeSubs_CorrectAfterRemoval(bool cache) + { + var s = MakeSl(cache); + for (int i = 0; i < SubscriptionIndex.PlistMin * 2; i++) + s.Insert(NewSub("foo")); + var r = s.Match("foo"); + r.PSubs.Count.ShouldBe(SubscriptionIndex.PlistMin * 2); + + s.Remove(r.PSubs[SubscriptionIndex.PlistMin]); + s.Remove(r.PSubs[0]); + s.Remove(r.PSubs[^1]); + r = s.Match("foo"); + r.PSubs.Count.ShouldBe(SubscriptionIndex.PlistMin * 2 - 3); + } + + // ------------------------------------------------------------------------- + // Invalid Subjects (T:2985, T:2986) + // ------------------------------------------------------------------------- + + [Theory] // T:2985, T:2986 + [InlineData(true)] + [InlineData(false)] + public void InvalidSubjects_InsertReturnsError(bool cache) + { + var s = MakeSl(cache); + s.Insert(NewSub(".foo")).ShouldBe(SubscriptionIndex.ErrInvalidSubject); + s.Insert(NewSub("foo.")).ShouldBe(SubscriptionIndex.ErrInvalidSubject); + s.Insert(NewSub("foo..bar")).ShouldBe(SubscriptionIndex.ErrInvalidSubject); + s.Insert(NewSub("foo.bar..baz")).ShouldBe(SubscriptionIndex.ErrInvalidSubject); + s.Insert(NewSub("foo.>.bar")).ShouldBe(SubscriptionIndex.ErrInvalidSubject); + } + + // ------------------------------------------------------------------------- + // RemoveBatch (T:2987, T:2988) + // ------------------------------------------------------------------------- + + [Fact] // T:2987 + public void NoCacheRemoveBatch_DoesNotEnableCache() + { + var s = SubscriptionIndex.NewSublistNoCache(); + s.Insert(NewSub("foo")); + var sub = NewSub("bar"); + s.Insert(sub); + s.RemoveBatch(new[] { sub }); + for (int i = 0; i < 10; i++) s.Match("foo"); + s.CacheEnabled().ShouldBeFalse(); + } + + [Fact] // T:2988 + public void RemoveBatchWithError_StillRemovesPresent() + { + var s = SubscriptionIndex.NewSublistNoCache(); + var sub1 = NewSub("foo"); + var sub2 = NewSub("bar"); + var sub3 = NewSub("baz"); + s.Insert(sub1); s.Insert(sub2); s.Insert(sub3); + var notPresent = NewSub("not.inserted"); + var err = s.RemoveBatch(new[] { notPresent, sub1, sub3 }); + err.ShouldBe(SubscriptionIndex.ErrNotFound); + s.Count().ShouldBe(1u); + s.Match("bar").PSubs.Count.ShouldBe(1); + VerifyMember(s.Match("bar").PSubs, sub2); + s.Match("foo").PSubs.Count.ShouldBe(0); + s.Match("baz").PSubs.Count.ShouldBe(0); + } + + // ------------------------------------------------------------------------- + // Cache behavior (T:2989) + // ------------------------------------------------------------------------- + + [Fact] // T:2989 + public void Cache_BasicBehavior() + { + var s = SubscriptionIndex.NewSublistWithCache(); + var sub = NewSub("a.b.c.d"); + var psub = NewSub("a.b.*.d"); + var fsub = NewSub("a.b.>"); + + s.Insert(sub); + s.Match("a.b.c.d").PSubs.Count.ShouldBe(1); + s.Insert(psub); s.Insert(fsub); + s.Count().ShouldBe(3u); + s.Match("a.b.c.d").PSubs.Count.ShouldBe(3); + s.Remove(sub); s.Count().ShouldBe(2u); + s.Remove(fsub); s.Count().ShouldBe(1u); + s.Remove(psub); s.Count().ShouldBe(0u); + s.CacheCount().ShouldBe(0); + s.Match("a.b.c.d").PSubs.Count.ShouldBe(0); + + // Fill cache beyond max. + for (int i = 0; i < 2 * SubscriptionIndex.SlCacheMax; i++) + s.Match($"foo-{i}"); + + // Cache sweep runs async, wait briefly. + Thread.Sleep(200); + s.CacheCount().ShouldBeLessThanOrEqualTo(SubscriptionIndex.SlCacheMax); + + // Test wildcard cache update. + s = SubscriptionIndex.NewSublistWithCache(); + s.Insert(NewSub("foo.*")); + s.Insert(NewSub("foo.bar")); + s.Match("foo.baz").PSubs.Count.ShouldBe(1); + s.Match("foo.bar").PSubs.Count.ShouldBe(2); + s.Insert(NewSub("foo.>")); + s.Match("foo.bar").PSubs.Count.ShouldBe(3); + } + + // ------------------------------------------------------------------------- + // Queue Results (T:2990, T:2991) + // ------------------------------------------------------------------------- + + [Theory] // T:2990, T:2991 + [InlineData(true)] + [InlineData(false)] + public void BasicQueueResults_CorrectGrouping(bool cache) + { + var s = MakeSl(cache); + var sub = NewSub("foo"); + var sub1 = NewQSub("foo", "bar"); + var sub2 = NewQSub("foo", "baz"); + + s.Insert(sub1); + var r = s.Match("foo"); + r.PSubs.Count.ShouldBe(0); + r.QSubs.Count.ShouldBe(1); + r.QSubs[0].Count.ShouldBe(1); + VerifyQMember(r.QSubs, sub1); + + s.Insert(sub2); + r = s.Match("foo"); + r.PSubs.Count.ShouldBe(0); + r.QSubs.Count.ShouldBe(2); + VerifyQMember(r.QSubs, sub1); + VerifyQMember(r.QSubs, sub2); + + s.Insert(sub); + r = s.Match("foo"); + r.PSubs.Count.ShouldBe(1); + r.QSubs.Count.ShouldBe(2); + + var sub3 = NewQSub("foo", "bar"); + var sub4 = NewQSub("foo", "baz"); + s.Insert(sub3); s.Insert(sub4); + r = s.Match("foo"); + r.PSubs.Count.ShouldBe(1); + r.QSubs.Count.ShouldBe(2); + + // Verify each group has 2 members. + var barIdx = SubscriptionIndex.FindQSlot(Encoding.ASCII.GetBytes("bar"), r.QSubs); + var bazIdx = SubscriptionIndex.FindQSlot(Encoding.ASCII.GetBytes("baz"), r.QSubs); + r.QSubs[barIdx].Count.ShouldBe(2); + r.QSubs[bazIdx].Count.ShouldBe(2); + + // Removal. + s.Remove(sub); + r = s.Match("foo"); + r.PSubs.Count.ShouldBe(0); + r.QSubs.Count.ShouldBe(2); + + s.Remove(sub1); + r = s.Match("foo"); + barIdx = SubscriptionIndex.FindQSlot(Encoding.ASCII.GetBytes("bar"), r.QSubs); + bazIdx = SubscriptionIndex.FindQSlot(Encoding.ASCII.GetBytes("baz"), r.QSubs); + r.QSubs[barIdx].Count.ShouldBe(1); + r.QSubs[bazIdx].Count.ShouldBe(2); + + s.Remove(sub3); // last bar + r = s.Match("foo"); + r.QSubs.Count.ShouldBe(1); // only baz remains + + s.Remove(sub2); s.Remove(sub4); + r = s.Match("foo"); + r.PSubs.Count.ShouldBe(0); + r.QSubs.Count.ShouldBe(0); + } + + // ------------------------------------------------------------------------- + // Subject Validation (T:2992-2997) + // ------------------------------------------------------------------------- + + [Fact] // T:2992 + public void ValidLiteralSubjects_CorrectResults() + { + SubscriptionIndex.IsValidLiteralSubject("foo").ShouldBeTrue(); + SubscriptionIndex.IsValidLiteralSubject(".foo").ShouldBeFalse(); + SubscriptionIndex.IsValidLiteralSubject("foo.").ShouldBeFalse(); + SubscriptionIndex.IsValidLiteralSubject("foo..bar").ShouldBeFalse(); + SubscriptionIndex.IsValidLiteralSubject("foo.bar.*").ShouldBeFalse(); + SubscriptionIndex.IsValidLiteralSubject("foo.bar.>").ShouldBeFalse(); + SubscriptionIndex.IsValidLiteralSubject("*").ShouldBeFalse(); + SubscriptionIndex.IsValidLiteralSubject(">").ShouldBeFalse(); + // Wildcard chars that aren't standalone tokens. + SubscriptionIndex.IsValidLiteralSubject("foo*").ShouldBeTrue(); + SubscriptionIndex.IsValidLiteralSubject("foo**").ShouldBeTrue(); + SubscriptionIndex.IsValidLiteralSubject("foo.**").ShouldBeTrue(); + SubscriptionIndex.IsValidLiteralSubject("foo*bar").ShouldBeTrue(); + SubscriptionIndex.IsValidLiteralSubject("foo.*bar").ShouldBeTrue(); + SubscriptionIndex.IsValidLiteralSubject("foo*.bar").ShouldBeTrue(); + SubscriptionIndex.IsValidLiteralSubject("*bar").ShouldBeTrue(); + SubscriptionIndex.IsValidLiteralSubject("foo>").ShouldBeTrue(); + SubscriptionIndex.IsValidLiteralSubject("foo>>").ShouldBeTrue(); + SubscriptionIndex.IsValidLiteralSubject("foo.>>").ShouldBeTrue(); + SubscriptionIndex.IsValidLiteralSubject("foo>bar").ShouldBeTrue(); + SubscriptionIndex.IsValidLiteralSubject("foo.>bar").ShouldBeTrue(); + SubscriptionIndex.IsValidLiteralSubject("foo>.bar").ShouldBeTrue(); + SubscriptionIndex.IsValidLiteralSubject(">bar").ShouldBeTrue(); + } + + [Fact] // T:2993 + public void ValidSubjects_CorrectResults() + { + SubscriptionIndex.IsValidSubject(".").ShouldBeFalse(); + SubscriptionIndex.IsValidSubject(".foo").ShouldBeFalse(); + SubscriptionIndex.IsValidSubject("foo.").ShouldBeFalse(); + SubscriptionIndex.IsValidSubject("foo..bar").ShouldBeFalse(); + SubscriptionIndex.IsValidSubject(">.bar").ShouldBeFalse(); + SubscriptionIndex.IsValidSubject("foo.>.bar").ShouldBeFalse(); + SubscriptionIndex.IsValidSubject("foo").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject("foo.bar.*").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject("foo.bar.>").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject("*").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject(">").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject("foo*").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject("foo**").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject("foo.**").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject("foo*bar").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject("foo.*bar").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject("foo*.bar").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject("*bar").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject("foo>").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject("foo.>>").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject("foo>bar").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject("foo.>bar").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject("foo>.bar").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject(">bar").ShouldBeTrue(); + } + + [Fact] // T:2994 + public void MatchLiterals_CorrectResults() + { + SubscriptionIndex.MatchLiteral("foo", "foo").ShouldBeTrue(); + SubscriptionIndex.MatchLiteral("foo", "bar").ShouldBeFalse(); + SubscriptionIndex.MatchLiteral("foo", "*").ShouldBeTrue(); + SubscriptionIndex.MatchLiteral("foo", ">").ShouldBeTrue(); + SubscriptionIndex.MatchLiteral("foo.bar", ">").ShouldBeTrue(); + SubscriptionIndex.MatchLiteral("foo.bar", "foo.>").ShouldBeTrue(); + SubscriptionIndex.MatchLiteral("foo.bar", "bar.>").ShouldBeFalse(); + SubscriptionIndex.MatchLiteral("stats.test.22", "stats.>").ShouldBeTrue(); + SubscriptionIndex.MatchLiteral("stats.test.22", "stats.*.*").ShouldBeTrue(); + SubscriptionIndex.MatchLiteral("foo.bar", "foo").ShouldBeFalse(); + SubscriptionIndex.MatchLiteral("stats.test.foos", "stats.test.foos").ShouldBeTrue(); + SubscriptionIndex.MatchLiteral("stats.test.foos", "stats.test.foo").ShouldBeFalse(); + SubscriptionIndex.MatchLiteral("stats.test", "stats.test.*").ShouldBeFalse(); + SubscriptionIndex.MatchLiteral("stats.test.foos", "stats.*").ShouldBeFalse(); + SubscriptionIndex.MatchLiteral("stats.test.foos", "stats.*.*.foos").ShouldBeFalse(); + // Wildcards as non-token literals. + SubscriptionIndex.MatchLiteral("*bar", "*bar").ShouldBeTrue(); + SubscriptionIndex.MatchLiteral("foo*", "foo*").ShouldBeTrue(); + SubscriptionIndex.MatchLiteral("foo*bar", "foo*bar").ShouldBeTrue(); + SubscriptionIndex.MatchLiteral("foo.***.bar", "foo.***.bar").ShouldBeTrue(); + SubscriptionIndex.MatchLiteral(">bar", ">bar").ShouldBeTrue(); + SubscriptionIndex.MatchLiteral("foo>", "foo>").ShouldBeTrue(); + SubscriptionIndex.MatchLiteral("foo>bar", "foo>bar").ShouldBeTrue(); + SubscriptionIndex.MatchLiteral("foo.>>>.bar", "foo.>>>.bar").ShouldBeTrue(); + } + + [Fact] // T:2995 + public void SubjectIsLiteral_CorrectResults() + { + SubscriptionIndex.SubjectIsLiteral("foo").ShouldBeTrue(); + SubscriptionIndex.SubjectIsLiteral("foo.bar").ShouldBeTrue(); + SubscriptionIndex.SubjectIsLiteral("foo*.bar").ShouldBeTrue(); + SubscriptionIndex.SubjectIsLiteral("*").ShouldBeFalse(); + SubscriptionIndex.SubjectIsLiteral(">").ShouldBeFalse(); + SubscriptionIndex.SubjectIsLiteral("foo.*").ShouldBeFalse(); + SubscriptionIndex.SubjectIsLiteral("foo.>").ShouldBeFalse(); + SubscriptionIndex.SubjectIsLiteral("foo.*.>").ShouldBeFalse(); + SubscriptionIndex.SubjectIsLiteral("foo.*.bar").ShouldBeFalse(); + SubscriptionIndex.SubjectIsLiteral("foo.bar.>").ShouldBeFalse(); + } + + [Fact] // T:2997 + public void TokenAt_ReturnsCorrectTokens() + { + SubscriptionIndex.TokenAt("foo.bar.baz.*", 0).ShouldBe(string.Empty); + SubscriptionIndex.TokenAt("foo.bar.baz.*", 1).ShouldBe("foo"); + SubscriptionIndex.TokenAt("foo.bar.baz.*", 2).ShouldBe("bar"); + SubscriptionIndex.TokenAt("foo.bar.baz.*", 3).ShouldBe("baz"); + SubscriptionIndex.TokenAt("foo.bar.baz.*", 4).ShouldBe("*"); + SubscriptionIndex.TokenAt("foo.bar.baz.*", 5).ShouldBe(string.Empty); + } + + // ------------------------------------------------------------------------- + // Bad Subject on Remove (T:2998, T:2999) + // ------------------------------------------------------------------------- + + [Theory] // T:2998, T:2999 + [InlineData(true)] + [InlineData(false)] + public void BadSubjectOnRemove_ReturnsError(bool cache) + { + var s = MakeSl(cache); + s.Insert(NewSub("a.b..d")).ShouldBe(SubscriptionIndex.ErrInvalidSubject); + s.Remove(NewSub("a.b..d")).ShouldBe(SubscriptionIndex.ErrInvalidSubject); + s.Remove(NewSub("a.>.b")).ShouldBe(SubscriptionIndex.ErrInvalidSubject); + } + + // ------------------------------------------------------------------------- + // Two-token pub vs single-token sub (T:3000, T:3001) + // ------------------------------------------------------------------------- + + [Theory] // T:3000, T:3001 + [InlineData(true)] + [InlineData(false)] + public void TwoTokenPub_DoesNotMatchSingleTokenSub(bool cache) + { + var s = MakeSl(cache); + var sub = NewSub("foo"); + s.Insert(sub); + s.Match("foo").PSubs.Count.ShouldBe(1); + s.Match("foo.bar").PSubs.Count.ShouldBe(0); + } + + // ------------------------------------------------------------------------- + // Wildcards as literals (T:3002-3005) + // ------------------------------------------------------------------------- + + [Theory] // T:3002, T:3003 + [InlineData(true)] + [InlineData(false)] + public void InsertWithWildcardsAsLiterals_OnlyExactMatch(bool cache) + { + var s = MakeSl(cache); + foreach (var subject in new[] { "foo.*-", "foo.>-" }) + { + var sub = NewSub(subject); + s.Insert(sub); + s.Match("foo.bar").PSubs.Count.ShouldBe(0); + s.Match(subject).PSubs.Count.ShouldBe(1); + } + } + + [Theory] // T:3004, T:3005 + [InlineData(true)] + [InlineData(false)] + public void RemoveWithWildcardsAsLiterals_CorrectRemoval(bool cache) + { + var s = MakeSl(cache); + foreach (var subject in new[] { "foo.*-", "foo.>-" }) + { + var sub = NewSub(subject); + s.Insert(sub); + s.Remove(NewSub("foo.bar")); // wrong subject, no-op + s.Count().ShouldBe(1u); + s.Remove(sub); + s.Count().ShouldBe(0u); + } + } + + // ------------------------------------------------------------------------- + // Race tests (T:3006-3010) + // ------------------------------------------------------------------------- + + [Theory] // T:3006, T:3007 + [InlineData(true)] + [InlineData(false)] + public void RaceOnRemove_NoCorruption(bool cache) + { + var s = MakeSl(cache); + var subs = new Subscription[100]; + for (int i = 0; i < subs.Length; i++) + subs[i] = NewQSub("foo", "bar"); + + for (int iter = 0; iter < 2; iter++) + { + foreach (var sub in subs) s.Insert(sub); + if (iter == 1) s.Match("foo"); + var r = s.Match("foo"); + var task = Task.Run(() => { foreach (var sub in subs) s.Remove(sub); }); + foreach (var qsub in r.QSubs) + for (int i = 0; i < qsub.Count; i++) + Encoding.ASCII.GetString(qsub[i].Queue!).ShouldBe("bar"); + task.Wait(); + } + } + + [Theory] // T:3008, T:3009 + [InlineData(true)] + [InlineData(false)] + public void RaceOnInsert_NoCorruption(bool cache) + { + var s = MakeSl(cache); + var subs = new Subscription[100]; + for (int i = 0; i < subs.Length; i++) + subs[i] = NewQSub("foo", "bar"); + + var task = Task.Run(() => { foreach (var sub in subs) s.Insert(sub); }); + for (int i = 0; i < 1000; i++) + { + var r = s.Match("foo"); + foreach (var qsub in r.QSubs) + foreach (var sub in qsub) + Encoding.ASCII.GetString(sub.Queue!).ShouldBe("bar"); + } + task.Wait(); + } + + [Fact] // T:3010 + public void RaceOnMatch_NoCorruption() + { + var s = SubscriptionIndex.NewSublistNoCache(); + s.Insert(NewQSub("foo.*", "workers")); + s.Insert(NewQSub("foo.bar", "workers")); + s.Insert(NewSub("foo.*")); + s.Insert(NewSub("foo.bar")); + + Exception? error = null; + var tasks = Enumerable.Range(0, 2).Select(_ => Task.Run(() => + { + for (int i = 0; i < 10; i++) + { + var r = s.Match("foo.bar"); + foreach (var sub in r.PSubs) + if (!Encoding.ASCII.GetString(sub.Subject).StartsWith("foo.")) + Interlocked.CompareExchange(ref error, new Exception($"Wrong subject: {Encoding.ASCII.GetString(sub.Subject)}"), null); + foreach (var qsub in r.QSubs) + foreach (var sub in qsub) + if (Encoding.ASCII.GetString(sub.Queue!) != "workers") + Interlocked.CompareExchange(ref error, new Exception($"Wrong queue: {Encoding.ASCII.GetString(sub.Queue!)}"), null); + } + })).ToArray(); + Task.WaitAll(tasks); + error.ShouldBeNull(); + } + + // ------------------------------------------------------------------------- + // Remote Queue Subscriptions (T:3011, T:3012) + // ------------------------------------------------------------------------- + + [Theory] // T:3011, T:3012 + [InlineData(true)] + [InlineData(false)] + public void RemoteQueueSubscriptions_WeightExpansion(bool cache) + { + var s = MakeSl(cache); + var s1 = NewQSub("foo", "bar"); + var s2 = NewQSub("foo", "bar"); + s.Insert(s1); s.Insert(s2); + + var rs1 = NewRemoteQSub("foo", "bar", 10); + var rs2 = NewRemoteQSub("foo", "bar", 10); + s.Insert(rs1); s.Insert(rs2); + s.Count().ShouldBe(4u); + + var r = s.Match("foo"); + r.PSubs.Count.ShouldBe(0); + r.QSubs.Count.ShouldBe(1); + r.QSubs[0].Count.ShouldBe(22); // 2 normal + 10 + 10 remote + + s.Remove(s1); s.Remove(rs1); + s.Count().ShouldBe(2u); + r = s.Match("foo"); + r.QSubs[0].Count.ShouldBe(11); // 1 normal + 10 remote + + Interlocked.Exchange(ref rs2.Qw, 1); + s.UpdateRemoteQSub(rs2); + r = s.Match("foo"); + r.QSubs[0].Count.ShouldBe(2); // 1 normal + 1 remote + } + + // ------------------------------------------------------------------------- + // Shared Empty Result (T:3013) + // ------------------------------------------------------------------------- + + [Fact] // T:3013 + public void SharedEmptyResult_SameReference() + { + var s = SubscriptionIndex.NewSublistWithCache(); + var r1 = s.Match("foo"); + var r2 = s.Match("bar"); + r1.PSubs.Count.ShouldBe(0); + r2.QSubs.Count.ShouldBe(0); + ReferenceEquals(r1, r2).ShouldBeTrue(); + } + + // ------------------------------------------------------------------------- + // No-cache Stats (T:3014) + // ------------------------------------------------------------------------- + + [Fact] // T:3014 + public void NoCacheStats_CacheCountIsZero() + { + var s = SubscriptionIndex.NewSublistNoCache(); + s.Insert(NewSub("foo")); s.Insert(NewSub("bar")); s.Insert(NewSub("baz")); s.Insert(NewSub("foo.bar.baz")); + s.Match("a.b.c"); s.Match("bar"); + s.Stats().NumCache.ShouldBe(0u); + } + + // ------------------------------------------------------------------------- + // All (T:3015) + // ------------------------------------------------------------------------- + + [Fact] // T:3015 + public void All_CollectsAllSubscriptions() + { + var s = SubscriptionIndex.NewSublistNoCache(); + var subs = new[] + { + NewSub("foo.bar.baz"), + NewSub("foo"), + NewSub("baz"), + }; + subs[0].Client!.Kind = ClientKind.Leaf; + foreach (var sub in subs) s.Insert(sub); + + var output = new List(); + s.All(output); + output.Count.ShouldBe(3); + } + + // ------------------------------------------------------------------------- + // IsSubsetMatch (T:3016) + // ------------------------------------------------------------------------- + + [Theory] // T:3016 + [InlineData("foo.bar", "foo.bar", true)] + [InlineData("foo.*", ">", true)] + [InlineData("foo.*", "*.*", true)] + [InlineData("foo.*", "foo.*", true)] + [InlineData("foo.*", "foo.bar", false)] + [InlineData("foo.>", ">", true)] + [InlineData("foo.>", "*.>", true)] + [InlineData("foo.>", "foo.>", true)] + [InlineData("foo.>", "foo.bar", false)] + [InlineData("foo..bar", "foo.*", false)] + [InlineData("foo.*", "foo..bar", false)] + public void IsSubsetMatch_CorrectResults(string subject, string test, bool expected) + { + SubscriptionIndex.SubjectIsSubsetMatch(subject, test).ShouldBe(expected); + } + + // ------------------------------------------------------------------------- + // ReverseMatch (T:3018, T:3019) + // ------------------------------------------------------------------------- + + [Fact] // T:3018 + public void ReverseMatch_FindsMatchingSubscriptions() + { + var s = SubscriptionIndex.NewSublistWithCache(); + var fooSub = NewSub("foo"); + var barSub = NewSub("bar"); + var fooBarSub = NewSub("foo.bar"); + var fooBazSub = NewSub("foo.baz"); + var fooBarBazSub = NewSub("foo.bar.baz"); + s.Insert(fooSub); s.Insert(barSub); s.Insert(fooBarSub); s.Insert(fooBazSub); s.Insert(fooBarBazSub); + + var r = s.ReverseMatch("foo"); + r.PSubs.Count.ShouldBe(1); VerifyMember(r.PSubs, fooSub); + + r = s.ReverseMatch("bar"); + r.PSubs.Count.ShouldBe(1); VerifyMember(r.PSubs, barSub); + + r = s.ReverseMatch("*"); + r.PSubs.Count.ShouldBe(2); + VerifyMember(r.PSubs, fooSub); VerifyMember(r.PSubs, barSub); + + r = s.ReverseMatch("baz"); + r.PSubs.Count.ShouldBe(0); + + r = s.ReverseMatch("foo.*"); + r.PSubs.Count.ShouldBe(2); + VerifyMember(r.PSubs, fooBarSub); VerifyMember(r.PSubs, fooBazSub); + + r = s.ReverseMatch("*.*"); + r.PSubs.Count.ShouldBe(2); + + r = s.ReverseMatch("*.bar"); + r.PSubs.Count.ShouldBe(1); VerifyMember(r.PSubs, fooBarSub); + + r = s.ReverseMatch("bar.*"); + r.PSubs.Count.ShouldBe(0); + + r = s.ReverseMatch("foo.>"); + r.PSubs.Count.ShouldBe(3); + + r = s.ReverseMatch(">"); + r.PSubs.Count.ShouldBe(5); + } + + [Fact] // T:3019 + public void ReverseMatchWider_MatchesWiderFilter() + { + var s = SubscriptionIndex.NewSublistWithCache(); + var sub = NewSub("uplink.*.*.>"); + s.Insert(sub); + + var r = s.ReverseMatch("uplink.1.*.*.>"); + r.PSubs.Count.ShouldBe(1); + VerifyMember(r.PSubs, sub); + + r = s.ReverseMatch("uplink.1.2.3.>"); + r.PSubs.Count.ShouldBe(1); + VerifyMember(r.PSubs, sub); + } + + // ------------------------------------------------------------------------- + // Match with empty tokens (T:3020) + // ------------------------------------------------------------------------- + + [Fact] // T:3020 + public void MatchWithEmptyTokens_ReturnsNoResults() + { + var sl = SubscriptionIndex.NewSublistWithCache(); + sl.Insert(NewSub(">")); + sl.Insert(NewQSub(">", "queue")); + + foreach (var subj in new[] { ".foo", "..foo", "foo..", "foo.", "foo..bar", "foo...bar" }) + { + var r = sl.Match(subj); + r.PSubs.Count.ShouldBe(0); + r.QSubs.Count.ShouldBe(0); + } + } + + // ------------------------------------------------------------------------- + // SubjectsCollide (T:3021) + // ------------------------------------------------------------------------- + + [Fact] // T:3021 + public void SubjectsCollide_CorrectResults() + { + SubscriptionIndex.SubjectsCollide("foo.*", "foo.*.bar.>").ShouldBeFalse(); + SubscriptionIndex.SubjectsCollide("foo.*.bar.>", "foo.*").ShouldBeFalse(); + SubscriptionIndex.SubjectsCollide("foo.*", "foo.foo").ShouldBeTrue(); + SubscriptionIndex.SubjectsCollide("foo.*", "*.foo").ShouldBeTrue(); + SubscriptionIndex.SubjectsCollide("foo.bar.>", "*.bar.foo").ShouldBeTrue(); + } + + // ------------------------------------------------------------------------- + // Cache hit rate (T:3022) + // ------------------------------------------------------------------------- + + [Fact] // T:3022 + public void AddCacheHitRate_CorrectAggregation() + { + var sl1 = SubscriptionIndex.NewSublistWithCache(); + sl1.Insert(NewSub("foo")); + for (int i = 0; i < 4; i++) sl1.Match("foo"); + var stats1 = sl1.Stats(); + stats1.CacheHitRate.ShouldBe(0.75); + + var sl2 = SubscriptionIndex.NewSublistWithCache(); + sl2.Insert(NewSub("bar")); + for (int i = 0; i < 4; i++) sl2.Match("bar"); + var stats2 = sl2.Stats(); + stats2.CacheHitRate.ShouldBe(0.75); + + var ts = new SublistStats(); + ts.Add(stats1); + ts.Add(stats2); + ts.CacheHitRate.ShouldBe(0.75); + } + + // ------------------------------------------------------------------------- + // HasInterest (T:3024) + // ------------------------------------------------------------------------- + + [Fact] // T:3024 + public void HasInterest_CorrectForLiteralsAndWildcards() + { + var sl = SubscriptionIndex.NewSublistWithCache(); + var fooSub = NewSub("foo"); + sl.Insert(fooSub); + + sl.HasInterest("foo").ShouldBeTrue(); + sl.HasInterest("bar").ShouldBeFalse(); + + sl.Remove(fooSub); + sl.HasInterest("foo").ShouldBeFalse(); + + // Partial wildcard. + var sub = NewSub("foo.*"); + sl.Insert(sub); + sl.HasInterest("foo").ShouldBeFalse(); + sl.HasInterest("foo.bar").ShouldBeTrue(); + sl.HasInterest("foo.bar.baz").ShouldBeFalse(); + sl.Remove(sub); + + // Full wildcard. + sub = NewSub("foo.>"); + sl.Insert(sub); + sl.HasInterest("foo").ShouldBeFalse(); + sl.HasInterest("foo.bar").ShouldBeTrue(); + sl.HasInterest("foo.bar.baz").ShouldBeTrue(); + sl.Remove(sub); + + // Queue subs. + var qsub = NewQSub("foo", "bar"); + sl.Insert(qsub); + sl.HasInterest("foo").ShouldBeTrue(); + sl.HasInterest("foo.bar").ShouldBeFalse(); + sl.Remove(qsub); + sl.HasInterest("foo").ShouldBeFalse(); + } + + // ------------------------------------------------------------------------- + // HasInterest Overlapping (T:3025) + // ------------------------------------------------------------------------- + + [Fact] // T:3025 + public void HasInterestOverlapping_BothMatch() + { + var sl = SubscriptionIndex.NewSublistWithCache(); + sl.Insert(NewSub("stream.A.child")); + sl.Insert(NewSub("stream.*")); + sl.HasInterest("stream.A.child").ShouldBeTrue(); + sl.HasInterest("stream.A").ShouldBeTrue(); + } + + // ------------------------------------------------------------------------- + // NumInterest (T:3026) + // ------------------------------------------------------------------------- + + [Fact] // T:3026 + public void NumInterest_CorrectCounts() + { + var sl = SubscriptionIndex.NewSublistWithCache(); + sl.Insert(NewSub("foo")); + var (np, nq) = sl.NumInterest("foo"); + np.ShouldBe(1); + nq.ShouldBe(0); + + (np, nq) = sl.NumInterest("bar"); + np.ShouldBe(0); + nq.ShouldBe(0); + + // Add queue sub. + sl.Insert(NewQSub("foo", "q1")); + (np, nq) = sl.NumInterest("foo"); + np.ShouldBe(1); + nq.ShouldBe(1); + } + + // ------------------------------------------------------------------------- + // Notifications (T:3017) — simplified from the large Go test + // ------------------------------------------------------------------------- + + [Fact] // T:3017 + public void RegisterInterestNotification_BasicFlow() + { + var s = SubscriptionIndex.NewSublistWithCache(); + var notifications = new List(); + Action notify = b => notifications.Add(b); + + // Wildcards should be rejected. + s.RegisterNotification("foo.*", notify).ShouldBe(SubscriptionIndex.ErrInvalidSubject); + s.RegisterNotification(">", notify).ShouldBe(SubscriptionIndex.ErrInvalidSubject); + + // Register for "foo" — no existing interest, should get false. + s.RegisterNotification("foo", notify).ShouldBeNull(); + notifications.Count.ShouldBe(1); + notifications[0].ShouldBeFalse(); + notifications.Clear(); + + // Insert interest — should get true. + var sub = NewSub("foo"); + s.Insert(sub); + notifications.Count.ShouldBe(1); + notifications[0].ShouldBeTrue(); + notifications.Clear(); + + // Insert a second — no notification (already have interest). + var sub2 = NewSub("foo"); + s.Insert(sub2); + notifications.Count.ShouldBe(0); + + // Remove all interest — should get false. + s.Remove(sub); + s.Remove(sub2); + notifications.Count.ShouldBe(1); + notifications[0].ShouldBeFalse(); + notifications.Clear(); + + // Clear should return true. + s.ClearNotification("foo", notify).ShouldBeTrue(); + + // Clear again should return false (already cleared). + s.ClearNotification("foo", notify).ShouldBeFalse(); + } + + [Fact] // T:3017 (queue notification portion) + public void RegisterQueueNotification_BasicFlow() + { + var s = SubscriptionIndex.NewSublistWithCache(); + var notifications = new List(); + Action notify = b => notifications.Add(b); + + s.RegisterQueueNotification("foo.bar", "q1", notify).ShouldBeNull(); + notifications.Count.ShouldBe(1); + notifications[0].ShouldBeFalse(); + notifications.Clear(); + + var qsub = NewQSub("foo.bar", "q1"); + s.Insert(qsub); + notifications.Count.ShouldBe(1); + notifications[0].ShouldBeTrue(); + notifications.Clear(); + + s.Remove(qsub); + notifications.Count.ShouldBe(1); + notifications[0].ShouldBeFalse(); + notifications.Clear(); + + s.ClearQueueNotification("foo.bar", "q1", notify).ShouldBeTrue(); + } +} diff --git a/porting.db b/porting.db index ea5a0b7..7f3048b 100644 Binary files a/porting.db and b/porting.db differ diff --git a/reports/current.md b/reports/current.md index 9a95428..201b585 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-02-26 16:54:25 UTC +Generated: 2026-02-26 17:11:07 UTC ## Modules (12 total) @@ -13,18 +13,18 @@ Generated: 2026-02-26 16:54:25 UTC | Status | Count | |--------|-------| -| complete | 368 | +| complete | 449 | | n_a | 82 | -| not_started | 3155 | +| not_started | 3074 | | stub | 68 | ## Unit Tests (3257 total) | Status | Count | |--------|-------| -| complete | 155 | -| n_a | 50 | -| not_started | 2953 | +| complete | 219 | +| n_a | 82 | +| not_started | 2857 | | stub | 99 | ## Library Mappings (36 total) @@ -36,4 +36,4 @@ Generated: 2026-02-26 16:54:25 UTC ## Overall Progress -**666/6942 items complete (9.6%)** +**843/6942 items complete (12.1%)** diff --git a/reports/report_b8f2f66.md b/reports/report_b8f2f66.md new file mode 100644 index 0000000..201b585 --- /dev/null +++ b/reports/report_b8f2f66.md @@ -0,0 +1,39 @@ +# NATS .NET Porting Status Report + +Generated: 2026-02-26 17:11:07 UTC + +## Modules (12 total) + +| Status | Count | +|--------|-------| +| complete | 11 | +| not_started | 1 | + +## Features (3673 total) + +| Status | Count | +|--------|-------| +| complete | 449 | +| n_a | 82 | +| not_started | 3074 | +| stub | 68 | + +## Unit Tests (3257 total) + +| Status | Count | +|--------|-------| +| complete | 219 | +| n_a | 82 | +| not_started | 2857 | +| stub | 99 | + +## Library Mappings (36 total) + +| Status | Count | +|--------|-------| +| mapped | 36 | + + +## Overall Progress + +**843/6942 items complete (12.1%)**