perf: replace SubList routed-sub string keys
This commit is contained in:
7
src/NATS.Server/Subscriptions/RoutedSubKey.cs
Normal file
7
src/NATS.Server/Subscriptions/RoutedSubKey.cs
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
namespace NATS.Server.Subscriptions;
|
||||||
|
|
||||||
|
internal readonly record struct RoutedSubKey(string RouteId, string Account, string Subject, string? Queue)
|
||||||
|
{
|
||||||
|
public static RoutedSubKey FromRemoteSubscription(RemoteSubscription sub)
|
||||||
|
=> new(sub.RouteId, sub.Account, sub.Subject, sub.Queue);
|
||||||
|
}
|
||||||
@@ -16,7 +16,7 @@ public sealed class SubList : IDisposable
|
|||||||
private readonly ReaderWriterLockSlim _lock = new();
|
private readonly ReaderWriterLockSlim _lock = new();
|
||||||
private readonly TrieLevel _root = new();
|
private readonly TrieLevel _root = new();
|
||||||
private readonly SubListCacheSweeper _sweeper = new();
|
private readonly SubListCacheSweeper _sweeper = new();
|
||||||
private readonly Dictionary<string, RemoteSubscription> _remoteSubs = new(StringComparer.Ordinal);
|
private readonly Dictionary<RoutedSubKey, RemoteSubscription> _remoteSubs = [];
|
||||||
private Dictionary<string, CachedResult>? _cache = new(StringComparer.Ordinal);
|
private Dictionary<string, CachedResult>? _cache = new(StringComparer.Ordinal);
|
||||||
private uint _count;
|
private uint _count;
|
||||||
private volatile bool _disposed;
|
private volatile bool _disposed;
|
||||||
@@ -31,8 +31,6 @@ public sealed class SubList : IDisposable
|
|||||||
private readonly Dictionary<string, List<Action<bool>>> _queueRemoveNotifications = new(StringComparer.Ordinal);
|
private readonly Dictionary<string, List<Action<bool>>> _queueRemoveNotifications = new(StringComparer.Ordinal);
|
||||||
|
|
||||||
private readonly record struct CachedResult(SubListResult Result, long Generation);
|
private readonly record struct CachedResult(SubListResult Result, long Generation);
|
||||||
internal readonly record struct RoutedSubKeyInfo(string RouteId, string Account, string Subject, string? Queue);
|
|
||||||
|
|
||||||
public event Action<InterestChange>? InterestChanged;
|
public event Action<InterestChange>? InterestChanged;
|
||||||
|
|
||||||
public SubList()
|
public SubList()
|
||||||
@@ -178,7 +176,7 @@ public sealed class SubList : IDisposable
|
|||||||
_lock.EnterWriteLock();
|
_lock.EnterWriteLock();
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var key = BuildRoutedSubKey(sub.RouteId, sub.Account, sub.Subject, sub.Queue);
|
var key = RoutedSubKey.FromRemoteSubscription(sub);
|
||||||
var changed = false;
|
var changed = false;
|
||||||
if (sub.IsRemoval)
|
if (sub.IsRemoval)
|
||||||
{
|
{
|
||||||
@@ -223,7 +221,7 @@ public sealed class SubList : IDisposable
|
|||||||
_lock.EnterWriteLock();
|
_lock.EnterWriteLock();
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var key = BuildRoutedSubKey(sub.RouteId, sub.Account, sub.Subject, sub.Queue);
|
var key = RoutedSubKey.FromRemoteSubscription(sub);
|
||||||
if (!_remoteSubs.TryGetValue(key, out var existing))
|
if (!_remoteSubs.TryGetValue(key, out var existing))
|
||||||
return;
|
return;
|
||||||
|
|
||||||
@@ -240,28 +238,6 @@ public sealed class SubList : IDisposable
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
internal static string BuildRoutedSubKey(string routeId, string account, string subject, string? queue)
|
|
||||||
=> $"{routeId}|{account}|{subject}|{queue}";
|
|
||||||
|
|
||||||
internal static string? GetAccNameFromRoutedSubKey(string routedSubKey)
|
|
||||||
=> GetRoutedSubKeyInfo(routedSubKey)?.Account;
|
|
||||||
|
|
||||||
internal static RoutedSubKeyInfo? GetRoutedSubKeyInfo(string routedSubKey)
|
|
||||||
{
|
|
||||||
if (string.IsNullOrWhiteSpace(routedSubKey))
|
|
||||||
return null;
|
|
||||||
|
|
||||||
var parts = routedSubKey.Split('|');
|
|
||||||
if (parts.Length != 4)
|
|
||||||
return null;
|
|
||||||
|
|
||||||
if (parts[0].Length == 0 || parts[1].Length == 0 || parts[2].Length == 0)
|
|
||||||
return null;
|
|
||||||
|
|
||||||
var queue = parts[3].Length == 0 ? null : parts[3];
|
|
||||||
return new RoutedSubKeyInfo(parts[0], parts[1], parts[2], queue);
|
|
||||||
}
|
|
||||||
|
|
||||||
public int RemoveRemoteSubs(string routeId)
|
public int RemoveRemoteSubs(string routeId)
|
||||||
{
|
{
|
||||||
_lock.EnterWriteLock();
|
_lock.EnterWriteLock();
|
||||||
@@ -270,8 +246,7 @@ public sealed class SubList : IDisposable
|
|||||||
var removed = 0;
|
var removed = 0;
|
||||||
foreach (var kvp in _remoteSubs.ToArray())
|
foreach (var kvp in _remoteSubs.ToArray())
|
||||||
{
|
{
|
||||||
var info = GetRoutedSubKeyInfo(kvp.Key);
|
if (!string.Equals(kvp.Key.RouteId, routeId, StringComparison.Ordinal))
|
||||||
if (info == null || !string.Equals(info.Value.RouteId, routeId, StringComparison.Ordinal))
|
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
if (_remoteSubs.Remove(kvp.Key))
|
if (_remoteSubs.Remove(kvp.Key))
|
||||||
@@ -304,12 +279,8 @@ public sealed class SubList : IDisposable
|
|||||||
var removed = 0;
|
var removed = 0;
|
||||||
foreach (var kvp in _remoteSubs.ToArray())
|
foreach (var kvp in _remoteSubs.ToArray())
|
||||||
{
|
{
|
||||||
var info = GetRoutedSubKeyInfo(kvp.Key);
|
if (!string.Equals(kvp.Key.RouteId, routeId, StringComparison.Ordinal)
|
||||||
if (info == null)
|
|| !string.Equals(kvp.Key.Account, account, StringComparison.Ordinal))
|
||||||
continue;
|
|
||||||
|
|
||||||
if (!string.Equals(info.Value.RouteId, routeId, StringComparison.Ordinal)
|
|
||||||
|| !string.Equals(info.Value.Account, account, StringComparison.Ordinal))
|
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,21 +10,14 @@ namespace NATS.Server.Clustering.Tests.Routes;
|
|||||||
public class RouteRemoteSubCleanupParityBatch2Tests
|
public class RouteRemoteSubCleanupParityBatch2Tests
|
||||||
{
|
{
|
||||||
[Fact]
|
[Fact]
|
||||||
public void Routed_sub_key_helpers_parse_account_and_queue_fields()
|
public void Routed_sub_key_exposes_route_account_subject_and_queue_fields()
|
||||||
{
|
{
|
||||||
var key = SubList.BuildRoutedSubKey("R1", "A", "orders.*", "q1");
|
var key = RoutedSubKey.FromRemoteSubscription(new RemoteSubscription("orders.*", "q1", "R1", "A"));
|
||||||
|
|
||||||
SubList.GetAccNameFromRoutedSubKey(key).ShouldBe("A");
|
key.RouteId.ShouldBe("R1");
|
||||||
|
key.Account.ShouldBe("A");
|
||||||
var info = SubList.GetRoutedSubKeyInfo(key);
|
key.Subject.ShouldBe("orders.*");
|
||||||
info.ShouldNotBeNull();
|
key.Queue.ShouldBe("q1");
|
||||||
info.Value.RouteId.ShouldBe("R1");
|
|
||||||
info.Value.Account.ShouldBe("A");
|
|
||||||
info.Value.Subject.ShouldBe("orders.*");
|
|
||||||
info.Value.Queue.ShouldBe("q1");
|
|
||||||
|
|
||||||
SubList.GetRoutedSubKeyInfo("invalid").ShouldBeNull();
|
|
||||||
SubList.GetAccNameFromRoutedSubKey("invalid").ShouldBeNull();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
|
|||||||
Reference in New Issue
Block a user