diff --git a/src/NATS.Server/Subscriptions/SubList.cs b/src/NATS.Server/Subscriptions/SubList.cs index 33059f7..099d85d 100644 --- a/src/NATS.Server/Subscriptions/SubList.cs +++ b/src/NATS.Server/Subscriptions/SubList.cs @@ -12,6 +12,10 @@ public sealed class SubList : IDisposable { private const int CacheMax = 1024; private const int CacheSweep = 256; + [ThreadStatic] + private static MatchBuilder? s_matchBuilder; + [ThreadStatic] + private static List? s_remoteSubRemovalKeys; private readonly ReaderWriterLockSlim _lock = new(); private readonly TrieLevel _root = new(); @@ -243,23 +247,31 @@ public sealed class SubList : IDisposable _lock.EnterWriteLock(); try { + var removalKeys = RentRemoteSubRemovalKeys(); var removed = 0; - foreach (var kvp in _remoteSubs.ToArray()) + foreach (var (key, _) in _remoteSubs) { - if (!string.Equals(kvp.Key.RouteId, routeId, StringComparison.Ordinal)) + if (!string.Equals(key.RouteId, routeId, StringComparison.Ordinal)) continue; - if (_remoteSubs.Remove(kvp.Key)) + removalKeys.Add(key); + } + + foreach (var key in removalKeys) + { + if (_remoteSubs.Remove(key, out var removedSub)) { removed++; InterestChanged?.Invoke(new InterestChange( InterestChangeKind.RemoteRemoved, - kvp.Value.Subject, - kvp.Value.Queue, - kvp.Value.Account)); + removedSub.Subject, + removedSub.Queue, + removedSub.Account)); } } + removalKeys.Clear(); + if (removed > 0) Interlocked.Increment(ref _generation); @@ -276,26 +288,34 @@ public sealed class SubList : IDisposable _lock.EnterWriteLock(); try { + var removalKeys = RentRemoteSubRemovalKeys(); var removed = 0; - foreach (var kvp in _remoteSubs.ToArray()) + foreach (var (key, _) in _remoteSubs) { - if (!string.Equals(kvp.Key.RouteId, routeId, StringComparison.Ordinal) - || !string.Equals(kvp.Key.Account, account, StringComparison.Ordinal)) + if (!string.Equals(key.RouteId, routeId, StringComparison.Ordinal) + || !string.Equals(key.Account, account, StringComparison.Ordinal)) { continue; } - if (_remoteSubs.Remove(kvp.Key)) + removalKeys.Add(key); + } + + foreach (var key in removalKeys) + { + if (_remoteSubs.Remove(key, out var removedSub)) { removed++; InterestChanged?.Invoke(new InterestChange( InterestChangeKind.RemoteRemoved, - kvp.Value.Subject, - kvp.Value.Queue, - kvp.Value.Account)); + removedSub.Subject, + removedSub.Queue, + removedSub.Account)); } } + removalKeys.Clear(); + if (removed > 0) Interlocked.Increment(ref _generation); @@ -565,22 +585,9 @@ public sealed class SubList : IDisposable return cached.Result; } - var plainSubs = new List(); - var queueSubs = new List>(); - MatchLevel(_root, tokens, 0, plainSubs, queueSubs); - - SubListResult result; - if (plainSubs.Count == 0 && queueSubs.Count == 0) - { - result = SubListResult.Empty; - } - else - { - var queueSubsArr = new Subscription[queueSubs.Count][]; - for (int i = 0; i < queueSubs.Count; i++) - queueSubsArr[i] = queueSubs[i].ToArray(); - result = new SubListResult(plainSubs.ToArray(), queueSubsArr); - } + var builder = RentMatchBuilder(); + MatchLevel(_root, tokens, 0, builder); + var result = builder.ToResult(); if (_cache != null) { @@ -676,6 +683,20 @@ public sealed class SubList : IDisposable return false; } + private static MatchBuilder RentMatchBuilder() + { + var builder = s_matchBuilder ??= new MatchBuilder(); + builder.Reset(); + return builder; + } + + private static List RentRemoteSubRemovalKeys() + { + var keys = s_remoteSubRemovalKeys ??= []; + keys.Clear(); + return keys; + } + private bool HasExactQueueInterestNoLock(string subject, string queue) { var subs = new List(); @@ -822,6 +843,42 @@ public sealed class SubList : IDisposable AddNodeToResults(pwc, plainSubs, queueSubs); } + private static void MatchLevel(TrieLevel? level, string[] tokens, int tokenIndex, MatchBuilder builder) + { + TrieNode? pwc = null; + TrieNode? node = null; + + for (int i = tokenIndex; i < tokens.Length; i++) + { + if (level == null) + return; + + if (level.Fwc != null) + AddNodeToResults(level.Fwc, builder); + + pwc = level.Pwc; + if (pwc != null) + MatchLevel(pwc.Next, tokens, i + 1, builder); + + node = null; + if (level.Nodes.TryGetValue(tokens[i], out var found)) + { + node = found; + level = node.Next; + } + else + { + level = null; + } + } + + if (node != null) + AddNodeToResults(node, builder); + + if (pwc != null) + AddNodeToResults(pwc, builder); + } + private static void AddNodeToResults(TrieNode node, List plainSubs, List> queueSubs) { @@ -853,6 +910,19 @@ public sealed class SubList : IDisposable } } + private static void AddNodeToResults(TrieNode node, MatchBuilder builder) + { + builder.PlainSubs.AddRange(node.PlainSubs); + + foreach (var (queueName, subs) in node.QueueSubs) + { + if (subs.Count == 0) + continue; + + builder.AddQueueGroup(queueName, subs); + } + } + public SubListStats Stats() { _lock.EnterReadLock(); @@ -1368,4 +1438,47 @@ public sealed class SubList : IDisposable public bool IsEmpty => PlainSubs.Count == 0 && QueueSubs.Count == 0 && (Next == null || (Next.Nodes.Count == 0 && Next.Pwc == null && Next.Fwc == null)); } + + private sealed class MatchBuilder + { + private readonly Dictionary _queueIndexes = new(StringComparer.Ordinal); + private readonly List> _queueGroups = []; + private int _queueGroupCount; + + public List PlainSubs { get; } = []; + + public void Reset() + { + PlainSubs.Clear(); + _queueIndexes.Clear(); + for (var i = 0; i < _queueGroupCount; i++) + _queueGroups[i].Clear(); + _queueGroupCount = 0; + } + + public void AddQueueGroup(string queueName, HashSet subs) + { + if (!_queueIndexes.TryGetValue(queueName, out var index)) + { + index = _queueGroupCount++; + _queueIndexes[queueName] = index; + if (index == _queueGroups.Count) + _queueGroups.Add([]); + } + + _queueGroups[index].AddRange(subs); + } + + public SubListResult ToResult() + { + if (PlainSubs.Count == 0 && _queueGroupCount == 0) + return SubListResult.Empty; + + var queueSubsArr = new Subscription[_queueGroupCount][]; + for (var i = 0; i < _queueGroupCount; i++) + queueSubsArr[i] = _queueGroups[i].ToArray(); + + return new SubListResult(PlainSubs.ToArray(), queueSubsArr); + } + } } diff --git a/tests/NATS.Server.Clustering.Tests/Routes/RouteSubscriptionTests.cs b/tests/NATS.Server.Clustering.Tests/Routes/RouteSubscriptionTests.cs index 051de09..c572c8e 100644 --- a/tests/NATS.Server.Clustering.Tests/Routes/RouteSubscriptionTests.cs +++ b/tests/NATS.Server.Clustering.Tests/Routes/RouteSubscriptionTests.cs @@ -208,6 +208,50 @@ public class RouteSubscriptionTests } } + [Fact] + public async Task Removing_one_subject_keeps_other_remote_interest_intact() + { + var cluster = Guid.NewGuid().ToString("N"); + var a = await StartServerAsync(MakeClusterOpts(cluster)); + var b = await StartServerAsync(MakeClusterOpts(cluster, a.Server.ClusterListen!)); + + try + { + await WaitForRouteFormation(a.Server, b.Server); + + await using var nc = new NatsConnection(new NatsOpts + { + Url = $"nats://127.0.0.1:{a.Server.Port}", + }); + await nc.ConnectAsync(); + + await using var sub1 = await nc.SubscribeCoreAsync("multi.one"); + await using var sub2 = await nc.SubscribeCoreAsync("multi.two"); + await nc.PingAsync(); + + await WaitForCondition(() => b.Server.HasRemoteInterest("multi.one") && b.Server.HasRemoteInterest("multi.two")); + b.Server.HasRemoteInterest("multi.one").ShouldBeTrue(); + b.Server.HasRemoteInterest("multi.two").ShouldBeTrue(); + + await sub1.DisposeAsync(); + await nc.PingAsync(); + + await WaitForCondition(() => !b.Server.HasRemoteInterest("multi.one")); + b.Server.HasRemoteInterest("multi.one").ShouldBeFalse(); + b.Server.HasRemoteInterest("multi.two").ShouldBeTrue(); + + await sub2.DisposeAsync(); + await nc.PingAsync(); + + await WaitForCondition(() => !b.Server.HasRemoteInterest("multi.two")); + b.Server.HasRemoteInterest("multi.two").ShouldBeFalse(); + } + finally + { + await DisposeServers(a, b); + } + } + // Go: RS+ wire protocol parsing (low-level) [Fact] public async Task RSplus_frame_registers_remote_interest_via_wire() diff --git a/tests/NATS.Server.Core.Tests/Subscriptions/SubListAllocationGuardTests.cs b/tests/NATS.Server.Core.Tests/Subscriptions/SubListAllocationGuardTests.cs index b48277c..f00306d 100644 --- a/tests/NATS.Server.Core.Tests/Subscriptions/SubListAllocationGuardTests.cs +++ b/tests/NATS.Server.Core.Tests/Subscriptions/SubListAllocationGuardTests.cs @@ -45,4 +45,20 @@ public class SubListAllocationGuardTests matches.Count.ShouldBe(4); matches.ShouldAllBe(match => match.Queue == "workers"); } + + [Fact] + public void Match_merges_queue_groups_from_multiple_matching_nodes_by_queue_name() + { + using var sl = new SubList(); + sl.Insert(new Subscription { Subject = "orders.created", Queue = "workers", Sid = "1" }); + sl.Insert(new Subscription { Subject = "orders.*", Queue = "workers", Sid = "2" }); + sl.Insert(new Subscription { Subject = "orders.>", Queue = "audit", Sid = "3" }); + + var result = sl.Match("orders.created"); + + result.PlainSubs.ShouldBeEmpty(); + result.QueueSubs.Length.ShouldBe(2); + result.QueueSubs.Single(group => group[0].Queue == "workers").Length.ShouldBe(2); + result.QueueSubs.Single(group => group[0].Queue == "audit").Length.ShouldBe(1); + } }