Files
natsdotnet/src/NATS.Server/Subscriptions/SubList.cs
2026-03-13 09:53:37 -04:00

1372 lines
41 KiB
C#

using System.Text;
namespace NATS.Server.Subscriptions;
/// <summary>
/// SubList is a routing mechanism to handle subject distribution and
/// provides a facility to match subjects from published messages to
/// interested subscribers. Subscribers can have wildcard subjects to
/// match multiple published subjects.
/// </summary>
public sealed class SubList : IDisposable
{
private const int CacheMax = 1024;
private const int CacheSweep = 256;
private readonly ReaderWriterLockSlim _lock = new();
private readonly TrieLevel _root = new();
private readonly SubListCacheSweeper _sweeper = new();
private readonly Dictionary<RoutedSubKey, RemoteSubscription> _remoteSubs = [];
private Dictionary<string, CachedResult>? _cache = new(StringComparer.Ordinal);
private uint _count;
private volatile bool _disposed;
private long _generation;
private ulong _matches;
private ulong _cacheHits;
private ulong _inserts;
private ulong _removes;
private int _highFanoutNodes;
private Action<bool>? _interestStateNotification;
private readonly Dictionary<string, List<Action<bool>>> _queueInsertNotifications = new(StringComparer.Ordinal);
private readonly Dictionary<string, List<Action<bool>>> _queueRemoveNotifications = new(StringComparer.Ordinal);
private readonly record struct CachedResult(SubListResult Result, long Generation);
public event Action<InterestChange>? InterestChanged;
public SubList()
: this(enableCache: true)
{
}
public SubList(bool enableCache)
{
if (!enableCache)
_cache = null;
}
public static SubList NewSublistNoCache() => new(enableCache: false);
public bool CacheEnabled() => _cache != null;
public void RegisterNotification(Action<bool> callback) => _interestStateNotification = callback;
public void ClearNotification() => _interestStateNotification = null;
public bool RegisterQueueNotification(string subject, string queue, Action<bool> callback)
{
if (callback == null || string.IsNullOrWhiteSpace(subject) || string.IsNullOrWhiteSpace(queue))
return false;
if (SubjectMatch.SubjectHasWildcard(subject) || SubjectMatch.SubjectHasWildcard(queue))
return false;
bool hasInterest;
var key = QueueNotifyKey(subject, queue);
_lock.EnterWriteLock();
try
{
hasInterest = HasExactQueueInterestNoLock(subject, queue);
var map = hasInterest ? _queueRemoveNotifications : _queueInsertNotifications;
if (!AddQueueNotifyNoLock(map, key, callback))
return false;
}
finally
{
_lock.ExitWriteLock();
}
callback(hasInterest);
return true;
}
public bool ClearQueueNotification(string subject, string queue, Action<bool> callback)
{
var key = QueueNotifyKey(subject, queue);
_lock.EnterWriteLock();
try
{
var removed = RemoveQueueNotifyNoLock(_queueRemoveNotifications, key, callback);
removed |= RemoveQueueNotifyNoLock(_queueInsertNotifications, key, callback);
return removed;
}
finally
{
_lock.ExitWriteLock();
}
}
public void Dispose()
{
_disposed = true;
_lock.Dispose();
}
public uint Count
{
get
{
_lock.EnterReadLock();
try { return _count; }
finally { _lock.ExitReadLock(); }
}
}
/// <summary>
/// Returns all subscriptions in the trie. For monitoring only.
/// </summary>
public List<Subscription> GetAllSubscriptions()
{
_lock.EnterReadLock();
try
{
var result = new List<Subscription>();
CollectAll(_root, result);
return result;
}
finally
{
_lock.ExitReadLock();
}
}
private static void CollectAll(TrieLevel level, List<Subscription> result)
{
foreach (var (_, node) in level.Nodes)
{
foreach (var sub in node.PlainSubs) result.Add(sub);
foreach (var (_, qset) in node.QueueSubs)
foreach (var sub in qset) result.Add(sub);
if (node.Next != null) CollectAll(node.Next, result);
}
if (level.Pwc != null)
{
foreach (var sub in level.Pwc.PlainSubs) result.Add(sub);
foreach (var (_, qset) in level.Pwc.QueueSubs)
foreach (var sub in qset) result.Add(sub);
if (level.Pwc.Next != null) CollectAll(level.Pwc.Next, result);
}
if (level.Fwc != null)
{
foreach (var sub in level.Fwc.PlainSubs) result.Add(sub);
foreach (var (_, qset) in level.Fwc.QueueSubs)
foreach (var sub in qset) result.Add(sub);
if (level.Fwc.Next != null) CollectAll(level.Fwc.Next, result);
}
}
/// <summary>
/// Returns the current number of entries in the cache.
/// </summary>
public int CacheCount
{
get
{
_lock.EnterReadLock();
try { return _cache?.Count ?? 0; }
finally { _lock.ExitReadLock(); }
}
}
internal int HighFanoutNodeCountForTest => Volatile.Read(ref _highFanoutNodes);
internal Task TriggerCacheSweepAsyncForTest() => _sweeper.TriggerSweepAsync(SweepCache);
public void ApplyRemoteSub(RemoteSubscription sub)
{
_lock.EnterWriteLock();
try
{
var key = RoutedSubKey.FromRemoteSubscription(sub);
var changed = false;
if (sub.IsRemoval)
{
changed = _remoteSubs.Remove(key);
if (changed)
{
InterestChanged?.Invoke(new InterestChange(
InterestChangeKind.RemoteRemoved,
sub.Subject,
sub.Queue,
sub.Account));
}
}
else
{
if (!_remoteSubs.TryGetValue(key, out var existing) || existing != sub)
{
_remoteSubs[key] = sub;
changed = true;
InterestChanged?.Invoke(new InterestChange(
InterestChangeKind.RemoteAdded,
sub.Subject,
sub.Queue,
sub.Account));
}
}
if (changed)
Interlocked.Increment(ref _generation);
}
finally
{
_lock.ExitWriteLock();
}
}
public void UpdateRemoteQSub(RemoteSubscription sub)
{
if (sub.Queue == null)
return;
_lock.EnterWriteLock();
try
{
var key = RoutedSubKey.FromRemoteSubscription(sub);
if (!_remoteSubs.TryGetValue(key, out var existing))
return;
var nextWeight = Math.Max(1, sub.QueueWeight);
if (existing.QueueWeight == nextWeight)
return;
_remoteSubs[key] = existing with { QueueWeight = nextWeight };
Interlocked.Increment(ref _generation);
}
finally
{
_lock.ExitWriteLock();
}
}
public int RemoveRemoteSubs(string routeId)
{
_lock.EnterWriteLock();
try
{
var removed = 0;
foreach (var kvp in _remoteSubs.ToArray())
{
if (!string.Equals(kvp.Key.RouteId, routeId, StringComparison.Ordinal))
continue;
if (_remoteSubs.Remove(kvp.Key))
{
removed++;
InterestChanged?.Invoke(new InterestChange(
InterestChangeKind.RemoteRemoved,
kvp.Value.Subject,
kvp.Value.Queue,
kvp.Value.Account));
}
}
if (removed > 0)
Interlocked.Increment(ref _generation);
return removed;
}
finally
{
_lock.ExitWriteLock();
}
}
public int RemoveRemoteSubsForAccount(string routeId, string account)
{
_lock.EnterWriteLock();
try
{
var removed = 0;
foreach (var kvp in _remoteSubs.ToArray())
{
if (!string.Equals(kvp.Key.RouteId, routeId, StringComparison.Ordinal)
|| !string.Equals(kvp.Key.Account, account, StringComparison.Ordinal))
{
continue;
}
if (_remoteSubs.Remove(kvp.Key))
{
removed++;
InterestChanged?.Invoke(new InterestChange(
InterestChangeKind.RemoteRemoved,
kvp.Value.Subject,
kvp.Value.Queue,
kvp.Value.Account));
}
}
if (removed > 0)
Interlocked.Increment(ref _generation);
return removed;
}
finally
{
_lock.ExitWriteLock();
}
}
public bool HasRemoteInterest(string subject)
=> HasRemoteInterest("$G", subject);
public bool HasRemoteInterest(string account, string subject)
{
_lock.EnterReadLock();
try
{
foreach (var remoteSub in _remoteSubs.Values)
{
if (remoteSub.IsRemoval)
continue;
if (!string.Equals(remoteSub.Account, account, StringComparison.Ordinal))
continue;
if (SubjectMatch.MatchLiteral(subject, remoteSub.Subject))
return true;
}
return false;
}
finally
{
_lock.ExitReadLock();
}
}
public void Insert(Subscription sub)
{
var subject = sub.Subject;
_lock.EnterWriteLock();
try
{
var hadInterest = _count > 0;
var level = _root;
TrieNode? node = null;
bool sawFwc = false;
foreach (var token in new TokenEnumerator(subject))
{
if (token.Length == 0 || sawFwc)
throw new ArgumentException("Invalid subject", nameof(sub));
if (token.Length == 1 && token[0] == SubjectMatch.Pwc)
{
node = level.Pwc ??= new TrieNode();
}
else if (token.Length == 1 && token[0] == SubjectMatch.Fwc)
{
node = level.Fwc ??= new TrieNode();
sawFwc = true;
}
else
{
if (!TryGetLiteralNode(level, token, out _, out node))
{
var key = token.ToString();
node = new TrieNode();
level.Nodes[key] = node;
}
}
node.Next ??= new TrieLevel();
level = node.Next;
}
if (node == null)
throw new ArgumentException("Invalid subject", nameof(sub));
if (sub.Queue == null)
{
node.PlainSubs.Add(sub);
if (!node.PackedListEnabled && node.PlainSubs.Count > 256)
{
node.PackedListEnabled = true;
Interlocked.Increment(ref _highFanoutNodes);
}
}
else
{
if (!node.QueueSubs.TryGetValue(sub.Queue, out var qset))
{
qset = [];
node.QueueSubs[sub.Queue] = qset;
}
qset.Add(sub);
}
_count++;
_inserts++;
Interlocked.Increment(ref _generation);
if (sub.Queue != null && _queueInsertNotifications.Count > 0)
CheckForQueueInsertNotificationNoLock(sub.Subject, sub.Queue);
if (!hadInterest && _count > 0)
_interestStateNotification?.Invoke(true);
InterestChanged?.Invoke(new InterestChange(
InterestChangeKind.LocalAdded,
sub.Subject,
sub.Queue,
sub.Client?.Account?.Name ?? "$G"));
}
finally
{
_lock.ExitWriteLock();
}
}
public void Remove(Subscription sub)
{
if (_disposed) return;
_lock.EnterWriteLock();
try
{
var hadInterest = _count > 0;
if (RemoveInternal(sub))
{
_removes++;
Interlocked.Increment(ref _generation);
if (sub.Queue != null && _queueRemoveNotifications.Count > 0)
CheckForQueueRemoveNotificationNoLock(sub.Subject, sub.Queue);
if (hadInterest && _count == 0)
_interestStateNotification?.Invoke(false);
InterestChanged?.Invoke(new InterestChange(
InterestChangeKind.LocalRemoved,
sub.Subject,
sub.Queue,
sub.Client?.Account?.Name ?? "$G"));
}
}
finally
{
_lock.ExitWriteLock();
}
}
/// <summary>
/// Core remove logic without lock acquisition or generation bumping.
/// Assumes write lock is held. Returns true if a subscription was actually removed.
/// </summary>
private bool RemoveInternal(Subscription sub)
{
var level = _root;
TrieNode? node = null;
bool sawFwc = false;
var pathList = new List<(TrieLevel level, TrieNode node, string token, bool isPwc, bool isFwc)>();
foreach (var token in new TokenEnumerator(sub.Subject))
{
if (token.Length == 0 || sawFwc)
return false;
bool isPwc = token.Length == 1 && token[0] == SubjectMatch.Pwc;
bool isFwc = token.Length == 1 && token[0] == SubjectMatch.Fwc;
if (isPwc)
{
node = level.Pwc;
}
else if (isFwc)
{
node = level.Fwc;
sawFwc = true;
}
else
{
if (!TryGetLiteralNode(level, token, out var existingToken, out node))
return false;
pathList.Add((level, node, existingToken, isPwc: false, isFwc: false));
if (node.Next == null)
return false; // corrupted trie state
level = node.Next;
continue;
}
if (node == null)
return false; // not found
var tokenStr = isPwc ? "*" : ">";
pathList.Add((level, node, tokenStr, isPwc, isFwc));
if (node.Next == null)
return false; // corrupted trie state
level = node.Next;
}
if (node == null) return false;
// Remove from node
bool removed;
if (sub.Queue == null)
{
removed = node.PlainSubs.Remove(sub);
}
else
{
removed = false;
if (node.QueueSubs.TryGetValue(sub.Queue, out var qset))
{
removed = qset.Remove(sub);
if (qset.Count == 0)
node.QueueSubs.Remove(sub.Queue);
}
}
if (!removed) return false;
_count--;
// Prune empty nodes (walk backwards)
for (int i = pathList.Count - 1; i >= 0; i--)
{
var (l, n, t, isPwc, isFwc) = pathList[i];
if (n.IsEmpty)
{
if (isPwc) l.Pwc = null;
else if (isFwc) l.Fwc = null;
else l.Nodes.Remove(t);
}
}
return true;
}
public SubListResult Match(string subject)
{
Interlocked.Increment(ref _matches);
var currentGen = Interlocked.Read(ref _generation);
_lock.EnterReadLock();
try
{
if (_cache != null && _cache.TryGetValue(subject, out var cached) && cached.Generation == currentGen)
{
Interlocked.Increment(ref _cacheHits);
return cached.Result;
}
}
finally
{
_lock.ExitReadLock();
}
var tokens = Tokenize(subject);
if (tokens == null)
return SubListResult.Empty;
_lock.EnterWriteLock();
try
{
currentGen = Interlocked.Read(ref _generation);
if (_cache != null && _cache.TryGetValue(subject, out var cached) && cached.Generation == currentGen)
{
Interlocked.Increment(ref _cacheHits);
return cached.Result;
}
var plainSubs = new List<Subscription>();
var queueSubs = new List<List<Subscription>>();
MatchLevel(_root, tokens, 0, plainSubs, queueSubs);
SubListResult result;
if (plainSubs.Count == 0 && queueSubs.Count == 0)
{
result = SubListResult.Empty;
}
else
{
var queueSubsArr = new Subscription[queueSubs.Count][];
for (int i = 0; i < queueSubs.Count; i++)
queueSubsArr[i] = queueSubs[i].ToArray();
result = new SubListResult(plainSubs.ToArray(), queueSubsArr);
}
if (_cache != null)
{
_cache[subject] = new CachedResult(result, currentGen);
if (_cache.Count > CacheMax)
_sweeper.ScheduleSweep(SweepCache);
}
return result;
}
finally
{
_lock.ExitWriteLock();
}
}
public SubListResult MatchBytes(ReadOnlySpan<byte> subjectUtf8)
{
return Match(Encoding.ASCII.GetString(subjectUtf8));
}
public IReadOnlyList<RemoteSubscription> MatchRemote(string account, string subject)
{
_lock.EnterReadLock();
try
{
var expanded = new List<RemoteSubscription>();
foreach (var remoteSub in _remoteSubs.Values)
{
if (remoteSub.IsRemoval)
continue;
if (!string.Equals(remoteSub.Account, account, StringComparison.Ordinal))
continue;
if (!SubjectMatch.MatchLiteral(subject, remoteSub.Subject))
continue;
var weight = Math.Max(1, remoteSub.QueueWeight);
for (var i = 0; i < weight; i++)
expanded.Add(remoteSub);
}
return expanded;
}
finally
{
_lock.ExitReadLock();
}
}
private static string QueueNotifyKey(string subject, string queue) => $"{subject} {queue}";
private static bool AddQueueNotifyNoLock(Dictionary<string, List<Action<bool>>> map, string key, Action<bool> callback)
{
if (!map.TryGetValue(key, out var callbacks))
{
callbacks = [];
map[key] = callbacks;
}
else if (callbacks.Contains(callback))
{
return false;
}
callbacks.Add(callback);
return true;
}
private static bool RemoveQueueNotifyNoLock(Dictionary<string, List<Action<bool>>> map, string key, Action<bool> callback)
{
if (!map.TryGetValue(key, out var callbacks))
return false;
var removed = callbacks.Remove(callback);
if (callbacks.Count == 0)
map.Remove(key);
return removed;
}
private static bool TryGetLiteralNode(TrieLevel level, ReadOnlySpan<char> token, out string existingToken, out TrieNode node)
{
foreach (var (candidate, existingNode) in level.Nodes)
{
if (!SubjectMatch.TokenEquals(token, candidate))
continue;
existingToken = candidate;
node = existingNode;
return true;
}
existingToken = string.Empty;
node = null!;
return false;
}
private bool HasExactQueueInterestNoLock(string subject, string queue)
{
var subs = new List<Subscription>();
CollectAll(_root, subs);
foreach (var sub in subs)
{
if (sub.Queue != null
&& string.Equals(sub.Subject, subject, StringComparison.Ordinal)
&& string.Equals(sub.Queue, queue, StringComparison.Ordinal))
{
return true;
}
}
return false;
}
private void CheckForQueueInsertNotificationNoLock(string subject, string queue)
{
var key = QueueNotifyKey(subject, queue);
if (!_queueInsertNotifications.TryGetValue(key, out var callbacks) || callbacks.Count == 0)
return;
foreach (var callback in callbacks)
callback(true);
if (!_queueRemoveNotifications.TryGetValue(key, out var removeCallbacks))
{
removeCallbacks = [];
_queueRemoveNotifications[key] = removeCallbacks;
}
removeCallbacks.AddRange(callbacks);
_queueInsertNotifications.Remove(key);
}
private void CheckForQueueRemoveNotificationNoLock(string subject, string queue)
{
var key = QueueNotifyKey(subject, queue);
if (!_queueRemoveNotifications.TryGetValue(key, out var callbacks) || callbacks.Count == 0)
return;
if (HasExactQueueInterestNoLock(subject, queue))
return;
foreach (var callback in callbacks)
callback(false);
if (!_queueInsertNotifications.TryGetValue(key, out var insertCallbacks))
{
insertCallbacks = [];
_queueInsertNotifications[key] = insertCallbacks;
}
insertCallbacks.AddRange(callbacks);
_queueRemoveNotifications.Remove(key);
}
private void SweepCache()
{
_lock.EnterWriteLock();
try
{
if (_cache == null || _cache.Count <= CacheMax)
return;
var removeCount = Math.Min(CacheSweep, _cache.Count - CacheMax);
var keys = _cache.Keys.Take(removeCount).ToArray();
foreach (var key in keys)
_cache.Remove(key);
}
finally
{
_lock.ExitWriteLock();
}
}
/// <summary>
/// Tokenize the subject into an array of token strings.
/// Returns null if the subject is invalid (empty tokens).
/// </summary>
private static string[]? Tokenize(string subject)
{
if (string.IsNullOrEmpty(subject))
return null;
var tokens = new List<string>();
int start = 0;
for (int i = 0; i <= subject.Length; i++)
{
if (i == subject.Length || subject[i] == SubjectMatch.Sep)
{
if (i - start == 0)
return null; // empty token
tokens.Add(subject[start..i]);
start = i + 1;
}
}
return tokens.Count > 0 ? tokens.ToArray() : null;
}
/// <summary>
/// Recursively descend into the trie matching tokens.
/// This follows the Go matchLevel() algorithm closely.
/// </summary>
private static void MatchLevel(TrieLevel? level, string[] tokens, int tokenIndex,
List<Subscription> plainSubs, List<List<Subscription>> queueSubs)
{
TrieNode? pwc = null;
TrieNode? node = null;
for (int i = tokenIndex; i < tokens.Length; i++)
{
if (level == null)
return;
// Full wildcard (>) at this level matches all remaining tokens.
if (level.Fwc != null)
AddNodeToResults(level.Fwc, plainSubs, queueSubs);
// Partial wildcard (*) -- recurse with remaining tokens.
pwc = level.Pwc;
if (pwc != null)
MatchLevel(pwc.Next, tokens, i + 1, plainSubs, queueSubs);
// Literal match
node = null;
if (level.Nodes.TryGetValue(tokens[i], out var found))
{
node = found;
level = node.Next;
}
else
{
level = null;
}
}
// After processing all tokens, add results from the final literal node.
if (node != null)
AddNodeToResults(node, plainSubs, queueSubs);
// Also add results from the partial wildcard at the last level,
// which handles the case where * matches the final token.
if (pwc != null)
AddNodeToResults(pwc, plainSubs, queueSubs);
}
private static void AddNodeToResults(TrieNode node,
List<Subscription> plainSubs, List<List<Subscription>> queueSubs)
{
// Add plain subscriptions
foreach (var sub in node.PlainSubs)
plainSubs.Add(sub);
// Add queue subscriptions grouped by queue name
foreach (var (queueName, subs) in node.QueueSubs)
{
if (subs.Count == 0) continue;
// Find existing queue group or create new one
List<Subscription>? existing = null;
foreach (var qs in queueSubs)
{
if (qs.Count > 0 && qs[0].Queue == queueName)
{
existing = qs;
break;
}
}
if (existing == null)
{
existing = new List<Subscription>();
queueSubs.Add(existing);
}
existing.AddRange(subs);
}
}
public SubListStats Stats()
{
_lock.EnterReadLock();
uint numSubs, numCache;
ulong inserts, removes;
try
{
numSubs = _count;
numCache = (uint)(_cache?.Count ?? 0);
inserts = _inserts;
removes = _removes;
}
finally
{
_lock.ExitReadLock();
}
var matches = Interlocked.Read(ref _matches);
var cacheHits = Interlocked.Read(ref _cacheHits);
var hitRate = matches > 0 ? (double)cacheHits / matches : 0.0;
uint maxFanout = 0;
long totalFanout = 0;
int cacheEntries = 0;
_lock.EnterReadLock();
try
{
if (_cache != null)
{
foreach (var (_, entry) in _cache)
{
var r = entry.Result;
var f = r.PlainSubs.Length + r.QueueSubs.Length;
totalFanout += f;
if (f > maxFanout) maxFanout = (uint)f;
cacheEntries++;
}
}
}
finally
{
_lock.ExitReadLock();
}
return new SubListStats
{
NumSubs = numSubs,
NumCache = numCache,
NumInserts = inserts,
NumRemoves = removes,
NumMatches = matches,
CacheHitRate = hitRate,
MaxFanout = maxFanout,
AvgFanout = cacheEntries > 0 ? (double)totalFanout / cacheEntries : 0.0,
TotalFanout = (int)totalFanout,
CacheEntries = cacheEntries,
CacheHits = cacheHits,
};
}
public bool HasInterest(string subject)
{
var currentGen = Interlocked.Read(ref _generation);
_lock.EnterReadLock();
try
{
if (_cache != null && _cache.TryGetValue(subject, out var cached) && cached.Generation == currentGen)
{
var r = cached.Result;
return r.PlainSubs.Length > 0 || r.QueueSubs.Length > 0;
}
}
finally
{
_lock.ExitReadLock();
}
var tokens = Tokenize(subject);
if (tokens == null) return false;
_lock.EnterReadLock();
try
{
return HasInterestLevel(_root, tokens, 0);
}
finally
{
_lock.ExitReadLock();
}
}
public (int plainCount, int queueCount) NumInterest(string subject)
{
var tokens = Tokenize(subject);
if (tokens == null) return (0, 0);
_lock.EnterReadLock();
try
{
int np = 0, nq = 0;
CountInterestLevel(_root, tokens, 0, ref np, ref nq);
return (np, nq);
}
finally
{
_lock.ExitReadLock();
}
}
public void RemoveBatch(IEnumerable<Subscription> subs)
{
_lock.EnterWriteLock();
try
{
var wasEnabled = _cache != null;
_cache = null;
foreach (var sub in subs)
{
if (RemoveInternal(sub))
{
_removes++;
if (sub.Queue != null && _queueRemoveNotifications.Count > 0)
CheckForQueueRemoveNotificationNoLock(sub.Subject, sub.Queue);
}
}
Interlocked.Increment(ref _generation);
if (wasEnabled)
_cache = new Dictionary<string, CachedResult>(StringComparer.Ordinal);
}
finally
{
_lock.ExitWriteLock();
}
}
public IReadOnlyList<Subscription> All()
{
var subs = new List<Subscription>();
_lock.EnterReadLock();
try
{
CollectAllSubs(_root, subs);
}
finally
{
_lock.ExitReadLock();
}
return subs;
}
public IReadOnlyList<Subscription> LocalSubs(bool includeLeafHubs = false)
{
var subs = new List<Subscription>();
_lock.EnterReadLock();
try
{
CollectLocalSubs(_root, subs, includeLeafHubs);
}
finally
{
_lock.ExitReadLock();
}
return subs;
}
internal int NumLevels()
{
_lock.EnterReadLock();
try
{
return VisitLevel(_root, 0);
}
finally
{
_lock.ExitReadLock();
}
}
public SubListResult ReverseMatch(string subject)
{
var tokens = Tokenize(subject);
if (tokens == null)
return SubListResult.Empty;
_lock.EnterReadLock();
try
{
var plainSubs = new List<Subscription>();
var queueSubs = new List<List<Subscription>>();
ReverseMatchLevel(_root, tokens, 0, plainSubs, queueSubs);
if (plainSubs.Count == 0 && queueSubs.Count == 0)
return SubListResult.Empty;
var queueSubsArr = new Subscription[queueSubs.Count][];
for (int i = 0; i < queueSubs.Count; i++)
queueSubsArr[i] = queueSubs[i].ToArray();
return new SubListResult(plainSubs.ToArray(), queueSubsArr);
}
finally
{
_lock.ExitReadLock();
}
}
private static bool HasInterestLevel(TrieLevel? level, string[] tokens, int tokenIndex)
{
TrieNode? pwc = null;
TrieNode? node = null;
for (int i = tokenIndex; i < tokens.Length; i++)
{
if (level == null) return false;
if (level.Fwc != null && NodeHasInterest(level.Fwc)) return true;
pwc = level.Pwc;
if (pwc != null && HasInterestLevel(pwc.Next, tokens, i + 1)) return true;
node = null;
if (level.Nodes.TryGetValue(tokens[i], out var found))
{
node = found;
level = node.Next;
}
else
{
level = null;
}
}
if (node != null && NodeHasInterest(node)) return true;
if (pwc != null && NodeHasInterest(pwc)) return true;
return false;
}
private static bool NodeHasInterest(TrieNode node)
{
return node.PlainSubs.Count > 0 || node.QueueSubs.Count > 0;
}
private static void CountInterestLevel(TrieLevel? level, string[] tokens, int tokenIndex,
ref int np, ref int nq)
{
TrieNode? pwc = null;
TrieNode? node = null;
for (int i = tokenIndex; i < tokens.Length; i++)
{
if (level == null) return;
if (level.Fwc != null) AddNodeCounts(level.Fwc, ref np, ref nq);
pwc = level.Pwc;
if (pwc != null) CountInterestLevel(pwc.Next, tokens, i + 1, ref np, ref nq);
node = null;
if (level.Nodes.TryGetValue(tokens[i], out var found))
{
node = found;
level = node.Next;
}
else
{
level = null;
}
}
if (node != null) AddNodeCounts(node, ref np, ref nq);
if (pwc != null) AddNodeCounts(pwc, ref np, ref nq);
}
private static void AddNodeCounts(TrieNode node, ref int np, ref int nq)
{
np += node.PlainSubs.Count;
foreach (var (_, qset) in node.QueueSubs)
nq += qset.Count;
}
private static void CollectAllSubs(TrieLevel level, List<Subscription> subs)
{
foreach (var (_, node) in level.Nodes)
{
foreach (var sub in node.PlainSubs)
subs.Add(sub);
foreach (var (_, qset) in node.QueueSubs)
foreach (var sub in qset)
subs.Add(sub);
if (node.Next != null)
CollectAllSubs(node.Next, subs);
}
if (level.Pwc != null)
{
foreach (var sub in level.Pwc.PlainSubs)
subs.Add(sub);
foreach (var (_, qset) in level.Pwc.QueueSubs)
foreach (var sub in qset)
subs.Add(sub);
if (level.Pwc.Next != null)
CollectAllSubs(level.Pwc.Next, subs);
}
if (level.Fwc != null)
{
foreach (var sub in level.Fwc.PlainSubs)
subs.Add(sub);
foreach (var (_, qset) in level.Fwc.QueueSubs)
foreach (var sub in qset)
subs.Add(sub);
if (level.Fwc.Next != null)
CollectAllSubs(level.Fwc.Next, subs);
}
}
private static void CollectLocalSubs(TrieLevel level, List<Subscription> subs, bool includeLeafHubs)
{
foreach (var (_, node) in level.Nodes)
{
AddNodeLocalSubs(node, subs, includeLeafHubs);
if (node.Next != null)
CollectLocalSubs(node.Next, subs, includeLeafHubs);
}
if (level.Pwc != null)
{
AddNodeLocalSubs(level.Pwc, subs, includeLeafHubs);
if (level.Pwc.Next != null)
CollectLocalSubs(level.Pwc.Next, subs, includeLeafHubs);
}
if (level.Fwc != null)
{
AddNodeLocalSubs(level.Fwc, subs, includeLeafHubs);
if (level.Fwc.Next != null)
CollectLocalSubs(level.Fwc.Next, subs, includeLeafHubs);
}
}
private static void AddNodeLocalSubs(TrieNode node, List<Subscription> subs, bool includeLeafHubs)
{
foreach (var sub in node.PlainSubs)
AddLocalSub(sub, subs, includeLeafHubs);
foreach (var (_, qset) in node.QueueSubs)
foreach (var sub in qset)
AddLocalSub(sub, subs, includeLeafHubs);
}
private static void AddLocalSub(Subscription sub, List<Subscription> subs, bool includeLeafHubs)
{
if (sub.Client == null)
return;
var kind = sub.Client.Kind;
if (kind is global::NATS.Server.ClientKind.Client
or global::NATS.Server.ClientKind.System
or global::NATS.Server.ClientKind.JetStream
or global::NATS.Server.ClientKind.Account
|| (includeLeafHubs && kind == global::NATS.Server.ClientKind.Leaf))
{
subs.Add(sub);
}
}
private static int VisitLevel(TrieLevel? level, int depth)
{
if (level == null || (level.Nodes.Count == 0 && level.Pwc == null && level.Fwc == null))
return depth;
depth++;
var maxDepth = depth;
foreach (var (_, node) in level.Nodes)
{
var childDepth = VisitLevel(node.Next, depth);
if (childDepth > maxDepth)
maxDepth = childDepth;
}
if (level.Pwc != null)
{
var pwcDepth = VisitLevel(level.Pwc.Next, depth);
if (pwcDepth > maxDepth)
maxDepth = pwcDepth;
}
if (level.Fwc != null)
{
var fwcDepth = VisitLevel(level.Fwc.Next, depth);
if (fwcDepth > maxDepth)
maxDepth = fwcDepth;
}
return maxDepth;
}
private static void ReverseMatchLevel(TrieLevel? level, string[] tokens, int tokenIndex,
List<Subscription> plainSubs, List<List<Subscription>> queueSubs)
{
if (level == null || tokenIndex >= tokens.Length)
return;
var token = tokens[tokenIndex];
bool isLast = tokenIndex == tokens.Length - 1;
if (token == ">")
{
CollectAllNodes(level, plainSubs, queueSubs);
return;
}
if (token == "*")
{
foreach (var (_, node) in level.Nodes)
{
if (isLast)
AddNodeToResults(node, plainSubs, queueSubs);
else
ReverseMatchLevel(node.Next, tokens, tokenIndex + 1, plainSubs, queueSubs);
}
}
else
{
if (level.Nodes.TryGetValue(token, out var node))
{
if (isLast)
AddNodeToResults(node, plainSubs, queueSubs);
else
ReverseMatchLevel(node.Next, tokens, tokenIndex + 1, plainSubs, queueSubs);
}
}
if (level.Pwc != null)
{
if (isLast)
AddNodeToResults(level.Pwc, plainSubs, queueSubs);
else
ReverseMatchLevel(level.Pwc.Next, tokens, tokenIndex + 1, plainSubs, queueSubs);
}
if (level.Fwc != null)
{
AddNodeToResults(level.Fwc, plainSubs, queueSubs);
}
}
private static void CollectAllNodes(TrieLevel level, List<Subscription> plainSubs,
List<List<Subscription>> queueSubs)
{
foreach (var (_, node) in level.Nodes)
{
AddNodeToResults(node, plainSubs, queueSubs);
if (node.Next != null)
CollectAllNodes(node.Next, plainSubs, queueSubs);
}
if (level.Pwc != null)
{
AddNodeToResults(level.Pwc, plainSubs, queueSubs);
if (level.Pwc.Next != null)
CollectAllNodes(level.Pwc.Next, plainSubs, queueSubs);
}
if (level.Fwc != null)
{
AddNodeToResults(level.Fwc, plainSubs, queueSubs);
if (level.Fwc.Next != null)
CollectAllNodes(level.Fwc.Next, plainSubs, queueSubs);
}
}
/// <summary>Enumerates '.' separated tokens in a subject without allocating.</summary>
private ref struct TokenEnumerator
{
private ReadOnlySpan<char> _remaining;
public TokenEnumerator(string subject)
{
_remaining = subject.AsSpan();
Current = default;
}
public ReadOnlySpan<char> Current { get; private set; }
public TokenEnumerator GetEnumerator() => this;
public bool MoveNext()
{
if (_remaining.IsEmpty)
return false;
int sep = _remaining.IndexOf(SubjectMatch.Sep);
if (sep < 0)
{
Current = _remaining;
_remaining = default;
}
else
{
Current = _remaining[..sep];
_remaining = _remaining[(sep + 1)..];
}
return true;
}
}
private sealed class TrieLevel
{
public readonly Dictionary<string, TrieNode> Nodes = new(StringComparer.Ordinal);
public TrieNode? Pwc; // partial wildcard (*)
public TrieNode? Fwc; // full wildcard (>)
}
private sealed class TrieNode
{
public TrieLevel? Next;
public readonly HashSet<Subscription> PlainSubs = [];
public readonly Dictionary<string, HashSet<Subscription>> QueueSubs = new(StringComparer.Ordinal);
public bool PackedListEnabled;
public bool IsEmpty => PlainSubs.Count == 0 && QueueSubs.Count == 0 &&
(Next == null || (Next.Nodes.Count == 0 && Next.Pwc == null && Next.Fwc == null));
}
}