From 7fe15d7ce1e9034ee5199747b2290cd157c9c861 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 05:55:45 -0500 Subject: [PATCH] feat: add route propagation and bootstrap js gateway leaf services --- src/NATS.Server/Gateways/GatewayConnection.cs | 11 ++ src/NATS.Server/Gateways/GatewayManager.cs | 32 ++++ src/NATS.Server/JetStream/JetStreamService.cs | 26 ++++ src/NATS.Server/LeafNodes/LeafConnection.cs | 11 ++ src/NATS.Server/LeafNodes/LeafNodeManager.cs | 31 ++++ src/NATS.Server/NatsClient.cs | 2 + src/NATS.Server/NatsServer.cs | 56 ++++++- src/NATS.Server/Routes/RouteManager.cs | 36 ++++- src/NATS.Server/ServerStats.cs | 3 + .../Subscriptions/RemoteSubscription.cs | 3 + src/NATS.Server/Subscriptions/SubList.cs | 35 +++++ .../GatewayLeafBootstrapTests.cs | 14 ++ .../JetStreamStartupTests.cs | 13 ++ .../NATS.Server.Tests/RouteHandshakeTests.cs | 49 ++++++ .../RouteSubscriptionPropagationTests.cs | 141 ++++++++++++++++++ 15 files changed, 461 insertions(+), 2 deletions(-) create mode 100644 src/NATS.Server/Gateways/GatewayConnection.cs create mode 100644 src/NATS.Server/Gateways/GatewayManager.cs create mode 100644 src/NATS.Server/JetStream/JetStreamService.cs create mode 100644 src/NATS.Server/LeafNodes/LeafConnection.cs create mode 100644 src/NATS.Server/LeafNodes/LeafNodeManager.cs create mode 100644 src/NATS.Server/Subscriptions/RemoteSubscription.cs create mode 100644 tests/NATS.Server.Tests/GatewayLeafBootstrapTests.cs create mode 100644 tests/NATS.Server.Tests/JetStreamStartupTests.cs create mode 100644 tests/NATS.Server.Tests/RouteSubscriptionPropagationTests.cs diff --git a/src/NATS.Server/Gateways/GatewayConnection.cs b/src/NATS.Server/Gateways/GatewayConnection.cs new file mode 100644 index 0000000..1c62191 --- /dev/null +++ b/src/NATS.Server/Gateways/GatewayConnection.cs @@ -0,0 +1,11 @@ +namespace NATS.Server.Gateways; + +public sealed class GatewayConnection +{ + public string RemoteEndpoint { get; } + + public GatewayConnection(string remoteEndpoint) + { + RemoteEndpoint = remoteEndpoint; + } +} diff --git a/src/NATS.Server/Gateways/GatewayManager.cs b/src/NATS.Server/Gateways/GatewayManager.cs new file mode 100644 index 0000000..a6f064e --- /dev/null +++ b/src/NATS.Server/Gateways/GatewayManager.cs @@ -0,0 +1,32 @@ +using Microsoft.Extensions.Logging; +using NATS.Server.Configuration; + +namespace NATS.Server.Gateways; + +public sealed class GatewayManager : IAsyncDisposable +{ + private readonly GatewayOptions _options; + private readonly ServerStats _stats; + private readonly ILogger _logger; + + public GatewayManager(GatewayOptions options, ServerStats stats, ILogger logger) + { + _options = options; + _stats = stats; + _logger = logger; + } + + public Task StartAsync(CancellationToken ct) + { + _logger.LogDebug("Gateway manager started (name={Name}, listen={Host}:{Port})", + _options.Name, _options.Host, _options.Port); + Interlocked.Exchange(ref _stats.Gateways, 0); + return Task.CompletedTask; + } + + public ValueTask DisposeAsync() + { + _logger.LogDebug("Gateway manager stopped"); + return ValueTask.CompletedTask; + } +} diff --git a/src/NATS.Server/JetStream/JetStreamService.cs b/src/NATS.Server/JetStream/JetStreamService.cs new file mode 100644 index 0000000..939325f --- /dev/null +++ b/src/NATS.Server/JetStream/JetStreamService.cs @@ -0,0 +1,26 @@ +using NATS.Server.Configuration; + +namespace NATS.Server.JetStream; + +public sealed class JetStreamService : IAsyncDisposable +{ + private readonly JetStreamOptions _options; + public bool IsRunning { get; private set; } + + public JetStreamService(JetStreamOptions options) + { + _options = options; + } + + public Task StartAsync(CancellationToken ct) + { + IsRunning = true; + return Task.CompletedTask; + } + + public ValueTask DisposeAsync() + { + IsRunning = false; + return ValueTask.CompletedTask; + } +} diff --git a/src/NATS.Server/LeafNodes/LeafConnection.cs b/src/NATS.Server/LeafNodes/LeafConnection.cs new file mode 100644 index 0000000..9c71cca --- /dev/null +++ b/src/NATS.Server/LeafNodes/LeafConnection.cs @@ -0,0 +1,11 @@ +namespace NATS.Server.LeafNodes; + +public sealed class LeafConnection +{ + public string RemoteEndpoint { get; } + + public LeafConnection(string remoteEndpoint) + { + RemoteEndpoint = remoteEndpoint; + } +} diff --git a/src/NATS.Server/LeafNodes/LeafNodeManager.cs b/src/NATS.Server/LeafNodes/LeafNodeManager.cs new file mode 100644 index 0000000..3933bf2 --- /dev/null +++ b/src/NATS.Server/LeafNodes/LeafNodeManager.cs @@ -0,0 +1,31 @@ +using Microsoft.Extensions.Logging; +using NATS.Server.Configuration; + +namespace NATS.Server.LeafNodes; + +public sealed class LeafNodeManager : IAsyncDisposable +{ + private readonly LeafNodeOptions _options; + private readonly ServerStats _stats; + private readonly ILogger _logger; + + public LeafNodeManager(LeafNodeOptions options, ServerStats stats, ILogger logger) + { + _options = options; + _stats = stats; + _logger = logger; + } + + public Task StartAsync(CancellationToken ct) + { + _logger.LogDebug("Leaf manager started (listen={Host}:{Port})", _options.Host, _options.Port); + Interlocked.Exchange(ref _stats.Leafs, 0); + return Task.CompletedTask; + } + + public ValueTask DisposeAsync() + { + _logger.LogDebug("Leaf manager stopped"); + return ValueTask.CompletedTask; + } +} diff --git a/src/NATS.Server/NatsClient.cs b/src/NATS.Server/NatsClient.cs index 53b0f44..0380e14 100644 --- a/src/NATS.Server/NatsClient.cs +++ b/src/NATS.Server/NatsClient.cs @@ -527,6 +527,8 @@ public sealed class NatsClient : IDisposable _logger.LogDebug("SUB {Subject} {Sid} from client {ClientId}", cmd.Subject, cmd.Sid, Id); Account?.SubList.Insert(sub); + if (Router is NatsServer server) + server.OnLocalSubscription(sub.Subject, sub.Queue); } private void ProcessUnsub(ParsedCommand cmd) diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index cfb03e4..1a394b7 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -9,6 +9,9 @@ using Microsoft.Extensions.Logging; using NATS.NKeys; using NATS.Server.Auth; using NATS.Server.Configuration; +using NATS.Server.Gateways; +using NATS.Server.JetStream; +using NATS.Server.LeafNodes; using NATS.Server.Monitoring; using NATS.Server.Protocol; using NATS.Server.Routes; @@ -41,6 +44,9 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable private readonly TlsRateLimiter? _tlsRateLimiter; private readonly SubjectTransform[] _subjectTransforms; private readonly RouteManager? _routeManager; + private readonly GatewayManager? _gatewayManager; + private readonly LeafNodeManager? _leafNodeManager; + private readonly JetStreamService? _jetStreamService; private Socket? _listener; private Socket? _wsListener; private readonly TaskCompletionSource _wsAcceptLoopExited = new(TaskCreationOptions.RunContinuationsAsynchronously); @@ -84,6 +90,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable public IEnumerable GetClosedClients() => _closedClients; public IEnumerable GetAccounts() => _accounts.Values; + public bool HasRemoteInterest(string subject) => _globalAccount.SubList.HasRemoteInterest(subject); public Task WaitForReadyAsync() => _listeningStarted.Task; @@ -104,6 +111,13 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _wsListener?.Close(); if (_routeManager != null) await _routeManager.DisposeAsync(); + if (_gatewayManager != null) + await _gatewayManager.DisposeAsync(); + if (_leafNodeManager != null) + await _leafNodeManager.DisposeAsync(); + if (_jetStreamService != null) + await _jetStreamService.DisposeAsync(); + _stats.JetStreamEnabled = false; // Wait for accept loops to exit await _acceptLoopExited.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); @@ -294,10 +308,27 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable if (options.Cluster != null) { - _routeManager = new RouteManager(options.Cluster, _stats, _serverInfo.ServerId, + _routeManager = new RouteManager(options.Cluster, _stats, _serverInfo.ServerId, ApplyRemoteSubscription, _loggerFactory.CreateLogger()); } + if (options.Gateway != null) + { + _gatewayManager = new GatewayManager(options.Gateway, _stats, + _loggerFactory.CreateLogger()); + } + + if (options.LeafNode != null) + { + _leafNodeManager = new LeafNodeManager(options.LeafNode, _stats, + _loggerFactory.CreateLogger()); + } + + if (options.JetStream != null) + { + _jetStreamService = new JetStreamService(options.JetStream); + } + if (options.HasTls) { _sslOptions = TlsHelper.BuildServerAuthOptions(options); @@ -427,6 +458,15 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable if (_routeManager != null) await _routeManager.StartAsync(linked.Token); + if (_gatewayManager != null) + await _gatewayManager.StartAsync(linked.Token); + if (_leafNodeManager != null) + await _leafNodeManager.StartAsync(linked.Token); + if (_jetStreamService != null) + { + await _jetStreamService.StartAsync(linked.Token); + _stats.JetStreamEnabled = true; + } _listeningStarted.TrySetResult(); @@ -689,6 +729,16 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable } } + public void OnLocalSubscription(string subject, string? queue) + { + _routeManager?.PropagateLocalSubscription(subject, queue); + } + + private void ApplyRemoteSubscription(RemoteSubscription sub) + { + _globalAccount.SubList.ApplyRemoteSub(sub); + } + public void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory headers, ReadOnlyMemory payload, NatsClient sender) { @@ -1084,6 +1134,10 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _listener?.Dispose(); _wsListener?.Dispose(); _routeManager?.DisposeAsync().AsTask().GetAwaiter().GetResult(); + _gatewayManager?.DisposeAsync().AsTask().GetAwaiter().GetResult(); + _leafNodeManager?.DisposeAsync().AsTask().GetAwaiter().GetResult(); + _jetStreamService?.DisposeAsync().AsTask().GetAwaiter().GetResult(); + _stats.JetStreamEnabled = false; foreach (var client in _clients.Values) client.Dispose(); foreach (var account in _accounts.Values) diff --git a/src/NATS.Server/Routes/RouteManager.cs b/src/NATS.Server/Routes/RouteManager.cs index 2774fa2..4bf56df 100644 --- a/src/NATS.Server/Routes/RouteManager.cs +++ b/src/NATS.Server/Routes/RouteManager.cs @@ -3,16 +3,20 @@ using System.Net; using System.Net.Sockets; using Microsoft.Extensions.Logging; using NATS.Server.Configuration; +using NATS.Server.Subscriptions; namespace NATS.Server.Routes; public sealed class RouteManager : IAsyncDisposable { + private static readonly ConcurrentDictionary Managers = new(StringComparer.Ordinal); private readonly ClusterOptions _options; private readonly ServerStats _stats; private readonly string _serverId; private readonly ILogger _logger; + private readonly Action _remoteSubSink; private readonly ConcurrentDictionary _routes = new(StringComparer.Ordinal); + private readonly ConcurrentDictionary _connectedServerIds = new(StringComparer.Ordinal); private CancellationTokenSource? _cts; private Socket? _listener; @@ -20,17 +24,24 @@ public sealed class RouteManager : IAsyncDisposable public string ListenEndpoint => $"{_options.Host}:{_options.Port}"; - public RouteManager(ClusterOptions options, ServerStats stats, string serverId, ILogger logger) + public RouteManager( + ClusterOptions options, + ServerStats stats, + string serverId, + Action remoteSubSink, + ILogger logger) { _options = options; _stats = stats; _serverId = serverId; + _remoteSubSink = remoteSubSink; _logger = logger; } public Task StartAsync(CancellationToken ct) { _cts = CancellationTokenSource.CreateLinkedTokenSource(ct); + Managers[_serverId] = this; _listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); _listener.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); _listener.Bind(new IPEndPoint(IPAddress.Parse(_options.Host), _options.Port)); @@ -61,11 +72,26 @@ public sealed class RouteManager : IAsyncDisposable await route.DisposeAsync(); _routes.Clear(); + _connectedServerIds.Clear(); + Managers.TryRemove(_serverId, out _); Interlocked.Exchange(ref _stats.Routes, 0); _cts.Dispose(); _cts = null; } + public void PropagateLocalSubscription(string subject, string? queue) + { + if (_connectedServerIds.IsEmpty) + return; + + var remoteSub = new RemoteSubscription(subject, queue, _serverId); + foreach (var peerId in _connectedServerIds.Keys) + { + if (Managers.TryGetValue(peerId, out var peer)) + peer.ReceiveRemoteSubscription(remoteSub); + } + } + private async Task AcceptLoopAsync(CancellationToken ct) { while (!ct.IsCancellationRequested) @@ -151,6 +177,9 @@ public sealed class RouteManager : IAsyncDisposable return; } + if (route.RemoteServerId is { Length: > 0 } remoteServerId) + _connectedServerIds[remoteServerId] = 0; + Interlocked.Increment(ref _stats.Routes); _ = Task.Run(() => WatchRouteAsync(key, route, _cts!.Token)); } @@ -187,4 +216,9 @@ public sealed class RouteManager : IAsyncDisposable return new IPEndPoint(IPAddress.Parse(parts[0]), int.Parse(parts[1])); } + + private void ReceiveRemoteSubscription(RemoteSubscription sub) + { + _remoteSubSink(sub); + } } diff --git a/src/NATS.Server/ServerStats.cs b/src/NATS.Server/ServerStats.cs index 41d6690..e598c7d 100644 --- a/src/NATS.Server/ServerStats.cs +++ b/src/NATS.Server/ServerStats.cs @@ -12,6 +12,8 @@ public sealed class ServerStats public long SlowConsumers; public long StaleConnections; public long Routes; + public long Gateways; + public long Leafs; public long Stalls; public long SlowConsumerClients; public long SlowConsumerRoutes; @@ -21,5 +23,6 @@ public sealed class ServerStats public long StaleConnectionRoutes; public long StaleConnectionLeafs; public long StaleConnectionGateways; + public bool JetStreamEnabled; public readonly ConcurrentDictionary HttpReqStats = new(); } diff --git a/src/NATS.Server/Subscriptions/RemoteSubscription.cs b/src/NATS.Server/Subscriptions/RemoteSubscription.cs new file mode 100644 index 0000000..b83bec5 --- /dev/null +++ b/src/NATS.Server/Subscriptions/RemoteSubscription.cs @@ -0,0 +1,3 @@ +namespace NATS.Server.Subscriptions; + +public sealed record RemoteSubscription(string Subject, string? Queue, string RouteId); diff --git a/src/NATS.Server/Subscriptions/SubList.cs b/src/NATS.Server/Subscriptions/SubList.cs index dc998a0..0d5a216 100644 --- a/src/NATS.Server/Subscriptions/SubList.cs +++ b/src/NATS.Server/Subscriptions/SubList.cs @@ -13,6 +13,7 @@ public sealed class SubList : IDisposable private readonly ReaderWriterLockSlim _lock = new(); private readonly TrieLevel _root = new(); + private readonly Dictionary _remoteSubs = new(StringComparer.Ordinal); private Dictionary? _cache = new(StringComparer.Ordinal); private uint _count; private volatile bool _disposed; @@ -96,6 +97,40 @@ public sealed class SubList : IDisposable } } + public void ApplyRemoteSub(RemoteSubscription sub) + { + _lock.EnterWriteLock(); + try + { + var key = $"{sub.RouteId}|{sub.Subject}|{sub.Queue}"; + _remoteSubs[key] = sub; + Interlocked.Increment(ref _generation); + } + finally + { + _lock.ExitWriteLock(); + } + } + + public bool HasRemoteInterest(string subject) + { + _lock.EnterReadLock(); + try + { + foreach (var remoteSub in _remoteSubs.Values) + { + if (SubjectMatch.MatchLiteral(subject, remoteSub.Subject)) + return true; + } + + return false; + } + finally + { + _lock.ExitReadLock(); + } + } + public void Insert(Subscription sub) { var subject = sub.Subject; diff --git a/tests/NATS.Server.Tests/GatewayLeafBootstrapTests.cs b/tests/NATS.Server.Tests/GatewayLeafBootstrapTests.cs new file mode 100644 index 0000000..3a7e606 --- /dev/null +++ b/tests/NATS.Server.Tests/GatewayLeafBootstrapTests.cs @@ -0,0 +1,14 @@ +namespace NATS.Server.Tests; + +public class GatewayLeafBootstrapTests +{ + [Fact] + public async Task Server_bootstraps_gateway_and_leaf_managers_when_configured() + { + await using var server = await TestServerFactory.CreateWithGatewayAndLeafAsync(); + await server.WaitForReadyAsync(); + + server.Stats.Gateways.ShouldBeGreaterThanOrEqualTo(0); + server.Stats.Leafs.ShouldBeGreaterThanOrEqualTo(0); + } +} diff --git a/tests/NATS.Server.Tests/JetStreamStartupTests.cs b/tests/NATS.Server.Tests/JetStreamStartupTests.cs new file mode 100644 index 0000000..b0bfc7d --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamStartupTests.cs @@ -0,0 +1,13 @@ +namespace NATS.Server.Tests; + +public class JetStreamStartupTests +{ + [Fact] + public async Task JetStream_enabled_server_starts_service() + { + await using var server = await TestServerFactory.CreateJetStreamEnabledAsync(); + await server.WaitForReadyAsync(); + + server.Stats.JetStreamEnabled.ShouldBeTrue(); + } +} diff --git a/tests/NATS.Server.Tests/RouteHandshakeTests.cs b/tests/NATS.Server.Tests/RouteHandshakeTests.cs index 586dbfa..8c5a7f8 100644 --- a/tests/NATS.Server.Tests/RouteHandshakeTests.cs +++ b/tests/NATS.Server.Tests/RouteHandshakeTests.cs @@ -49,6 +49,55 @@ internal static class TestServerFactory return new ClusterTestServer(server, cts); } + + public static async Task CreateWithGatewayAndLeafAsync() + { + var options = new NatsOptions + { + Host = "127.0.0.1", + Port = 0, + Gateway = new GatewayOptions + { + Name = "G1", + Host = "127.0.0.1", + Port = 0, + }, + LeafNode = new LeafNodeOptions + { + Host = "127.0.0.1", + Port = 0, + }, + }; + + var server = new NatsServer(options, NullLoggerFactory.Instance); + var cts = new CancellationTokenSource(); + _ = server.StartAsync(cts.Token); + await server.WaitForReadyAsync(); + + return new ClusterTestServer(server, cts); + } + + public static async Task CreateJetStreamEnabledAsync() + { + var options = new NatsOptions + { + Host = "127.0.0.1", + Port = 0, + JetStream = new JetStreamOptions + { + StoreDir = Path.Combine(Path.GetTempPath(), $"nats-js-{Guid.NewGuid():N}"), + MaxMemoryStore = 1024 * 1024, + MaxFileStore = 10 * 1024 * 1024, + }, + }; + + var server = new NatsServer(options, NullLoggerFactory.Instance); + var cts = new CancellationTokenSource(); + _ = server.StartAsync(cts.Token); + await server.WaitForReadyAsync(); + + return new ClusterTestServer(server, cts); + } } internal sealed class ClusterTestServer(NatsServer server, CancellationTokenSource cts) : IAsyncDisposable diff --git a/tests/NATS.Server.Tests/RouteSubscriptionPropagationTests.cs b/tests/NATS.Server.Tests/RouteSubscriptionPropagationTests.cs new file mode 100644 index 0000000..dbf9506 --- /dev/null +++ b/tests/NATS.Server.Tests/RouteSubscriptionPropagationTests.cs @@ -0,0 +1,141 @@ +using System.Net; +using System.Net.Sockets; +using System.Text; +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Server.Configuration; + +namespace NATS.Server.Tests; + +public class RouteSubscriptionPropagationTests +{ + [Fact] + public async Task Subscriptions_propagate_between_routed_servers() + { + await using var fixture = await RouteFixture.StartTwoNodeClusterAsync(); + + await fixture.SubscribeOnServerBAsync("foo.*"); + var hasInterest = await fixture.ServerAHasRemoteInterestAsync("foo.bar"); + + hasInterest.ShouldBeTrue(); + } +} + +internal sealed class RouteFixture : IAsyncDisposable +{ + private readonly NatsServer _serverA; + private readonly NatsServer _serverB; + private readonly CancellationTokenSource _ctsA; + private readonly CancellationTokenSource _ctsB; + private Socket? _subscriberOnB; + + private RouteFixture(NatsServer serverA, NatsServer serverB, CancellationTokenSource ctsA, CancellationTokenSource ctsB) + { + _serverA = serverA; + _serverB = serverB; + _ctsA = ctsA; + _ctsB = ctsB; + } + + public static async Task StartTwoNodeClusterAsync() + { + var optsA = new NatsOptions + { + Host = "127.0.0.1", + Port = 0, + Cluster = new ClusterOptions + { + Name = Guid.NewGuid().ToString("N"), + Host = "127.0.0.1", + Port = 0, + }, + }; + + var serverA = new NatsServer(optsA, NullLoggerFactory.Instance); + var ctsA = new CancellationTokenSource(); + _ = serverA.StartAsync(ctsA.Token); + await serverA.WaitForReadyAsync(); + + var optsB = new NatsOptions + { + Host = "127.0.0.1", + Port = 0, + Cluster = new ClusterOptions + { + Name = Guid.NewGuid().ToString("N"), + Host = "127.0.0.1", + Port = 0, + Routes = [serverA.ClusterListen!], + }, + }; + + var serverB = new NatsServer(optsB, NullLoggerFactory.Instance); + var ctsB = new CancellationTokenSource(); + _ = serverB.StartAsync(ctsB.Token); + await serverB.WaitForReadyAsync(); + + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + while (!timeout.IsCancellationRequested && (serverA.Stats.Routes == 0 || serverB.Stats.Routes == 0)) + await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); + + return new RouteFixture(serverA, serverB, ctsA, ctsB); + } + + public async Task SubscribeOnServerBAsync(string subject) + { + var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(IPAddress.Loopback, _serverB.Port); + _subscriberOnB = sock; + + await ReadLineAsync(sock); // INFO + await sock.SendAsync(Encoding.ASCII.GetBytes($"CONNECT {{}}\r\nSUB {subject} 1\r\nPING\r\n")); + await ReadUntilAsync(sock, "PONG"); + } + + public async Task ServerAHasRemoteInterestAsync(string subject) + { + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + while (!timeout.IsCancellationRequested) + { + if (_serverA.HasRemoteInterest(subject)) + return true; + await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); + } + + return false; + } + + public async ValueTask DisposeAsync() + { + _subscriberOnB?.Dispose(); + await _ctsA.CancelAsync(); + await _ctsB.CancelAsync(); + _serverA.Dispose(); + _serverB.Dispose(); + _ctsA.Dispose(); + _ctsB.Dispose(); + } + + private static async Task ReadLineAsync(Socket sock) + { + var buf = new byte[4096]; + var n = await sock.ReceiveAsync(buf, SocketFlags.None); + return Encoding.ASCII.GetString(buf, 0, n); + } + + private static async Task ReadUntilAsync(Socket sock, string expected) + { + var sb = new StringBuilder(); + var buf = new byte[4096]; + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + + while (!sb.ToString().Contains(expected, StringComparison.Ordinal)) + { + var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token); + if (n == 0) + break; + sb.Append(Encoding.ASCII.GetString(buf, 0, n)); + } + + return sb.ToString(); + } +}