// Copyright 2016-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Adapted from server/sublist.go in the NATS server Go source.
using System.Text;
namespace ZB.MOM.NatsNet.Server.Internal.DataStructures;
///
/// A trie-based routing mechanism for NATS subject distribution.
/// Matches published subjects to interested subscribers, supporting
/// * (single-token) and > (full) wildcards.
/// Mirrors Go's Sublist.
///
public sealed class SubscriptionIndex
{
// Wildcard and separator constants.
internal const char Pwc = '*';
internal const string Pwcs = "*";
internal const char Fwc = '>';
internal const string Fwcs = ">";
internal const string Tsep = ".";
internal const char Btsep = '.';
// Error singletons.
public static readonly Exception ErrInvalidSubject = new ArgumentException("sublist: invalid subject");
public static readonly Exception ErrNotFound = new KeyNotFoundException("sublist: no matches found");
public static readonly Exception ErrNilChan = new ArgumentNullException("sublist: nil channel");
public static readonly Exception ErrAlreadyRegistered = new InvalidOperationException("sublist: notification already registered");
// Cache limits.
internal const int SlCacheMax = 1024;
internal const int SlCacheSweep = 256;
// Threshold for creating a fast plist for Match.
internal const int PlistMin = 256;
// Shared empty result singleton.
internal static readonly SubscriptionIndexResult EmptyResult = new();
// -------------------------------------------------------------------------
// Fields
// -------------------------------------------------------------------------
private readonly ReaderWriterLockSlim _lock = new(LockRecursionPolicy.NoRecursion);
private long _genId;
private long _matches;
private long _cacheHits;
private long _inserts;
private long _removes;
private SublistLevel _root;
private Dictionary? _cache;
private int _ccSweep;
private NotifyMaps? _notify;
private uint _count;
// -------------------------------------------------------------------------
// Construction
// -------------------------------------------------------------------------
private SubscriptionIndex(bool enableCache)
{
_root = new SublistLevel();
_cache = enableCache ? new Dictionary() : null;
}
/// Creates a new SubscriptionIndex with caching controlled by .
public static SubscriptionIndex NewSublist(bool enableCache) => new(enableCache);
/// Creates a new SubscriptionIndex with caching enabled.
public static SubscriptionIndex NewSublistWithCache() => new(true);
/// Creates a new SubscriptionIndex with caching disabled.
public static SubscriptionIndex NewSublistNoCache() => new(false);
// -------------------------------------------------------------------------
// Public API — Cache
// -------------------------------------------------------------------------
/// Returns whether caching is enabled.
public bool CacheEnabled()
{
_lock.EnterReadLock();
try { return _cache != null; }
finally { _lock.ExitReadLock(); }
}
/// Returns the number of cached result entries.
public int CacheCount()
{
_lock.EnterReadLock();
try { return _cache?.Count ?? 0; }
finally { _lock.ExitReadLock(); }
}
// -------------------------------------------------------------------------
// Public API — Insert / Remove
// -------------------------------------------------------------------------
/// Inserts a subscription into the index. Mirrors Sublist.Insert.
public Exception? Insert(Subscription sub)
{
var subject = Encoding.ASCII.GetString(sub.Subject);
_lock.EnterWriteLock();
try
{
bool sfwc = false, haswc = false, isnew = false;
SublistNode? n = null;
var l = _root;
var start = 0;
for (var i = 0; i <= subject.Length; i++)
{
if (i < subject.Length && subject[i] != Btsep)
continue;
var tokenLen = i - start;
if (tokenLen == 0 || sfwc)
return ErrInvalidSubject;
var t = subject.Substring(start, tokenLen);
if (tokenLen > 1)
{
if (!l.Nodes.TryGetValue(t, out n))
{
n = new SublistNode();
l.Nodes[t] = n;
}
}
else
{
switch (t[0])
{
case Pwc:
n = l.Pwc;
haswc = true;
if (n == null)
{
n = new SublistNode();
l.Pwc = n;
}
break;
case Fwc:
n = l.Fwc;
haswc = true;
sfwc = true;
if (n == null)
{
n = new SublistNode();
l.Fwc = n;
}
break;
default:
if (!l.Nodes.TryGetValue(t, out n))
{
n = new SublistNode();
l.Nodes[t] = n;
}
break;
}
}
n.Next ??= new SublistLevel();
l = n.Next;
start = i + 1;
}
if (n == null)
return ErrInvalidSubject;
if (sub.Queue == null || sub.Queue.Length == 0)
{
n.PSubs[sub] = default;
isnew = n.PSubs.Count == 1;
if (n.PList != null)
{
n.PList.Add(sub);
}
else if (n.PSubs.Count > PlistMin)
{
n.PList = new List(n.PSubs.Count);
foreach (var psub in n.PSubs.Keys)
n.PList.Add(psub);
}
}
else
{
n.QSubs ??= new Dictionary>();
var qname = Encoding.ASCII.GetString(sub.Queue);
if (!n.QSubs.TryGetValue(qname, out var subs))
{
subs = new Dictionary();
n.QSubs[qname] = subs;
isnew = true;
}
subs[sub] = 0;
}
_count++;
_inserts++;
AddToCache(subject, sub);
Interlocked.Increment(ref _genId);
if (_notify != null && isnew && !haswc && _notify.Insert.Count > 0)
ChkForInsertNotification(subject, sub.Queue != null ? Encoding.ASCII.GetString(sub.Queue) : string.Empty);
}
finally
{
_lock.ExitWriteLock();
}
return null;
}
/// Removes a single subscription. Mirrors Sublist.Remove.
public Exception? Remove(Subscription sub) => RemoveInternal(sub, true, true);
/// Removes a batch of subscriptions. Mirrors Sublist.RemoveBatch.
public Exception? RemoveBatch(IReadOnlyList subs)
{
if (subs.Count == 0) return null;
_lock.EnterWriteLock();
try
{
var wasEnabled = _cache != null;
_cache = null;
Exception? err = null;
foreach (var sub in subs)
{
var lerr = RemoveInternal(sub, false, false);
err ??= lerr;
}
Interlocked.Increment(ref _genId);
if (wasEnabled)
_cache = new Dictionary();
return err;
}
finally
{
_lock.ExitWriteLock();
}
}
/// Invalidates cache for a remote queue sub weight change.
public void UpdateRemoteQSub(Subscription sub)
{
_lock.EnterWriteLock();
try
{
RemoveFromCache(Encoding.ASCII.GetString(sub.Subject));
Interlocked.Increment(ref _genId);
}
finally
{
_lock.ExitWriteLock();
}
}
// -------------------------------------------------------------------------
// Public API — Match / HasInterest / NumInterest
// -------------------------------------------------------------------------
///
/// Matches all subscriptions for a literal subject.
/// Returns a result containing plain and queue subscribers.
/// Mirrors Sublist.Match.
///
public SubscriptionIndexResult Match(string subject) => MatchInternal(subject, true, false);
/// Matches using a byte[] subject. Mirrors Sublist.MatchBytes.
public SubscriptionIndexResult MatchBytes(byte[] subject) => MatchInternal(Encoding.ASCII.GetString(subject), true, true);
/// Returns true if there is any interest in the literal subject.
public bool HasInterest(string subject) => HasInterestInternal(subject, true, out _, out _);
/// Returns counts of plain and queue subscribers for the literal subject.
public (int np, int nq) NumInterest(string subject)
{
HasInterestInternal(subject, true, out var np, out var nq);
return (np, nq);
}
// -------------------------------------------------------------------------
// Public API — ReverseMatch
// -------------------------------------------------------------------------
///
/// For a given subject (which may contain wildcards), returns all subscriptions
/// that would match that subject. Mirrors Sublist.ReverseMatch.
///
public SubscriptionIndexResult ReverseMatch(string subject)
{
var tokens = TokenizeIntoArray(subject);
var result = new SubscriptionIndexResult();
_lock.EnterReadLock();
try
{
ReverseMatchLevel(_root, tokens.AsSpan(), null, result);
if (result.PSubs.Count == 0 && result.QSubs.Count == 0)
result = EmptyResult;
}
finally
{
_lock.ExitReadLock();
}
return result;
}
// -------------------------------------------------------------------------
// Public API — Enumerate
// -------------------------------------------------------------------------
/// Returns the subscription count.
public uint Count()
{
_lock.EnterReadLock();
try { return _count; }
finally { _lock.ExitReadLock(); }
}
/// Collects all subscriptions into . Mirrors Sublist.All.
public void All(List subs)
{
_lock.EnterReadLock();
try { CollectAllSubs(_root, subs); }
finally { _lock.ExitReadLock(); }
}
/// Collects local client subscriptions. Mirrors Sublist.localSubs.
public void LocalSubs(List subs, bool includeLeafHubs)
{
_lock.EnterReadLock();
try { CollectLocalSubs(_root, subs, includeLeafHubs); }
finally { _lock.ExitReadLock(); }
}
/// Returns the generation ID (incremented on every mutation). Thread-safe.
public long GenId() => Interlocked.Read(ref _genId);
// -------------------------------------------------------------------------
// Public API — Stats
// -------------------------------------------------------------------------
/// Returns statistics for the current state. Mirrors Sublist.Stats.
public SublistStats Stats()
{
var st = new SublistStats();
_lock.EnterReadLock();
var cache = _cache;
var cc = _cache?.Count ?? 0;
st.NumSubs = _count;
st.NumInserts = (ulong)Interlocked.Read(ref _inserts);
st.NumRemoves = (ulong)Interlocked.Read(ref _removes);
_lock.ExitReadLock();
st.NumCache = (uint)cc;
st.NumMatches = (ulong)Interlocked.Read(ref _matches);
st.CacheHitsRaw = (ulong)Interlocked.Read(ref _cacheHits);
if (st.NumMatches > 0)
st.CacheHitRate = (double)st.CacheHitsRaw / st.NumMatches;
if (cache != null)
{
int tot = 0, max = 0, clen = 0;
_lock.EnterReadLock();
try
{
foreach (var r in _cache!.Values)
{
clen++;
var l = r.PSubs.Count + r.QSubs.Count;
tot += l;
if (l > max) max = l;
}
}
finally
{
_lock.ExitReadLock();
}
st.TotFanout = tot;
st.CacheCnt = clen;
st.MaxFanout = (uint)max;
if (tot > 0)
st.AvgFanout = (double)tot / clen;
}
return st;
}
// -------------------------------------------------------------------------
// Public API — Notifications
// -------------------------------------------------------------------------
/// Registers a notification for interest changes on a literal subject.
public Exception? RegisterNotification(string subject, Action notify) =>
RegisterNotificationInternal(subject, string.Empty, notify);
/// Registers a notification for queue interest changes.
public Exception? RegisterQueueNotification(string subject, string queue, Action notify) =>
RegisterNotificationInternal(subject, queue, notify);
/// Clears a notification.
public bool ClearNotification(string subject, Action notify) =>
ClearNotificationInternal(subject, string.Empty, notify);
/// Clears a queue notification.
public bool ClearQueueNotification(string subject, string queue, Action notify) =>
ClearNotificationInternal(subject, queue, notify);
// -------------------------------------------------------------------------
// Internal: numLevels (for testing)
// -------------------------------------------------------------------------
/// Returns the maximum depth of the trie. Used in tests.
internal int NumLevels() => VisitLevel(_root, 0);
// -------------------------------------------------------------------------
// Private: Remove internals
// -------------------------------------------------------------------------
private Exception? RemoveInternal(Subscription sub, bool shouldLock, bool doCacheUpdates)
{
var subject = Encoding.ASCII.GetString(sub.Subject);
if (shouldLock) _lock.EnterWriteLock();
try
{
bool sfwc = false, haswc = false;
SublistNode? n = null;
var l = _root;
var lnts = new LevelNodeToken[32];
var levelCount = 0;
var start = 0;
for (var i = 0; i <= subject.Length; i++)
{
if (i < subject.Length && subject[i] != Btsep)
continue;
var tokenLen = i - start;
if (tokenLen == 0 || sfwc)
return ErrInvalidSubject;
if (l == null!)
return ErrNotFound;
var t = subject.Substring(start, tokenLen);
if (tokenLen > 1)
{
l.Nodes.TryGetValue(t, out n);
}
else
{
switch (t[0])
{
case Pwc:
n = l.Pwc;
haswc = true;
break;
case Fwc:
n = l.Fwc;
haswc = true;
sfwc = true;
break;
default:
l.Nodes.TryGetValue(t, out n);
break;
}
}
if (n != null)
{
if (levelCount < lnts.Length)
lnts[levelCount++] = new LevelNodeToken(l, n, t);
l = n.Next!;
}
else
{
l = null!;
}
start = i + 1;
}
var (removed, last) = RemoveFromNode(n, sub);
if (!removed)
return ErrNotFound;
_count--;
_removes++;
for (var i = levelCount - 1; i >= 0; i--)
{
ref var lnt = ref lnts[i];
if (lnt.Node.IsEmpty())
lnt.Level.PruneNode(lnt.Node, lnt.Token);
}
if (doCacheUpdates)
{
RemoveFromCache(subject);
Interlocked.Increment(ref _genId);
}
if (_notify != null && last && !haswc && _notify.Remove.Count > 0)
ChkForRemoveNotification(subject, sub.Queue != null ? Encoding.ASCII.GetString(sub.Queue) : string.Empty);
return null;
}
finally
{
if (shouldLock) _lock.ExitWriteLock();
}
}
private static (bool found, bool last) RemoveFromNode(SublistNode? n, Subscription sub)
{
if (n == null) return (false, true);
if (sub.Queue == null || sub.Queue.Length == 0)
{
var found = n.PSubs.Remove(sub);
if (found && n.PList != null)
n.PList = null; // Will be re-populated on Match if needed.
return (found, n.PSubs.Count == 0);
}
// Queue subscription.
var qname = Encoding.ASCII.GetString(sub.Queue);
if (n.QSubs == null || !n.QSubs.TryGetValue(qname, out var qsub))
return (false, false);
var removed = qsub.Remove(sub);
bool last = false;
if (qsub.Count == 0)
{
last = true;
n.QSubs.Remove(qname);
}
return (removed, last);
}
// -------------------------------------------------------------------------
// Private: Match internals
// -------------------------------------------------------------------------
private SubscriptionIndexResult MatchInternal(string subject, bool doLock, bool doCopyOnCache)
{
Interlocked.Increment(ref _matches);
// Check cache first.
if (doLock) _lock.EnterReadLock();
var cacheEnabled = _cache != null;
SubscriptionIndexResult? cached = null;
_cache?.TryGetValue(subject, out cached);
if (doLock) _lock.ExitReadLock();
if (cached != null)
{
Interlocked.Increment(ref _cacheHits);
return cached;
}
// Tokenize.
var tokens = TokenizeForMatch(subject);
if (tokens == null)
return EmptyResult;
var result = new SubscriptionIndexResult();
int n;
if (doLock)
{
if (cacheEnabled)
_lock.EnterWriteLock();
else
_lock.EnterReadLock();
}
try
{
MatchLevel(_root, tokens, result);
if (result.PSubs.Count == 0 && result.QSubs.Count == 0)
result = EmptyResult;
if (cacheEnabled)
{
_cache![subject] = result;
n = _cache.Count;
}
else
{
n = 0;
}
}
finally
{
if (doLock)
{
if (cacheEnabled) _lock.ExitWriteLock();
else _lock.ExitReadLock();
}
}
// Reduce cache if over limit.
if (cacheEnabled && n > SlCacheMax && Interlocked.CompareExchange(ref _ccSweep, 1, 0) == 0)
Task.Run(ReduceCacheCount);
return result;
}
internal SubscriptionIndexResult MatchNoLock(string subject) => MatchInternal(subject, false, false);
private bool HasInterestInternal(string subject, bool doLock, out int np, out int nq)
{
np = 0;
nq = 0;
// Check cache first.
if (doLock) _lock.EnterReadLock();
bool matched = false;
if (_cache != null && _cache.TryGetValue(subject, out var cached))
{
np = cached.PSubs.Count;
foreach (var qsub in cached.QSubs)
nq += qsub.Count;
matched = cached.PSubs.Count + cached.QSubs.Count > 0;
}
if (doLock) _lock.ExitReadLock();
if (matched)
{
Interlocked.Increment(ref _cacheHits);
return true;
}
var tokens = TokenizeForMatch(subject);
if (tokens == null) return false;
if (doLock) _lock.EnterReadLock();
try
{
return MatchLevelForAny(_root, tokens.AsSpan(), ref np, ref nq);
}
finally
{
if (doLock) _lock.ExitReadLock();
}
}
// -------------------------------------------------------------------------
// Private: Cache management
// -------------------------------------------------------------------------
private void AddToCache(string subject, Subscription sub)
{
if (_cache == null) return;
if (SubjectIsLiteral(subject))
{
if (_cache.TryGetValue(subject, out var r))
_cache[subject] = r.AddSubToResult(sub);
return;
}
// Wildcard subject — update any matching cache entries.
foreach (var key in _cache.Keys.ToArray())
{
if (MatchLiteral(key, subject))
{
_cache[key] = _cache[key].AddSubToResult(sub);
}
}
}
private void RemoveFromCache(string subject)
{
if (_cache == null) return;
if (SubjectIsLiteral(subject))
{
_cache.Remove(subject);
return;
}
foreach (var key in _cache.Keys.ToArray())
{
if (MatchLiteral(key, subject))
_cache.Remove(key);
}
}
private void ReduceCacheCount()
{
try
{
_lock.EnterWriteLock();
try
{
if (_cache == null) return;
foreach (var key in _cache.Keys.ToArray())
{
_cache.Remove(key);
if (_cache.Count <= SlCacheSweep)
break;
}
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
Interlocked.Exchange(ref _ccSweep, 0);
}
}
// -------------------------------------------------------------------------
// Private: Trie matching (matchLevel)
// -------------------------------------------------------------------------
private static void MatchLevel(SublistLevel? l, string[] toks, SubscriptionIndexResult results)
{
SublistNode? pwc = null, n = null;
for (int i = 0; i < toks.Length; i++)
{
if (l == null) return;
if (l.Fwc != null) AddNodeToResults(l.Fwc, results);
pwc = l.Pwc;
if (pwc != null) MatchLevel(pwc.Next, toks.AsSpan(i + 1).ToArray(), results);
l.Nodes.TryGetValue(toks[i], out n);
l = n?.Next;
}
if (n != null) AddNodeToResults(n, results);
if (pwc != null) AddNodeToResults(pwc, results);
}
private static bool MatchLevelForAny(SublistLevel? l, ReadOnlySpan toks, ref int np, ref int nq)
{
SublistNode? pwc = null, n = null;
for (int i = 0; i < toks.Length; i++)
{
if (l == null) return false;
if (l.Fwc != null)
{
np += l.Fwc.PSubs.Count;
foreach (var qsub in l.Fwc.QSubs?.Values ?? Enumerable.Empty>())
nq += qsub.Count;
return true;
}
pwc = l.Pwc;
if (pwc != null)
{
if (MatchLevelForAny(pwc.Next, toks[(i + 1)..], ref np, ref nq))
return true;
}
l.Nodes.TryGetValue(toks[i], out n);
l = n?.Next;
}
if (n != null)
{
np += n.PSubs.Count;
foreach (var qsub in n.QSubs?.Values ?? Enumerable.Empty>())
nq += qsub.Count;
if ((n.PList?.Count ?? 0) > 0 || n.PSubs.Count > 0 || (n.QSubs?.Count ?? 0) > 0)
return true;
}
if (pwc != null)
{
np += pwc.PSubs.Count;
foreach (var qsub in pwc.QSubs?.Values ?? Enumerable.Empty>())
nq += qsub.Count;
return (pwc.PList?.Count ?? 0) > 0 || pwc.PSubs.Count > 0 || (pwc.QSubs?.Count ?? 0) > 0;
}
return false;
}
// -------------------------------------------------------------------------
// Private: Reverse match
// -------------------------------------------------------------------------
private static void ReverseMatchLevel(SublistLevel? l, ReadOnlySpan toks, SublistNode? n, SubscriptionIndexResult results)
{
if (l == null) return;
for (int i = 0; i < toks.Length; i++)
{
var t = toks[i];
if (t.Length == 1)
{
if (t[0] == Fwc)
{
GetAllNodes(l, results);
return;
}
if (t[0] == Pwc)
{
foreach (var nd in l.Nodes.Values)
ReverseMatchLevel(nd.Next, toks[(i + 1)..], nd, results);
if (l.Pwc != null)
ReverseMatchLevel(l.Pwc.Next, toks[(i + 1)..], n, results);
if (l.Fwc != null)
GetAllNodes(l, results);
return;
}
}
if (l.Fwc != null)
{
GetAllNodes(l, results);
return;
}
if (l.Pwc != null)
ReverseMatchLevel(l.Pwc.Next, toks[(i + 1)..], n, results);
if (!l.Nodes.TryGetValue(t, out var next))
{
n = null;
break;
}
n = next;
l = n.Next;
}
if (n != null) AddNodeToResults(n, results);
}
private static void GetAllNodes(SublistLevel? l, SubscriptionIndexResult results)
{
if (l == null) return;
if (l.Pwc != null) AddNodeToResults(l.Pwc, results);
if (l.Fwc != null) AddNodeToResults(l.Fwc, results);
foreach (var n in l.Nodes.Values)
{
AddNodeToResults(n, results);
GetAllNodes(n.Next, results);
}
}
// -------------------------------------------------------------------------
// Private: addNodeToResults
// -------------------------------------------------------------------------
private static void AddNodeToResults(SublistNode n, SubscriptionIndexResult results)
{
// Plain subscriptions.
if (n.PList != null)
{
results.PSubs.AddRange(n.PList);
}
else
{
foreach (var psub in n.PSubs.Keys)
results.PSubs.Add(psub);
}
// Queue subscriptions.
if (n.QSubs == null) return;
foreach (var (qname, qr) in n.QSubs)
{
if (qr.Count == 0) continue;
var i = FindQSlot(Encoding.ASCII.GetBytes(qname), results.QSubs);
if (i < 0)
{
i = results.QSubs.Count;
results.QSubs.Add(new List(qr.Count));
}
foreach (var sub in qr.Keys)
{
if (IsRemoteQSub(sub))
{
var ns = Interlocked.CompareExchange(ref sub.Qw, 0, 0);
for (var j = 0; j < ns; j++)
results.QSubs[i].Add(sub);
}
else
{
results.QSubs[i].Add(sub);
}
}
}
}
internal static int FindQSlot(byte[]? queue, List> qsl)
{
if (queue == null) return -1;
for (int i = 0; i < qsl.Count; i++)
{
if (qsl[i].Count > 0 && qsl[i][0].Queue != null && queue.AsSpan().SequenceEqual(qsl[i][0].Queue))
return i;
}
return -1;
}
internal static bool IsRemoteQSub(Subscription sub)
{
return sub.Queue != null && sub.Queue.Length > 0
&& sub.Client != null
&& (sub.Client.Kind == ClientKind.Router || sub.Client.Kind == ClientKind.Leaf);
}
// -------------------------------------------------------------------------
// Private: Enumerate
// -------------------------------------------------------------------------
private static void AddLocalSub(Subscription sub, List subs, bool includeLeafHubs)
{
if (sub.Client == null) return;
var kind = sub.Client.Kind;
if (kind == ClientKind.Client || kind == ClientKind.System ||
kind == ClientKind.JetStream || kind == ClientKind.Account ||
(includeLeafHubs && sub.Client.IsHubLeafNode()))
{
subs.Add(sub);
}
}
private static void AddNodeToSubsLocal(SublistNode n, List subs, bool includeLeafHubs)
{
if (n.PList != null)
{
foreach (var sub in n.PList)
AddLocalSub(sub, subs, includeLeafHubs);
}
else
{
foreach (var sub in n.PSubs.Keys)
AddLocalSub(sub, subs, includeLeafHubs);
}
if (n.QSubs != null)
{
foreach (var qr in n.QSubs.Values)
foreach (var sub in qr.Keys)
AddLocalSub(sub, subs, includeLeafHubs);
}
}
private static void CollectLocalSubs(SublistLevel? l, List subs, bool includeLeafHubs)
{
if (l == null) return;
foreach (var n in l.Nodes.Values)
{
AddNodeToSubsLocal(n, subs, includeLeafHubs);
CollectLocalSubs(n.Next, subs, includeLeafHubs);
}
if (l.Pwc != null) { AddNodeToSubsLocal(l.Pwc, subs, includeLeafHubs); CollectLocalSubs(l.Pwc.Next, subs, includeLeafHubs); }
if (l.Fwc != null) { AddNodeToSubsLocal(l.Fwc, subs, includeLeafHubs); CollectLocalSubs(l.Fwc.Next, subs, includeLeafHubs); }
}
private static void AddAllNodeToSubs(SublistNode n, List subs)
{
if (n.PList != null)
subs.AddRange(n.PList);
else
foreach (var sub in n.PSubs.Keys)
subs.Add(sub);
if (n.QSubs != null)
foreach (var qr in n.QSubs.Values)
foreach (var sub in qr.Keys)
subs.Add(sub);
}
private static void CollectAllSubs(SublistLevel? l, List subs)
{
if (l == null) return;
foreach (var n in l.Nodes.Values)
{
AddAllNodeToSubs(n, subs);
CollectAllSubs(n.Next, subs);
}
if (l.Pwc != null) { AddAllNodeToSubs(l.Pwc, subs); CollectAllSubs(l.Pwc.Next, subs); }
if (l.Fwc != null) { AddAllNodeToSubs(l.Fwc, subs); CollectAllSubs(l.Fwc.Next, subs); }
}
// -------------------------------------------------------------------------
// Private: Notifications
// -------------------------------------------------------------------------
private Exception? RegisterNotificationInternal(string subject, string queue, Action notify)
{
if (SubjectHasWildcard(subject))
return ErrInvalidSubject;
bool hasInterest = false;
var r = Match(subject);
if (r.PSubs.Count + r.QSubs.Count > 0)
{
if (queue.Length == 0)
{
foreach (var sub in r.PSubs)
{
if (Encoding.ASCII.GetString(sub.Subject) == subject)
{
hasInterest = true;
break;
}
}
}
else
{
foreach (var qsub in r.QSubs)
{
if (qsub.Count == 0) continue;
var qs = qsub[0];
if (Encoding.ASCII.GetString(qs.Subject) == subject &&
qs.Queue != null && Encoding.ASCII.GetString(qs.Queue) == queue)
{
hasInterest = true;
break;
}
}
}
}
var key = KeyFromSubjectAndQueue(subject, queue);
Exception? err;
_lock.EnterWriteLock();
try
{
_notify ??= new NotifyMaps();
err = hasInterest
? AddNotify(_notify.Remove, key, notify)
: AddNotify(_notify.Insert, key, notify);
}
finally
{
_lock.ExitWriteLock();
}
if (err == null)
SendNotification(notify, hasInterest);
return err;
}
private bool ClearNotificationInternal(string subject, string queue, Action notify)
{
_lock.EnterWriteLock();
try
{
if (_notify == null) return false;
var key = KeyFromSubjectAndQueue(subject, queue);
var didRemove = ChkAndRemove(key, notify, _notify.Remove);
didRemove = didRemove || ChkAndRemove(key, notify, _notify.Insert);
if (_notify.Remove.Count + _notify.Insert.Count == 0)
_notify = null;
return didRemove;
}
finally
{
_lock.ExitWriteLock();
}
}
private static bool ChkAndRemove(string key, Action notify, Dictionary>> ms)
{
if (!ms.TryGetValue(key, out var chs)) return false;
for (int i = 0; i < chs.Count; i++)
{
if (chs[i] == notify)
{
chs[i] = chs[^1];
chs.RemoveAt(chs.Count - 1);
if (chs.Count == 0) ms.Remove(key);
return true;
}
}
return false;
}
// Write lock must be held.
private Exception? AddInsertNotify(string subject, Action notify)
=> AddNotify(_notify!.Insert, subject, notify);
// Write lock must be held.
private Exception? AddRemoveNotify(string subject, Action notify)
=> AddNotify(_notify!.Remove, subject, notify);
private static Exception? AddNotify(Dictionary>> m, string subject, Action notify)
{
if (m.TryGetValue(subject, out var chs))
{
foreach (var ch in chs)
if (ch == notify)
return ErrAlreadyRegistered;
}
else
{
chs = new List>();
m[subject] = chs;
}
chs.Add(notify);
return null;
}
private static void SendNotification(Action notify, bool hasInterest)
{
// Non-blocking send — in Go this was select { case ch <- val: default: }
// In .NET we just invoke; the caller should ensure it doesn't block.
try { notify(hasInterest); }
catch { /* swallow if handler faults */ }
}
private static string KeyFromSubjectAndQueue(string subject, string queue)
{
if (queue.Length == 0) return subject;
return subject + " " + queue;
}
private void ChkForInsertNotification(string subject, string queue)
{
var key = KeyFromSubjectAndQueue(subject, queue);
if (_notify!.Insert.TryGetValue(key, out var chs) && chs.Count > 0)
{
foreach (var ch in chs) SendNotification(ch, true);
if (!_notify.Remove.TryGetValue(key, out var rmChs))
{
rmChs = new List>();
_notify.Remove[key] = rmChs;
}
rmChs.AddRange(chs);
_notify.Insert.Remove(key);
}
}
private void ChkForRemoveNotification(string subject, string queue)
{
var key = KeyFromSubjectAndQueue(subject, queue);
if (!_notify!.Remove.TryGetValue(key, out var chs) || chs.Count == 0) return;
bool hasInterest = false;
var r = MatchNoLock(subject);
if (r.PSubs.Count + r.QSubs.Count > 0)
{
if (queue.Length == 0)
{
foreach (var sub in r.PSubs)
{
if (Encoding.ASCII.GetString(sub.Subject) == subject)
{
hasInterest = true;
break;
}
}
}
else
{
foreach (var qsub in r.QSubs)
{
if (qsub.Count == 0) continue;
var qs = qsub[0];
if (Encoding.ASCII.GetString(qs.Subject) == subject &&
qs.Queue != null && Encoding.ASCII.GetString(qs.Queue) == queue)
{
hasInterest = true;
break;
}
}
}
}
if (!hasInterest)
{
foreach (var ch in chs) SendNotification(ch, false);
if (!_notify.Insert.TryGetValue(key, out var insChs))
{
insChs = new List>();
_notify.Insert[key] = insChs;
}
insChs.AddRange(chs);
_notify.Remove.Remove(key);
}
}
// -------------------------------------------------------------------------
// Private: visitLevel (depth calculation for tests)
// -------------------------------------------------------------------------
private static int VisitLevel(SublistLevel? l, int depth)
{
if (l == null || l.NumNodes() == 0) return depth;
depth++;
var maxDepth = depth;
foreach (var n in l.Nodes.Values)
{
var d = VisitLevel(n.Next, depth);
if (d > maxDepth) maxDepth = d;
}
if (l.Pwc != null) { var d = VisitLevel(l.Pwc.Next, depth); if (d > maxDepth) maxDepth = d; }
if (l.Fwc != null) { var d = VisitLevel(l.Fwc.Next, depth); if (d > maxDepth) maxDepth = d; }
return maxDepth;
}
// -------------------------------------------------------------------------
// Private: Tokenization helpers
// -------------------------------------------------------------------------
private static string[]? TokenizeForMatch(string subject)
{
if (subject.Length == 0) return null;
var tokens = new List(8);
var start = 0;
for (var i = 0; i < subject.Length; i++)
{
if (subject[i] == Btsep)
{
if (i - start == 0) return null;
tokens.Add(subject.Substring(start, i - start));
start = i + 1;
}
}
if (start >= subject.Length) return null;
tokens.Add(subject[start..]);
return tokens.ToArray();
}
internal static string[] TokenizeIntoArray(string subject)
{
var tokens = new List(8);
var start = 0;
for (var i = 0; i < subject.Length; i++)
{
if (subject[i] == Btsep)
{
tokens.Add(subject.Substring(start, i - start));
start = i + 1;
}
}
tokens.Add(subject[start..]);
return tokens.ToArray();
}
// -------------------------------------------------------------------------
// Public static: Subject validation and utility methods
// -------------------------------------------------------------------------
/// Returns true if the subject contains any wildcard tokens.
public static bool SubjectHasWildcard(string subject)
{
for (int i = 0; i < subject.Length; i++)
{
var c = subject[i];
if (c == Pwc || c == Fwc)
{
if ((i == 0 || subject[i - 1] == Btsep) &&
(i + 1 == subject.Length || subject[i + 1] == Btsep))
return true;
}
}
return false;
}
/// Returns true if the subject is a literal (no wildcards).
public static bool SubjectIsLiteral(string subject)
{
for (int i = 0; i < subject.Length; i++)
{
var c = subject[i];
if (c == Pwc || c == Fwc)
{
if ((i == 0 || subject[i - 1] == Btsep) &&
(i + 1 == subject.Length || subject[i + 1] == Btsep))
return false;
}
}
return true;
}
/// Returns true if subject is valid (allows wildcards).
public static bool IsValidSubject(string subject) => IsValidSubjectInternal(subject, false);
/// Returns true if subject is valid and literal (no wildcards).
public static bool IsValidPublishSubject(string subject) =>
IsValidSubject(subject) && SubjectIsLiteral(subject);
/// Returns true if subject is valid and literal (no wildcards).
public static bool IsValidLiteralSubject(string subject) =>
IsValidLiteralSubjectTokens(subject.Split(Btsep));
private static bool IsValidSubjectInternal(string subject, bool checkRunes)
{
if (string.IsNullOrEmpty(subject)) return false;
if (checkRunes)
{
if (subject.Contains('\0')) return false;
foreach (var r in subject)
{
if (r == '\uFFFD') return false; // RuneError
}
}
bool sfwc = false;
var start = 0;
for (var i = 0; i <= subject.Length; i++)
{
if (i < subject.Length && subject[i] != Btsep)
continue;
var tokenLen = i - start;
if (tokenLen == 0 || sfwc) return false;
if (tokenLen > 1)
{
var t = subject.AsSpan(start, tokenLen);
if (t.ContainsAny(" \t\n\r\f"))
return false;
}
else
{
switch (subject[start])
{
case Fwc: sfwc = true; break;
case ' ': case '\t': case '\n': case '\r': case '\f': return false;
}
}
start = i + 1;
}
return true;
}
private static bool IsValidLiteralSubjectTokens(string[] tokens)
{
foreach (var t in tokens)
{
if (t.Length == 0) return false;
if (t.Length > 1) continue;
if (t[0] == Pwc || t[0] == Fwc) return false;
}
return true;
}
/// Determines if two subjects could both match a single literal subject.
public static bool SubjectsCollide(string subj1, string subj2)
{
if (subj1 == subj2) return true;
var toks1 = subj1.Split(Btsep);
var toks2 = subj2.Split(Btsep);
AnalyzeTokens(toks1, out var pwc1, out var fwc1);
AnalyzeTokens(toks2, out var pwc2, out var fwc2);
bool l1 = !(pwc1 || fwc1), l2 = !(pwc2 || fwc2);
if (l1 && l2) return subj1 == subj2;
if (l1 && !l2) return IsSubsetMatch(toks1, subj2);
if (l2 && !l1) return IsSubsetMatch(toks2, subj1);
if (!fwc1 && !fwc2 && toks1.Length != toks2.Length) return false;
if (toks1.Length != toks2.Length)
{
if ((toks1.Length < toks2.Length && !fwc1) || (toks2.Length < toks1.Length && !fwc2))
return false;
}
var stop = Math.Min(toks1.Length, toks2.Length);
for (int i = 0; i < stop; i++)
{
if (!TokensCanMatch(toks1[i], toks2[i]))
return false;
}
return true;
}
/// Returns true if the subject matches the filter. Mirrors SubjectMatchesFilter.
public static bool SubjectMatchesFilter(string subject, string filter) =>
SubjectIsSubsetMatch(subject, filter);
/// Matches a literal subject against a potentially-wildcarded subject. Used in the cache layer.
public static bool MatchLiteral(string literal, string subject)
{
int li = 0;
int ll = literal.Length;
int ls = subject.Length;
for (int i = 0; i < ls; i++)
{
if (li >= ll) return false;
switch (subject[i])
{
case Pwc:
if (i == 0 || subject[i - 1] == Btsep)
{
if (i == ls - 1)
{
while (true)
{
if (li >= ll) return true;
if (literal[li] == Btsep) return false;
li++;
}
}
else if (subject[i + 1] == Btsep)
{
while (true)
{
if (li >= ll) return false;
if (literal[li] == Btsep) break;
li++;
}
i++;
}
}
break;
case Fwc:
if ((i == 0 || subject[i - 1] == Btsep) && i == ls - 1)
return true;
break;
}
if (subject[i] != literal[li]) return false;
li++;
}
return li >= ll;
}
/// Returns the number of dot-separated tokens in a subject.
public static int NumTokens(string subject)
{
if (subject.Length == 0) return 0;
int count = 0;
for (int i = 0; i < subject.Length; i++)
if (subject[i] == Btsep) count++;
return count + 1;
}
/// Returns the 1-based indexed token from a subject.
public static string TokenAt(string subject, int index)
{
int ti = 1, start = 0;
for (int i = 0; i < subject.Length; i++)
{
if (subject[i] == Btsep)
{
if (ti == index) return subject[start..i];
start = i + 1;
ti++;
}
}
if (ti == index) return subject[start..];
return string.Empty;
}
internal static bool SubjectIsSubsetMatch(string subject, string test)
{
var tts = TokenizeIntoArray(subject);
return IsSubsetMatch(tts, test);
}
internal static bool IsSubsetMatch(string[] tokens, string test)
{
var tts = TokenizeIntoArray(test);
return IsSubsetMatchTokenized(tokens, tts);
}
internal static bool IsSubsetMatchTokenized(string[] tokens, string[] test)
{
for (int i = 0; i < test.Length; i++)
{
if (i >= tokens.Length) return false;
var t2 = test[i];
if (t2.Length == 0) return false;
if (t2[0] == Fwc && t2.Length == 1) return true;
var t1 = tokens[i];
if (t1.Length == 0 || (t1[0] == Fwc && t1.Length == 1)) return false;
if (t1[0] == Pwc && t1.Length == 1)
{
if (!(t2[0] == Pwc && t2.Length == 1)) return false;
if (i >= test.Length) return true;
continue;
}
if (t2[0] != Pwc && string.Compare(t1, t2, StringComparison.Ordinal) != 0)
return false;
}
return tokens.Length == test.Length;
}
private static void AnalyzeTokens(string[] tokens, out bool hasPWC, out bool hasFWC)
{
hasPWC = false;
hasFWC = false;
foreach (var t in tokens)
{
if (t.Length == 0 || t.Length > 1) continue;
switch (t[0])
{
case Pwc: hasPWC = true; break;
case Fwc: hasFWC = true; break;
}
}
}
private static bool TokensCanMatch(string t1, string t2)
{
if (t1.Length == 0 || t2.Length == 0) return false;
if (t1[0] == Pwc || t2[0] == Pwc || t1[0] == Fwc || t2[0] == Fwc) return true;
return t1 == t2;
}
// -------------------------------------------------------------------------
// Nested types: SublistNode, SublistLevel, NotifyMaps
// -------------------------------------------------------------------------
internal sealed class SublistNode
{
public readonly Dictionary PSubs = new(ReferenceEqualityComparer.Instance);
public Dictionary>? QSubs;
public List? PList;
public SublistLevel? Next;
/// Factory method matching Go's newNode().
public static SublistNode NewNode() => new();
public bool IsEmpty()
{
return PSubs.Count == 0 && (QSubs == null || QSubs.Count == 0) &&
(Next == null || Next.NumNodes() == 0);
}
}
internal sealed class SublistLevel
{
public readonly Dictionary Nodes = new();
public SublistNode? Pwc;
public SublistNode? Fwc;
/// Factory method matching Go's newLevel().
public static SublistLevel NewLevel() => new();
public int NumNodes()
{
var num = Nodes.Count;
if (Pwc != null) num++;
if (Fwc != null) num++;
return num;
}
public void PruneNode(SublistNode n, string t)
{
if (ReferenceEquals(n, Fwc)) Fwc = null;
else if (ReferenceEquals(n, Pwc)) Pwc = null;
else Nodes.Remove(t);
}
}
private sealed class NotifyMaps
{
public readonly Dictionary>> Insert = new();
public readonly Dictionary>> Remove = new();
}
private readonly struct LevelNodeToken
{
public readonly SublistLevel Level;
public readonly SublistNode Node;
public readonly string Token;
public LevelNodeToken(SublistLevel level, SublistNode node, string token)
{
Level = level;
Node = node;
Token = token;
}
}
}
///
/// Result of a subscription match containing plain and queue subscribers.
/// Mirrors Go's SublistResult.
///
public sealed class SubscriptionIndexResult
{
public List PSubs { get; } = new();
public List> QSubs { get; } = new();
/// Deep copies this result. Mirrors copyResult.
public SubscriptionIndexResult Copy()
{
var nr = new SubscriptionIndexResult();
nr.PSubs.AddRange(PSubs);
foreach (var qr in QSubs)
nr.QSubs.Add(new List(qr));
return nr;
}
/// Adds a subscription to a copy of this result. Mirrors addSubToResult.
public SubscriptionIndexResult AddSubToResult(Subscription sub)
{
var nr = Copy();
if (sub.Queue == null || sub.Queue.Length == 0)
{
nr.PSubs.Add(sub);
}
else
{
var i = SubscriptionIndex.FindQSlot(sub.Queue, nr.QSubs);
if (i >= 0)
{
nr.QSubs[i].Add(sub);
}
else
{
nr.QSubs.Add(new List { sub });
}
}
return nr;
}
}
///
/// Public statistics for the subscription index. Mirrors Go's SublistStats.
///
public sealed class SublistStats
{
public uint NumSubs { get; set; }
public uint NumCache { get; set; }
public ulong NumInserts { get; set; }
public ulong NumRemoves { get; set; }
public ulong NumMatches { get; set; }
public double CacheHitRate { get; set; }
public uint MaxFanout { get; set; }
public double AvgFanout { get; set; }
// Internal fields used for aggregation.
internal int TotFanout { get; set; }
internal int CacheCnt { get; set; }
internal ulong CacheHitsRaw { get; set; }
/// Aggregates another stats object into this one. Mirrors SublistStats.add.
public void Add(SublistStats stat)
{
NumSubs += stat.NumSubs;
NumCache += stat.NumCache;
NumInserts += stat.NumInserts;
NumRemoves += stat.NumRemoves;
NumMatches += stat.NumMatches;
CacheHitsRaw += stat.CacheHitsRaw;
if (MaxFanout < stat.MaxFanout) MaxFanout = stat.MaxFanout;
TotFanout += stat.TotFanout;
CacheCnt += stat.CacheCnt;
if (TotFanout > 0)
AvgFanout = (double)TotFanout / CacheCnt;
if (NumMatches > 0)
CacheHitRate = (double)CacheHitsRaw / NumMatches;
}
}