From ed78a100e27c82a75b7dbdd3ecb36021eaa74e2b Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 26 Feb 2026 12:11:06 -0500 Subject: [PATCH] =?UTF-8?q?feat:=20port=20session=2005=20=E2=80=94=20Subsc?= =?UTF-8?q?ription=20Index=20(sublist)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Port trie-based subject matching engine (81 features, 74 tests). Includes SubscriptionIndex with cache, wildcard matching (*/>), queue subscription groups, reverse match, notifications, stats, and subject validation utilities. Also adds minimal Subscription and NatsClient stubs needed by the index. --- .../DataStructures/SubscriptionIndex.cs | 1664 +++++++++++++++++ .../Internal/Subscription.cs | 77 + .../DataStructures/SubscriptionIndexTests.cs | 1040 +++++++++++ porting.db | Bin 2469888 -> 2469888 bytes reports/current.md | 14 +- reports/report_b8f2f66.md | 39 + 6 files changed, 2827 insertions(+), 7 deletions(-) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/Internal/DataStructures/SubscriptionIndex.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/Internal/Subscription.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/DataStructures/SubscriptionIndexTests.cs create mode 100644 reports/report_b8f2f66.md diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/DataStructures/SubscriptionIndex.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/DataStructures/SubscriptionIndex.cs new file mode 100644 index 0000000..c910a2b --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/DataStructures/SubscriptionIndex.cs @@ -0,0 +1,1664 @@ +// Copyright 2016-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/sublist.go in the NATS server Go source. + +using System.Text; + +namespace ZB.MOM.NatsNet.Server.Internal.DataStructures; + +/// +/// A trie-based routing mechanism for NATS subject distribution. +/// Matches published subjects to interested subscribers, supporting +/// * (single-token) and > (full) wildcards. +/// Mirrors Go's Sublist. +/// +public sealed class SubscriptionIndex +{ + // Wildcard and separator constants. + internal const char Pwc = '*'; + internal const string Pwcs = "*"; + internal const char Fwc = '>'; + internal const string Fwcs = ">"; + internal const string Tsep = "."; + internal const char Btsep = '.'; + + // Error singletons. + public static readonly Exception ErrInvalidSubject = new ArgumentException("sublist: invalid subject"); + public static readonly Exception ErrNotFound = new KeyNotFoundException("sublist: no matches found"); + public static readonly Exception ErrNilChan = new ArgumentNullException("sublist: nil channel"); + public static readonly Exception ErrAlreadyRegistered = new InvalidOperationException("sublist: notification already registered"); + + // Cache limits. + internal const int SlCacheMax = 1024; + internal const int SlCacheSweep = 256; + // Threshold for creating a fast plist for Match. + internal const int PlistMin = 256; + + // Shared empty result singleton. + internal static readonly SubscriptionIndexResult EmptyResult = new(); + + // ------------------------------------------------------------------------- + // Fields + // ------------------------------------------------------------------------- + + private readonly ReaderWriterLockSlim _lock = new(LockRecursionPolicy.NoRecursion); + private long _genId; + private long _matches; + private long _cacheHits; + private long _inserts; + private long _removes; + private SublistLevel _root; + private Dictionary? _cache; + private int _ccSweep; + private NotifyMaps? _notify; + private uint _count; + + // ------------------------------------------------------------------------- + // Construction + // ------------------------------------------------------------------------- + + private SubscriptionIndex(bool enableCache) + { + _root = new SublistLevel(); + _cache = enableCache ? new Dictionary() : null; + } + + /// Creates a new SubscriptionIndex with caching controlled by . + public static SubscriptionIndex NewSublist(bool enableCache) => new(enableCache); + + /// Creates a new SubscriptionIndex with caching enabled. + public static SubscriptionIndex NewSublistWithCache() => new(true); + + /// Creates a new SubscriptionIndex with caching disabled. + public static SubscriptionIndex NewSublistNoCache() => new(false); + + // ------------------------------------------------------------------------- + // Public API — Cache + // ------------------------------------------------------------------------- + + /// Returns whether caching is enabled. + public bool CacheEnabled() + { + _lock.EnterReadLock(); + try { return _cache != null; } + finally { _lock.ExitReadLock(); } + } + + /// Returns the number of cached result entries. + public int CacheCount() + { + _lock.EnterReadLock(); + try { return _cache?.Count ?? 0; } + finally { _lock.ExitReadLock(); } + } + + // ------------------------------------------------------------------------- + // Public API — Insert / Remove + // ------------------------------------------------------------------------- + + /// Inserts a subscription into the index. Mirrors Sublist.Insert. + public Exception? Insert(Subscription sub) + { + var subject = Encoding.ASCII.GetString(sub.Subject); + + _lock.EnterWriteLock(); + try + { + bool sfwc = false, haswc = false, isnew = false; + SublistNode? n = null; + var l = _root; + + var start = 0; + for (var i = 0; i <= subject.Length; i++) + { + if (i < subject.Length && subject[i] != Btsep) + continue; + + var tokenLen = i - start; + if (tokenLen == 0 || sfwc) + return ErrInvalidSubject; + + var t = subject.Substring(start, tokenLen); + + if (tokenLen > 1) + { + if (!l.Nodes.TryGetValue(t, out n)) + { + n = new SublistNode(); + l.Nodes[t] = n; + } + } + else + { + switch (t[0]) + { + case Pwc: + n = l.Pwc; + haswc = true; + if (n == null) + { + n = new SublistNode(); + l.Pwc = n; + } + break; + case Fwc: + n = l.Fwc; + haswc = true; + sfwc = true; + if (n == null) + { + n = new SublistNode(); + l.Fwc = n; + } + break; + default: + if (!l.Nodes.TryGetValue(t, out n)) + { + n = new SublistNode(); + l.Nodes[t] = n; + } + break; + } + } + + n.Next ??= new SublistLevel(); + l = n.Next; + + start = i + 1; + } + + if (n == null) + return ErrInvalidSubject; + + if (sub.Queue == null || sub.Queue.Length == 0) + { + n.PSubs[sub] = default; + isnew = n.PSubs.Count == 1; + if (n.PList != null) + { + n.PList.Add(sub); + } + else if (n.PSubs.Count > PlistMin) + { + n.PList = new List(n.PSubs.Count); + foreach (var psub in n.PSubs.Keys) + n.PList.Add(psub); + } + } + else + { + n.QSubs ??= new Dictionary>(); + var qname = Encoding.ASCII.GetString(sub.Queue); + if (!n.QSubs.TryGetValue(qname, out var subs)) + { + subs = new Dictionary(); + n.QSubs[qname] = subs; + isnew = true; + } + subs[sub] = 0; + } + + _count++; + _inserts++; + + AddToCache(subject, sub); + Interlocked.Increment(ref _genId); + + if (_notify != null && isnew && !haswc && _notify.Insert.Count > 0) + ChkForInsertNotification(subject, sub.Queue != null ? Encoding.ASCII.GetString(sub.Queue) : string.Empty); + } + finally + { + _lock.ExitWriteLock(); + } + + return null; + } + + /// Removes a single subscription. Mirrors Sublist.Remove. + public Exception? Remove(Subscription sub) => RemoveInternal(sub, true, true); + + /// Removes a batch of subscriptions. Mirrors Sublist.RemoveBatch. + public Exception? RemoveBatch(IReadOnlyList subs) + { + if (subs.Count == 0) return null; + + _lock.EnterWriteLock(); + try + { + var wasEnabled = _cache != null; + _cache = null; + + Exception? err = null; + foreach (var sub in subs) + { + var lerr = RemoveInternal(sub, false, false); + err ??= lerr; + } + + Interlocked.Increment(ref _genId); + if (wasEnabled) + _cache = new Dictionary(); + + return err; + } + finally + { + _lock.ExitWriteLock(); + } + } + + /// Invalidates cache for a remote queue sub weight change. + public void UpdateRemoteQSub(Subscription sub) + { + _lock.EnterWriteLock(); + try + { + RemoveFromCache(Encoding.ASCII.GetString(sub.Subject)); + Interlocked.Increment(ref _genId); + } + finally + { + _lock.ExitWriteLock(); + } + } + + // ------------------------------------------------------------------------- + // Public API — Match / HasInterest / NumInterest + // ------------------------------------------------------------------------- + + /// + /// Matches all subscriptions for a literal subject. + /// Returns a result containing plain and queue subscribers. + /// Mirrors Sublist.Match. + /// + public SubscriptionIndexResult Match(string subject) => MatchInternal(subject, true, false); + + /// Matches using a byte[] subject. Mirrors Sublist.MatchBytes. + public SubscriptionIndexResult MatchBytes(byte[] subject) => MatchInternal(Encoding.ASCII.GetString(subject), true, true); + + /// Returns true if there is any interest in the literal subject. + public bool HasInterest(string subject) => HasInterestInternal(subject, true, out _, out _); + + /// Returns counts of plain and queue subscribers for the literal subject. + public (int np, int nq) NumInterest(string subject) + { + HasInterestInternal(subject, true, out var np, out var nq); + return (np, nq); + } + + // ------------------------------------------------------------------------- + // Public API — ReverseMatch + // ------------------------------------------------------------------------- + + /// + /// For a given subject (which may contain wildcards), returns all subscriptions + /// that would match that subject. Mirrors Sublist.ReverseMatch. + /// + public SubscriptionIndexResult ReverseMatch(string subject) + { + var tokens = TokenizeIntoArray(subject); + var result = new SubscriptionIndexResult(); + + _lock.EnterReadLock(); + try + { + ReverseMatchLevel(_root, tokens.AsSpan(), null, result); + if (result.PSubs.Count == 0 && result.QSubs.Count == 0) + result = EmptyResult; + } + finally + { + _lock.ExitReadLock(); + } + return result; + } + + // ------------------------------------------------------------------------- + // Public API — Enumerate + // ------------------------------------------------------------------------- + + /// Returns the subscription count. + public uint Count() + { + _lock.EnterReadLock(); + try { return _count; } + finally { _lock.ExitReadLock(); } + } + + /// Collects all subscriptions into . Mirrors Sublist.All. + public void All(List subs) + { + _lock.EnterReadLock(); + try { CollectAllSubs(_root, subs); } + finally { _lock.ExitReadLock(); } + } + + /// Collects local client subscriptions. Mirrors Sublist.localSubs. + public void LocalSubs(List subs, bool includeLeafHubs) + { + _lock.EnterReadLock(); + try { CollectLocalSubs(_root, subs, includeLeafHubs); } + finally { _lock.ExitReadLock(); } + } + + /// Returns the generation ID (incremented on every mutation). Thread-safe. + public long GenId() => Interlocked.Read(ref _genId); + + // ------------------------------------------------------------------------- + // Public API — Stats + // ------------------------------------------------------------------------- + + /// Returns statistics for the current state. Mirrors Sublist.Stats. + public SublistStats Stats() + { + var st = new SublistStats(); + + _lock.EnterReadLock(); + var cache = _cache; + var cc = _cache?.Count ?? 0; + st.NumSubs = _count; + st.NumInserts = (ulong)Interlocked.Read(ref _inserts); + st.NumRemoves = (ulong)Interlocked.Read(ref _removes); + _lock.ExitReadLock(); + + st.NumCache = (uint)cc; + st.NumMatches = (ulong)Interlocked.Read(ref _matches); + st.CacheHitsRaw = (ulong)Interlocked.Read(ref _cacheHits); + if (st.NumMatches > 0) + st.CacheHitRate = (double)st.CacheHitsRaw / st.NumMatches; + + if (cache != null) + { + int tot = 0, max = 0, clen = 0; + _lock.EnterReadLock(); + try + { + foreach (var r in _cache!.Values) + { + clen++; + var l = r.PSubs.Count + r.QSubs.Count; + tot += l; + if (l > max) max = l; + } + } + finally + { + _lock.ExitReadLock(); + } + st.TotFanout = tot; + st.CacheCnt = clen; + st.MaxFanout = (uint)max; + if (tot > 0) + st.AvgFanout = (double)tot / clen; + } + + return st; + } + + // ------------------------------------------------------------------------- + // Public API — Notifications + // ------------------------------------------------------------------------- + + /// Registers a notification for interest changes on a literal subject. + public Exception? RegisterNotification(string subject, Action notify) => + RegisterNotificationInternal(subject, string.Empty, notify); + + /// Registers a notification for queue interest changes. + public Exception? RegisterQueueNotification(string subject, string queue, Action notify) => + RegisterNotificationInternal(subject, queue, notify); + + /// Clears a notification. + public bool ClearNotification(string subject, Action notify) => + ClearNotificationInternal(subject, string.Empty, notify); + + /// Clears a queue notification. + public bool ClearQueueNotification(string subject, string queue, Action notify) => + ClearNotificationInternal(subject, queue, notify); + + // ------------------------------------------------------------------------- + // Internal: numLevels (for testing) + // ------------------------------------------------------------------------- + + /// Returns the maximum depth of the trie. Used in tests. + internal int NumLevels() => VisitLevel(_root, 0); + + // ------------------------------------------------------------------------- + // Private: Remove internals + // ------------------------------------------------------------------------- + + private Exception? RemoveInternal(Subscription sub, bool shouldLock, bool doCacheUpdates) + { + var subject = Encoding.ASCII.GetString(sub.Subject); + + if (shouldLock) _lock.EnterWriteLock(); + try + { + bool sfwc = false, haswc = false; + SublistNode? n = null; + var l = _root; + + var lnts = new LevelNodeToken[32]; + var levelCount = 0; + + var start = 0; + for (var i = 0; i <= subject.Length; i++) + { + if (i < subject.Length && subject[i] != Btsep) + continue; + + var tokenLen = i - start; + if (tokenLen == 0 || sfwc) + return ErrInvalidSubject; + + if (l == null!) + return ErrNotFound; + + var t = subject.Substring(start, tokenLen); + + if (tokenLen > 1) + { + l.Nodes.TryGetValue(t, out n); + } + else + { + switch (t[0]) + { + case Pwc: + n = l.Pwc; + haswc = true; + break; + case Fwc: + n = l.Fwc; + haswc = true; + sfwc = true; + break; + default: + l.Nodes.TryGetValue(t, out n); + break; + } + } + + if (n != null) + { + if (levelCount < lnts.Length) + lnts[levelCount++] = new LevelNodeToken(l, n, t); + l = n.Next!; + } + else + { + l = null!; + } + + start = i + 1; + } + + var (removed, last) = RemoveFromNode(n, sub); + if (!removed) + return ErrNotFound; + + _count--; + _removes++; + + for (var i = levelCount - 1; i >= 0; i--) + { + ref var lnt = ref lnts[i]; + if (lnt.Node.IsEmpty()) + lnt.Level.PruneNode(lnt.Node, lnt.Token); + } + + if (doCacheUpdates) + { + RemoveFromCache(subject); + Interlocked.Increment(ref _genId); + } + + if (_notify != null && last && !haswc && _notify.Remove.Count > 0) + ChkForRemoveNotification(subject, sub.Queue != null ? Encoding.ASCII.GetString(sub.Queue) : string.Empty); + + return null; + } + finally + { + if (shouldLock) _lock.ExitWriteLock(); + } + } + + private static (bool found, bool last) RemoveFromNode(SublistNode? n, Subscription sub) + { + if (n == null) return (false, true); + + if (sub.Queue == null || sub.Queue.Length == 0) + { + var found = n.PSubs.Remove(sub); + if (found && n.PList != null) + n.PList = null; // Will be re-populated on Match if needed. + return (found, n.PSubs.Count == 0); + } + + // Queue subscription. + var qname = Encoding.ASCII.GetString(sub.Queue); + if (n.QSubs == null || !n.QSubs.TryGetValue(qname, out var qsub)) + return (false, false); + + var removed = qsub.Remove(sub); + bool last = false; + if (qsub.Count == 0) + { + last = true; + n.QSubs.Remove(qname); + } + return (removed, last); + } + + // ------------------------------------------------------------------------- + // Private: Match internals + // ------------------------------------------------------------------------- + + private SubscriptionIndexResult MatchInternal(string subject, bool doLock, bool doCopyOnCache) + { + Interlocked.Increment(ref _matches); + + // Check cache first. + if (doLock) _lock.EnterReadLock(); + var cacheEnabled = _cache != null; + SubscriptionIndexResult? cached = null; + _cache?.TryGetValue(subject, out cached); + if (doLock) _lock.ExitReadLock(); + + if (cached != null) + { + Interlocked.Increment(ref _cacheHits); + return cached; + } + + // Tokenize. + var tokens = TokenizeForMatch(subject); + if (tokens == null) + return EmptyResult; + + var result = new SubscriptionIndexResult(); + + int n; + if (doLock) + { + if (cacheEnabled) + _lock.EnterWriteLock(); + else + _lock.EnterReadLock(); + } + + try + { + MatchLevel(_root, tokens, result); + + if (result.PSubs.Count == 0 && result.QSubs.Count == 0) + result = EmptyResult; + + if (cacheEnabled) + { + _cache![subject] = result; + n = _cache.Count; + } + else + { + n = 0; + } + } + finally + { + if (doLock) + { + if (cacheEnabled) _lock.ExitWriteLock(); + else _lock.ExitReadLock(); + } + } + + // Reduce cache if over limit. + if (cacheEnabled && n > SlCacheMax && Interlocked.CompareExchange(ref _ccSweep, 1, 0) == 0) + Task.Run(ReduceCacheCount); + + return result; + } + + internal SubscriptionIndexResult MatchNoLock(string subject) => MatchInternal(subject, false, false); + + private bool HasInterestInternal(string subject, bool doLock, out int np, out int nq) + { + np = 0; + nq = 0; + + // Check cache first. + if (doLock) _lock.EnterReadLock(); + bool matched = false; + if (_cache != null && _cache.TryGetValue(subject, out var cached)) + { + np = cached.PSubs.Count; + foreach (var qsub in cached.QSubs) + nq += qsub.Count; + matched = cached.PSubs.Count + cached.QSubs.Count > 0; + } + if (doLock) _lock.ExitReadLock(); + + if (matched) + { + Interlocked.Increment(ref _cacheHits); + return true; + } + + var tokens = TokenizeForMatch(subject); + if (tokens == null) return false; + + if (doLock) _lock.EnterReadLock(); + try + { + return MatchLevelForAny(_root, tokens.AsSpan(), ref np, ref nq); + } + finally + { + if (doLock) _lock.ExitReadLock(); + } + } + + // ------------------------------------------------------------------------- + // Private: Cache management + // ------------------------------------------------------------------------- + + private void AddToCache(string subject, Subscription sub) + { + if (_cache == null) return; + + if (SubjectIsLiteral(subject)) + { + if (_cache.TryGetValue(subject, out var r)) + _cache[subject] = r.AddSubToResult(sub); + return; + } + + // Wildcard subject — update any matching cache entries. + foreach (var key in _cache.Keys.ToArray()) + { + if (MatchLiteral(key, subject)) + { + _cache[key] = _cache[key].AddSubToResult(sub); + } + } + } + + private void RemoveFromCache(string subject) + { + if (_cache == null) return; + + if (SubjectIsLiteral(subject)) + { + _cache.Remove(subject); + return; + } + + foreach (var key in _cache.Keys.ToArray()) + { + if (MatchLiteral(key, subject)) + _cache.Remove(key); + } + } + + private void ReduceCacheCount() + { + try + { + _lock.EnterWriteLock(); + try + { + if (_cache == null) return; + foreach (var key in _cache.Keys.ToArray()) + { + _cache.Remove(key); + if (_cache.Count <= SlCacheSweep) + break; + } + } + finally + { + _lock.ExitWriteLock(); + } + } + finally + { + Interlocked.Exchange(ref _ccSweep, 0); + } + } + + // ------------------------------------------------------------------------- + // Private: Trie matching (matchLevel) + // ------------------------------------------------------------------------- + + private static void MatchLevel(SublistLevel? l, string[] toks, SubscriptionIndexResult results) + { + SublistNode? pwc = null, n = null; + for (int i = 0; i < toks.Length; i++) + { + if (l == null) return; + if (l.Fwc != null) AddNodeToResults(l.Fwc, results); + pwc = l.Pwc; + if (pwc != null) MatchLevel(pwc.Next, toks.AsSpan(i + 1).ToArray(), results); + + l.Nodes.TryGetValue(toks[i], out n); + l = n?.Next; + } + if (n != null) AddNodeToResults(n, results); + if (pwc != null) AddNodeToResults(pwc, results); + } + + private static bool MatchLevelForAny(SublistLevel? l, ReadOnlySpan toks, ref int np, ref int nq) + { + SublistNode? pwc = null, n = null; + for (int i = 0; i < toks.Length; i++) + { + if (l == null) return false; + if (l.Fwc != null) + { + np += l.Fwc.PSubs.Count; + foreach (var qsub in l.Fwc.QSubs?.Values ?? Enumerable.Empty>()) + nq += qsub.Count; + return true; + } + pwc = l.Pwc; + if (pwc != null) + { + if (MatchLevelForAny(pwc.Next, toks[(i + 1)..], ref np, ref nq)) + return true; + } + l.Nodes.TryGetValue(toks[i], out n); + l = n?.Next; + } + if (n != null) + { + np += n.PSubs.Count; + foreach (var qsub in n.QSubs?.Values ?? Enumerable.Empty>()) + nq += qsub.Count; + if ((n.PList?.Count ?? 0) > 0 || n.PSubs.Count > 0 || (n.QSubs?.Count ?? 0) > 0) + return true; + } + if (pwc != null) + { + np += pwc.PSubs.Count; + foreach (var qsub in pwc.QSubs?.Values ?? Enumerable.Empty>()) + nq += qsub.Count; + return (pwc.PList?.Count ?? 0) > 0 || pwc.PSubs.Count > 0 || (pwc.QSubs?.Count ?? 0) > 0; + } + return false; + } + + // ------------------------------------------------------------------------- + // Private: Reverse match + // ------------------------------------------------------------------------- + + private static void ReverseMatchLevel(SublistLevel? l, ReadOnlySpan toks, SublistNode? n, SubscriptionIndexResult results) + { + if (l == null) return; + for (int i = 0; i < toks.Length; i++) + { + var t = toks[i]; + if (t.Length == 1) + { + if (t[0] == Fwc) + { + GetAllNodes(l, results); + return; + } + if (t[0] == Pwc) + { + foreach (var nd in l.Nodes.Values) + ReverseMatchLevel(nd.Next, toks[(i + 1)..], nd, results); + if (l.Pwc != null) + ReverseMatchLevel(l.Pwc.Next, toks[(i + 1)..], n, results); + if (l.Fwc != null) + GetAllNodes(l, results); + return; + } + } + + if (l.Fwc != null) + { + GetAllNodes(l, results); + return; + } + if (l.Pwc != null) + ReverseMatchLevel(l.Pwc.Next, toks[(i + 1)..], n, results); + + if (!l.Nodes.TryGetValue(t, out var next)) + { + n = null; + break; + } + n = next; + l = n.Next; + } + if (n != null) AddNodeToResults(n, results); + } + + private static void GetAllNodes(SublistLevel? l, SubscriptionIndexResult results) + { + if (l == null) return; + if (l.Pwc != null) AddNodeToResults(l.Pwc, results); + if (l.Fwc != null) AddNodeToResults(l.Fwc, results); + foreach (var n in l.Nodes.Values) + { + AddNodeToResults(n, results); + GetAllNodes(n.Next, results); + } + } + + // ------------------------------------------------------------------------- + // Private: addNodeToResults + // ------------------------------------------------------------------------- + + private static void AddNodeToResults(SublistNode n, SubscriptionIndexResult results) + { + // Plain subscriptions. + if (n.PList != null) + { + results.PSubs.AddRange(n.PList); + } + else + { + foreach (var psub in n.PSubs.Keys) + results.PSubs.Add(psub); + } + + // Queue subscriptions. + if (n.QSubs == null) return; + foreach (var (qname, qr) in n.QSubs) + { + if (qr.Count == 0) continue; + + var i = FindQSlot(Encoding.ASCII.GetBytes(qname), results.QSubs); + if (i < 0) + { + i = results.QSubs.Count; + results.QSubs.Add(new List(qr.Count)); + } + + foreach (var sub in qr.Keys) + { + if (IsRemoteQSub(sub)) + { + var ns = Interlocked.CompareExchange(ref sub.Qw, 0, 0); + for (var j = 0; j < ns; j++) + results.QSubs[i].Add(sub); + } + else + { + results.QSubs[i].Add(sub); + } + } + } + } + + internal static int FindQSlot(byte[]? queue, List> qsl) + { + if (queue == null) return -1; + for (int i = 0; i < qsl.Count; i++) + { + if (qsl[i].Count > 0 && qsl[i][0].Queue != null && queue.AsSpan().SequenceEqual(qsl[i][0].Queue)) + return i; + } + return -1; + } + + internal static bool IsRemoteQSub(Subscription sub) + { + return sub.Queue != null && sub.Queue.Length > 0 + && sub.Client != null + && (sub.Client.Kind == ClientKind.Router || sub.Client.Kind == ClientKind.Leaf); + } + + // ------------------------------------------------------------------------- + // Private: Enumerate + // ------------------------------------------------------------------------- + + private static void AddLocalSub(Subscription sub, List subs, bool includeLeafHubs) + { + if (sub.Client == null) return; + var kind = sub.Client.Kind; + if (kind == ClientKind.Client || kind == ClientKind.System || + kind == ClientKind.JetStream || kind == ClientKind.Account || + (includeLeafHubs && sub.Client.IsHubLeafNode())) + { + subs.Add(sub); + } + } + + private static void AddNodeToSubsLocal(SublistNode n, List subs, bool includeLeafHubs) + { + if (n.PList != null) + { + foreach (var sub in n.PList) + AddLocalSub(sub, subs, includeLeafHubs); + } + else + { + foreach (var sub in n.PSubs.Keys) + AddLocalSub(sub, subs, includeLeafHubs); + } + if (n.QSubs != null) + { + foreach (var qr in n.QSubs.Values) + foreach (var sub in qr.Keys) + AddLocalSub(sub, subs, includeLeafHubs); + } + } + + private static void CollectLocalSubs(SublistLevel? l, List subs, bool includeLeafHubs) + { + if (l == null) return; + foreach (var n in l.Nodes.Values) + { + AddNodeToSubsLocal(n, subs, includeLeafHubs); + CollectLocalSubs(n.Next, subs, includeLeafHubs); + } + if (l.Pwc != null) { AddNodeToSubsLocal(l.Pwc, subs, includeLeafHubs); CollectLocalSubs(l.Pwc.Next, subs, includeLeafHubs); } + if (l.Fwc != null) { AddNodeToSubsLocal(l.Fwc, subs, includeLeafHubs); CollectLocalSubs(l.Fwc.Next, subs, includeLeafHubs); } + } + + private static void AddAllNodeToSubs(SublistNode n, List subs) + { + if (n.PList != null) + subs.AddRange(n.PList); + else + foreach (var sub in n.PSubs.Keys) + subs.Add(sub); + + if (n.QSubs != null) + foreach (var qr in n.QSubs.Values) + foreach (var sub in qr.Keys) + subs.Add(sub); + } + + private static void CollectAllSubs(SublistLevel? l, List subs) + { + if (l == null) return; + foreach (var n in l.Nodes.Values) + { + AddAllNodeToSubs(n, subs); + CollectAllSubs(n.Next, subs); + } + if (l.Pwc != null) { AddAllNodeToSubs(l.Pwc, subs); CollectAllSubs(l.Pwc.Next, subs); } + if (l.Fwc != null) { AddAllNodeToSubs(l.Fwc, subs); CollectAllSubs(l.Fwc.Next, subs); } + } + + // ------------------------------------------------------------------------- + // Private: Notifications + // ------------------------------------------------------------------------- + + private Exception? RegisterNotificationInternal(string subject, string queue, Action notify) + { + if (SubjectHasWildcard(subject)) + return ErrInvalidSubject; + + bool hasInterest = false; + var r = Match(subject); + + if (r.PSubs.Count + r.QSubs.Count > 0) + { + if (queue.Length == 0) + { + foreach (var sub in r.PSubs) + { + if (Encoding.ASCII.GetString(sub.Subject) == subject) + { + hasInterest = true; + break; + } + } + } + else + { + foreach (var qsub in r.QSubs) + { + if (qsub.Count == 0) continue; + var qs = qsub[0]; + if (Encoding.ASCII.GetString(qs.Subject) == subject && + qs.Queue != null && Encoding.ASCII.GetString(qs.Queue) == queue) + { + hasInterest = true; + break; + } + } + } + } + + var key = KeyFromSubjectAndQueue(subject, queue); + Exception? err; + + _lock.EnterWriteLock(); + try + { + _notify ??= new NotifyMaps(); + err = hasInterest + ? AddNotify(_notify.Remove, key, notify) + : AddNotify(_notify.Insert, key, notify); + } + finally + { + _lock.ExitWriteLock(); + } + + if (err == null) + SendNotification(notify, hasInterest); + + return err; + } + + private bool ClearNotificationInternal(string subject, string queue, Action notify) + { + _lock.EnterWriteLock(); + try + { + if (_notify == null) return false; + var key = KeyFromSubjectAndQueue(subject, queue); + var didRemove = ChkAndRemove(key, notify, _notify.Remove); + didRemove = didRemove || ChkAndRemove(key, notify, _notify.Insert); + if (_notify.Remove.Count + _notify.Insert.Count == 0) + _notify = null; + return didRemove; + } + finally + { + _lock.ExitWriteLock(); + } + } + + private static bool ChkAndRemove(string key, Action notify, Dictionary>> ms) + { + if (!ms.TryGetValue(key, out var chs)) return false; + for (int i = 0; i < chs.Count; i++) + { + if (chs[i] == notify) + { + chs[i] = chs[^1]; + chs.RemoveAt(chs.Count - 1); + if (chs.Count == 0) ms.Remove(key); + return true; + } + } + return false; + } + + private static Exception? AddNotify(Dictionary>> m, string subject, Action notify) + { + if (m.TryGetValue(subject, out var chs)) + { + foreach (var ch in chs) + if (ch == notify) + return ErrAlreadyRegistered; + } + else + { + chs = new List>(); + m[subject] = chs; + } + chs.Add(notify); + return null; + } + + private static void SendNotification(Action notify, bool hasInterest) + { + // Non-blocking send — in Go this was select { case ch <- val: default: } + // In .NET we just invoke; the caller should ensure it doesn't block. + try { notify(hasInterest); } + catch { /* swallow if handler faults */ } + } + + private static string KeyFromSubjectAndQueue(string subject, string queue) + { + if (queue.Length == 0) return subject; + return subject + " " + queue; + } + + private void ChkForInsertNotification(string subject, string queue) + { + var key = KeyFromSubjectAndQueue(subject, queue); + if (_notify!.Insert.TryGetValue(key, out var chs) && chs.Count > 0) + { + foreach (var ch in chs) SendNotification(ch, true); + if (!_notify.Remove.TryGetValue(key, out var rmChs)) + { + rmChs = new List>(); + _notify.Remove[key] = rmChs; + } + rmChs.AddRange(chs); + _notify.Insert.Remove(key); + } + } + + private void ChkForRemoveNotification(string subject, string queue) + { + var key = KeyFromSubjectAndQueue(subject, queue); + if (!_notify!.Remove.TryGetValue(key, out var chs) || chs.Count == 0) return; + + bool hasInterest = false; + var r = MatchNoLock(subject); + if (r.PSubs.Count + r.QSubs.Count > 0) + { + if (queue.Length == 0) + { + foreach (var sub in r.PSubs) + { + if (Encoding.ASCII.GetString(sub.Subject) == subject) + { + hasInterest = true; + break; + } + } + } + else + { + foreach (var qsub in r.QSubs) + { + if (qsub.Count == 0) continue; + var qs = qsub[0]; + if (Encoding.ASCII.GetString(qs.Subject) == subject && + qs.Queue != null && Encoding.ASCII.GetString(qs.Queue) == queue) + { + hasInterest = true; + break; + } + } + } + } + + if (!hasInterest) + { + foreach (var ch in chs) SendNotification(ch, false); + if (!_notify.Insert.TryGetValue(key, out var insChs)) + { + insChs = new List>(); + _notify.Insert[key] = insChs; + } + insChs.AddRange(chs); + _notify.Remove.Remove(key); + } + } + + // ------------------------------------------------------------------------- + // Private: visitLevel (depth calculation for tests) + // ------------------------------------------------------------------------- + + private static int VisitLevel(SublistLevel? l, int depth) + { + if (l == null || l.NumNodes() == 0) return depth; + depth++; + var maxDepth = depth; + foreach (var n in l.Nodes.Values) + { + var d = VisitLevel(n.Next, depth); + if (d > maxDepth) maxDepth = d; + } + if (l.Pwc != null) { var d = VisitLevel(l.Pwc.Next, depth); if (d > maxDepth) maxDepth = d; } + if (l.Fwc != null) { var d = VisitLevel(l.Fwc.Next, depth); if (d > maxDepth) maxDepth = d; } + return maxDepth; + } + + // ------------------------------------------------------------------------- + // Private: Tokenization helpers + // ------------------------------------------------------------------------- + + private static string[]? TokenizeForMatch(string subject) + { + if (subject.Length == 0) return null; + var tokens = new List(8); + var start = 0; + for (var i = 0; i < subject.Length; i++) + { + if (subject[i] == Btsep) + { + if (i - start == 0) return null; + tokens.Add(subject.Substring(start, i - start)); + start = i + 1; + } + } + if (start >= subject.Length) return null; + tokens.Add(subject[start..]); + return tokens.ToArray(); + } + + internal static string[] TokenizeIntoArray(string subject) + { + var tokens = new List(8); + var start = 0; + for (var i = 0; i < subject.Length; i++) + { + if (subject[i] == Btsep) + { + tokens.Add(subject.Substring(start, i - start)); + start = i + 1; + } + } + tokens.Add(subject[start..]); + return tokens.ToArray(); + } + + // ------------------------------------------------------------------------- + // Public static: Subject validation and utility methods + // ------------------------------------------------------------------------- + + /// Returns true if the subject contains any wildcard tokens. + public static bool SubjectHasWildcard(string subject) + { + for (int i = 0; i < subject.Length; i++) + { + var c = subject[i]; + if (c == Pwc || c == Fwc) + { + if ((i == 0 || subject[i - 1] == Btsep) && + (i + 1 == subject.Length || subject[i + 1] == Btsep)) + return true; + } + } + return false; + } + + /// Returns true if the subject is a literal (no wildcards). + public static bool SubjectIsLiteral(string subject) + { + for (int i = 0; i < subject.Length; i++) + { + var c = subject[i]; + if (c == Pwc || c == Fwc) + { + if ((i == 0 || subject[i - 1] == Btsep) && + (i + 1 == subject.Length || subject[i + 1] == Btsep)) + return false; + } + } + return true; + } + + /// Returns true if subject is valid (allows wildcards). + public static bool IsValidSubject(string subject) => IsValidSubjectInternal(subject, false); + + /// Returns true if subject is valid and literal (no wildcards). + public static bool IsValidPublishSubject(string subject) => + IsValidSubject(subject) && SubjectIsLiteral(subject); + + /// Returns true if subject is valid and literal (no wildcards). + public static bool IsValidLiteralSubject(string subject) => + IsValidLiteralSubjectTokens(subject.Split(Btsep)); + + private static bool IsValidSubjectInternal(string subject, bool checkRunes) + { + if (string.IsNullOrEmpty(subject)) return false; + + if (checkRunes) + { + if (subject.Contains('\0')) return false; + foreach (var r in subject) + { + if (r == '\uFFFD') return false; // RuneError + } + } + + bool sfwc = false; + var start = 0; + for (var i = 0; i <= subject.Length; i++) + { + if (i < subject.Length && subject[i] != Btsep) + continue; + + var tokenLen = i - start; + if (tokenLen == 0 || sfwc) return false; + + if (tokenLen > 1) + { + var t = subject.AsSpan(start, tokenLen); + if (t.ContainsAny(" \t\n\r\f")) + return false; + } + else + { + switch (subject[start]) + { + case Fwc: sfwc = true; break; + case ' ': case '\t': case '\n': case '\r': case '\f': return false; + } + } + + start = i + 1; + } + return true; + } + + private static bool IsValidLiteralSubjectTokens(string[] tokens) + { + foreach (var t in tokens) + { + if (t.Length == 0) return false; + if (t.Length > 1) continue; + if (t[0] == Pwc || t[0] == Fwc) return false; + } + return true; + } + + /// Determines if two subjects could both match a single literal subject. + public static bool SubjectsCollide(string subj1, string subj2) + { + if (subj1 == subj2) return true; + + var toks1 = subj1.Split(Btsep); + var toks2 = subj2.Split(Btsep); + AnalyzeTokens(toks1, out var pwc1, out var fwc1); + AnalyzeTokens(toks2, out var pwc2, out var fwc2); + + bool l1 = !(pwc1 || fwc1), l2 = !(pwc2 || fwc2); + if (l1 && l2) return subj1 == subj2; + if (l1 && !l2) return IsSubsetMatch(toks1, subj2); + if (l2 && !l1) return IsSubsetMatch(toks2, subj1); + + if (!fwc1 && !fwc2 && toks1.Length != toks2.Length) return false; + if (toks1.Length != toks2.Length) + { + if ((toks1.Length < toks2.Length && !fwc1) || (toks2.Length < toks1.Length && !fwc2)) + return false; + } + + var stop = Math.Min(toks1.Length, toks2.Length); + for (int i = 0; i < stop; i++) + { + if (!TokensCanMatch(toks1[i], toks2[i])) + return false; + } + return true; + } + + /// Returns true if the subject matches the filter. Mirrors SubjectMatchesFilter. + public static bool SubjectMatchesFilter(string subject, string filter) => + SubjectIsSubsetMatch(subject, filter); + + /// Matches a literal subject against a potentially-wildcarded subject. Used in the cache layer. + public static bool MatchLiteral(string literal, string subject) + { + int li = 0; + int ll = literal.Length; + int ls = subject.Length; + + for (int i = 0; i < ls; i++) + { + if (li >= ll) return false; + + switch (subject[i]) + { + case Pwc: + if (i == 0 || subject[i - 1] == Btsep) + { + if (i == ls - 1) + { + while (true) + { + if (li >= ll) return true; + if (literal[li] == Btsep) return false; + li++; + } + } + else if (subject[i + 1] == Btsep) + { + while (true) + { + if (li >= ll) return false; + if (literal[li] == Btsep) break; + li++; + } + i++; + } + } + break; + case Fwc: + if ((i == 0 || subject[i - 1] == Btsep) && i == ls - 1) + return true; + break; + } + + if (subject[i] != literal[li]) return false; + li++; + } + return li >= ll; + } + + /// Returns the number of dot-separated tokens in a subject. + public static int NumTokens(string subject) + { + if (subject.Length == 0) return 0; + int count = 0; + for (int i = 0; i < subject.Length; i++) + if (subject[i] == Btsep) count++; + return count + 1; + } + + /// Returns the 1-based indexed token from a subject. + public static string TokenAt(string subject, int index) + { + int ti = 1, start = 0; + for (int i = 0; i < subject.Length; i++) + { + if (subject[i] == Btsep) + { + if (ti == index) return subject[start..i]; + start = i + 1; + ti++; + } + } + if (ti == index) return subject[start..]; + return string.Empty; + } + + internal static bool SubjectIsSubsetMatch(string subject, string test) + { + var tts = TokenizeIntoArray(subject); + return IsSubsetMatch(tts, test); + } + + internal static bool IsSubsetMatch(string[] tokens, string test) + { + var tts = TokenizeIntoArray(test); + return IsSubsetMatchTokenized(tokens, tts); + } + + internal static bool IsSubsetMatchTokenized(string[] tokens, string[] test) + { + for (int i = 0; i < test.Length; i++) + { + if (i >= tokens.Length) return false; + + var t2 = test[i]; + if (t2.Length == 0) return false; + if (t2[0] == Fwc && t2.Length == 1) return true; + + var t1 = tokens[i]; + if (t1.Length == 0 || (t1[0] == Fwc && t1.Length == 1)) return false; + if (t1[0] == Pwc && t1.Length == 1) + { + if (!(t2[0] == Pwc && t2.Length == 1)) return false; + if (i >= test.Length) return true; + continue; + } + if (t2[0] != Pwc && string.Compare(t1, t2, StringComparison.Ordinal) != 0) + return false; + } + return tokens.Length == test.Length; + } + + private static void AnalyzeTokens(string[] tokens, out bool hasPWC, out bool hasFWC) + { + hasPWC = false; + hasFWC = false; + foreach (var t in tokens) + { + if (t.Length == 0 || t.Length > 1) continue; + switch (t[0]) + { + case Pwc: hasPWC = true; break; + case Fwc: hasFWC = true; break; + } + } + } + + private static bool TokensCanMatch(string t1, string t2) + { + if (t1.Length == 0 || t2.Length == 0) return false; + if (t1[0] == Pwc || t2[0] == Pwc || t1[0] == Fwc || t2[0] == Fwc) return true; + return t1 == t2; + } + + // ------------------------------------------------------------------------- + // Nested types: SublistNode, SublistLevel, NotifyMaps + // ------------------------------------------------------------------------- + + internal sealed class SublistNode + { + public readonly Dictionary PSubs = new(ReferenceEqualityComparer.Instance); + public Dictionary>? QSubs; + public List? PList; + public SublistLevel? Next; + + public bool IsEmpty() + { + return PSubs.Count == 0 && (QSubs == null || QSubs.Count == 0) && + (Next == null || Next.NumNodes() == 0); + } + } + + internal sealed class SublistLevel + { + public readonly Dictionary Nodes = new(); + public SublistNode? Pwc; + public SublistNode? Fwc; + + public int NumNodes() + { + var num = Nodes.Count; + if (Pwc != null) num++; + if (Fwc != null) num++; + return num; + } + + public void PruneNode(SublistNode n, string t) + { + if (ReferenceEquals(n, Fwc)) Fwc = null; + else if (ReferenceEquals(n, Pwc)) Pwc = null; + else Nodes.Remove(t); + } + } + + private sealed class NotifyMaps + { + public readonly Dictionary>> Insert = new(); + public readonly Dictionary>> Remove = new(); + } + + private readonly struct LevelNodeToken + { + public readonly SublistLevel Level; + public readonly SublistNode Node; + public readonly string Token; + + public LevelNodeToken(SublistLevel level, SublistNode node, string token) + { + Level = level; + Node = node; + Token = token; + } + } +} + +/// +/// Result of a subscription match containing plain and queue subscribers. +/// Mirrors Go's SublistResult. +/// +public sealed class SubscriptionIndexResult +{ + public List PSubs { get; } = new(); + public List> QSubs { get; } = new(); + + /// Deep copies this result. Mirrors copyResult. + public SubscriptionIndexResult Copy() + { + var nr = new SubscriptionIndexResult(); + nr.PSubs.AddRange(PSubs); + foreach (var qr in QSubs) + nr.QSubs.Add(new List(qr)); + return nr; + } + + /// Adds a subscription to a copy of this result. Mirrors addSubToResult. + public SubscriptionIndexResult AddSubToResult(Subscription sub) + { + var nr = Copy(); + if (sub.Queue == null || sub.Queue.Length == 0) + { + nr.PSubs.Add(sub); + } + else + { + var i = SubscriptionIndex.FindQSlot(sub.Queue, nr.QSubs); + if (i >= 0) + { + nr.QSubs[i].Add(sub); + } + else + { + nr.QSubs.Add(new List { sub }); + } + } + return nr; + } +} + +/// +/// Public statistics for the subscription index. Mirrors Go's SublistStats. +/// +public sealed class SublistStats +{ + public uint NumSubs { get; set; } + public uint NumCache { get; set; } + public ulong NumInserts { get; set; } + public ulong NumRemoves { get; set; } + public ulong NumMatches { get; set; } + public double CacheHitRate { get; set; } + public uint MaxFanout { get; set; } + public double AvgFanout { get; set; } + + // Internal fields used for aggregation. + internal int TotFanout { get; set; } + internal int CacheCnt { get; set; } + internal ulong CacheHitsRaw { get; set; } + + /// Aggregates another stats object into this one. Mirrors SublistStats.add. + public void Add(SublistStats stat) + { + NumSubs += stat.NumSubs; + NumCache += stat.NumCache; + NumInserts += stat.NumInserts; + NumRemoves += stat.NumRemoves; + NumMatches += stat.NumMatches; + CacheHitsRaw += stat.CacheHitsRaw; + if (MaxFanout < stat.MaxFanout) MaxFanout = stat.MaxFanout; + + TotFanout += stat.TotFanout; + CacheCnt += stat.CacheCnt; + if (TotFanout > 0) + AvgFanout = (double)TotFanout / CacheCnt; + if (NumMatches > 0) + CacheHitRate = (double)CacheHitsRaw / NumMatches; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/Subscription.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/Subscription.cs new file mode 100644 index 0000000..bf63462 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/Subscription.cs @@ -0,0 +1,77 @@ +// Copyright 2012-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/client.go (subscription struct) in the NATS server Go source. + +namespace ZB.MOM.NatsNet.Server.Internal; + +/// +/// Represents a client subscription in the NATS server. +/// Mirrors the Go subscription struct from client.go. +/// This is a minimal stub; full client integration will be added in later sessions. +/// +public sealed class Subscription +{ + /// The subject this subscription is listening on. + public byte[] Subject { get; set; } = []; + + /// The queue group name, or null/empty for non-queue subscriptions. + public byte[]? Queue { get; set; } + + /// The subscription identifier. + public byte[]? Sid { get; set; } + + /// Queue weight for remote queue subscriptions. + public int Qw; + + /// Closed flag (0 = open, 1 = closed). + private int _closed; + + /// The client that owns this subscription. Null in test/stub scenarios. + public NatsClient? Client { get; set; } + + /// Marks this subscription as closed. + public void Close() => Interlocked.Exchange(ref _closed, 1); + + /// Returns true if this subscription has been closed. + public bool IsClosed() => Interlocked.CompareExchange(ref _closed, 0, 0) == 1; +} + +/// +/// Represents the kind of client connection. +/// Mirrors Go's clientKind enum. +/// This is a minimal stub; full implementation in later sessions. +/// +public enum ClientKind +{ + Client = 0, + Router = 1, + Gateway = 2, + System = 3, + Leaf = 4, + JetStream = 5, + Account = 6, +} + +/// +/// Minimal client stub for subscription routing. +/// Full implementation will be added in later sessions. +/// +public class NatsClient +{ + /// The kind of client connection. + public ClientKind Kind { get; set; } + + /// Whether this is a hub leaf node. Stub for now. + public virtual bool IsHubLeafNode() => false; +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/DataStructures/SubscriptionIndexTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/DataStructures/SubscriptionIndexTests.cs new file mode 100644 index 0000000..fe11062 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/DataStructures/SubscriptionIndexTests.cs @@ -0,0 +1,1040 @@ +// Copyright 2012-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server.Tests.Internal.DataStructures; + +/// +/// Tests for SubscriptionIndex — mirrors tests from server/sublist_test.go. +/// Cache/no-cache pairs are collapsed into Theory with InlineData. +/// +public class SubscriptionIndexTests +{ + // Helper factories matching Go's newSub/newQSub/newRemoteQSub. + private static Subscription NewSub(string subject) => + new() { Subject = Encoding.ASCII.GetBytes(subject), Client = new NatsClient { Kind = ClientKind.Client } }; + + private static Subscription NewQSub(string subject, string queue) => + queue.Length > 0 + ? new() { Subject = Encoding.ASCII.GetBytes(subject), Queue = Encoding.ASCII.GetBytes(queue) } + : NewSub(subject); + + private static Subscription NewRemoteQSub(string subject, string queue, int weight) => + queue.Length > 0 + ? new() { Subject = Encoding.ASCII.GetBytes(subject), Queue = Encoding.ASCII.GetBytes(queue), Qw = weight, Client = new NatsClient { Kind = ClientKind.Router } } + : NewSub(subject); + + private static SubscriptionIndex MakeSl(bool cache) => + cache ? SubscriptionIndex.NewSublistWithCache() : SubscriptionIndex.NewSublistNoCache(); + + private static void VerifyMember(List subs, Subscription expected) + { + subs.ShouldContain(expected, $"Subscription for [{Encoding.ASCII.GetString(expected.Subject)}] not found in results"); + } + + private static void VerifyQMember(List> qsubs, Subscription expected) + { + var idx = SubscriptionIndex.FindQSlot(expected.Queue, qsubs); + idx.ShouldBeGreaterThanOrEqualTo(0); + qsubs[idx].ShouldContain(expected); + } + + // ------------------------------------------------------------------------- + // Init & Count (T:2962-2964) + // ------------------------------------------------------------------------- + + [Fact] // T:2962 + public void Init_StartsWithZeroCount() + { + var s = SubscriptionIndex.NewSublistWithCache(); + s.Count().ShouldBe(0u); + } + + [Theory] // T:2963, T:2964 + [InlineData(true)] + [InlineData(false)] + public void InsertCount_ReturnsCorrectCount(bool cache) + { + var s = MakeSl(cache); + s.Insert(NewSub("foo")); + s.Insert(NewSub("bar")); + s.Insert(NewSub("foo.bar")); + s.Count().ShouldBe(3u); + } + + // ------------------------------------------------------------------------- + // Simple Match (T:2965-2968) + // ------------------------------------------------------------------------- + + [Theory] // T:2965, T:2966 + [InlineData(true)] + [InlineData(false)] + public void Simple_SingleTokenMatch(bool cache) + { + var s = MakeSl(cache); + var sub = NewSub("foo"); + s.Insert(sub); + var r = s.Match("foo"); + r.PSubs.Count.ShouldBe(1); + VerifyMember(r.PSubs, sub); + } + + [Theory] // T:2967, T:2968 + [InlineData(true)] + [InlineData(false)] + public void Simple_MultiTokenMatch(bool cache) + { + var s = MakeSl(cache); + var sub = NewSub("foo.bar.baz"); + s.Insert(sub); + var r = s.Match("foo.bar.baz"); + r.PSubs.Count.ShouldBe(1); + VerifyMember(r.PSubs, sub); + } + + // ------------------------------------------------------------------------- + // Wildcard Match (T:2969-2974) + // ------------------------------------------------------------------------- + + [Theory] // T:2969, T:2970 + [InlineData(true)] + [InlineData(false)] + public void PartialWildcard_MatchesCorrectly(bool cache) + { + var s = MakeSl(cache); + var lsub = NewSub("a.b.c"); + var psub = NewSub("a.*.c"); + s.Insert(lsub); + s.Insert(psub); + var r = s.Match("a.b.c"); + r.PSubs.Count.ShouldBe(2); + VerifyMember(r.PSubs, lsub); + VerifyMember(r.PSubs, psub); + } + + [Theory] // T:2971, T:2972 + [InlineData(true)] + [InlineData(false)] + public void PartialWildcardAtEnd_MatchesCorrectly(bool cache) + { + var s = MakeSl(cache); + var lsub = NewSub("a.b.c"); + var psub = NewSub("a.b.*"); + s.Insert(lsub); + s.Insert(psub); + var r = s.Match("a.b.c"); + r.PSubs.Count.ShouldBe(2); + VerifyMember(r.PSubs, lsub); + VerifyMember(r.PSubs, psub); + } + + [Theory] // T:2973, T:2974 + [InlineData(true)] + [InlineData(false)] + public void FullWildcard_MatchesAll(bool cache) + { + var s = MakeSl(cache); + var lsub = NewSub("a.b.c"); + var fsub = NewSub("a.>"); + s.Insert(lsub); + s.Insert(fsub); + var r = s.Match("a.b.c"); + r.PSubs.Count.ShouldBe(2); + VerifyMember(r.PSubs, lsub); + VerifyMember(r.PSubs, fsub); + + r = s.Match("a.>"); + r.PSubs.Count.ShouldBe(1); + VerifyMember(r.PSubs, fsub); + } + + // ------------------------------------------------------------------------- + // Remove (T:2975-2984) + // ------------------------------------------------------------------------- + + [Theory] // T:2975, T:2976 + [InlineData(true)] + [InlineData(false)] + public void Remove_BasicRemoval(bool cache) + { + var s = MakeSl(cache); + var sub = NewSub("a.b.c.d"); + s.Insert(sub); + s.Count().ShouldBe(1u); + s.Match("a.b.c.d").PSubs.Count.ShouldBe(1); + s.Remove(NewSub("a.b.c")); // wrong sub, no-op + s.Count().ShouldBe(1u); + s.Remove(sub); + s.Count().ShouldBe(0u); + s.Match("a.b.c.d").PSubs.Count.ShouldBe(0); + } + + [Theory] // T:2977, T:2978 + [InlineData(true)] + [InlineData(false)] + public void RemoveWildcard_CorrectCountsAndMatch(bool cache) + { + var s = MakeSl(cache); + var sub = NewSub("a.b.c.d"); + var psub = NewSub("a.b.*.d"); + var fsub = NewSub("a.b.>"); + s.Insert(sub); s.Insert(psub); s.Insert(fsub); + s.Count().ShouldBe(3u); + s.Match("a.b.c.d").PSubs.Count.ShouldBe(3); + s.Remove(sub); s.Count().ShouldBe(2u); + s.Remove(fsub); s.Count().ShouldBe(1u); + s.Remove(psub); s.Count().ShouldBe(0u); + s.Match("a.b.c.d").PSubs.Count.ShouldBe(0); + } + + [Theory] // T:2979, T:2980 + [InlineData(true)] + [InlineData(false)] + public void RemoveCleanup_LevelsReclaimed(bool cache) + { + var s = MakeSl(cache); + var sub = NewSub("a.b.c.d.e.f"); + s.NumLevels().ShouldBe(0); + s.Insert(sub); + s.NumLevels().ShouldBe(6); + s.Remove(sub); + s.NumLevels().ShouldBe(0); + } + + [Theory] // T:2981, T:2982 + [InlineData(true)] + [InlineData(false)] + public void RemoveCleanupWildcards_LevelsReclaimed(bool cache) + { + var s = MakeSl(cache); + var sub = NewSub("a.b.*.d.e.>"); + s.NumLevels().ShouldBe(0); + s.Insert(sub); + s.NumLevels().ShouldBe(6); + s.Remove(sub); + s.NumLevels().ShouldBe(0); + } + + [Theory] // T:2983, T:2984 + [InlineData(true)] + [InlineData(false)] + public void RemoveWithLargeSubs_CorrectAfterRemoval(bool cache) + { + var s = MakeSl(cache); + for (int i = 0; i < SubscriptionIndex.PlistMin * 2; i++) + s.Insert(NewSub("foo")); + var r = s.Match("foo"); + r.PSubs.Count.ShouldBe(SubscriptionIndex.PlistMin * 2); + + s.Remove(r.PSubs[SubscriptionIndex.PlistMin]); + s.Remove(r.PSubs[0]); + s.Remove(r.PSubs[^1]); + r = s.Match("foo"); + r.PSubs.Count.ShouldBe(SubscriptionIndex.PlistMin * 2 - 3); + } + + // ------------------------------------------------------------------------- + // Invalid Subjects (T:2985, T:2986) + // ------------------------------------------------------------------------- + + [Theory] // T:2985, T:2986 + [InlineData(true)] + [InlineData(false)] + public void InvalidSubjects_InsertReturnsError(bool cache) + { + var s = MakeSl(cache); + s.Insert(NewSub(".foo")).ShouldBe(SubscriptionIndex.ErrInvalidSubject); + s.Insert(NewSub("foo.")).ShouldBe(SubscriptionIndex.ErrInvalidSubject); + s.Insert(NewSub("foo..bar")).ShouldBe(SubscriptionIndex.ErrInvalidSubject); + s.Insert(NewSub("foo.bar..baz")).ShouldBe(SubscriptionIndex.ErrInvalidSubject); + s.Insert(NewSub("foo.>.bar")).ShouldBe(SubscriptionIndex.ErrInvalidSubject); + } + + // ------------------------------------------------------------------------- + // RemoveBatch (T:2987, T:2988) + // ------------------------------------------------------------------------- + + [Fact] // T:2987 + public void NoCacheRemoveBatch_DoesNotEnableCache() + { + var s = SubscriptionIndex.NewSublistNoCache(); + s.Insert(NewSub("foo")); + var sub = NewSub("bar"); + s.Insert(sub); + s.RemoveBatch(new[] { sub }); + for (int i = 0; i < 10; i++) s.Match("foo"); + s.CacheEnabled().ShouldBeFalse(); + } + + [Fact] // T:2988 + public void RemoveBatchWithError_StillRemovesPresent() + { + var s = SubscriptionIndex.NewSublistNoCache(); + var sub1 = NewSub("foo"); + var sub2 = NewSub("bar"); + var sub3 = NewSub("baz"); + s.Insert(sub1); s.Insert(sub2); s.Insert(sub3); + var notPresent = NewSub("not.inserted"); + var err = s.RemoveBatch(new[] { notPresent, sub1, sub3 }); + err.ShouldBe(SubscriptionIndex.ErrNotFound); + s.Count().ShouldBe(1u); + s.Match("bar").PSubs.Count.ShouldBe(1); + VerifyMember(s.Match("bar").PSubs, sub2); + s.Match("foo").PSubs.Count.ShouldBe(0); + s.Match("baz").PSubs.Count.ShouldBe(0); + } + + // ------------------------------------------------------------------------- + // Cache behavior (T:2989) + // ------------------------------------------------------------------------- + + [Fact] // T:2989 + public void Cache_BasicBehavior() + { + var s = SubscriptionIndex.NewSublistWithCache(); + var sub = NewSub("a.b.c.d"); + var psub = NewSub("a.b.*.d"); + var fsub = NewSub("a.b.>"); + + s.Insert(sub); + s.Match("a.b.c.d").PSubs.Count.ShouldBe(1); + s.Insert(psub); s.Insert(fsub); + s.Count().ShouldBe(3u); + s.Match("a.b.c.d").PSubs.Count.ShouldBe(3); + s.Remove(sub); s.Count().ShouldBe(2u); + s.Remove(fsub); s.Count().ShouldBe(1u); + s.Remove(psub); s.Count().ShouldBe(0u); + s.CacheCount().ShouldBe(0); + s.Match("a.b.c.d").PSubs.Count.ShouldBe(0); + + // Fill cache beyond max. + for (int i = 0; i < 2 * SubscriptionIndex.SlCacheMax; i++) + s.Match($"foo-{i}"); + + // Cache sweep runs async, wait briefly. + Thread.Sleep(200); + s.CacheCount().ShouldBeLessThanOrEqualTo(SubscriptionIndex.SlCacheMax); + + // Test wildcard cache update. + s = SubscriptionIndex.NewSublistWithCache(); + s.Insert(NewSub("foo.*")); + s.Insert(NewSub("foo.bar")); + s.Match("foo.baz").PSubs.Count.ShouldBe(1); + s.Match("foo.bar").PSubs.Count.ShouldBe(2); + s.Insert(NewSub("foo.>")); + s.Match("foo.bar").PSubs.Count.ShouldBe(3); + } + + // ------------------------------------------------------------------------- + // Queue Results (T:2990, T:2991) + // ------------------------------------------------------------------------- + + [Theory] // T:2990, T:2991 + [InlineData(true)] + [InlineData(false)] + public void BasicQueueResults_CorrectGrouping(bool cache) + { + var s = MakeSl(cache); + var sub = NewSub("foo"); + var sub1 = NewQSub("foo", "bar"); + var sub2 = NewQSub("foo", "baz"); + + s.Insert(sub1); + var r = s.Match("foo"); + r.PSubs.Count.ShouldBe(0); + r.QSubs.Count.ShouldBe(1); + r.QSubs[0].Count.ShouldBe(1); + VerifyQMember(r.QSubs, sub1); + + s.Insert(sub2); + r = s.Match("foo"); + r.PSubs.Count.ShouldBe(0); + r.QSubs.Count.ShouldBe(2); + VerifyQMember(r.QSubs, sub1); + VerifyQMember(r.QSubs, sub2); + + s.Insert(sub); + r = s.Match("foo"); + r.PSubs.Count.ShouldBe(1); + r.QSubs.Count.ShouldBe(2); + + var sub3 = NewQSub("foo", "bar"); + var sub4 = NewQSub("foo", "baz"); + s.Insert(sub3); s.Insert(sub4); + r = s.Match("foo"); + r.PSubs.Count.ShouldBe(1); + r.QSubs.Count.ShouldBe(2); + + // Verify each group has 2 members. + var barIdx = SubscriptionIndex.FindQSlot(Encoding.ASCII.GetBytes("bar"), r.QSubs); + var bazIdx = SubscriptionIndex.FindQSlot(Encoding.ASCII.GetBytes("baz"), r.QSubs); + r.QSubs[barIdx].Count.ShouldBe(2); + r.QSubs[bazIdx].Count.ShouldBe(2); + + // Removal. + s.Remove(sub); + r = s.Match("foo"); + r.PSubs.Count.ShouldBe(0); + r.QSubs.Count.ShouldBe(2); + + s.Remove(sub1); + r = s.Match("foo"); + barIdx = SubscriptionIndex.FindQSlot(Encoding.ASCII.GetBytes("bar"), r.QSubs); + bazIdx = SubscriptionIndex.FindQSlot(Encoding.ASCII.GetBytes("baz"), r.QSubs); + r.QSubs[barIdx].Count.ShouldBe(1); + r.QSubs[bazIdx].Count.ShouldBe(2); + + s.Remove(sub3); // last bar + r = s.Match("foo"); + r.QSubs.Count.ShouldBe(1); // only baz remains + + s.Remove(sub2); s.Remove(sub4); + r = s.Match("foo"); + r.PSubs.Count.ShouldBe(0); + r.QSubs.Count.ShouldBe(0); + } + + // ------------------------------------------------------------------------- + // Subject Validation (T:2992-2997) + // ------------------------------------------------------------------------- + + [Fact] // T:2992 + public void ValidLiteralSubjects_CorrectResults() + { + SubscriptionIndex.IsValidLiteralSubject("foo").ShouldBeTrue(); + SubscriptionIndex.IsValidLiteralSubject(".foo").ShouldBeFalse(); + SubscriptionIndex.IsValidLiteralSubject("foo.").ShouldBeFalse(); + SubscriptionIndex.IsValidLiteralSubject("foo..bar").ShouldBeFalse(); + SubscriptionIndex.IsValidLiteralSubject("foo.bar.*").ShouldBeFalse(); + SubscriptionIndex.IsValidLiteralSubject("foo.bar.>").ShouldBeFalse(); + SubscriptionIndex.IsValidLiteralSubject("*").ShouldBeFalse(); + SubscriptionIndex.IsValidLiteralSubject(">").ShouldBeFalse(); + // Wildcard chars that aren't standalone tokens. + SubscriptionIndex.IsValidLiteralSubject("foo*").ShouldBeTrue(); + SubscriptionIndex.IsValidLiteralSubject("foo**").ShouldBeTrue(); + SubscriptionIndex.IsValidLiteralSubject("foo.**").ShouldBeTrue(); + SubscriptionIndex.IsValidLiteralSubject("foo*bar").ShouldBeTrue(); + SubscriptionIndex.IsValidLiteralSubject("foo.*bar").ShouldBeTrue(); + SubscriptionIndex.IsValidLiteralSubject("foo*.bar").ShouldBeTrue(); + SubscriptionIndex.IsValidLiteralSubject("*bar").ShouldBeTrue(); + SubscriptionIndex.IsValidLiteralSubject("foo>").ShouldBeTrue(); + SubscriptionIndex.IsValidLiteralSubject("foo>>").ShouldBeTrue(); + SubscriptionIndex.IsValidLiteralSubject("foo.>>").ShouldBeTrue(); + SubscriptionIndex.IsValidLiteralSubject("foo>bar").ShouldBeTrue(); + SubscriptionIndex.IsValidLiteralSubject("foo.>bar").ShouldBeTrue(); + SubscriptionIndex.IsValidLiteralSubject("foo>.bar").ShouldBeTrue(); + SubscriptionIndex.IsValidLiteralSubject(">bar").ShouldBeTrue(); + } + + [Fact] // T:2993 + public void ValidSubjects_CorrectResults() + { + SubscriptionIndex.IsValidSubject(".").ShouldBeFalse(); + SubscriptionIndex.IsValidSubject(".foo").ShouldBeFalse(); + SubscriptionIndex.IsValidSubject("foo.").ShouldBeFalse(); + SubscriptionIndex.IsValidSubject("foo..bar").ShouldBeFalse(); + SubscriptionIndex.IsValidSubject(">.bar").ShouldBeFalse(); + SubscriptionIndex.IsValidSubject("foo.>.bar").ShouldBeFalse(); + SubscriptionIndex.IsValidSubject("foo").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject("foo.bar.*").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject("foo.bar.>").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject("*").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject(">").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject("foo*").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject("foo**").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject("foo.**").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject("foo*bar").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject("foo.*bar").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject("foo*.bar").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject("*bar").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject("foo>").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject("foo.>>").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject("foo>bar").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject("foo.>bar").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject("foo>.bar").ShouldBeTrue(); + SubscriptionIndex.IsValidSubject(">bar").ShouldBeTrue(); + } + + [Fact] // T:2994 + public void MatchLiterals_CorrectResults() + { + SubscriptionIndex.MatchLiteral("foo", "foo").ShouldBeTrue(); + SubscriptionIndex.MatchLiteral("foo", "bar").ShouldBeFalse(); + SubscriptionIndex.MatchLiteral("foo", "*").ShouldBeTrue(); + SubscriptionIndex.MatchLiteral("foo", ">").ShouldBeTrue(); + SubscriptionIndex.MatchLiteral("foo.bar", ">").ShouldBeTrue(); + SubscriptionIndex.MatchLiteral("foo.bar", "foo.>").ShouldBeTrue(); + SubscriptionIndex.MatchLiteral("foo.bar", "bar.>").ShouldBeFalse(); + SubscriptionIndex.MatchLiteral("stats.test.22", "stats.>").ShouldBeTrue(); + SubscriptionIndex.MatchLiteral("stats.test.22", "stats.*.*").ShouldBeTrue(); + SubscriptionIndex.MatchLiteral("foo.bar", "foo").ShouldBeFalse(); + SubscriptionIndex.MatchLiteral("stats.test.foos", "stats.test.foos").ShouldBeTrue(); + SubscriptionIndex.MatchLiteral("stats.test.foos", "stats.test.foo").ShouldBeFalse(); + SubscriptionIndex.MatchLiteral("stats.test", "stats.test.*").ShouldBeFalse(); + SubscriptionIndex.MatchLiteral("stats.test.foos", "stats.*").ShouldBeFalse(); + SubscriptionIndex.MatchLiteral("stats.test.foos", "stats.*.*.foos").ShouldBeFalse(); + // Wildcards as non-token literals. + SubscriptionIndex.MatchLiteral("*bar", "*bar").ShouldBeTrue(); + SubscriptionIndex.MatchLiteral("foo*", "foo*").ShouldBeTrue(); + SubscriptionIndex.MatchLiteral("foo*bar", "foo*bar").ShouldBeTrue(); + SubscriptionIndex.MatchLiteral("foo.***.bar", "foo.***.bar").ShouldBeTrue(); + SubscriptionIndex.MatchLiteral(">bar", ">bar").ShouldBeTrue(); + SubscriptionIndex.MatchLiteral("foo>", "foo>").ShouldBeTrue(); + SubscriptionIndex.MatchLiteral("foo>bar", "foo>bar").ShouldBeTrue(); + SubscriptionIndex.MatchLiteral("foo.>>>.bar", "foo.>>>.bar").ShouldBeTrue(); + } + + [Fact] // T:2995 + public void SubjectIsLiteral_CorrectResults() + { + SubscriptionIndex.SubjectIsLiteral("foo").ShouldBeTrue(); + SubscriptionIndex.SubjectIsLiteral("foo.bar").ShouldBeTrue(); + SubscriptionIndex.SubjectIsLiteral("foo*.bar").ShouldBeTrue(); + SubscriptionIndex.SubjectIsLiteral("*").ShouldBeFalse(); + SubscriptionIndex.SubjectIsLiteral(">").ShouldBeFalse(); + SubscriptionIndex.SubjectIsLiteral("foo.*").ShouldBeFalse(); + SubscriptionIndex.SubjectIsLiteral("foo.>").ShouldBeFalse(); + SubscriptionIndex.SubjectIsLiteral("foo.*.>").ShouldBeFalse(); + SubscriptionIndex.SubjectIsLiteral("foo.*.bar").ShouldBeFalse(); + SubscriptionIndex.SubjectIsLiteral("foo.bar.>").ShouldBeFalse(); + } + + [Fact] // T:2997 + public void TokenAt_ReturnsCorrectTokens() + { + SubscriptionIndex.TokenAt("foo.bar.baz.*", 0).ShouldBe(string.Empty); + SubscriptionIndex.TokenAt("foo.bar.baz.*", 1).ShouldBe("foo"); + SubscriptionIndex.TokenAt("foo.bar.baz.*", 2).ShouldBe("bar"); + SubscriptionIndex.TokenAt("foo.bar.baz.*", 3).ShouldBe("baz"); + SubscriptionIndex.TokenAt("foo.bar.baz.*", 4).ShouldBe("*"); + SubscriptionIndex.TokenAt("foo.bar.baz.*", 5).ShouldBe(string.Empty); + } + + // ------------------------------------------------------------------------- + // Bad Subject on Remove (T:2998, T:2999) + // ------------------------------------------------------------------------- + + [Theory] // T:2998, T:2999 + [InlineData(true)] + [InlineData(false)] + public void BadSubjectOnRemove_ReturnsError(bool cache) + { + var s = MakeSl(cache); + s.Insert(NewSub("a.b..d")).ShouldBe(SubscriptionIndex.ErrInvalidSubject); + s.Remove(NewSub("a.b..d")).ShouldBe(SubscriptionIndex.ErrInvalidSubject); + s.Remove(NewSub("a.>.b")).ShouldBe(SubscriptionIndex.ErrInvalidSubject); + } + + // ------------------------------------------------------------------------- + // Two-token pub vs single-token sub (T:3000, T:3001) + // ------------------------------------------------------------------------- + + [Theory] // T:3000, T:3001 + [InlineData(true)] + [InlineData(false)] + public void TwoTokenPub_DoesNotMatchSingleTokenSub(bool cache) + { + var s = MakeSl(cache); + var sub = NewSub("foo"); + s.Insert(sub); + s.Match("foo").PSubs.Count.ShouldBe(1); + s.Match("foo.bar").PSubs.Count.ShouldBe(0); + } + + // ------------------------------------------------------------------------- + // Wildcards as literals (T:3002-3005) + // ------------------------------------------------------------------------- + + [Theory] // T:3002, T:3003 + [InlineData(true)] + [InlineData(false)] + public void InsertWithWildcardsAsLiterals_OnlyExactMatch(bool cache) + { + var s = MakeSl(cache); + foreach (var subject in new[] { "foo.*-", "foo.>-" }) + { + var sub = NewSub(subject); + s.Insert(sub); + s.Match("foo.bar").PSubs.Count.ShouldBe(0); + s.Match(subject).PSubs.Count.ShouldBe(1); + } + } + + [Theory] // T:3004, T:3005 + [InlineData(true)] + [InlineData(false)] + public void RemoveWithWildcardsAsLiterals_CorrectRemoval(bool cache) + { + var s = MakeSl(cache); + foreach (var subject in new[] { "foo.*-", "foo.>-" }) + { + var sub = NewSub(subject); + s.Insert(sub); + s.Remove(NewSub("foo.bar")); // wrong subject, no-op + s.Count().ShouldBe(1u); + s.Remove(sub); + s.Count().ShouldBe(0u); + } + } + + // ------------------------------------------------------------------------- + // Race tests (T:3006-3010) + // ------------------------------------------------------------------------- + + [Theory] // T:3006, T:3007 + [InlineData(true)] + [InlineData(false)] + public void RaceOnRemove_NoCorruption(bool cache) + { + var s = MakeSl(cache); + var subs = new Subscription[100]; + for (int i = 0; i < subs.Length; i++) + subs[i] = NewQSub("foo", "bar"); + + for (int iter = 0; iter < 2; iter++) + { + foreach (var sub in subs) s.Insert(sub); + if (iter == 1) s.Match("foo"); + var r = s.Match("foo"); + var task = Task.Run(() => { foreach (var sub in subs) s.Remove(sub); }); + foreach (var qsub in r.QSubs) + for (int i = 0; i < qsub.Count; i++) + Encoding.ASCII.GetString(qsub[i].Queue!).ShouldBe("bar"); + task.Wait(); + } + } + + [Theory] // T:3008, T:3009 + [InlineData(true)] + [InlineData(false)] + public void RaceOnInsert_NoCorruption(bool cache) + { + var s = MakeSl(cache); + var subs = new Subscription[100]; + for (int i = 0; i < subs.Length; i++) + subs[i] = NewQSub("foo", "bar"); + + var task = Task.Run(() => { foreach (var sub in subs) s.Insert(sub); }); + for (int i = 0; i < 1000; i++) + { + var r = s.Match("foo"); + foreach (var qsub in r.QSubs) + foreach (var sub in qsub) + Encoding.ASCII.GetString(sub.Queue!).ShouldBe("bar"); + } + task.Wait(); + } + + [Fact] // T:3010 + public void RaceOnMatch_NoCorruption() + { + var s = SubscriptionIndex.NewSublistNoCache(); + s.Insert(NewQSub("foo.*", "workers")); + s.Insert(NewQSub("foo.bar", "workers")); + s.Insert(NewSub("foo.*")); + s.Insert(NewSub("foo.bar")); + + Exception? error = null; + var tasks = Enumerable.Range(0, 2).Select(_ => Task.Run(() => + { + for (int i = 0; i < 10; i++) + { + var r = s.Match("foo.bar"); + foreach (var sub in r.PSubs) + if (!Encoding.ASCII.GetString(sub.Subject).StartsWith("foo.")) + Interlocked.CompareExchange(ref error, new Exception($"Wrong subject: {Encoding.ASCII.GetString(sub.Subject)}"), null); + foreach (var qsub in r.QSubs) + foreach (var sub in qsub) + if (Encoding.ASCII.GetString(sub.Queue!) != "workers") + Interlocked.CompareExchange(ref error, new Exception($"Wrong queue: {Encoding.ASCII.GetString(sub.Queue!)}"), null); + } + })).ToArray(); + Task.WaitAll(tasks); + error.ShouldBeNull(); + } + + // ------------------------------------------------------------------------- + // Remote Queue Subscriptions (T:3011, T:3012) + // ------------------------------------------------------------------------- + + [Theory] // T:3011, T:3012 + [InlineData(true)] + [InlineData(false)] + public void RemoteQueueSubscriptions_WeightExpansion(bool cache) + { + var s = MakeSl(cache); + var s1 = NewQSub("foo", "bar"); + var s2 = NewQSub("foo", "bar"); + s.Insert(s1); s.Insert(s2); + + var rs1 = NewRemoteQSub("foo", "bar", 10); + var rs2 = NewRemoteQSub("foo", "bar", 10); + s.Insert(rs1); s.Insert(rs2); + s.Count().ShouldBe(4u); + + var r = s.Match("foo"); + r.PSubs.Count.ShouldBe(0); + r.QSubs.Count.ShouldBe(1); + r.QSubs[0].Count.ShouldBe(22); // 2 normal + 10 + 10 remote + + s.Remove(s1); s.Remove(rs1); + s.Count().ShouldBe(2u); + r = s.Match("foo"); + r.QSubs[0].Count.ShouldBe(11); // 1 normal + 10 remote + + Interlocked.Exchange(ref rs2.Qw, 1); + s.UpdateRemoteQSub(rs2); + r = s.Match("foo"); + r.QSubs[0].Count.ShouldBe(2); // 1 normal + 1 remote + } + + // ------------------------------------------------------------------------- + // Shared Empty Result (T:3013) + // ------------------------------------------------------------------------- + + [Fact] // T:3013 + public void SharedEmptyResult_SameReference() + { + var s = SubscriptionIndex.NewSublistWithCache(); + var r1 = s.Match("foo"); + var r2 = s.Match("bar"); + r1.PSubs.Count.ShouldBe(0); + r2.QSubs.Count.ShouldBe(0); + ReferenceEquals(r1, r2).ShouldBeTrue(); + } + + // ------------------------------------------------------------------------- + // No-cache Stats (T:3014) + // ------------------------------------------------------------------------- + + [Fact] // T:3014 + public void NoCacheStats_CacheCountIsZero() + { + var s = SubscriptionIndex.NewSublistNoCache(); + s.Insert(NewSub("foo")); s.Insert(NewSub("bar")); s.Insert(NewSub("baz")); s.Insert(NewSub("foo.bar.baz")); + s.Match("a.b.c"); s.Match("bar"); + s.Stats().NumCache.ShouldBe(0u); + } + + // ------------------------------------------------------------------------- + // All (T:3015) + // ------------------------------------------------------------------------- + + [Fact] // T:3015 + public void All_CollectsAllSubscriptions() + { + var s = SubscriptionIndex.NewSublistNoCache(); + var subs = new[] + { + NewSub("foo.bar.baz"), + NewSub("foo"), + NewSub("baz"), + }; + subs[0].Client!.Kind = ClientKind.Leaf; + foreach (var sub in subs) s.Insert(sub); + + var output = new List(); + s.All(output); + output.Count.ShouldBe(3); + } + + // ------------------------------------------------------------------------- + // IsSubsetMatch (T:3016) + // ------------------------------------------------------------------------- + + [Theory] // T:3016 + [InlineData("foo.bar", "foo.bar", true)] + [InlineData("foo.*", ">", true)] + [InlineData("foo.*", "*.*", true)] + [InlineData("foo.*", "foo.*", true)] + [InlineData("foo.*", "foo.bar", false)] + [InlineData("foo.>", ">", true)] + [InlineData("foo.>", "*.>", true)] + [InlineData("foo.>", "foo.>", true)] + [InlineData("foo.>", "foo.bar", false)] + [InlineData("foo..bar", "foo.*", false)] + [InlineData("foo.*", "foo..bar", false)] + public void IsSubsetMatch_CorrectResults(string subject, string test, bool expected) + { + SubscriptionIndex.SubjectIsSubsetMatch(subject, test).ShouldBe(expected); + } + + // ------------------------------------------------------------------------- + // ReverseMatch (T:3018, T:3019) + // ------------------------------------------------------------------------- + + [Fact] // T:3018 + public void ReverseMatch_FindsMatchingSubscriptions() + { + var s = SubscriptionIndex.NewSublistWithCache(); + var fooSub = NewSub("foo"); + var barSub = NewSub("bar"); + var fooBarSub = NewSub("foo.bar"); + var fooBazSub = NewSub("foo.baz"); + var fooBarBazSub = NewSub("foo.bar.baz"); + s.Insert(fooSub); s.Insert(barSub); s.Insert(fooBarSub); s.Insert(fooBazSub); s.Insert(fooBarBazSub); + + var r = s.ReverseMatch("foo"); + r.PSubs.Count.ShouldBe(1); VerifyMember(r.PSubs, fooSub); + + r = s.ReverseMatch("bar"); + r.PSubs.Count.ShouldBe(1); VerifyMember(r.PSubs, barSub); + + r = s.ReverseMatch("*"); + r.PSubs.Count.ShouldBe(2); + VerifyMember(r.PSubs, fooSub); VerifyMember(r.PSubs, barSub); + + r = s.ReverseMatch("baz"); + r.PSubs.Count.ShouldBe(0); + + r = s.ReverseMatch("foo.*"); + r.PSubs.Count.ShouldBe(2); + VerifyMember(r.PSubs, fooBarSub); VerifyMember(r.PSubs, fooBazSub); + + r = s.ReverseMatch("*.*"); + r.PSubs.Count.ShouldBe(2); + + r = s.ReverseMatch("*.bar"); + r.PSubs.Count.ShouldBe(1); VerifyMember(r.PSubs, fooBarSub); + + r = s.ReverseMatch("bar.*"); + r.PSubs.Count.ShouldBe(0); + + r = s.ReverseMatch("foo.>"); + r.PSubs.Count.ShouldBe(3); + + r = s.ReverseMatch(">"); + r.PSubs.Count.ShouldBe(5); + } + + [Fact] // T:3019 + public void ReverseMatchWider_MatchesWiderFilter() + { + var s = SubscriptionIndex.NewSublistWithCache(); + var sub = NewSub("uplink.*.*.>"); + s.Insert(sub); + + var r = s.ReverseMatch("uplink.1.*.*.>"); + r.PSubs.Count.ShouldBe(1); + VerifyMember(r.PSubs, sub); + + r = s.ReverseMatch("uplink.1.2.3.>"); + r.PSubs.Count.ShouldBe(1); + VerifyMember(r.PSubs, sub); + } + + // ------------------------------------------------------------------------- + // Match with empty tokens (T:3020) + // ------------------------------------------------------------------------- + + [Fact] // T:3020 + public void MatchWithEmptyTokens_ReturnsNoResults() + { + var sl = SubscriptionIndex.NewSublistWithCache(); + sl.Insert(NewSub(">")); + sl.Insert(NewQSub(">", "queue")); + + foreach (var subj in new[] { ".foo", "..foo", "foo..", "foo.", "foo..bar", "foo...bar" }) + { + var r = sl.Match(subj); + r.PSubs.Count.ShouldBe(0); + r.QSubs.Count.ShouldBe(0); + } + } + + // ------------------------------------------------------------------------- + // SubjectsCollide (T:3021) + // ------------------------------------------------------------------------- + + [Fact] // T:3021 + public void SubjectsCollide_CorrectResults() + { + SubscriptionIndex.SubjectsCollide("foo.*", "foo.*.bar.>").ShouldBeFalse(); + SubscriptionIndex.SubjectsCollide("foo.*.bar.>", "foo.*").ShouldBeFalse(); + SubscriptionIndex.SubjectsCollide("foo.*", "foo.foo").ShouldBeTrue(); + SubscriptionIndex.SubjectsCollide("foo.*", "*.foo").ShouldBeTrue(); + SubscriptionIndex.SubjectsCollide("foo.bar.>", "*.bar.foo").ShouldBeTrue(); + } + + // ------------------------------------------------------------------------- + // Cache hit rate (T:3022) + // ------------------------------------------------------------------------- + + [Fact] // T:3022 + public void AddCacheHitRate_CorrectAggregation() + { + var sl1 = SubscriptionIndex.NewSublistWithCache(); + sl1.Insert(NewSub("foo")); + for (int i = 0; i < 4; i++) sl1.Match("foo"); + var stats1 = sl1.Stats(); + stats1.CacheHitRate.ShouldBe(0.75); + + var sl2 = SubscriptionIndex.NewSublistWithCache(); + sl2.Insert(NewSub("bar")); + for (int i = 0; i < 4; i++) sl2.Match("bar"); + var stats2 = sl2.Stats(); + stats2.CacheHitRate.ShouldBe(0.75); + + var ts = new SublistStats(); + ts.Add(stats1); + ts.Add(stats2); + ts.CacheHitRate.ShouldBe(0.75); + } + + // ------------------------------------------------------------------------- + // HasInterest (T:3024) + // ------------------------------------------------------------------------- + + [Fact] // T:3024 + public void HasInterest_CorrectForLiteralsAndWildcards() + { + var sl = SubscriptionIndex.NewSublistWithCache(); + var fooSub = NewSub("foo"); + sl.Insert(fooSub); + + sl.HasInterest("foo").ShouldBeTrue(); + sl.HasInterest("bar").ShouldBeFalse(); + + sl.Remove(fooSub); + sl.HasInterest("foo").ShouldBeFalse(); + + // Partial wildcard. + var sub = NewSub("foo.*"); + sl.Insert(sub); + sl.HasInterest("foo").ShouldBeFalse(); + sl.HasInterest("foo.bar").ShouldBeTrue(); + sl.HasInterest("foo.bar.baz").ShouldBeFalse(); + sl.Remove(sub); + + // Full wildcard. + sub = NewSub("foo.>"); + sl.Insert(sub); + sl.HasInterest("foo").ShouldBeFalse(); + sl.HasInterest("foo.bar").ShouldBeTrue(); + sl.HasInterest("foo.bar.baz").ShouldBeTrue(); + sl.Remove(sub); + + // Queue subs. + var qsub = NewQSub("foo", "bar"); + sl.Insert(qsub); + sl.HasInterest("foo").ShouldBeTrue(); + sl.HasInterest("foo.bar").ShouldBeFalse(); + sl.Remove(qsub); + sl.HasInterest("foo").ShouldBeFalse(); + } + + // ------------------------------------------------------------------------- + // HasInterest Overlapping (T:3025) + // ------------------------------------------------------------------------- + + [Fact] // T:3025 + public void HasInterestOverlapping_BothMatch() + { + var sl = SubscriptionIndex.NewSublistWithCache(); + sl.Insert(NewSub("stream.A.child")); + sl.Insert(NewSub("stream.*")); + sl.HasInterest("stream.A.child").ShouldBeTrue(); + sl.HasInterest("stream.A").ShouldBeTrue(); + } + + // ------------------------------------------------------------------------- + // NumInterest (T:3026) + // ------------------------------------------------------------------------- + + [Fact] // T:3026 + public void NumInterest_CorrectCounts() + { + var sl = SubscriptionIndex.NewSublistWithCache(); + sl.Insert(NewSub("foo")); + var (np, nq) = sl.NumInterest("foo"); + np.ShouldBe(1); + nq.ShouldBe(0); + + (np, nq) = sl.NumInterest("bar"); + np.ShouldBe(0); + nq.ShouldBe(0); + + // Add queue sub. + sl.Insert(NewQSub("foo", "q1")); + (np, nq) = sl.NumInterest("foo"); + np.ShouldBe(1); + nq.ShouldBe(1); + } + + // ------------------------------------------------------------------------- + // Notifications (T:3017) — simplified from the large Go test + // ------------------------------------------------------------------------- + + [Fact] // T:3017 + public void RegisterInterestNotification_BasicFlow() + { + var s = SubscriptionIndex.NewSublistWithCache(); + var notifications = new List(); + Action notify = b => notifications.Add(b); + + // Wildcards should be rejected. + s.RegisterNotification("foo.*", notify).ShouldBe(SubscriptionIndex.ErrInvalidSubject); + s.RegisterNotification(">", notify).ShouldBe(SubscriptionIndex.ErrInvalidSubject); + + // Register for "foo" — no existing interest, should get false. + s.RegisterNotification("foo", notify).ShouldBeNull(); + notifications.Count.ShouldBe(1); + notifications[0].ShouldBeFalse(); + notifications.Clear(); + + // Insert interest — should get true. + var sub = NewSub("foo"); + s.Insert(sub); + notifications.Count.ShouldBe(1); + notifications[0].ShouldBeTrue(); + notifications.Clear(); + + // Insert a second — no notification (already have interest). + var sub2 = NewSub("foo"); + s.Insert(sub2); + notifications.Count.ShouldBe(0); + + // Remove all interest — should get false. + s.Remove(sub); + s.Remove(sub2); + notifications.Count.ShouldBe(1); + notifications[0].ShouldBeFalse(); + notifications.Clear(); + + // Clear should return true. + s.ClearNotification("foo", notify).ShouldBeTrue(); + + // Clear again should return false (already cleared). + s.ClearNotification("foo", notify).ShouldBeFalse(); + } + + [Fact] // T:3017 (queue notification portion) + public void RegisterQueueNotification_BasicFlow() + { + var s = SubscriptionIndex.NewSublistWithCache(); + var notifications = new List(); + Action notify = b => notifications.Add(b); + + s.RegisterQueueNotification("foo.bar", "q1", notify).ShouldBeNull(); + notifications.Count.ShouldBe(1); + notifications[0].ShouldBeFalse(); + notifications.Clear(); + + var qsub = NewQSub("foo.bar", "q1"); + s.Insert(qsub); + notifications.Count.ShouldBe(1); + notifications[0].ShouldBeTrue(); + notifications.Clear(); + + s.Remove(qsub); + notifications.Count.ShouldBe(1); + notifications[0].ShouldBeFalse(); + notifications.Clear(); + + s.ClearQueueNotification("foo.bar", "q1", notify).ShouldBeTrue(); + } +} diff --git a/porting.db b/porting.db index ea5a0b70e0891b39ecc45c428b573315da49689d..7f3048b8d8e936b1403181d22556ccb9dc9fb83c 100644 GIT binary patch delta 15885 zcmc(GcYGA(_WsVy&dj!XXVbHjWK))qM%W~qkZd53(0k~;1VZm!h(g?`fS^|_aA4zA zY}bN91a%d?iWNZwt|BVZt~5au@Lm;B^mpFb+06{!`~MF<%V*9r&pGEU=e+N{g*#7g zV(!FWOK>~WaaZ&9S|)LeN&VxIxHPtY{@ev~rY@N(&D`0-WXSSSQw(Xw*UU5Wf8>+9 zoQHRjBu7`2A9QjacHaCY6BjR;v}noHDfZZhkDdH0$3kJ`-@N6u=>~#-ShzTL-o#1v ze>a_e?KHzw3zp6d!&ZjG40a#W$p6Hj<&W|o^Se0*`zL#q{gyq#e#E}czF;Y`WLe@Z zf_PoLBpw$JihIPJ;?v?Lajn>3nPC~D{`y-wtM$C4Fsm>rxG)NN+()dmWahyRKYmmkmq|w+{sn9i-&6aQJq0q8icD>jr_G9AN zYEJl7xF~!hd@8&nyevG$e#ahT53p~uE$ox*BkXE+IXjCT$KAu-#?9o$as$M3;+Nt+ z@lEk1@p17%@h)~C*M%$QJeFP-zoo#EW|1w7_#ZAoTq;htG+Gu~>Mg_BYH_q!C3?k7 zF;3*oUzqor-!N}$H2>ZFfH`1ZY@TW!Y3^h0Xf8CTn`6uz_cM2a`c%J<;Q`8+;_ zm&|{eubIzqX7(5PC;6;=RQ_1rEx#x~CU20NNXv|oy43J;i9uRykQN!Fg^*strZ*a}M-0+^25G%PT4#{%G)P99 zs|@c}8l*cUgLoPma@ds+%3|k3eioYwzvQqzLm8#)D~vHQ78um#8>D##X|6$L6uH z@zQ#!L7FLzlCDXor9;x&(o52#WCfW)Mv|VSjI<$fgoTU^=J-_6DqNRO%Ad+_$=l?h ze6PG*o-U7&d&s4HEi4$s&)T?zI~sbL;fC>WeVS8-E1$}qP`=0w;FW`3%T^!bL(Is)?0>!8Ifa>S~BZ{U+q zVtt$&!Zt9^TP{Lz9a9k+*Fp*zv-pH51MYvBq{@K-!C^5bribKBDPx&5*!(gX0)cHL z1ByN(1P;7P;=#L<)@fr2)FGU5w5R1up~GLx!!=wD|xBi>2Hv` zboG$lA~wk0P1lQp+0w0V={y2DlK5U39hM1{T*^ zdvpp1-iuVBv?9m`;g>n z)bc>jU>#EP>b)s0N`3J|GAUYepH{*0rmEmXtl)qE(yAz};Un@P{4_iquDvu`2c%6f z(B9rlX+y%aMA-c?Db~1Uf!l)Pk?RjIUdc~QvinFN>Y2bR`$$=oTOS;UTyMbYO-V=Y zrv0QL8s{;vZ9mD?sHIKj;8>(~4CH8icWFP_7;fLKpODTP&DX>Q$EdVyjrQ{=$PbRjCMuM?QXwtqp9e_CaC{E>9J6fpr=-0`EeUiFjzVg*b9a1-olD1= zO&G#zDXp17Zed`dR*A>|MP3S*>@1Gk zt3M-cU14f@rLQ6|vDt)$E1vC@imBoEJ|pv>q#RGd?HzicV!hun`?(29mUx zd;W8>B;1|VM@YHG>_Ck#3$8+@H{cC-Cj9LPxnEc8{$G$>jb29SLxU@ko`Z3cl0*x$ z{tGOO+NNi*O$&|^kDfg_cssI-DO(NwsYl83=wK2H@|Pr6t7s`T-Zj{O70sEX*&gsE znE*R?2wI|?_>y=uwvV!h2Uj55KiHe1CSc-MWJb7(>MVl8LZQYkq3kiirO5Wp^`>}e zxqteKEHJVK*zz@Lqp1{8m8#$pRLMgJYBGNQH5sLA8FU~(>FXq&R=A?F%uiM7f{U@j zc>!M2*?f#Fy46j|U`GyhgE~07RRjhF7owZYm0DM=`i4x`sU#jJc^P508VvctMaYhX zf4R3%9jf0s$t1zz<0MU|`R;Mz)%!^b-VxT!j-xFgoFJ89cKr!bpw%0@p*&C>Tu$pf z+nba|iBF#(Q|Yjg26-n*JFR<5DoaW*Yi`5ZW(VxvgcQo0bCN8CMPt*_;O0q^ubC_k zGnb*sg4?}Gbi{zFQ)E63;WRjKiWJ6%1Gfl?G`nQel6RQL>(OcmBr;viOo*r<^+~{r5x&O>nAywf* z6{}4-zFEdXm5udE>SxH}Gh|knh$BfdB%F;@wwNYLQ&KZQyJdj^8hzAR@{}$d)HL*+ zBkeNP+7#1lY2uqD)Ts)@cqKYemwT(!-r9bInFJ@#k!)RWm7b5}rK%d3+w4Hqlt3^1 zB%#%4I!{*X=>7|lF``&aovLO#)+K$sS5g~z=>;-eH%9z^A(F)^yNPMGq49!17q6sF zsg>W6hr-QymL!1rB2fbrn^R4++GZ<_hkCE1CR)u!GFexWBNs_wI4FwMM4Q)aLEAZj zY_0v1FX2oQ`3#DI{gvm0DS71~<8zd6hj}WB2_5W1;oS@z={(+qw3bbS)~>tZ&zBrnSZS zMXl|;A4z$ugM(Ly1J3?Pa){zvut!--Q3^KD6K(*}ykSwcI}a$NxG6Ue^QnBJivpSc||WJ+KCWd-T9+1eWN5 zdl0DC19u}ZgaVD)-l23CA~kwS69PqgAb>!+9=H<$q6e@t>`gte3W3XdU?l?I=z+#N z5ZSLsZb#q^JJiA)1CtSO>VZj8t?4g4 zP=~;edSD^~Ct88VDbfT)3`^$m2pE>k;}9?`na3hvSTc`6U~Q|_Dbi>JZqoy!5SXS1 zMj|kLUboimb7=%3-Sw2=2>A5CFa)ynz)%EYLT}w553?+JB{cXZsb?DHd8QUU=}`n8 z*8|N6{7nx85m>1QwjeM^4{SzYtR4Ua`s#sA2>2-wP3et@V$@DKub>VXFln6C#mATU7>JTMKB0ea+b2z1f|bR)Gu58Q`-*0B4TGvra}IREXd$?_+RGf>~0i1R{podmBja)b7P!ODKR z_V8Dh6b1av2{_?Y;o3u8&}?PpzR~$I7E(D`UC^l1qULy{mLpaDw5j9d?z)-y3r=nq zUSbscn+lp^(Rp?gPQ&UBL$XeIcyP8kd6Bo z;hl^_Hn|EODo9CF*UVk)a{KVKs*>-X)U03)%2qE9?~#P=w#z>=@Z`?)DBT9HNwPM1 zm#CjA{^n$K5N*rvC2Y&qouz3+R=quL{h3SH287w8%w` z0>}T%U*NyydJBa@x)8%vT2ri&Hc$uDd*s^U&Hmmh_>krmb)_+;w zx4vq9*1E;I-n!B{-&#A#I@H?3T5ip=eQJBh_Ok6M+eX_O+X~xk+jv{8t;XhK$Js~Q z``N4PUVElJ&d%F@wOzD*!w$5+Xn)MU!QLcRitR+V=oDGuzruNVrA02x-pM`9ZQ|CN z?=~+p&oGZM_cwPo7n!rn@n(U$&Ryb;a|gLSkWs`YHgXO8Mt%*yf}hQg=WC@k(h6y| zgyX7IBl&PsHc7GyBZL7$ccCL!EG!ge2or?{E?2mVOBe3r67eYIF-{Vm<;;9L`}_Q1 zelPzv9-q9(KgBJyye)1MpA@%<8^kr@Dm=fLBi4&!#KB^3`wII4`*gMk+liY!LO3BD z5k3*#6W-uva1(_V`)+O`CUfNU@l@pDXyoAw{eH6jQ-k!LL3+|4JppMu<*(wtjMzCD zc{mZW?2>zOUVGHn@yM%hA`izR4_`+fz6z~;O%@o)^{`T}tp-nz8>Gh!(%%h|;ncw1 z40w<>6vQ@cKJySPe_ft!I4!ZiYS4YfAnh_pI}Osy2C2m$?J!8&4bn@769@Z?hL_t6 z(hCOZd4u$vL3-98{lg$VW00PfxW?8K2m4com+u;+JqGC=gY>pR`lms9%OJgJkanZg z7(HXRzhQX!xVklpeldU_Welc`#z(bm;vz<<90tw$XHsIbvdMEW=!9t}y4>vFs4G z4_nQgV9VKM>|AyVbA*j!+q2nhs{E_`qkK+2E`KiX$K%r%@Bn9>+#t`A$H;x}2b_*^b%v+jiStux+uewcTc$W*ctnZu8l)Y%w;I^@{bR z^;7Fx)@|0Hweeo-a_e;K2pq^utsZNvm9bp4oU$CW{L}KHrP;F1vI6fsjI{K$l;O+} zXJN%_;%V`a_%@z-KPs*l8^oF7D6yB=LChB8MNarh__uHvPtUgte;4i(ZWm?=qlMl= zxzJWf5X}64_;2x$dJn&Ye~iDMzXQkZF^zm5zJkx;6M5eJU-KFBXXba!E#}9~e>1N% z&oPfR_ci;?x#lDs{D0=oa-VbWaW8XQxd*sa++1!P*N>~@^0;J9#3}C_UgCJ4-N`<| zZm@8XUcMT6_&)M*KJsu*eMrY!B4;DenaIPpk%w#W!%jJ?hy8K{xDA~*;0 z$)SuoXIBPVCOJF76`7NvQsiP`AA?@;6fp+AXE-N(O~v*szW9Sa-T9I1gMyFb;o#u7 zEV%w5inV5Z(e6WeI&9=PC({NdeIV~%^Y9cW{$lVKi@!Mh#p5ple~I`@3Ozg}d0{h7 z;fr4tUCV5&?Qv+dxiZY8jF}pGD&O%ChsRvl6n^6R_(73hXbN(RO|a+AqzbTXb=?71 zPYYGBa6xwWxW2ak^B1b0a8-rwf5P=7Q*LQAtYYj}5YVq;>^~x)U&YveKp8QPxj7|w*|J?n~L%<{d=C!yjxW-kM$1`=l|d9zf* zhMHe={fI+`$*}ISe~*BE-DSUoz;NBSzWpKs`em2>I|K{`zJP$Cz~>Q&(c3sDvD-{e z(-~{X(&GBfn}qM3rsMgto#Z%4;?zC$V z-AhUX=XIB7(IU@~NTWI(QKuu-dEks|V5<+R!IHE5+qwK6>0u!~|qx#%j@+^OrHnUF+PdZ0=V)!k^Ac-oZ! zJ1)BHu=b*>%%I;nyoKyjRcAvyRqchUz3`B=n}4jjh5W!JR~oFiq}PP$mt3*X=Vw=; zrmwDXMnD|Z_oKdF)gKViPlPGoyV|#UhA+N%RcJkq zGTXw{KU@ktec4qUUh(*fLtkHZ6>{v}LaJ*}5bxHieHCMIHaj&0TCE;?Gs`0r9+EIEmf;Rkw5fmwB z2AufGMPQRVRttbqHR|R-0o75PRqc_nkbbI7A{71?d*r46xH`~fZyTr+6%WNK!o8{L zHrXJb>W;ea5uz~Hr;;>Oa6Hjh%n5oxXs_{1-JX9kxAa(N(rWy%w_&S>3 z^C2Tb%XM94^T2r%U-bWoiPTdKrKON%sOP-jboDHNmX8<#(r-kPrc~9bgG{Q^1uILh zeRT5=rJmDZ-3?bsw4R!-^m`;nOVy2k3EEQK@u)kVw(H>ET??W*Y8joN{1Nr1?j0Hv^Z1^EP^)FfIbeB!|im}O;;DAdl6#)jJj77bv$HH z?f=uzOaF9L=yZQUH7!Zi!-w9GPSv6lK+Rve2_O%o{fu_L=A(3azUDNoF5GBQ(wWsX zOUJqcID*!}0BS|o)^wB@Hg)k5$Yhj+jk}Z*_^f9*JUo!gC>?P|DK5f8%)lb(PgSF} zZeWzw>{B3xRcdfNQn_>&WhV5ablrHMjz=%DN-At(l`C^Gj0dh>z(9RkP>L{> zqVzG)Q(yjW{@U=EV-_RT#^GkT=0KY_dQgo>W5EGY8Kn+Pn=ZIgVY)@>tl6jW*Axfc zDSHwYXOfyQOKHNy!9kl659cjP-)I=+Ks!5P>#a&DUIL;8qz3NFfo^eVU})3$@YL4L zKV4^mu_USv#myoJS z%sM>pe#&ZkN-iMXr5=uH_EEOJ)=rl8VuldQn>pMRWR98gX{NS;{!V2iZYu-5trh)t zfpJb{80`EqGA;kdsdU1fUD^hI*<%_6clwoF805l6{3}Lr!lyBcDEJFR!x9Yb#RJCff!cp!h0~c$U z26Cb@E?UA#VJ>c#+hF9aT(vbEP?V$$jw+}Pp<;IUE-6wkRe!{-`fga2jM1N+sI-HX z$%+y-Qr%H048Qm@IjUcQ{)gYHulgTvbe#n8N>r;HwT9iQl}?R|us~6U!b1mKZX72P zpt@2?j^YyJrx_#$vIWUc(pkq* z`^VP%EUUx?!eo9Jca1&Dd<+>2tQ}!|hBC<6lmxS=;i)-VmdwpiR!6%e3o0{}F5#B( z(b%4;aw8eFAybzOnfTvFlxmGzgmz~b?AGOA9(ApB9Z8dx#`-ivIWaF(BMMB<*5#n8 zufRReGLiJ7VLJ6z)nt%!loVLmR;QXnu}aM$4O4o5vo-0lvaLz0)>DF$xmfzt;8oR6rekM5wUiq6kNT6sLv2Y324e;sj#pCY*gDCmPkZ-NfifgoYPHq` zP8Z;uwYi4ybZ6c-_$mm@t*ZP2TuIvY7kCXDwgu~6uZv>wh3YCXj83pz7|C&Ut;Wbxv( zw!;z0G23qIi0m}D(>VsM zh_4u^?X1`!sk5%#Rj#@Rx$W)X?v}(uCL39k-br z>>-nVqRnUdL3~cQA&lU+l7r@h+$-!MsYB@Hs`zd!)U)n6a$r($KAxc#-mcwyUBtTe z*CP`+H=Swd&BzW-i9y=-Re>Jbx%dRm{SV!FYnj>Y4PW0aD+>$^&PB7?bG5VX2D7`K zesgL6L4fdX?fNiMOMcbOm^}u?B?5@_G5bu@UC7M-VAR{;nsX0hhue#Rq?ou7~ z72e%0O3e?>)LZ?9N2{#`m@c@rtN&#sO*KI}_WPRrS_z&L+(&h0cZfP}*Wh%O+fn0Q z72RLz>|VD-xy4PnK{~D%H(_xz>G!JN>V8|t&9%Anw6-m3iVxC(Jg14%sy4&s-l606 zw!7PeZ@ib$8}IMH?GE>w&b#7+H1E>ywuUbmhSoUT1>swm;s3aCMsjO6zst&-+6QSk zSK&3x@U@04g2idQwHzb6y>#;kySj9>)4kf6zS^0w+L^i9nYF1UUWolHhx|@h$?d4L zPqxir-ZOm$$xhF1CvGWqK)cb68}K_lV`$N}4a{`K6ARz?WNpW@tQ0qs%BVtgGpSvS zr#iBRN``GQo(}j0f>#>Kd`($UMX9rK3ZJbmpc?7&CkYl*$O$0Fc|;%yxD46;2kye2 z{?%oM({Y|S`ob11L17dV;m$aZwh5wM^lg%$6E&~jlt_-+83sd_gxA-aiBXlZK|2dN z>I(;7#d}so-Fje4f@iQ+5xQ7LXO#wLOrocC2SlZ(fzE|`#kDQ?>2N$3>pz!I6TA7PiE48#j&2&fTq0+~*z^7xRf{Yg;SR+!F1iL! zg!mLsueRYbtE<$(;G^_1^g=KFcJud%lzAw$`0*Qa>vy<)b?29vX4szMalm_TXQ|tr ziSS#BN4r~ErtR01P)m{h8t9wq8KnCXK9L%wt6#$7q1ZrgpJt%Ho93y+*^|bh`X#&w ziYPs@tJ2Ls5sK41@i4*dk)guvsnEG<Z^@HTF#f6dKw5B}POZl-5*8~z{7XtfIf delta 11715 zcmZu%349aP*3axSN#;({eI{+Xq@|QnQYZ*56bc1FDbP|75D-c!sAVY>6htT~iYw@& zpFNu56U5yoAbb$P4TYy5sB9vlAOfQBTu=~kLBBIINgC#T{C>ATa^`=}e$T!4%q%_G zN-f2)F}R+p+T6B&1eNrj&G_e&nKZg~_RNMEb@S@vlbbeD845mdlQzm`qn=g1Q;uwQ z?cYq29Y5jkf4CSQJ!|&7NsaSr=FY2|>gdvP?8M)>S1n{JTj%|!!uY5d@Py`*TTfC{ z8P9E}Y_vBdWYBx4;o`00&0>YvS1c5>!~~HSt_bJp&a{_t(tptx=u`Az`aSwJ`X%8D z;Ui&(ut|7EXcg`k?&5yt&TvP#-Q0G=j_@A{bf-JQ^;z^B@OGzX!}*Y+gnIbsduiA( z)p7mS4)lE#tS_P);HB>LAaM7jOQ7sEr8=~yBi&0FO82$>M5i;{d)#Z>OWbO1CAXBD z%T48O;)ZY~TxZV9xj34=#GYl3vLCQ-u?-yi_73(|_GY$%?aLOjS!@DbK&OYk zE2IZeO*_O5;y=X|;@x6B+IfT6OUxCML{7LQoE8oW?+6=(r-YTlVqu0bUbs;x7CH*a z0?+@#pWzSj@A9wmYxqa_CHzc&0zZ^5;q&+uUf?csUvr0tim_3H|m z5L--NeNW=A*w#|pY}^QFsN`;c>?7s~lr`jZgF9+mC84bpGk{~Ai)>l{=8kZ{;>IB+;|$f3U!!W0Tl*0zRq+F)`O zml5h%Mv^JvqT`g~fMW-g^(R@OTlm*g*r#UkTWrKO z!N%{gO;9?L4m|HHbyV9&N@K*EnO^in>H(p}olzW`u#pr{Y@jkY3Rb*IQen@lq(5ZU zNg1&J9TE>En@BP2_?i~Um|(TrMvajDo_;jtg9kQ|39!jSGGXawBn}cclk0Sx;JWw5 z;K(=|HRyiD@9BZa(3H)jnvQ`Y%Q!f(lcbw`^Me)0*VErGdnn}Fc`F$Z1-oGNR?@-X zo8Y`)OE4lG^9S~9B}dsnkKkx{Hbl0tfrY^;xaBqS5Omp29I)vXE(6LKB@S-gPC6Ol zlski!sGUEc`enav$BWy^vKV@O@!?X_)ywMotg zVdD-m42Hf#ZiQ1jX(N#RV8`Q}s1yWiQOmB)Zoel}H@?HWWCx7=l{YYMNZ3j8Ou_ez z^>COKGlNq=+(k~ecHtZ(J6MBCXQrct`rL2-O?ts8fi*lF50&qs(ng}(5S)y-yjs6U z)p0A{BQ@}}l9~aoA29G~?~`m(nX=#{RAyKJfu%Zd?EB>3(EO@B9I}etdPeD5mj!RR z2KdBo1kN2Ign=ven5ic!cLgURusA^ZE>j4OM^?Q9xqeT+&gzZ5!%<%4&729`{e?i;Rhd*-Z2SM^bzr!xWU1(i0g;z%*f9LACU?(L`EWi zw3E1?VpGJ|!-HcGd;@|FgUdc9H^m6w^>Nr>(l2;ZEYs?p$TYHeUGU*fGwFra3ldM? zX#;UT@j>&;&I(s^LK}_t^$hg(%RP0yAKXt?$4nh{fOIz{Lm*T zc~hUnu$s9?^z=lAN?mXf0=w!!Bj4+ekOeUn#rbK(ltIC}(3ISoFvDS=l3U>EN6g9- z51)TZvJ8gqvB8CiE3Wc;@=$P3e;&BrAxYprN=Cq`*D;&1Qg~qZQB=xEn2Evph%cJy z_xN;0e?CeYqLwE^>t|s_Nm+0n;_{HMvEB}SM#e@hPl8Fu!bL}^4K|`dc>yjAeEb-h z3r{~~R{CV<`*|d(s)KWoVdmnn%a?pkro+fjaG@Ke%5ylRlR9(VcQ`>hL*ZCKRE7udM5A&74u7IY$ILiE=0LgT$bjEZpd=kbh6nFJ zHu;PE$>}bf+aCCkkn12IX^`y1TcjZ7?S zbD+-Y6aBK`!1>>h(J_nl$~O@wux)l^m>=lpmyM!X@-0~fS-T_Ki=i%cE|O*=+ax`@ zwSHM&577S{nG#d151hl;7z=Sun}}R<136*WrkuyB-d?lcKX2MEkv74QJ~S*{{ttN^ zrfiRtibMa2l(uIMXs#!JS(KpN|~ftIGSy0`wGyUS z2{l&2WGi9PwFG#-Ke=t(wX~`77%Sl>D`B*iFv?1()(N3EekUz7t=tl-x=L!P|4Pu+ z)>&;&Az0BM=i`o(p(i0)Zp`evf z1_J$NWJg-UcqORm<;_>Rw8i0$kkgD?T#mjM4rP_Mp?SCxdvYq}6!Rub^Vx9V0PSb4 zYg1tsr_6)0_wZ6MomcX8sRAnuw9SXElc_nOCwOH!L%eONsD%$VC*vSlU{|`D+nsUb zc-XFlk0ym}9>n3j*t{8kY*$KQQ%}`UA|CoU6yqi%xbB(SrlGn$mN)u6y1Joz9Lg^g zWZi2XB@L(@-g{XK-|+jirNlbwKQcNhaxry7Em6WZpT2F$x}KB#9^*v#GEv6Gc)pvW zm=4LTjaq5AiPLkEeKHne7ic4X`Jd&$6Gf`{25bO5t!W z$UV$0;Tq^Kxn0~=?gjdQ_=)(wxQ$*xH_>zGe7cq%O{dX=>0-w|$G47SVnBRZd{TTw zm@iBh#tB3D1N=^Y3;#U-1pg4f7`nfz6g1ty^kxc}bk@Vl%-_sU%xUHm=6z-x^D^@! z^9Xjd8kss~47-JWo_&ISh+WLiW^34Lb`aZ>&12lLJvjM3d7)e{Pn1W<{pBJ#M^2U{ z?tAPCe$E}Fck?3mH+PwL({J+{y_NUTukfAu9`rL!d|$ea-RaoLP3LamMspQhIak7n z;sRj+w%-bcT)`_O3QmFHf9HSVzvaKA|E4d~-}CeMdVUH&j<4hgJ2pGkIa(ac9F2}z zN42BeQRMJB;vJ0rqW!e}kbS3pvwfYt#lFnmh^LWid%3;H?z6|+8R?>QTIzd9+9_?8 z)=4eWGO1Cj#REyXR3!PNc!?1&il@bDJc1O7J~3Wogp1fEJS6NCHVf;777P(K@Rn~< zjzadkiWXY1S?R~Xfsd8K&^KEZj)E?0vb#gsWa=Gw>oui*bJa^H>Cn#CmA-5f^?_{{ zwT)qpF&{A7nU|O#^B{8*!TIws*8A_jN9^^U^QQhov*pA?aP`lguB?dFfSYjq`2idg&2qi8NE1APseX<=iio zNO@9oBAVGUNPNZ$N5bZ^-HX*c!D4su~`lqfDbshOZ9U zQ#Ox!eg!39Cs9COwH$b{0e+I1On$9>rH!+-1YWF!&uU%Up|aMsAl}lRuGeSOyqE!h z(o8ove*Nd46yt&}Q(Rf#oZ|AqpEWLbs6t@ApkU1V%4oQW zFazOVG6KJpnJftIR*K+y{9nt0=dxN}ey*tH$LHMaBQ_faHP2=YejcMmwS4tL1H8OP zDTn>NXg55Ge_Ad)>xMUV2LE~=*Yq2N8Qt>my5lXu=iH(CUCMTfTD5h*!amBr!Y;Fc ztBOkr&6?~QK*8#-7!4k6aP)%zt#S1Vl{IKU8$DTQV3`N*^VGR;xN*ech{utDBN0au zj^vQ1F6CU1*}=$L1oaL(jp^28S2IFSb#hMOo~K;uqk^E>o@Bf>C|jf64X!n6A*`Vo z7DyF07qD+jL}m(8#gs9As9dX-!lbpTE3{&*`ZNW54mwALmK}2LrLDWO<4Mcjvrw1Zr6_Jc0D5U_9D7pM+nyK+F{+U9W7Q>SBXtN%Aga(-0VyZdUEsQ;#>0qHDi6(Hsn}1_`!7Qw5jPndUcb%l=)pkJ$sN%w!Fns;1g8mLfn z6179FNdVudMHc+RU1+X;DP<@~fsfwJN{0Gx)$s;OcSsjVK8L1e97&hlkgQuW2rU_e zmXsHdgZ*EriLl|E>VW&tsXd~T#v2+Z&4L7d!eE>*7~9e1#p8{>!6WC@G+1~(f_Gh3 z)8WE-OnW^%N#h`1pHz&Kij7IbB9oF}>VHrlBUF(|5***B8=wbvAn1o3TRzUr*Sp`@ z-yzR#doZvDddr6to(Is<(DCooJcizdZJ7&dI<-qb(yWdT%z!TX^jPyqx)1w37f{pc zVy_vn**~d$O>sk@K*txLIOCkQ>nF7g=qjxPCP*3#{8{a0;4rYA&A_((jDejSpWFfV z|AiTH{AZOwt2eF?mY$_KW;W!5`$vpa?w8pFI$c7azH(8$4i28lG&%Y%sb;sGk)ac; zxQwat_%CWIoVooi zQ5m)(R)&Pe>_Vs<=;CScLFGx+3rl~)mAU^4>c8XPlmHvAMD#Z%W&9RdvI`*V8r5h0 z7E#>*u78gbVdZbgrakd%Art%@t`<}$5gz;; z`Y&2bg;rYYVfIw_vsw*Xv)D~FNi@bZB%N!Jl}xXS3+s<+4p+@SLw zkC`~0rhPDw)n>!0`H5llSXR?O;Uc*`6oy>mgPS;Q05sPo=qo1)4scqD>8k-SSjXid zSEECg!=qYp#}#vht>rcI9UT_+>rEw%#mEX_Td=Zsf~A7i%`l(9pjh)^pP-F3maASj zy-+7=SklmzxFi^;18Y!fjj^ic>#HgrKC){Ga8}fAjD|lG@*GIJU($^53kl0QL3upR zt!|!JELR(or^igD?An=YCxFjw4M>=`o}2uhJuFlQ}AIs(~zQJNyMvZN13_~ z-(ptSY!!-+43q~sr#UL^B~rFX_$%Bgb|15qUPC=->!jEEY^ZQ)W1zyV*=arj0&|jb zjgmdsjmei8mHA50cPT0ybkk$ z60c2(KJ1|XOojdN`h6Ai0AV-lur&A}5v%p=1kDM>3EK712^p}m4@#Jvpn2d#Li?<7 z(OFqAIPw3TpuovQ4X^2@n6dhVj<7UIONP-&IN_eonK^Jl(>g=HBozI7Qakf8Iu?`Ih4a>27hFP|25Y*~~T?zfZDF)N{2MCT3`r za5W?1(#e_a$$6XZ7%z0r(lWu3g$TBhn2vOHz&JdTrD0o1k83*I3N;z%l1Zv2wEDCe z(GOx<|Gaj85(t^Po#kdqzyt z^~%v7Y5+MA&2d>76Ex`4p&eVNW3$1Zi+TBM2gDwpDMW2>>vSyfW^PW^p&d|R87exW zE}L^>SnGxu^yp%#jcOsoxIBD_>~mpbHkzXUq{Ro!td8hqpv?DZxT2@%vvXlWC(Ou? z^CE)E^0lt5oisPx`#cq0{1Z*+%W;|nu6#_*WQwTp%K5`5)mxGZoh_K zRK#hzQ1cfp`lD`$Qo|`)=H^`+oRI{zufi1dhms04Rb-oQfH_dq3-^hqdTEbA^;<5Z zZf1a|7|X-*;&w|KqUy2WEz#oP$6_?+6OS{hsA|$5JgC2THbXtzob*s{EJIgI5YNnT z=fLFN?YTcoN2Ec1A1wxc;YuX5BBwKH?2O(JQ-&9(?saeFJ_h@A^hN2Kb)n%q7%rMwj=C(s#R|X zEUM(53{5W6Mp1feq8|-Aej#FL@j&e^CiE-FFVLag>(m#i`-W6#I6OE~LqjUkwrx&# zG}@eWEn^=mx+?nv)qywAlnMjvMps@FbJ6`Y9;)SQ{&Zu4>0TO0HKaTez_8p$H@ z1#d@OF1D|YPbKPjZ+CdwiC^-xe+aOT_nL1t@%xtB5L|>{qsuG_-m(~`##aM(3*HG>!Q3?E71XDyw4%Q(sx0wl>HrwId0F5(qVofs|*!pK!6#INv z>}0Xal7kJf$mx9@ev!R*$9&uJ6!C^X1Lzu@fqYBxGXV3YzzKp1M80c@hkc6IyycM| z!TMNlZQ#&bb6>=dMhw@YS;nnYSnl#Jccm|PWh{4PE_Y?6$kexZe#*0vKL{;*oh6Pb z_8C;FEe}#$zU@%4YV_V} zzxd`HDAgy`;~}`-IL9`bSIdT8_*IP(?-PI|;-Pc>pLiua`76ph8SjhNAK0S@J|9eq z_l3)q3;ICjUc4^jrA>}b7N_Y({%-9Hd9u(_a;Y>7aroIyQ|;wb&d1j)ow}-j6aKw%ljGJ5Z9-mHxBLQ%sUsdF%6Gnp{LGaG>RK-aJUWn zxDeV!Ll@$?9a|pQ%Q9rAYQF1@h+!A2yLptpPxDnpE<8!#_V@;yfKgBc8_y*rQ`K;? z8298g-{Q@5>8F`2toQhwa7t5kmMUEK_{^&mUQD~07t;x;zP=DQD15**HW&nlJpt+