perf: pool SubList match builders and cleanup scans
This commit is contained in:
@@ -12,6 +12,10 @@ public sealed class SubList : IDisposable
|
|||||||
{
|
{
|
||||||
private const int CacheMax = 1024;
|
private const int CacheMax = 1024;
|
||||||
private const int CacheSweep = 256;
|
private const int CacheSweep = 256;
|
||||||
|
[ThreadStatic]
|
||||||
|
private static MatchBuilder? s_matchBuilder;
|
||||||
|
[ThreadStatic]
|
||||||
|
private static List<RoutedSubKey>? s_remoteSubRemovalKeys;
|
||||||
|
|
||||||
private readonly ReaderWriterLockSlim _lock = new();
|
private readonly ReaderWriterLockSlim _lock = new();
|
||||||
private readonly TrieLevel _root = new();
|
private readonly TrieLevel _root = new();
|
||||||
@@ -243,23 +247,31 @@ public sealed class SubList : IDisposable
|
|||||||
_lock.EnterWriteLock();
|
_lock.EnterWriteLock();
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
var removalKeys = RentRemoteSubRemovalKeys();
|
||||||
var removed = 0;
|
var removed = 0;
|
||||||
foreach (var kvp in _remoteSubs.ToArray())
|
foreach (var (key, _) in _remoteSubs)
|
||||||
{
|
{
|
||||||
if (!string.Equals(kvp.Key.RouteId, routeId, StringComparison.Ordinal))
|
if (!string.Equals(key.RouteId, routeId, StringComparison.Ordinal))
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
if (_remoteSubs.Remove(kvp.Key))
|
removalKeys.Add(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach (var key in removalKeys)
|
||||||
|
{
|
||||||
|
if (_remoteSubs.Remove(key, out var removedSub))
|
||||||
{
|
{
|
||||||
removed++;
|
removed++;
|
||||||
InterestChanged?.Invoke(new InterestChange(
|
InterestChanged?.Invoke(new InterestChange(
|
||||||
InterestChangeKind.RemoteRemoved,
|
InterestChangeKind.RemoteRemoved,
|
||||||
kvp.Value.Subject,
|
removedSub.Subject,
|
||||||
kvp.Value.Queue,
|
removedSub.Queue,
|
||||||
kvp.Value.Account));
|
removedSub.Account));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
removalKeys.Clear();
|
||||||
|
|
||||||
if (removed > 0)
|
if (removed > 0)
|
||||||
Interlocked.Increment(ref _generation);
|
Interlocked.Increment(ref _generation);
|
||||||
|
|
||||||
@@ -276,26 +288,34 @@ public sealed class SubList : IDisposable
|
|||||||
_lock.EnterWriteLock();
|
_lock.EnterWriteLock();
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
var removalKeys = RentRemoteSubRemovalKeys();
|
||||||
var removed = 0;
|
var removed = 0;
|
||||||
foreach (var kvp in _remoteSubs.ToArray())
|
foreach (var (key, _) in _remoteSubs)
|
||||||
{
|
{
|
||||||
if (!string.Equals(kvp.Key.RouteId, routeId, StringComparison.Ordinal)
|
if (!string.Equals(key.RouteId, routeId, StringComparison.Ordinal)
|
||||||
|| !string.Equals(kvp.Key.Account, account, StringComparison.Ordinal))
|
|| !string.Equals(key.Account, account, StringComparison.Ordinal))
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (_remoteSubs.Remove(kvp.Key))
|
removalKeys.Add(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach (var key in removalKeys)
|
||||||
|
{
|
||||||
|
if (_remoteSubs.Remove(key, out var removedSub))
|
||||||
{
|
{
|
||||||
removed++;
|
removed++;
|
||||||
InterestChanged?.Invoke(new InterestChange(
|
InterestChanged?.Invoke(new InterestChange(
|
||||||
InterestChangeKind.RemoteRemoved,
|
InterestChangeKind.RemoteRemoved,
|
||||||
kvp.Value.Subject,
|
removedSub.Subject,
|
||||||
kvp.Value.Queue,
|
removedSub.Queue,
|
||||||
kvp.Value.Account));
|
removedSub.Account));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
removalKeys.Clear();
|
||||||
|
|
||||||
if (removed > 0)
|
if (removed > 0)
|
||||||
Interlocked.Increment(ref _generation);
|
Interlocked.Increment(ref _generation);
|
||||||
|
|
||||||
@@ -565,22 +585,9 @@ public sealed class SubList : IDisposable
|
|||||||
return cached.Result;
|
return cached.Result;
|
||||||
}
|
}
|
||||||
|
|
||||||
var plainSubs = new List<Subscription>();
|
var builder = RentMatchBuilder();
|
||||||
var queueSubs = new List<List<Subscription>>();
|
MatchLevel(_root, tokens, 0, builder);
|
||||||
MatchLevel(_root, tokens, 0, plainSubs, queueSubs);
|
var result = builder.ToResult();
|
||||||
|
|
||||||
SubListResult result;
|
|
||||||
if (plainSubs.Count == 0 && queueSubs.Count == 0)
|
|
||||||
{
|
|
||||||
result = SubListResult.Empty;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
var queueSubsArr = new Subscription[queueSubs.Count][];
|
|
||||||
for (int i = 0; i < queueSubs.Count; i++)
|
|
||||||
queueSubsArr[i] = queueSubs[i].ToArray();
|
|
||||||
result = new SubListResult(plainSubs.ToArray(), queueSubsArr);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (_cache != null)
|
if (_cache != null)
|
||||||
{
|
{
|
||||||
@@ -676,6 +683,20 @@ public sealed class SubList : IDisposable
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static MatchBuilder RentMatchBuilder()
|
||||||
|
{
|
||||||
|
var builder = s_matchBuilder ??= new MatchBuilder();
|
||||||
|
builder.Reset();
|
||||||
|
return builder;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<RoutedSubKey> RentRemoteSubRemovalKeys()
|
||||||
|
{
|
||||||
|
var keys = s_remoteSubRemovalKeys ??= [];
|
||||||
|
keys.Clear();
|
||||||
|
return keys;
|
||||||
|
}
|
||||||
|
|
||||||
private bool HasExactQueueInterestNoLock(string subject, string queue)
|
private bool HasExactQueueInterestNoLock(string subject, string queue)
|
||||||
{
|
{
|
||||||
var subs = new List<Subscription>();
|
var subs = new List<Subscription>();
|
||||||
@@ -822,6 +843,42 @@ public sealed class SubList : IDisposable
|
|||||||
AddNodeToResults(pwc, plainSubs, queueSubs);
|
AddNodeToResults(pwc, plainSubs, queueSubs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void MatchLevel(TrieLevel? level, string[] tokens, int tokenIndex, MatchBuilder builder)
|
||||||
|
{
|
||||||
|
TrieNode? pwc = null;
|
||||||
|
TrieNode? node = null;
|
||||||
|
|
||||||
|
for (int i = tokenIndex; i < tokens.Length; i++)
|
||||||
|
{
|
||||||
|
if (level == null)
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (level.Fwc != null)
|
||||||
|
AddNodeToResults(level.Fwc, builder);
|
||||||
|
|
||||||
|
pwc = level.Pwc;
|
||||||
|
if (pwc != null)
|
||||||
|
MatchLevel(pwc.Next, tokens, i + 1, builder);
|
||||||
|
|
||||||
|
node = null;
|
||||||
|
if (level.Nodes.TryGetValue(tokens[i], out var found))
|
||||||
|
{
|
||||||
|
node = found;
|
||||||
|
level = node.Next;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
level = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (node != null)
|
||||||
|
AddNodeToResults(node, builder);
|
||||||
|
|
||||||
|
if (pwc != null)
|
||||||
|
AddNodeToResults(pwc, builder);
|
||||||
|
}
|
||||||
|
|
||||||
private static void AddNodeToResults(TrieNode node,
|
private static void AddNodeToResults(TrieNode node,
|
||||||
List<Subscription> plainSubs, List<List<Subscription>> queueSubs)
|
List<Subscription> plainSubs, List<List<Subscription>> queueSubs)
|
||||||
{
|
{
|
||||||
@@ -853,6 +910,19 @@ public sealed class SubList : IDisposable
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void AddNodeToResults(TrieNode node, MatchBuilder builder)
|
||||||
|
{
|
||||||
|
builder.PlainSubs.AddRange(node.PlainSubs);
|
||||||
|
|
||||||
|
foreach (var (queueName, subs) in node.QueueSubs)
|
||||||
|
{
|
||||||
|
if (subs.Count == 0)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
builder.AddQueueGroup(queueName, subs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public SubListStats Stats()
|
public SubListStats Stats()
|
||||||
{
|
{
|
||||||
_lock.EnterReadLock();
|
_lock.EnterReadLock();
|
||||||
@@ -1368,4 +1438,47 @@ public sealed class SubList : IDisposable
|
|||||||
public bool IsEmpty => PlainSubs.Count == 0 && QueueSubs.Count == 0 &&
|
public bool IsEmpty => PlainSubs.Count == 0 && QueueSubs.Count == 0 &&
|
||||||
(Next == null || (Next.Nodes.Count == 0 && Next.Pwc == null && Next.Fwc == null));
|
(Next == null || (Next.Nodes.Count == 0 && Next.Pwc == null && Next.Fwc == null));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private sealed class MatchBuilder
|
||||||
|
{
|
||||||
|
private readonly Dictionary<string, int> _queueIndexes = new(StringComparer.Ordinal);
|
||||||
|
private readonly List<List<Subscription>> _queueGroups = [];
|
||||||
|
private int _queueGroupCount;
|
||||||
|
|
||||||
|
public List<Subscription> PlainSubs { get; } = [];
|
||||||
|
|
||||||
|
public void Reset()
|
||||||
|
{
|
||||||
|
PlainSubs.Clear();
|
||||||
|
_queueIndexes.Clear();
|
||||||
|
for (var i = 0; i < _queueGroupCount; i++)
|
||||||
|
_queueGroups[i].Clear();
|
||||||
|
_queueGroupCount = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void AddQueueGroup(string queueName, HashSet<Subscription> subs)
|
||||||
|
{
|
||||||
|
if (!_queueIndexes.TryGetValue(queueName, out var index))
|
||||||
|
{
|
||||||
|
index = _queueGroupCount++;
|
||||||
|
_queueIndexes[queueName] = index;
|
||||||
|
if (index == _queueGroups.Count)
|
||||||
|
_queueGroups.Add([]);
|
||||||
|
}
|
||||||
|
|
||||||
|
_queueGroups[index].AddRange(subs);
|
||||||
|
}
|
||||||
|
|
||||||
|
public SubListResult ToResult()
|
||||||
|
{
|
||||||
|
if (PlainSubs.Count == 0 && _queueGroupCount == 0)
|
||||||
|
return SubListResult.Empty;
|
||||||
|
|
||||||
|
var queueSubsArr = new Subscription[_queueGroupCount][];
|
||||||
|
for (var i = 0; i < _queueGroupCount; i++)
|
||||||
|
queueSubsArr[i] = _queueGroups[i].ToArray();
|
||||||
|
|
||||||
|
return new SubListResult(PlainSubs.ToArray(), queueSubsArr);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -208,6 +208,50 @@ public class RouteSubscriptionTests
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Removing_one_subject_keeps_other_remote_interest_intact()
|
||||||
|
{
|
||||||
|
var cluster = Guid.NewGuid().ToString("N");
|
||||||
|
var a = await StartServerAsync(MakeClusterOpts(cluster));
|
||||||
|
var b = await StartServerAsync(MakeClusterOpts(cluster, a.Server.ClusterListen!));
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await WaitForRouteFormation(a.Server, b.Server);
|
||||||
|
|
||||||
|
await using var nc = new NatsConnection(new NatsOpts
|
||||||
|
{
|
||||||
|
Url = $"nats://127.0.0.1:{a.Server.Port}",
|
||||||
|
});
|
||||||
|
await nc.ConnectAsync();
|
||||||
|
|
||||||
|
await using var sub1 = await nc.SubscribeCoreAsync<string>("multi.one");
|
||||||
|
await using var sub2 = await nc.SubscribeCoreAsync<string>("multi.two");
|
||||||
|
await nc.PingAsync();
|
||||||
|
|
||||||
|
await WaitForCondition(() => b.Server.HasRemoteInterest("multi.one") && b.Server.HasRemoteInterest("multi.two"));
|
||||||
|
b.Server.HasRemoteInterest("multi.one").ShouldBeTrue();
|
||||||
|
b.Server.HasRemoteInterest("multi.two").ShouldBeTrue();
|
||||||
|
|
||||||
|
await sub1.DisposeAsync();
|
||||||
|
await nc.PingAsync();
|
||||||
|
|
||||||
|
await WaitForCondition(() => !b.Server.HasRemoteInterest("multi.one"));
|
||||||
|
b.Server.HasRemoteInterest("multi.one").ShouldBeFalse();
|
||||||
|
b.Server.HasRemoteInterest("multi.two").ShouldBeTrue();
|
||||||
|
|
||||||
|
await sub2.DisposeAsync();
|
||||||
|
await nc.PingAsync();
|
||||||
|
|
||||||
|
await WaitForCondition(() => !b.Server.HasRemoteInterest("multi.two"));
|
||||||
|
b.Server.HasRemoteInterest("multi.two").ShouldBeFalse();
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
await DisposeServers(a, b);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Go: RS+ wire protocol parsing (low-level)
|
// Go: RS+ wire protocol parsing (low-level)
|
||||||
[Fact]
|
[Fact]
|
||||||
public async Task RSplus_frame_registers_remote_interest_via_wire()
|
public async Task RSplus_frame_registers_remote_interest_via_wire()
|
||||||
|
|||||||
@@ -45,4 +45,20 @@ public class SubListAllocationGuardTests
|
|||||||
matches.Count.ShouldBe(4);
|
matches.Count.ShouldBe(4);
|
||||||
matches.ShouldAllBe(match => match.Queue == "workers");
|
matches.ShouldAllBe(match => match.Queue == "workers");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Match_merges_queue_groups_from_multiple_matching_nodes_by_queue_name()
|
||||||
|
{
|
||||||
|
using var sl = new SubList();
|
||||||
|
sl.Insert(new Subscription { Subject = "orders.created", Queue = "workers", Sid = "1" });
|
||||||
|
sl.Insert(new Subscription { Subject = "orders.*", Queue = "workers", Sid = "2" });
|
||||||
|
sl.Insert(new Subscription { Subject = "orders.>", Queue = "audit", Sid = "3" });
|
||||||
|
|
||||||
|
var result = sl.Match("orders.created");
|
||||||
|
|
||||||
|
result.PlainSubs.ShouldBeEmpty();
|
||||||
|
result.QueueSubs.Length.ShouldBe(2);
|
||||||
|
result.QueueSubs.Single(group => group[0].Queue == "workers").Length.ShouldBe(2);
|
||||||
|
result.QueueSubs.Single(group => group[0].Queue == "audit").Length.ShouldBe(1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user