refactor: extract NATS.Server.Clustering.Tests project
Move 29 clustering/routing test files from NATS.Server.Tests to a dedicated NATS.Server.Clustering.Tests project. Update namespaces, replace private GetFreePort/ReadUntilAsync helpers with TestUtilities calls, and extract TestServerFactory/ClusterTestServer to TestUtilities to fix cross-project reference from JetStreamStartupTests.
This commit is contained in:
@@ -10,6 +10,7 @@
|
||||
<Project Path="tests/NATS.Server.Mqtt.Tests/NATS.Server.Mqtt.Tests.csproj" />
|
||||
<Project Path="tests/NATS.Server.Gateways.Tests/NATS.Server.Gateways.Tests.csproj" />
|
||||
<Project Path="tests/NATS.Server.LeafNodes.Tests/NATS.Server.LeafNodes.Tests.csproj" />
|
||||
<Project Path="tests/NATS.Server.Clustering.Tests/NATS.Server.Clustering.Tests.csproj" />
|
||||
<Project Path="tests/NATS.E2E.Tests/NATS.E2E.Tests.csproj" />
|
||||
</Folder>
|
||||
</Solution>
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
<InternalsVisibleTo Include="NATS.Server.Mqtt.Tests" />
|
||||
<InternalsVisibleTo Include="NATS.Server.Gateways.Tests" />
|
||||
<InternalsVisibleTo Include="NATS.Server.LeafNodes.Tests" />
|
||||
<InternalsVisibleTo Include="NATS.Server.Clustering.Tests" />
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<FrameworkReference Include="Microsoft.AspNetCore.App" />
|
||||
|
||||
@@ -5,7 +5,7 @@ using NATS.Server.Gateways;
|
||||
using NATS.Server.Protocol;
|
||||
using NATS.Server.Routes;
|
||||
|
||||
namespace NATS.Server.Tests;
|
||||
namespace NATS.Server.Clustering.Tests;
|
||||
|
||||
// Go reference: server/route.go processImplicitRoute, server/gateway.go processImplicitGateway
|
||||
|
||||
@@ -4,7 +4,7 @@ using System.Text;
|
||||
using NATS.Server.Gateways;
|
||||
using NATS.Server.Subscriptions;
|
||||
|
||||
namespace NATS.Server.Tests;
|
||||
namespace NATS.Server.Clustering.Tests;
|
||||
|
||||
public class InterServerAccountProtocolTests
|
||||
{
|
||||
@@ -0,0 +1,27 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<IsPackable>false</IsPackable>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="coverlet.collector" />
|
||||
<PackageReference Include="Microsoft.NET.Test.Sdk" />
|
||||
<PackageReference Include="NATS.Client.Core" />
|
||||
<PackageReference Include="NSubstitute" />
|
||||
<PackageReference Include="Shouldly" />
|
||||
<PackageReference Include="xunit" />
|
||||
<PackageReference Include="xunit.runner.visualstudio" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<Using Include="Xunit" />
|
||||
<Using Include="Shouldly" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\..\src\NATS.Server\NATS.Server.csproj" />
|
||||
<ProjectReference Include="..\NATS.Server.TestUtilities\NATS.Server.TestUtilities.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
@@ -8,7 +8,7 @@ using Microsoft.Extensions.Logging.Abstractions;
|
||||
using NATS.Server.Configuration;
|
||||
using NATS.Server.Routes;
|
||||
|
||||
namespace NATS.Server.Tests.Route;
|
||||
namespace NATS.Server.Clustering.Tests.Route;
|
||||
|
||||
/// <summary>
|
||||
/// Go parity tests for the .NET route subsystem ported from
|
||||
25
tests/NATS.Server.Clustering.Tests/RouteHandshakeTests.cs
Normal file
25
tests/NATS.Server.Clustering.Tests/RouteHandshakeTests.cs
Normal file
@@ -0,0 +1,25 @@
|
||||
using NATS.Server.TestUtilities;
|
||||
|
||||
namespace NATS.Server.Clustering.Tests;
|
||||
|
||||
public class RouteHandshakeTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task Two_servers_establish_route_connection()
|
||||
{
|
||||
await using var a = await TestServerFactory.CreateClusterEnabledAsync();
|
||||
await using var b = await TestServerFactory.CreateClusterEnabledAsync(seed: a.ClusterListen);
|
||||
|
||||
await a.WaitForReadyAsync();
|
||||
await b.WaitForReadyAsync();
|
||||
|
||||
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||
while (!timeout.IsCancellationRequested && (a.Stats.Routes == 0 || b.Stats.Routes == 0))
|
||||
{
|
||||
await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
|
||||
}
|
||||
|
||||
a.Stats.Routes.ShouldBeGreaterThan(0);
|
||||
b.Stats.Routes.ShouldBeGreaterThan(0);
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
namespace NATS.Server.Tests;
|
||||
namespace NATS.Server.Clustering.Tests;
|
||||
|
||||
public class RoutePoolTests
|
||||
{
|
||||
@@ -1,4 +1,4 @@
|
||||
namespace NATS.Server.Tests;
|
||||
namespace NATS.Server.Clustering.Tests;
|
||||
|
||||
public class RouteRmsgForwardingTests
|
||||
{
|
||||
@@ -3,8 +3,9 @@ using System.Net.Sockets;
|
||||
using System.Text;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using NATS.Server.Configuration;
|
||||
using NATS.Server.TestUtilities;
|
||||
|
||||
namespace NATS.Server.Tests;
|
||||
namespace NATS.Server.Clustering.Tests;
|
||||
|
||||
public class RouteSubscriptionPropagationTests
|
||||
{
|
||||
@@ -90,7 +91,7 @@ internal sealed class RouteFixture : IAsyncDisposable
|
||||
|
||||
await ReadLineAsync(sock); // INFO
|
||||
await sock.SendAsync(Encoding.ASCII.GetBytes($"CONNECT {{}}\r\nSUB {subject} 1\r\nPING\r\n"));
|
||||
await ReadUntilAsync(sock, "PONG");
|
||||
await SocketTestHelper.ReadUntilAsync(sock, "PONG");
|
||||
}
|
||||
|
||||
public async Task SendRouteSubFrameAsync(string subject)
|
||||
@@ -123,11 +124,11 @@ internal sealed class RouteFixture : IAsyncDisposable
|
||||
_publisherOnA = sock;
|
||||
_ = await ReadLineAsync(sock); // INFO
|
||||
await sock.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPING\r\n"));
|
||||
await ReadUntilAsync(sock, "PONG");
|
||||
await SocketTestHelper.ReadUntilAsync(sock, "PONG");
|
||||
}
|
||||
|
||||
await sock.SendAsync(Encoding.ASCII.GetBytes($"PUB {subject} {payload.Length}\r\n{payload}\r\nPING\r\n"));
|
||||
await ReadUntilAsync(sock, "PONG");
|
||||
await SocketTestHelper.ReadUntilAsync(sock, "PONG");
|
||||
}
|
||||
|
||||
public async Task<string> ReadServerBMessageAsync()
|
||||
@@ -135,7 +136,7 @@ internal sealed class RouteFixture : IAsyncDisposable
|
||||
if (_subscriberOnB == null)
|
||||
throw new InvalidOperationException("No subscriber socket on server B.");
|
||||
|
||||
return await ReadUntilAsync(_subscriberOnB, "MSG ");
|
||||
return await SocketTestHelper.ReadUntilAsync(_subscriberOnB, "MSG ");
|
||||
}
|
||||
|
||||
public async Task<bool> ServerAHasRemoteInterestAsync(string subject, bool expected = true)
|
||||
@@ -184,22 +185,6 @@ internal sealed class RouteFixture : IAsyncDisposable
|
||||
return Encoding.ASCII.GetString(buf, 0, n);
|
||||
}
|
||||
|
||||
private static async Task<string> ReadUntilAsync(Socket sock, string expected)
|
||||
{
|
||||
var sb = new StringBuilder();
|
||||
var buf = new byte[4096];
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||
|
||||
while (!sb.ToString().Contains(expected, StringComparison.Ordinal))
|
||||
{
|
||||
var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token);
|
||||
if (n == 0)
|
||||
break;
|
||||
sb.Append(Encoding.ASCII.GetString(buf, 0, n));
|
||||
}
|
||||
|
||||
return sb.ToString();
|
||||
}
|
||||
|
||||
private static (string Host, int Port) ParseHostPort(string endpoint)
|
||||
{
|
||||
@@ -1,4 +1,4 @@
|
||||
namespace NATS.Server.Tests;
|
||||
namespace NATS.Server.Clustering.Tests;
|
||||
|
||||
public class RouteWireSubscriptionProtocolTests
|
||||
{
|
||||
@@ -7,7 +7,7 @@ using Microsoft.Extensions.Logging.Abstractions;
|
||||
using NATS.Server.Configuration;
|
||||
using NATS.Server.Routes;
|
||||
|
||||
namespace NATS.Server.Tests.Routes;
|
||||
namespace NATS.Server.Clustering.Tests.Routes;
|
||||
|
||||
/// <summary>
|
||||
/// Tests for per-account dedicated route connections (Gap 13.2).
|
||||
@@ -8,7 +8,7 @@ using Microsoft.Extensions.Logging.Abstractions;
|
||||
using NATS.Server.Configuration;
|
||||
using NATS.Server.Routes;
|
||||
|
||||
namespace NATS.Server.Tests.Routes;
|
||||
namespace NATS.Server.Clustering.Tests.Routes;
|
||||
|
||||
/// <summary>
|
||||
/// Tests for <see cref="RouteManager.RemoveRoute"/>,
|
||||
@@ -7,7 +7,7 @@ using Microsoft.Extensions.Logging.Abstractions;
|
||||
using NATS.Server.Configuration;
|
||||
using NATS.Server.Routes;
|
||||
|
||||
namespace NATS.Server.Tests.Routes;
|
||||
namespace NATS.Server.Clustering.Tests.Routes;
|
||||
|
||||
/// <summary>
|
||||
/// Tests for no-pool (legacy) route fallback — Gap 13.6.
|
||||
@@ -7,7 +7,7 @@ using Microsoft.Extensions.Logging.Abstractions;
|
||||
using NATS.Server.Configuration;
|
||||
using NATS.Server.Routes;
|
||||
|
||||
namespace NATS.Server.Tests.Routes;
|
||||
namespace NATS.Server.Clustering.Tests.Routes;
|
||||
|
||||
/// <summary>
|
||||
/// Tests for route pool size negotiation (Gap 13.3).
|
||||
@@ -3,7 +3,7 @@ using NATS.Client.Core;
|
||||
using NATS.Server.Auth;
|
||||
using NATS.Server.Configuration;
|
||||
|
||||
namespace NATS.Server.Tests.Routes;
|
||||
namespace NATS.Server.Clustering.Tests.Routes;
|
||||
|
||||
public class RouteAccountScopedDeliveryTests
|
||||
{
|
||||
@@ -1,6 +1,6 @@
|
||||
using NATS.Server.Routes;
|
||||
|
||||
namespace NATS.Server.Tests;
|
||||
namespace NATS.Server.Clustering.Tests;
|
||||
|
||||
public class RouteAccountScopedTests
|
||||
{
|
||||
@@ -4,7 +4,7 @@ using System.Text;
|
||||
using NATS.Server.Routes;
|
||||
using NATS.Server.Subscriptions;
|
||||
|
||||
namespace NATS.Server.Tests.Routes;
|
||||
namespace NATS.Server.Clustering.Tests.Routes;
|
||||
|
||||
public class RouteBatchProtoParityBatch3Tests
|
||||
{
|
||||
@@ -1,7 +1,7 @@
|
||||
using System.Text;
|
||||
using NATS.Server.Routes;
|
||||
|
||||
namespace NATS.Server.Tests;
|
||||
namespace NATS.Server.Clustering.Tests;
|
||||
|
||||
public class RouteCompressionTests
|
||||
{
|
||||
@@ -2,7 +2,7 @@ using Microsoft.Extensions.Logging.Abstractions;
|
||||
using NATS.Client.Core;
|
||||
using NATS.Server.Configuration;
|
||||
|
||||
namespace NATS.Server.Tests.Routes;
|
||||
namespace NATS.Server.Clustering.Tests.Routes;
|
||||
|
||||
/// <summary>
|
||||
/// Tests cluster route formation and message forwarding between servers.
|
||||
@@ -7,7 +7,7 @@ using NATS.Server.Auth;
|
||||
using NATS.Server.Configuration;
|
||||
using NATS.Server.Routes;
|
||||
|
||||
namespace NATS.Server.Tests.Routes;
|
||||
namespace NATS.Server.Clustering.Tests.Routes;
|
||||
|
||||
/// <summary>
|
||||
/// Tests for route configuration validation, compression options, topology gossip,
|
||||
@@ -7,7 +7,7 @@ using NATS.Server.Configuration;
|
||||
using NATS.Server.Routes;
|
||||
using NATS.Server.Subscriptions;
|
||||
|
||||
namespace NATS.Server.Tests.Routes;
|
||||
namespace NATS.Server.Clustering.Tests.Routes;
|
||||
|
||||
/// <summary>
|
||||
/// Tests for route connection establishment, handshake, reconnection, and lifecycle.
|
||||
@@ -7,8 +7,9 @@ using NATS.Server.Auth;
|
||||
using NATS.Server.Configuration;
|
||||
using NATS.Server.Routes;
|
||||
using NATS.Server.Subscriptions;
|
||||
using NATS.Server.TestUtilities;
|
||||
|
||||
namespace NATS.Server.Tests.Routes;
|
||||
namespace NATS.Server.Clustering.Tests.Routes;
|
||||
|
||||
/// <summary>
|
||||
/// Tests for route message forwarding (RMSG), reply propagation, payload delivery,
|
||||
@@ -140,7 +141,7 @@ public class RouteForwardingTests
|
||||
var buf = new byte[4096];
|
||||
_ = await responderSock.ReceiveAsync(buf); // INFO
|
||||
await responderSock.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB service.echo 1\r\nPING\r\n"));
|
||||
await ReadUntilAsync(responderSock, "PONG");
|
||||
await SocketTestHelper.ReadUntilAsync(responderSock, "PONG");
|
||||
await WaitForCondition(() => b.Server.HasRemoteInterest("service.echo"));
|
||||
|
||||
// Requester on server B: subscribe to reply inbox via raw socket
|
||||
@@ -150,26 +151,26 @@ public class RouteForwardingTests
|
||||
var replyInbox = $"_INBOX.{Guid.NewGuid():N}";
|
||||
await requesterSock.SendAsync(Encoding.ASCII.GetBytes(
|
||||
$"CONNECT {{}}\r\nSUB {replyInbox} 2\r\nPING\r\n"));
|
||||
await ReadUntilAsync(requesterSock, "PONG");
|
||||
await SocketTestHelper.ReadUntilAsync(requesterSock, "PONG");
|
||||
await WaitForCondition(() => a.Server.HasRemoteInterest(replyInbox));
|
||||
|
||||
// Publish request with reply-to from B
|
||||
await requesterSock.SendAsync(Encoding.ASCII.GetBytes(
|
||||
$"PUB service.echo {replyInbox} 4\r\nping\r\nPING\r\n"));
|
||||
await ReadUntilAsync(requesterSock, "PONG");
|
||||
await SocketTestHelper.ReadUntilAsync(requesterSock, "PONG");
|
||||
|
||||
// Read the request on A, verify reply-to
|
||||
var requestData = await ReadUntilAsync(responderSock, "ping");
|
||||
var requestData = await SocketTestHelper.ReadUntilAsync(responderSock, "ping");
|
||||
requestData.ShouldContain($"MSG service.echo 1 {replyInbox} 4");
|
||||
requestData.ShouldContain("ping");
|
||||
|
||||
// Publish reply from A to the reply-to subject
|
||||
await responderSock.SendAsync(Encoding.ASCII.GetBytes(
|
||||
$"PUB {replyInbox} 4\r\npong\r\nPING\r\n"));
|
||||
await ReadUntilAsync(responderSock, "PONG");
|
||||
await SocketTestHelper.ReadUntilAsync(responderSock, "PONG");
|
||||
|
||||
// Read the reply on B
|
||||
var replyData = await ReadUntilAsync(requesterSock, "pong");
|
||||
var replyData = await SocketTestHelper.ReadUntilAsync(requesterSock, "pong");
|
||||
replyData.ShouldContain($"MSG {replyInbox} 2 4");
|
||||
replyData.ShouldContain("pong");
|
||||
}
|
||||
@@ -595,7 +596,7 @@ public class RouteForwardingTests
|
||||
_ = await sock.ReceiveAsync(buf); // INFO
|
||||
await sock.SendAsync(Encoding.ASCII.GetBytes(
|
||||
"CONNECT {}\r\nPUB reply.subject.test _INBOX.reply123 5\r\nHello\r\nPING\r\n"));
|
||||
await ReadUntilAsync(sock, "PONG");
|
||||
await SocketTestHelper.ReadUntilAsync(sock, "PONG");
|
||||
|
||||
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||
var msg = await sub.Msgs.ReadAsync(timeout.Token);
|
||||
@@ -684,7 +685,7 @@ public class RouteForwardingTests
|
||||
await route.SendRmsgAsync("$G", "subject.test", "_INBOX.reply", payload, timeout.Token);
|
||||
|
||||
// Read the RMSG frame from the remote side, waiting until expected content arrives
|
||||
var data = await ReadUntilAsync(remote, "test-payload");
|
||||
var data = await SocketTestHelper.ReadUntilAsync(remote, "test-payload");
|
||||
data.ShouldContain("RMSG $G subject.test _INBOX.reply 12");
|
||||
data.ShouldContain("test-payload");
|
||||
}
|
||||
@@ -803,18 +804,4 @@ public class RouteForwardingTests
|
||||
return sb.ToString();
|
||||
}
|
||||
|
||||
private static async Task<string> ReadUntilAsync(Socket sock, string expected)
|
||||
{
|
||||
var sb = new StringBuilder();
|
||||
var buf = new byte[4096];
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||
while (!sb.ToString().Contains(expected, StringComparison.Ordinal))
|
||||
{
|
||||
var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token);
|
||||
if (n == 0) break;
|
||||
sb.Append(Encoding.ASCII.GetString(buf, 0, n));
|
||||
}
|
||||
|
||||
return sb.ToString();
|
||||
}
|
||||
}
|
||||
@@ -8,7 +8,7 @@ using NATS.Server;
|
||||
using NATS.Server.Configuration;
|
||||
using NATS.Server.Routes;
|
||||
|
||||
namespace NATS.Server.Tests.Routes;
|
||||
namespace NATS.Server.Clustering.Tests.Routes;
|
||||
|
||||
/// <summary>
|
||||
/// Tests for the FNV-1a hash-based route storage on <see cref="RouteManager"/>.
|
||||
@@ -3,15 +3,16 @@ using System.Net.Sockets;
|
||||
using System.Text;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using NATS.Server;
|
||||
using NATS.Server.TestUtilities;
|
||||
|
||||
namespace NATS.Server.Tests.Routes;
|
||||
namespace NATS.Server.Clustering.Tests.Routes;
|
||||
|
||||
public class RouteInfoBroadcastParityBatch4Tests
|
||||
{
|
||||
[Fact]
|
||||
public async Task UpdateServerINFOAndSendINFOToClients_broadcasts_INFO_to_connected_clients()
|
||||
{
|
||||
var port = GetFreePort();
|
||||
var port = TestPortAllocator.GetFreePort();
|
||||
using var server = new NatsServer(new NatsOptions { Host = "127.0.0.1", Port = port }, NullLoggerFactory.Instance);
|
||||
using var cts = new CancellationTokenSource();
|
||||
_ = server.StartAsync(cts.Token);
|
||||
@@ -68,17 +69,4 @@ public class RouteInfoBroadcastParityBatch4Tests
|
||||
return Encoding.ASCII.GetString([.. buffer]);
|
||||
}
|
||||
|
||||
private static int GetFreePort()
|
||||
{
|
||||
var listener = new TcpListener(IPAddress.Loopback, 0);
|
||||
listener.Start();
|
||||
try
|
||||
{
|
||||
return ((IPEndPoint)listener.LocalEndpoint).Port;
|
||||
}
|
||||
finally
|
||||
{
|
||||
listener.Stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4,7 +4,7 @@ using System.Text;
|
||||
using NATS.Server.Routes;
|
||||
using NATS.Server.Subscriptions;
|
||||
|
||||
namespace NATS.Server.Tests.Routes;
|
||||
namespace NATS.Server.Clustering.Tests.Routes;
|
||||
|
||||
public class RouteInterestIdempotencyTests
|
||||
{
|
||||
@@ -5,7 +5,7 @@ using NATS.Server.Configuration;
|
||||
using NATS.Server.Protocol;
|
||||
using NATS.Server.Routes;
|
||||
|
||||
namespace NATS.Server.Tests.Routes;
|
||||
namespace NATS.Server.Clustering.Tests.Routes;
|
||||
|
||||
public class RouteParityHelpersBatch1Tests
|
||||
{
|
||||
@@ -6,7 +6,7 @@ using Microsoft.Extensions.Logging.Abstractions;
|
||||
using NATS.Server.Configuration;
|
||||
using NATS.Server.Routes;
|
||||
|
||||
namespace NATS.Server.Tests.Routes;
|
||||
namespace NATS.Server.Clustering.Tests.Routes;
|
||||
|
||||
/// <summary>
|
||||
/// Tests for route pool accounting per account, matching Go's
|
||||
@@ -5,7 +5,7 @@ using Microsoft.Extensions.Logging.Abstractions;
|
||||
using NATS.Server.Configuration;
|
||||
using NATS.Server.Subscriptions;
|
||||
|
||||
namespace NATS.Server.Tests.Routes;
|
||||
namespace NATS.Server.Clustering.Tests.Routes;
|
||||
|
||||
public class RouteRemoteSubCleanupParityBatch2Tests
|
||||
{
|
||||
@@ -4,7 +4,7 @@
|
||||
using System.Text;
|
||||
using NATS.Server.Routes;
|
||||
|
||||
namespace NATS.Server.Tests.Routes;
|
||||
namespace NATS.Server.Clustering.Tests.Routes;
|
||||
|
||||
/// <summary>
|
||||
/// Tests for route S2/Snappy compression codec, matching Go's route compression
|
||||
@@ -7,8 +7,9 @@ using NATS.Server.Auth;
|
||||
using NATS.Server.Configuration;
|
||||
using NATS.Server.Routes;
|
||||
using NATS.Server.Subscriptions;
|
||||
using NATS.Server.TestUtilities;
|
||||
|
||||
namespace NATS.Server.Tests.Routes;
|
||||
namespace NATS.Server.Clustering.Tests.Routes;
|
||||
|
||||
/// <summary>
|
||||
/// Tests for route subscription propagation: RS+/RS-, wildcard subs, queue subs,
|
||||
@@ -357,7 +358,7 @@ public class RouteSubscriptionTests
|
||||
await sock.ConnectAsync(IPAddress.Loopback, a.Server.Port);
|
||||
_ = await ReadLineAsync(sock, default);
|
||||
await sock.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB foo queue1 1\r\nPING\r\n"));
|
||||
await ReadUntilAsync(sock, "PONG");
|
||||
await SocketTestHelper.ReadUntilAsync(sock, "PONG");
|
||||
|
||||
await WaitForCondition(() => b.Server.HasRemoteInterest("foo"));
|
||||
b.Server.HasRemoteInterest("foo").ShouldBeTrue();
|
||||
@@ -829,20 +830,6 @@ public class RouteSubscriptionTests
|
||||
private static Task WriteLineAsync(Socket socket, string line, CancellationToken ct)
|
||||
=> socket.SendAsync(Encoding.ASCII.GetBytes($"{line}\r\n"), SocketFlags.None, ct).AsTask();
|
||||
|
||||
private static async Task<string> ReadUntilAsync(Socket sock, string expected)
|
||||
{
|
||||
var sb = new StringBuilder();
|
||||
var buf = new byte[4096];
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||
while (!sb.ToString().Contains(expected, StringComparison.Ordinal))
|
||||
{
|
||||
var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token);
|
||||
if (n == 0) break;
|
||||
sb.Append(Encoding.ASCII.GetString(buf, 0, n));
|
||||
}
|
||||
|
||||
return sb.ToString();
|
||||
}
|
||||
}
|
||||
|
||||
file static class CancellationTokenExtensions
|
||||
@@ -2,7 +2,7 @@ using Microsoft.Extensions.Logging.Abstractions;
|
||||
using NATS.Server.Configuration;
|
||||
using NATS.Server.Routes;
|
||||
|
||||
namespace NATS.Server.Tests;
|
||||
namespace NATS.Server.Clustering.Tests;
|
||||
|
||||
public class RouteTopologyGossipTests
|
||||
{
|
||||
@@ -1,31 +1,9 @@
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using NATS.Server.Configuration;
|
||||
|
||||
namespace NATS.Server.Tests;
|
||||
namespace NATS.Server.TestUtilities;
|
||||
|
||||
public class RouteHandshakeTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task Two_servers_establish_route_connection()
|
||||
{
|
||||
await using var a = await TestServerFactory.CreateClusterEnabledAsync();
|
||||
await using var b = await TestServerFactory.CreateClusterEnabledAsync(seed: a.ClusterListen);
|
||||
|
||||
await a.WaitForReadyAsync();
|
||||
await b.WaitForReadyAsync();
|
||||
|
||||
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||
while (!timeout.IsCancellationRequested && (a.Stats.Routes == 0 || b.Stats.Routes == 0))
|
||||
{
|
||||
await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
|
||||
}
|
||||
|
||||
a.Stats.Routes.ShouldBeGreaterThan(0);
|
||||
b.Stats.Routes.ShouldBeGreaterThan(0);
|
||||
}
|
||||
}
|
||||
|
||||
internal static class TestServerFactory
|
||||
public static class TestServerFactory
|
||||
{
|
||||
public static async Task<ClusterTestServer> CreateClusterEnabledAsync(string? seed = null)
|
||||
{
|
||||
@@ -100,7 +78,7 @@ internal static class TestServerFactory
|
||||
}
|
||||
}
|
||||
|
||||
internal sealed class ClusterTestServer(NatsServer server, CancellationTokenSource cts) : IAsyncDisposable
|
||||
public sealed class ClusterTestServer(NatsServer server, CancellationTokenSource cts) : IAsyncDisposable
|
||||
{
|
||||
public ServerStats Stats => server.Stats;
|
||||
public string ClusterListen => server.ClusterListen!;
|
||||
@@ -1,3 +1,4 @@
|
||||
using NATS.Server.TestUtilities;
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class JetStreamStartupTests
|
||||
|
||||
Reference in New Issue
Block a user