feat: add generation-based cache, Stats, HasInterest, NumInterest, RemoveBatch, All, ReverseMatch to SubList

This commit is contained in:
Joseph Doherty
2026-02-23 00:45:28 -05:00
parent 4ad821394b
commit cc0fe04f3c
3 changed files with 562 additions and 173 deletions

View File

@@ -13,9 +13,16 @@ public sealed class SubList : IDisposable
private readonly ReaderWriterLockSlim _lock = new();
private readonly TrieLevel _root = new();
private Dictionary<string, SubListResult>? _cache = new(StringComparer.Ordinal);
private Dictionary<string, CachedResult>? _cache = new(StringComparer.Ordinal);
private uint _count;
private volatile bool _disposed;
private long _generation;
private ulong _matches;
private ulong _cacheHits;
private ulong _inserts;
private ulong _removes;
private readonly record struct CachedResult(SubListResult Result, long Generation);
public void Dispose()
{
@@ -90,7 +97,8 @@ public sealed class SubList : IDisposable
}
_count++;
AddToCache(subject, sub);
_inserts++;
Interlocked.Increment(ref _generation);
}
finally
{
@@ -104,78 +112,10 @@ public sealed class SubList : IDisposable
_lock.EnterWriteLock();
try
{
var level = _root;
TrieNode? node = null;
bool sawFwc = false;
var pathList = new List<(TrieLevel level, TrieNode node, string token, bool isPwc, bool isFwc)>();
foreach (var token in new TokenEnumerator(sub.Subject))
if (RemoveInternal(sub))
{
if (token.Length == 0 || sawFwc)
return;
bool isPwc = token.Length == 1 && token[0] == SubjectMatch.Pwc;
bool isFwc = token.Length == 1 && token[0] == SubjectMatch.Fwc;
if (isPwc)
{
node = level.Pwc;
}
else if (isFwc)
{
node = level.Fwc;
sawFwc = true;
}
else
{
level.Nodes.TryGetValue(token.ToString(), out node);
}
if (node == null)
return; // not found
var tokenStr = token.ToString();
pathList.Add((level, node, tokenStr, isPwc, isFwc));
if (node.Next == null)
return; // corrupted trie state
level = node.Next;
}
if (node == null) return;
// Remove from node
bool removed;
if (sub.Queue == null)
{
removed = node.PlainSubs.Remove(sub);
}
else
{
removed = false;
if (node.QueueSubs.TryGetValue(sub.Queue, out var qset))
{
removed = qset.Remove(sub);
if (qset.Count == 0)
node.QueueSubs.Remove(sub.Queue);
}
}
if (!removed) return;
_count--;
RemoveFromCache(sub.Subject);
// Prune empty nodes (walk backwards)
for (int i = pathList.Count - 1; i >= 0; i--)
{
var (l, n, t, isPwc, isFwc) = pathList[i];
if (n.IsEmpty)
{
if (isPwc) l.Pwc = null;
else if (isFwc) l.Fwc = null;
else l.Nodes.Remove(t);
}
_removes++;
Interlocked.Increment(ref _generation);
}
}
finally
@@ -184,22 +124,107 @@ public sealed class SubList : IDisposable
}
}
/// <summary>
/// Core remove logic without lock acquisition or generation bumping.
/// Assumes write lock is held. Returns true if a subscription was actually removed.
/// </summary>
private bool RemoveInternal(Subscription sub)
{
var level = _root;
TrieNode? node = null;
bool sawFwc = false;
var pathList = new List<(TrieLevel level, TrieNode node, string token, bool isPwc, bool isFwc)>();
foreach (var token in new TokenEnumerator(sub.Subject))
{
if (token.Length == 0 || sawFwc)
return false;
bool isPwc = token.Length == 1 && token[0] == SubjectMatch.Pwc;
bool isFwc = token.Length == 1 && token[0] == SubjectMatch.Fwc;
if (isPwc)
{
node = level.Pwc;
}
else if (isFwc)
{
node = level.Fwc;
sawFwc = true;
}
else
{
level.Nodes.TryGetValue(token.ToString(), out node);
}
if (node == null)
return false; // not found
var tokenStr = token.ToString();
pathList.Add((level, node, tokenStr, isPwc, isFwc));
if (node.Next == null)
return false; // corrupted trie state
level = node.Next;
}
if (node == null) return false;
// Remove from node
bool removed;
if (sub.Queue == null)
{
removed = node.PlainSubs.Remove(sub);
}
else
{
removed = false;
if (node.QueueSubs.TryGetValue(sub.Queue, out var qset))
{
removed = qset.Remove(sub);
if (qset.Count == 0)
node.QueueSubs.Remove(sub.Queue);
}
}
if (!removed) return false;
_count--;
// Prune empty nodes (walk backwards)
for (int i = pathList.Count - 1; i >= 0; i--)
{
var (l, n, t, isPwc, isFwc) = pathList[i];
if (n.IsEmpty)
{
if (isPwc) l.Pwc = null;
else if (isFwc) l.Fwc = null;
else l.Nodes.Remove(t);
}
}
return true;
}
public SubListResult Match(string subject)
{
// Check cache under read lock first.
Interlocked.Increment(ref _matches);
var currentGen = Interlocked.Read(ref _generation);
_lock.EnterReadLock();
try
{
if (_cache != null && _cache.TryGetValue(subject, out var cached))
return cached;
if (_cache != null && _cache.TryGetValue(subject, out var cached) && cached.Generation == currentGen)
{
Interlocked.Increment(ref _cacheHits);
return cached.Result;
}
}
finally
{
_lock.ExitReadLock();
}
// Cache miss -- tokenize and match under write lock (needed for cache update).
// Tokenize the subject.
var tokens = Tokenize(subject);
if (tokens == null)
return SubListResult.Empty;
@@ -207,13 +232,15 @@ public sealed class SubList : IDisposable
_lock.EnterWriteLock();
try
{
// Re-check cache after acquiring write lock.
if (_cache != null && _cache.TryGetValue(subject, out var cached))
return cached;
currentGen = Interlocked.Read(ref _generation);
if (_cache != null && _cache.TryGetValue(subject, out var cached) && cached.Generation == currentGen)
{
Interlocked.Increment(ref _cacheHits);
return cached.Result;
}
var plainSubs = new List<Subscription>();
var queueSubs = new List<List<Subscription>>();
MatchLevel(_root, tokens, 0, plainSubs, queueSubs);
SubListResult result;
@@ -226,19 +253,14 @@ public sealed class SubList : IDisposable
var queueSubsArr = new Subscription[queueSubs.Count][];
for (int i = 0; i < queueSubs.Count; i++)
queueSubsArr[i] = queueSubs[i].ToArray();
result = new SubListResult(
plainSubs.ToArray(),
queueSubsArr);
result = new SubListResult(plainSubs.ToArray(), queueSubsArr);
}
if (_cache != null)
{
_cache[subject] = result;
_cache[subject] = new CachedResult(result, currentGen);
if (_cache.Count > CacheMax)
{
// Sweep: remove entries until at CacheSweep count.
var keys = _cache.Keys.Take(_cache.Count - CacheSweep).ToList();
foreach (var key in keys)
_cache.Remove(key);
@@ -356,119 +378,352 @@ public sealed class SubList : IDisposable
}
}
/// <summary>
/// Adds a subscription to matching cache entries.
/// Assumes write lock is held.
/// </summary>
private void AddToCache(string subject, Subscription sub)
public SubListStats Stats()
{
if (_cache == null)
return;
// If literal subject, we can do a direct lookup.
if (SubjectMatch.IsLiteral(subject))
_lock.EnterReadLock();
uint numSubs, numCache;
ulong inserts, removes;
try
{
if (_cache.TryGetValue(subject, out var r))
{
_cache[subject] = AddSubToResult(r, sub);
}
return;
numSubs = _count;
numCache = (uint)(_cache?.Count ?? 0);
inserts = _inserts;
removes = _removes;
}
finally
{
_lock.ExitReadLock();
}
// Wildcard subscription -- check all cached keys.
var keysToUpdate = new List<(string key, SubListResult result)>();
foreach (var (key, r) in _cache)
var matches = Interlocked.Read(ref _matches);
var cacheHits = Interlocked.Read(ref _cacheHits);
var hitRate = matches > 0 ? (double)cacheHits / matches : 0.0;
uint maxFanout = 0;
long totalFanout = 0;
int cacheEntries = 0;
_lock.EnterReadLock();
try
{
if (SubjectMatch.MatchLiteral(key, subject))
if (_cache != null)
{
keysToUpdate.Add((key, r));
foreach (var (_, entry) in _cache)
{
var r = entry.Result;
var f = r.PlainSubs.Length + r.QueueSubs.Length;
totalFanout += f;
if (f > maxFanout) maxFanout = (uint)f;
cacheEntries++;
}
}
}
foreach (var (key, r) in keysToUpdate)
finally
{
_cache[key] = AddSubToResult(r, sub);
_lock.ExitReadLock();
}
return new SubListStats
{
NumSubs = numSubs,
NumCache = numCache,
NumInserts = inserts,
NumRemoves = removes,
NumMatches = matches,
CacheHitRate = hitRate,
MaxFanout = maxFanout,
AvgFanout = cacheEntries > 0 ? (double)totalFanout / cacheEntries : 0.0,
};
}
public bool HasInterest(string subject)
{
var currentGen = Interlocked.Read(ref _generation);
_lock.EnterReadLock();
try
{
if (_cache != null && _cache.TryGetValue(subject, out var cached) && cached.Generation == currentGen)
{
var r = cached.Result;
return r.PlainSubs.Length > 0 || r.QueueSubs.Length > 0;
}
}
finally
{
_lock.ExitReadLock();
}
var tokens = Tokenize(subject);
if (tokens == null) return false;
_lock.EnterReadLock();
try
{
return HasInterestLevel(_root, tokens, 0);
}
finally
{
_lock.ExitReadLock();
}
}
/// <summary>
/// Removes cache entries that match the given subject.
/// Assumes write lock is held.
/// </summary>
private void RemoveFromCache(string subject)
public (int plainCount, int queueCount) NumInterest(string subject)
{
if (_cache == null)
return;
var tokens = Tokenize(subject);
if (tokens == null) return (0, 0);
// If literal subject, we can do a direct removal.
if (SubjectMatch.IsLiteral(subject))
_lock.EnterReadLock();
try
{
_cache.Remove(subject);
return;
int np = 0, nq = 0;
CountInterestLevel(_root, tokens, 0, ref np, ref nq);
return (np, nq);
}
// Wildcard subscription -- remove all matching cached keys.
var keysToRemove = new List<string>();
foreach (var key in _cache.Keys)
finally
{
if (SubjectMatch.MatchLiteral(key, subject))
{
keysToRemove.Add(key);
}
}
foreach (var key in keysToRemove)
{
_cache.Remove(key);
_lock.ExitReadLock();
}
}
/// <summary>
/// Creates a new result with the given subscription added.
/// </summary>
private static SubListResult AddSubToResult(SubListResult result, Subscription sub)
public void RemoveBatch(IEnumerable<Subscription> subs)
{
if (sub.Queue == null)
_lock.EnterWriteLock();
try
{
var newPlain = new Subscription[result.PlainSubs.Length + 1];
result.PlainSubs.CopyTo(newPlain, 0);
newPlain[^1] = sub;
return new SubListResult(newPlain, result.QueueSubs);
var wasEnabled = _cache != null;
_cache = null;
foreach (var sub in subs)
RemoveInternal(sub);
Interlocked.Increment(ref _generation);
if (wasEnabled)
_cache = new Dictionary<string, CachedResult>(StringComparer.Ordinal);
}
finally
{
_lock.ExitWriteLock();
}
}
public IReadOnlyList<Subscription> All()
{
var subs = new List<Subscription>();
_lock.EnterReadLock();
try
{
CollectAllSubs(_root, subs);
}
finally
{
_lock.ExitReadLock();
}
return subs;
}
public SubListResult ReverseMatch(string subject)
{
var tokens = Tokenize(subject);
if (tokens == null)
return SubListResult.Empty;
_lock.EnterReadLock();
try
{
var plainSubs = new List<Subscription>();
var queueSubs = new List<List<Subscription>>();
ReverseMatchLevel(_root, tokens, 0, plainSubs, queueSubs);
if (plainSubs.Count == 0 && queueSubs.Count == 0)
return SubListResult.Empty;
var queueSubsArr = new Subscription[queueSubs.Count][];
for (int i = 0; i < queueSubs.Count; i++)
queueSubsArr[i] = queueSubs[i].ToArray();
return new SubListResult(plainSubs.ToArray(), queueSubsArr);
}
finally
{
_lock.ExitReadLock();
}
}
private static bool HasInterestLevel(TrieLevel? level, string[] tokens, int tokenIndex)
{
TrieNode? pwc = null;
TrieNode? node = null;
for (int i = tokenIndex; i < tokens.Length; i++)
{
if (level == null) return false;
if (level.Fwc != null && NodeHasInterest(level.Fwc)) return true;
pwc = level.Pwc;
if (pwc != null && HasInterestLevel(pwc.Next, tokens, i + 1)) return true;
node = null;
if (level.Nodes.TryGetValue(tokens[i], out var found))
{
node = found;
level = node.Next;
}
else
{
level = null;
}
}
if (node != null && NodeHasInterest(node)) return true;
if (pwc != null && NodeHasInterest(pwc)) return true;
return false;
}
private static bool NodeHasInterest(TrieNode node)
{
return node.PlainSubs.Count > 0 || node.QueueSubs.Count > 0;
}
private static void CountInterestLevel(TrieLevel? level, string[] tokens, int tokenIndex,
ref int np, ref int nq)
{
TrieNode? pwc = null;
TrieNode? node = null;
for (int i = tokenIndex; i < tokens.Length; i++)
{
if (level == null) return;
if (level.Fwc != null) AddNodeCounts(level.Fwc, ref np, ref nq);
pwc = level.Pwc;
if (pwc != null) CountInterestLevel(pwc.Next, tokens, i + 1, ref np, ref nq);
node = null;
if (level.Nodes.TryGetValue(tokens[i], out var found))
{
node = found;
level = node.Next;
}
else
{
level = null;
}
}
if (node != null) AddNodeCounts(node, ref np, ref nq);
if (pwc != null) AddNodeCounts(pwc, ref np, ref nq);
}
private static void AddNodeCounts(TrieNode node, ref int np, ref int nq)
{
np += node.PlainSubs.Count;
foreach (var (_, qset) in node.QueueSubs)
nq += qset.Count;
}
private static void CollectAllSubs(TrieLevel level, List<Subscription> subs)
{
foreach (var (_, node) in level.Nodes)
{
foreach (var sub in node.PlainSubs)
subs.Add(sub);
foreach (var (_, qset) in node.QueueSubs)
foreach (var sub in qset)
subs.Add(sub);
if (node.Next != null)
CollectAllSubs(node.Next, subs);
}
if (level.Pwc != null)
{
foreach (var sub in level.Pwc.PlainSubs)
subs.Add(sub);
foreach (var (_, qset) in level.Pwc.QueueSubs)
foreach (var sub in qset)
subs.Add(sub);
if (level.Pwc.Next != null)
CollectAllSubs(level.Pwc.Next, subs);
}
if (level.Fwc != null)
{
foreach (var sub in level.Fwc.PlainSubs)
subs.Add(sub);
foreach (var (_, qset) in level.Fwc.QueueSubs)
foreach (var sub in qset)
subs.Add(sub);
if (level.Fwc.Next != null)
CollectAllSubs(level.Fwc.Next, subs);
}
}
private static void ReverseMatchLevel(TrieLevel? level, string[] tokens, int tokenIndex,
List<Subscription> plainSubs, List<List<Subscription>> queueSubs)
{
if (level == null || tokenIndex >= tokens.Length)
return;
var token = tokens[tokenIndex];
bool isLast = tokenIndex == tokens.Length - 1;
if (token == ">")
{
CollectAllNodes(level, plainSubs, queueSubs);
return;
}
if (token == "*")
{
foreach (var (_, node) in level.Nodes)
{
if (isLast)
AddNodeToResults(node, plainSubs, queueSubs);
else
ReverseMatchLevel(node.Next, tokens, tokenIndex + 1, plainSubs, queueSubs);
}
}
else
{
// Find existing queue group
var queueSubs = result.QueueSubs;
int slot = -1;
for (int i = 0; i < queueSubs.Length; i++)
if (level.Nodes.TryGetValue(token, out var node))
{
if (queueSubs[i].Length > 0 && queueSubs[i][0].Queue == sub.Queue)
{
slot = i;
break;
}
}
// Deep copy queue subs
var newQueueSubs = new Subscription[queueSubs.Length + (slot < 0 ? 1 : 0)][];
for (int i = 0; i < queueSubs.Length; i++)
{
if (i == slot)
{
var newGroup = new Subscription[queueSubs[i].Length + 1];
queueSubs[i].CopyTo(newGroup, 0);
newGroup[^1] = sub;
newQueueSubs[i] = newGroup;
}
if (isLast)
AddNodeToResults(node, plainSubs, queueSubs);
else
{
newQueueSubs[i] = (Subscription[])queueSubs[i].Clone();
}
}
if (slot < 0)
{
newQueueSubs[^1] = [sub];
ReverseMatchLevel(node.Next, tokens, tokenIndex + 1, plainSubs, queueSubs);
}
}
return new SubListResult(result.PlainSubs, newQueueSubs);
if (level.Pwc != null)
{
if (isLast)
AddNodeToResults(level.Pwc, plainSubs, queueSubs);
else
ReverseMatchLevel(level.Pwc.Next, tokens, tokenIndex + 1, plainSubs, queueSubs);
}
if (level.Fwc != null)
{
AddNodeToResults(level.Fwc, plainSubs, queueSubs);
}
}
private static void CollectAllNodes(TrieLevel level, List<Subscription> plainSubs,
List<List<Subscription>> queueSubs)
{
foreach (var (_, node) in level.Nodes)
{
AddNodeToResults(node, plainSubs, queueSubs);
if (node.Next != null)
CollectAllNodes(node.Next, plainSubs, queueSubs);
}
if (level.Pwc != null)
{
AddNodeToResults(level.Pwc, plainSubs, queueSubs);
if (level.Pwc.Next != null)
CollectAllNodes(level.Pwc.Next, plainSubs, queueSubs);
}
if (level.Fwc != null)
{
AddNodeToResults(level.Fwc, plainSubs, queueSubs);
if (level.Fwc.Next != null)
CollectAllNodes(level.Fwc.Next, plainSubs, queueSubs);
}
}