refactor: extract NATS.Server.LeafNodes.Tests project

Move 28 leaf node test files from NATS.Server.Tests into a dedicated
NATS.Server.LeafNodes.Tests project. Update namespaces, add
InternalsVisibleTo, register in solution file. Replace all Task.Delay
polling loops with PollHelper.WaitUntilAsync/YieldForAsync from
TestUtilities. Replace private ReadUntilAsync in LeafProtocolTests
with SocketTestHelper.ReadUntilAsync.

All 281 tests pass.
This commit is contained in:
Joseph Doherty
2026-03-12 15:23:33 -04:00
parent 9972b74bc3
commit 3f7d896a34
31 changed files with 132 additions and 243 deletions

View File

@@ -9,6 +9,7 @@
<Project Path="tests/NATS.Server.Transport.Tests/NATS.Server.Transport.Tests.csproj" /> <Project Path="tests/NATS.Server.Transport.Tests/NATS.Server.Transport.Tests.csproj" />
<Project Path="tests/NATS.Server.Mqtt.Tests/NATS.Server.Mqtt.Tests.csproj" /> <Project Path="tests/NATS.Server.Mqtt.Tests/NATS.Server.Mqtt.Tests.csproj" />
<Project Path="tests/NATS.Server.Gateways.Tests/NATS.Server.Gateways.Tests.csproj" /> <Project Path="tests/NATS.Server.Gateways.Tests/NATS.Server.Gateways.Tests.csproj" />
<Project Path="tests/NATS.Server.LeafNodes.Tests/NATS.Server.LeafNodes.Tests.csproj" />
<Project Path="tests/NATS.E2E.Tests/NATS.E2E.Tests.csproj" /> <Project Path="tests/NATS.E2E.Tests/NATS.E2E.Tests.csproj" />
</Folder> </Folder>
</Solution> </Solution>

View File

@@ -4,6 +4,7 @@
<InternalsVisibleTo Include="NATS.Server.Transport.Tests" /> <InternalsVisibleTo Include="NATS.Server.Transport.Tests" />
<InternalsVisibleTo Include="NATS.Server.Mqtt.Tests" /> <InternalsVisibleTo Include="NATS.Server.Mqtt.Tests" />
<InternalsVisibleTo Include="NATS.Server.Gateways.Tests" /> <InternalsVisibleTo Include="NATS.Server.Gateways.Tests" />
<InternalsVisibleTo Include="NATS.Server.LeafNodes.Tests" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App" /> <FrameworkReference Include="Microsoft.AspNetCore.App" />

View File

@@ -4,7 +4,7 @@ using System.Text;
using NATS.Server.LeafNodes; using NATS.Server.LeafNodes;
using NATS.Server.Subscriptions; using NATS.Server.Subscriptions;
namespace NATS.Server.Tests; namespace NATS.Server.LeafNodes.Tests;
public class LeafAdvancedSemanticsTests public class LeafAdvancedSemanticsTests
{ {

View File

@@ -7,8 +7,9 @@ using NATS.Server.Auth;
using NATS.Server.Configuration; using NATS.Server.Configuration;
using NATS.Server.LeafNodes; using NATS.Server.LeafNodes;
using NATS.Server.Subscriptions; using NATS.Server.Subscriptions;
using NATS.Server.TestUtilities;
namespace NATS.Server.Tests.LeafNode; namespace NATS.Server.LeafNodes.Tests.LeafNode;
/// <summary> /// <summary>
/// Go-parity tests for leaf node functionality. /// Go-parity tests for leaf node functionality.
@@ -330,12 +331,9 @@ public class LeafNodeGoParityTests
// Wait for all three leaf connections to be established. // Wait for all three leaf connections to be established.
// B has TWO leaf connections: one outbound to A, one inbound from C. // B has TWO leaf connections: one outbound to A, one inbound from C.
using var waitTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(10)); await PollHelper.WaitUntilAsync(() => serverA.Stats.Leafs > 0
while (!waitTimeout.IsCancellationRequested && Interlocked.Read(ref serverB.Stats.Leafs) >= 2
&& (serverA.Stats.Leafs == 0 && serverC.Stats.Leafs > 0, timeoutMs: 10000);
|| Interlocked.Read(ref serverB.Stats.Leafs) < 2
|| serverC.Stats.Leafs == 0))
await Task.Delay(50, waitTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
// Verify the connection counts match the expected topology // Verify the connection counts match the expected topology
Interlocked.Read(ref serverA.Stats.Leafs).ShouldBe(1); // A has 1 inbound from B Interlocked.Read(ref serverA.Stats.Leafs).ShouldBe(1); // A has 1 inbound from B
@@ -542,13 +540,13 @@ public class LeafNodeGoParityTests
await conn.ConnectAsync(); await conn.ConnectAsync();
var sub = await conn.SubscribeCoreAsync<string>($"concurrent.leaf.{i}"); var sub = await conn.SubscribeCoreAsync<string>($"concurrent.leaf.{i}");
await conn.PingAsync(); await conn.PingAsync();
await Task.Delay(30); await PollHelper.YieldForAsync(30);
await sub.DisposeAsync(); await sub.DisposeAsync();
await conn.PingAsync(); await conn.PingAsync();
})).ToList(); })).ToList();
await Task.WhenAll(tasks); await Task.WhenAll(tasks);
await Task.Delay(200); await PollHelper.YieldForAsync(200);
// All subs should be gone from hub's perspective // All subs should be gone from hub's perspective
for (var i = 0; i < 8; i++) for (var i = 0; i < 8; i++)
@@ -831,7 +829,7 @@ public class LeafNodeGoParityTests
using var cts = new CancellationTokenSource(); using var cts = new CancellationTokenSource();
await manager.StartAsync(cts.Token); await manager.StartAsync(cts.Token);
await Task.Delay(300); await PollHelper.YieldForAsync(300);
stats.Leafs.ShouldBe(0); stats.Leafs.ShouldBe(0);
await cts.CancelAsync(); await cts.CancelAsync();
@@ -857,7 +855,7 @@ public class LeafNodeGoParityTests
using var cts = new CancellationTokenSource(); using var cts = new CancellationTokenSource();
await manager.StartAsync(cts.Token); await manager.StartAsync(cts.Token);
await Task.Delay(150); await PollHelper.YieldForAsync(150);
await cts.CancelAsync(); await cts.CancelAsync();
await manager.DisposeAsync(); // Must not hang await manager.DisposeAsync(); // Must not hang
@@ -1678,14 +1676,7 @@ public class LeafNodeGoParityTests
private static async Task WaitForConditionAsync(Func<bool> predicate, int timeoutMs = 5000) private static async Task WaitForConditionAsync(Func<bool> predicate, int timeoutMs = 5000)
{ {
using var cts = new CancellationTokenSource(timeoutMs); await PollHelper.WaitOrThrowAsync(predicate, $"Condition not met within {timeoutMs}ms.", timeoutMs: timeoutMs);
while (!cts.IsCancellationRequested)
{
if (predicate()) return;
await Task.Delay(20, cts.Token).ContinueWith(_ => { }, TaskScheduler.Default);
}
throw new TimeoutException($"Condition not met within {timeoutMs}ms.");
} }
} }
@@ -1739,36 +1730,19 @@ internal sealed class LeafGoFixture : IAsyncDisposable
_ = spoke.StartAsync(spokeCts.Token); _ = spoke.StartAsync(spokeCts.Token);
await spoke.WaitForReadyAsync(); await spoke.WaitForReadyAsync();
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
while (!timeout.IsCancellationRequested
&& (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
return new LeafGoFixture(hub, spoke, hubCts, spokeCts); return new LeafGoFixture(hub, spoke, hubCts, spokeCts);
} }
public async Task WaitForRemoteInterestOnHubAsync(string subject) public async Task WaitForRemoteInterestOnHubAsync(string subject)
{ {
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitOrThrowAsync(() => Hub.HasRemoteInterest(subject), $"Timed out waiting for hub remote interest on '{subject}'.", timeoutMs: 5000);
while (!timeout.IsCancellationRequested)
{
if (Hub.HasRemoteInterest(subject)) return;
await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
}
throw new TimeoutException($"Timed out waiting for hub remote interest on '{subject}'.");
} }
public async Task WaitForRemoteInterestOnSpokeAsync(string subject) public async Task WaitForRemoteInterestOnSpokeAsync(string subject)
{ {
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitOrThrowAsync(() => Spoke.HasRemoteInterest(subject), $"Timed out waiting for spoke remote interest on '{subject}'.", timeoutMs: 5000);
while (!timeout.IsCancellationRequested)
{
if (Spoke.HasRemoteInterest(subject)) return;
await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
}
throw new TimeoutException($"Timed out waiting for spoke remote interest on '{subject}'.");
} }
public async ValueTask DisposeAsync() public async ValueTask DisposeAsync()

View File

@@ -2,8 +2,9 @@ using Microsoft.Extensions.Logging.Abstractions;
using NATS.Client.Core; using NATS.Client.Core;
using NATS.Server.Auth; using NATS.Server.Auth;
using NATS.Server.Configuration; using NATS.Server.Configuration;
using NATS.Server.TestUtilities;
namespace NATS.Server.Tests.LeafNodes; namespace NATS.Server.LeafNodes.Tests.LeafNodes;
public class LeafAccountScopedDeliveryTests public class LeafAccountScopedDeliveryTests
{ {
@@ -95,9 +96,7 @@ internal sealed class LeafAccountDeliveryFixture : IAsyncDisposable
_ = spoke.StartAsync(spokeCts.Token); _ = spoke.StartAsync(spokeCts.Token);
await spoke.WaitForReadyAsync(); await spoke.WaitForReadyAsync();
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
return new LeafAccountDeliveryFixture(hub, spoke, hubCts, spokeCts); return new LeafAccountDeliveryFixture(hub, spoke, hubCts, spokeCts);
} }
@@ -114,16 +113,7 @@ internal sealed class LeafAccountDeliveryFixture : IAsyncDisposable
public async Task WaitForRemoteInterestOnHubAsync(string account, string subject) public async Task WaitForRemoteInterestOnHubAsync(string account, string subject)
{ {
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitOrThrowAsync(() => Hub.HasRemoteInterest(account, subject), $"Timed out waiting for remote interest {account}:{subject}.", timeoutMs: 5000);
while (!timeout.IsCancellationRequested)
{
if (Hub.HasRemoteInterest(account, subject))
return;
await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
}
throw new TimeoutException($"Timed out waiting for remote interest {account}:{subject}.");
} }
public async ValueTask DisposeAsync() public async ValueTask DisposeAsync()

View File

@@ -1,8 +1,9 @@
using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Logging.Abstractions;
using NATS.Client.Core; using NATS.Client.Core;
using NATS.Server.Configuration; using NATS.Server.Configuration;
using NATS.Server.TestUtilities;
namespace NATS.Server.Tests.LeafNodes; namespace NATS.Server.LeafNodes.Tests.LeafNodes;
/// <summary> /// <summary>
/// Basic leaf node hub-spoke connectivity tests. /// Basic leaf node hub-spoke connectivity tests.
@@ -133,39 +134,19 @@ internal sealed class LeafBasicFixture : IAsyncDisposable
await spoke.WaitForReadyAsync(); await spoke.WaitForReadyAsync();
// Wait for the leaf node connection to be established on both sides // Wait for the leaf node connection to be established on both sides
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
return new LeafBasicFixture(hub, spoke, hubCts, spokeCts); return new LeafBasicFixture(hub, spoke, hubCts, spokeCts);
} }
public async Task WaitForRemoteInterestOnHubAsync(string subject) public async Task WaitForRemoteInterestOnHubAsync(string subject)
{ {
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitOrThrowAsync(() => Hub.HasRemoteInterest(subject), $"Timed out waiting for remote interest on hub for '{subject}'.", timeoutMs: 5000);
while (!timeout.IsCancellationRequested)
{
if (Hub.HasRemoteInterest(subject))
return;
await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
}
throw new TimeoutException($"Timed out waiting for remote interest on hub for '{subject}'.");
} }
public async Task WaitForRemoteInterestOnSpokeAsync(string subject) public async Task WaitForRemoteInterestOnSpokeAsync(string subject)
{ {
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitOrThrowAsync(() => Spoke.HasRemoteInterest(subject), $"Timed out waiting for remote interest on spoke for '{subject}'.", timeoutMs: 5000);
while (!timeout.IsCancellationRequested)
{
if (Spoke.HasRemoteInterest(subject))
return;
await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
}
throw new TimeoutException($"Timed out waiting for remote interest on spoke for '{subject}'.");
} }
public async ValueTask DisposeAsync() public async ValueTask DisposeAsync()

View File

@@ -2,7 +2,7 @@ using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Configuration; using NATS.Server.Configuration;
using NATS.Server.LeafNodes; using NATS.Server.LeafNodes;
namespace NATS.Server.Tests.LeafNodes; namespace NATS.Server.LeafNodes.Tests.LeafNodes;
/// <summary> /// <summary>
/// Unit tests for leaf cluster topology registration (Gap 12.6). /// Unit tests for leaf cluster topology registration (Gap 12.6).

View File

@@ -4,7 +4,7 @@ using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Configuration; using NATS.Server.Configuration;
using NATS.Server.LeafNodes; using NATS.Server.LeafNodes;
namespace NATS.Server.Tests.LeafNodes; namespace NATS.Server.LeafNodes.Tests.LeafNodes;
public class LeafConnectionAndRemoteConfigParityBatch1Tests public class LeafConnectionAndRemoteConfigParityBatch1Tests
{ {

View File

@@ -5,7 +5,7 @@ using System.Text.Json;
using NATS.Server.Configuration; using NATS.Server.Configuration;
using NATS.Server.LeafNodes; using NATS.Server.LeafNodes;
namespace NATS.Server.Tests.LeafNodes; namespace NATS.Server.LeafNodes.Tests.LeafNodes;
public class LeafConnectionParityBatch3Tests public class LeafConnectionParityBatch3Tests
{ {

View File

@@ -4,7 +4,7 @@ using System.Text;
using NATS.Server.LeafNodes; using NATS.Server.LeafNodes;
using NATS.Server.Subscriptions; using NATS.Server.Subscriptions;
namespace NATS.Server.Tests.LeafNodes; namespace NATS.Server.LeafNodes.Tests.LeafNodes;
public class LeafConnectionParityBatch4Tests public class LeafConnectionParityBatch4Tests
{ {

View File

@@ -2,7 +2,7 @@ using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Configuration; using NATS.Server.Configuration;
using NATS.Server.LeafNodes; using NATS.Server.LeafNodes;
namespace NATS.Server.Tests.LeafNodes; namespace NATS.Server.LeafNodes.Tests.LeafNodes;
/// <summary> /// <summary>
/// Unit tests for leaf connection disable flag (Gap 12.7). /// Unit tests for leaf connection disable flag (Gap 12.7).

View File

@@ -1,6 +1,6 @@
using NATS.Server.LeafNodes; using NATS.Server.LeafNodes;
namespace NATS.Server.Tests; namespace NATS.Server.LeafNodes.Tests.LeafNodes;
public class LeafHubSpokeMappingParityTests public class LeafHubSpokeMappingParityTests
{ {

View File

@@ -3,8 +3,9 @@ using System.Net.Sockets;
using System.Text; using System.Text;
using NATS.Server.LeafNodes; using NATS.Server.LeafNodes;
using NATS.Server.Subscriptions; using NATS.Server.Subscriptions;
using NATS.Server.TestUtilities;
namespace NATS.Server.Tests.LeafNodes; namespace NATS.Server.LeafNodes.Tests.LeafNodes;
public class LeafInterestIdempotencyTests public class LeafInterestIdempotencyTests
{ {
@@ -45,7 +46,7 @@ public class LeafInterestIdempotencyTests
await WaitForAsync(() => subList.HasRemoteInterest("A", "orders.created"), timeout.Token); await WaitForAsync(() => subList.HasRemoteInterest("A", "orders.created"), timeout.Token);
await WriteLineAsync(remoteSocket, "LS+ A orders.*", timeout.Token); await WriteLineAsync(remoteSocket, "LS+ A orders.*", timeout.Token);
await Task.Delay(100, timeout.Token); await PollHelper.YieldForAsync(100);
subList.MatchRemote("A", "orders.created").Count.ShouldBe(1); subList.MatchRemote("A", "orders.created").Count.ShouldBe(1);
remoteAdded.ShouldBe(1); remoteAdded.ShouldBe(1);
@@ -74,14 +75,6 @@ public class LeafInterestIdempotencyTests
private static async Task WaitForAsync(Func<bool> predicate, CancellationToken ct) private static async Task WaitForAsync(Func<bool> predicate, CancellationToken ct)
{ {
while (!ct.IsCancellationRequested) await PollHelper.WaitOrThrowAsync(predicate, "Timed out waiting for condition.");
{
if (predicate())
return;
await Task.Delay(20, ct);
}
throw new TimeoutException("Timed out waiting for condition.");
} }
} }

View File

@@ -4,7 +4,7 @@ using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Configuration; using NATS.Server.Configuration;
using NATS.Server.LeafNodes; using NATS.Server.LeafNodes;
namespace NATS.Server.Tests.LeafNodes; namespace NATS.Server.LeafNodes.Tests.LeafNodes;
/// <summary> /// <summary>
/// Unit tests for JetStream migration checks on leaf node connections (Gap 12.4). /// Unit tests for JetStream migration checks on leaf node connections (Gap 12.4).

View File

@@ -1,6 +1,6 @@
using NATS.Server.LeafNodes; using NATS.Server.LeafNodes;
namespace NATS.Server.Tests.LeafNodes; namespace NATS.Server.LeafNodes.Tests.LeafNodes;
public class LeafLoopTransparencyRuntimeTests public class LeafLoopTransparencyRuntimeTests
{ {

View File

@@ -9,7 +9,7 @@ using NATS.Server.LeafNodes;
using NATS.Server.Subscriptions; using NATS.Server.Subscriptions;
using NATS.Server.TestUtilities; using NATS.Server.TestUtilities;
namespace NATS.Server.Tests.LeafNodes; namespace NATS.Server.LeafNodes.Tests.LeafNodes;
/// <summary> /// <summary>
/// Advanced leaf node behavior tests: daisy chains, account scoping, concurrency, /// Advanced leaf node behavior tests: daisy chains, account scoping, concurrency,
@@ -68,10 +68,7 @@ public class LeafNodeAdvancedTests
await serverC.WaitForReadyAsync(); await serverC.WaitForReadyAsync();
// Wait for leaf connections // Wait for leaf connections
using var waitTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => !((serverA.Stats.Leafs == 0 || Interlocked.Read(ref serverB.Stats.Leafs) < 2 || serverC.Stats.Leafs == 0)), timeoutMs: 5000);
while (!waitTimeout.IsCancellationRequested
&& (serverA.Stats.Leafs == 0 || Interlocked.Read(ref serverB.Stats.Leafs) < 2 || serverC.Stats.Leafs == 0))
await Task.Delay(50, waitTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
Interlocked.Read(ref serverA.Stats.Leafs).ShouldBe(1); Interlocked.Read(ref serverA.Stats.Leafs).ShouldBe(1);
Interlocked.Read(ref serverB.Stats.Leafs).ShouldBeGreaterThanOrEqualTo(2); Interlocked.Read(ref serverB.Stats.Leafs).ShouldBeGreaterThanOrEqualTo(2);
@@ -171,9 +168,7 @@ public class LeafNodeAdvancedTests
_ = spoke.StartAsync(spokeCts.Token); _ = spoke.StartAsync(spokeCts.Token);
await spoke.WaitForReadyAsync(); await spoke.WaitForReadyAsync();
using var waitTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
while (!waitTimeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
await Task.Delay(50, waitTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
// Subscribe with account A on spoke // Subscribe with account A on spoke
await using var connA = new NatsConnection(new NatsOpts await using var connA = new NatsConnection(new NatsOpts
@@ -195,9 +190,7 @@ public class LeafNodeAdvancedTests
await connB.PingAsync(); await connB.PingAsync();
// Wait for account A interest to propagate // Wait for account A interest to propagate
using var interestTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => !(!hub.HasRemoteInterest("ACCT_A", "acct.test")), timeoutMs: 5000);
while (!interestTimeout.IsCancellationRequested && !hub.HasRemoteInterest("ACCT_A", "acct.test"))
await Task.Delay(50, interestTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
// Publish from account A on hub // Publish from account A on hub
await using var pubA = new NatsConnection(new NatsOpts await using var pubA = new NatsConnection(new NatsOpts
@@ -245,7 +238,7 @@ public class LeafNodeAdvancedTests
var sub = await conn.SubscribeCoreAsync<string>($"concurrent.{index}"); var sub = await conn.SubscribeCoreAsync<string>($"concurrent.{index}");
await conn.PingAsync(); await conn.PingAsync();
await Task.Delay(50); await PollHelper.YieldForAsync(50);
await sub.DisposeAsync(); await sub.DisposeAsync();
await conn.PingAsync(); await conn.PingAsync();
})); }));
@@ -254,7 +247,7 @@ public class LeafNodeAdvancedTests
await Task.WhenAll(tasks); await Task.WhenAll(tasks);
// After all subs are unsubscribed, interest should be gone // After all subs are unsubscribed, interest should be gone
await Task.Delay(200); await PollHelper.YieldForAsync(200);
for (var i = 0; i < 10; i++) for (var i = 0; i < 10; i++)
fixture.Hub.HasRemoteInterest($"concurrent.{i}").ShouldBeFalse(); fixture.Hub.HasRemoteInterest($"concurrent.{i}").ShouldBeFalse();
} }
@@ -383,9 +376,7 @@ public class LeafNodeAdvancedTests
_ = spoke.StartAsync(spokeCts.Token); _ = spoke.StartAsync(spokeCts.Token);
await spoke.WaitForReadyAsync(); await spoke.WaitForReadyAsync();
using var waitTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => !(hub.Stats.Leafs == 0), timeoutMs: 5000);
while (!waitTimeout.IsCancellationRequested && hub.Stats.Leafs == 0)
await Task.Delay(50, waitTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
Interlocked.Read(ref hub.Stats.Leafs).ShouldBe(1); Interlocked.Read(ref hub.Stats.Leafs).ShouldBe(1);
@@ -393,9 +384,7 @@ public class LeafNodeAdvancedTests
spoke.Dispose(); spoke.Dispose();
// After spoke disconnects, wait for count to drop // After spoke disconnects, wait for count to drop
using var disconnTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => !(Interlocked.Read(ref hub.Stats.Leafs) > 0), timeoutMs: 5000);
while (!disconnTimeout.IsCancellationRequested && Interlocked.Read(ref hub.Stats.Leafs) > 0)
await Task.Delay(50, disconnTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
Interlocked.Read(ref hub.Stats.Leafs).ShouldBe(0); Interlocked.Read(ref hub.Stats.Leafs).ShouldBe(0);
@@ -469,9 +458,7 @@ public class LeafNodeAdvancedTests
await conn1.PingAsync(); await conn1.PingAsync();
await conn2.PingAsync(); await conn2.PingAsync();
using var interestTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => !(!fixture.Hub.HasRemoteInterest("dist.test")), timeoutMs: 5000);
while (!interestTimeout.IsCancellationRequested && !fixture.Hub.HasRemoteInterest("dist.test"))
await Task.Delay(50, interestTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
// Hub should have remote interest from at least one spoke // Hub should have remote interest from at least one spoke
fixture.Hub.HasRemoteInterest("dist.test").ShouldBeTrue(); fixture.Hub.HasRemoteInterest("dist.test").ShouldBeTrue();
@@ -643,9 +630,7 @@ public class LeafNodeAdvancedTests
_ = spoke.StartAsync(spokeCts.Token); _ = spoke.StartAsync(spokeCts.Token);
await spoke.WaitForReadyAsync(); await spoke.WaitForReadyAsync();
using var waitTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => !(hub.Stats.Leafs == 0), timeoutMs: 5000);
while (!waitTimeout.IsCancellationRequested && hub.Stats.Leafs == 0)
await Task.Delay(50, waitTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
Interlocked.Read(ref hub.Stats.Leafs).ShouldBe(1); Interlocked.Read(ref hub.Stats.Leafs).ShouldBe(1);
@@ -653,9 +638,7 @@ public class LeafNodeAdvancedTests
await spokeCts.CancelAsync(); await spokeCts.CancelAsync();
spoke.Dispose(); spoke.Dispose();
using var disconnTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => !(Interlocked.Read(ref hub.Stats.Leafs) > 0), timeoutMs: 5000);
while (!disconnTimeout.IsCancellationRequested && Interlocked.Read(ref hub.Stats.Leafs) > 0)
await Task.Delay(50, disconnTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
Interlocked.Read(ref hub.Stats.Leafs).ShouldBe(0); Interlocked.Read(ref hub.Stats.Leafs).ShouldBe(0);

View File

@@ -9,7 +9,7 @@ using NATS.Server.LeafNodes;
using NATS.Server.Subscriptions; using NATS.Server.Subscriptions;
using NATS.Server.TestUtilities; using NATS.Server.TestUtilities;
namespace NATS.Server.Tests.LeafNodes; namespace NATS.Server.LeafNodes.Tests.LeafNodes;
/// <summary> /// <summary>
/// Tests for leaf node connection establishment, authentication, and lifecycle. /// Tests for leaf node connection establishment, authentication, and lifecycle.
@@ -60,9 +60,7 @@ public class LeafNodeConnectionTests
_ = spoke.StartAsync(spokeCts.Token); _ = spoke.StartAsync(spokeCts.Token);
await spoke.WaitForReadyAsync(); await spoke.WaitForReadyAsync();
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
hub.Stats.Leafs.ShouldBeGreaterThan(0); hub.Stats.Leafs.ShouldBeGreaterThan(0);
spoke.Stats.Leafs.ShouldBeGreaterThan(0); spoke.Stats.Leafs.ShouldBeGreaterThan(0);
@@ -103,9 +101,7 @@ public class LeafNodeConnectionTests
_ = spoke.StartAsync(spokeCts.Token); _ = spoke.StartAsync(spokeCts.Token);
await spoke.WaitForReadyAsync(); await spoke.WaitForReadyAsync();
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
hub.Stats.Leafs.ShouldBeGreaterThan(0); hub.Stats.Leafs.ShouldBeGreaterThan(0);
@@ -527,12 +523,6 @@ public class LeafNodeConnectionTests
private static async Task WaitForAsync(Func<bool> predicate, CancellationToken ct) private static async Task WaitForAsync(Func<bool> predicate, CancellationToken ct)
{ {
while (!ct.IsCancellationRequested) await PollHelper.WaitOrThrowAsync(predicate, "Timed out waiting for condition.");
{
if (predicate()) return;
await Task.Delay(20, ct);
}
throw new TimeoutException("Timed out waiting for condition.");
} }
} }

View File

@@ -3,7 +3,7 @@ using NATS.Client.Core;
using NATS.Server.Configuration; using NATS.Server.Configuration;
using NATS.Server.TestUtilities; using NATS.Server.TestUtilities;
namespace NATS.Server.Tests.LeafNodes; namespace NATS.Server.LeafNodes.Tests.LeafNodes;
/// <summary> /// <summary>
/// Tests for message forwarding through leaf node connections (hub-to-leaf, leaf-to-hub, leaf-to-leaf). /// Tests for message forwarding through leaf node connections (hub-to-leaf, leaf-to-hub, leaf-to-leaf).
@@ -155,10 +155,7 @@ public class LeafNodeForwardingTests
await spoke2Conn.PingAsync(); await spoke2Conn.PingAsync();
// Both spokes' interests should propagate to the hub // Both spokes' interests should propagate to the hub
using var waitCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => !((!fixture.Hub.HasRemoteInterest("spoke1.interest") || !fixture.Hub.HasRemoteInterest("spoke2.interest"))), timeoutMs: 5000);
while (!waitCts.IsCancellationRequested
&& (!fixture.Hub.HasRemoteInterest("spoke1.interest") || !fixture.Hub.HasRemoteInterest("spoke2.interest")))
await Task.Delay(50, waitCts.Token).ContinueWith(_ => { }, TaskScheduler.Default);
fixture.Hub.HasRemoteInterest("spoke1.interest").ShouldBeTrue(); fixture.Hub.HasRemoteInterest("spoke1.interest").ShouldBeTrue();
fixture.Hub.HasRemoteInterest("spoke2.interest").ShouldBeTrue(); fixture.Hub.HasRemoteInterest("spoke2.interest").ShouldBeTrue();
@@ -280,7 +277,7 @@ public class LeafNodeForwardingTests
await hubConn.ConnectAsync(); await hubConn.ConnectAsync();
await hubConn.PublishAsync("no.subscriber", "lost"); await hubConn.PublishAsync("no.subscriber", "lost");
await Task.Delay(200); await PollHelper.YieldForAsync(200);
true.ShouldBeTrue(); // No crash = success true.ShouldBeTrue(); // No crash = success
} }
@@ -364,12 +361,9 @@ internal sealed class TwoSpokeFixture : IAsyncDisposable
_ = spoke2.StartAsync(spoke2Cts.Token); _ = spoke2.StartAsync(spoke2Cts.Token);
await spoke2.WaitForReadyAsync(); await spoke2.WaitForReadyAsync();
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => Interlocked.Read(ref hub.Stats.Leafs) >= 2
while (!timeout.IsCancellationRequested && spoke1.Stats.Leafs > 0
&& (Interlocked.Read(ref hub.Stats.Leafs) < 2 && spoke2.Stats.Leafs > 0);
|| spoke1.Stats.Leafs == 0
|| spoke2.Stats.Leafs == 0))
await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
return new TwoSpokeFixture(hub, spoke1, spoke2, hubCts, spoke1Cts, spoke2Cts); return new TwoSpokeFixture(hub, spoke1, spoke2, hubCts, spoke1Cts, spoke2Cts);
} }

View File

@@ -1,8 +1,9 @@
using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Logging.Abstractions;
using NATS.Client.Core; using NATS.Client.Core;
using NATS.Server.Configuration; using NATS.Server.Configuration;
using NATS.Server.TestUtilities;
namespace NATS.Server.Tests.LeafNodes; namespace NATS.Server.LeafNodes.Tests.LeafNodes;
/// <summary> /// <summary>
/// Tests for JetStream behavior over leaf node connections. /// Tests for JetStream behavior over leaf node connections.
@@ -44,9 +45,7 @@ public class LeafNodeJetStreamTests
_ = spoke.StartAsync(spokeCts.Token); _ = spoke.StartAsync(spokeCts.Token);
await spoke.WaitForReadyAsync(); await spoke.WaitForReadyAsync();
using var waitTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
while (!waitTimeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
await Task.Delay(50, waitTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
hub.Stats.JetStreamEnabled.ShouldBeTrue(); hub.Stats.JetStreamEnabled.ShouldBeTrue();
@@ -100,9 +99,7 @@ public class LeafNodeJetStreamTests
_ = spoke.StartAsync(spokeCts.Token); _ = spoke.StartAsync(spokeCts.Token);
await spoke.WaitForReadyAsync(); await spoke.WaitForReadyAsync();
using var waitTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
while (!waitTimeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
await Task.Delay(50, waitTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
// Subscribe on hub for a subject // Subscribe on hub for a subject
await using var hubConn = new NatsConnection(new NatsOpts await using var hubConn = new NatsConnection(new NatsOpts
@@ -115,9 +112,7 @@ public class LeafNodeJetStreamTests
await hubConn.PingAsync(); await hubConn.PingAsync();
// Wait for interest propagation // Wait for interest propagation
using var interestTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => !(!spoke.HasRemoteInterest("js.leaf.test")), timeoutMs: 5000);
while (!interestTimeout.IsCancellationRequested && !spoke.HasRemoteInterest("js.leaf.test"))
await Task.Delay(50, interestTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
// Publish from spoke // Publish from spoke
await using var spokeConn = new NatsConnection(new NatsOpts await using var spokeConn = new NatsConnection(new NatsOpts
@@ -178,9 +173,7 @@ public class LeafNodeJetStreamTests
_ = spoke.StartAsync(spokeCts.Token); _ = spoke.StartAsync(spokeCts.Token);
await spoke.WaitForReadyAsync(); await spoke.WaitForReadyAsync();
using var waitTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
while (!waitTimeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
await Task.Delay(50, waitTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
hub.Stats.JetStreamEnabled.ShouldBeTrue(); hub.Stats.JetStreamEnabled.ShouldBeTrue();
spoke.Stats.JetStreamEnabled.ShouldBeFalse(); spoke.Stats.JetStreamEnabled.ShouldBeFalse();
@@ -191,9 +184,7 @@ public class LeafNodeJetStreamTests
await using var sub = await hubConn.SubscribeCoreAsync<string>("njs.forward"); await using var sub = await hubConn.SubscribeCoreAsync<string>("njs.forward");
await hubConn.PingAsync(); await hubConn.PingAsync();
using var interestTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => !(!spoke.HasRemoteInterest("njs.forward")), timeoutMs: 5000);
while (!interestTimeout.IsCancellationRequested && !spoke.HasRemoteInterest("njs.forward"))
await Task.Delay(50, interestTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
// Publish from spoke // Publish from spoke
await using var spokeConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{spoke.Port}" }); await using var spokeConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{spoke.Port}" });
@@ -253,9 +244,7 @@ public class LeafNodeJetStreamTests
_ = spoke.StartAsync(spokeCts.Token); _ = spoke.StartAsync(spokeCts.Token);
await spoke.WaitForReadyAsync(); await spoke.WaitForReadyAsync();
using var waitTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
while (!waitTimeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
await Task.Delay(50, waitTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
hub.Stats.JetStreamEnabled.ShouldBeTrue(); hub.Stats.JetStreamEnabled.ShouldBeTrue();
spoke.Stats.JetStreamEnabled.ShouldBeTrue(); spoke.Stats.JetStreamEnabled.ShouldBeTrue();
@@ -309,9 +298,7 @@ public class LeafNodeJetStreamTests
_ = spoke.StartAsync(spokeCts.Token); _ = spoke.StartAsync(spokeCts.Token);
await spoke.WaitForReadyAsync(); await spoke.WaitForReadyAsync();
using var waitTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
while (!waitTimeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
await Task.Delay(50, waitTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
// Regular pub/sub should still work alongside JS // Regular pub/sub should still work alongside JS
await using var leafConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{spoke.Port}" }); await using var leafConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{spoke.Port}" });
@@ -322,9 +309,7 @@ public class LeafNodeJetStreamTests
await using var sub = await leafConn.SubscribeCoreAsync<string>("combo.test"); await using var sub = await leafConn.SubscribeCoreAsync<string>("combo.test");
await leafConn.PingAsync(); await leafConn.PingAsync();
using var interestTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => !(!hub.HasRemoteInterest("combo.test")), timeoutMs: 5000);
while (!interestTimeout.IsCancellationRequested && !hub.HasRemoteInterest("combo.test"))
await Task.Delay(50, interestTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
await hubConn.PublishAsync("combo.test", "js-combo"); await hubConn.PublishAsync("combo.test", "js-combo");

View File

@@ -1,6 +1,6 @@
using NATS.Server.LeafNodes; using NATS.Server.LeafNodes;
namespace NATS.Server.Tests.LeafNodes; namespace NATS.Server.LeafNodes.Tests.LeafNodes;
/// <summary> /// <summary>
/// Tests for leaf node loop detection via $LDS. prefix. /// Tests for leaf node loop detection via $LDS. prefix.

View File

@@ -5,7 +5,7 @@ using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Configuration; using NATS.Server.Configuration;
using NATS.Server.LeafNodes; using NATS.Server.LeafNodes;
namespace NATS.Server.Tests.LeafNodes; namespace NATS.Server.LeafNodes.Tests.LeafNodes;
public class LeafNodeManagerParityBatch5Tests public class LeafNodeManagerParityBatch5Tests
{ {

View File

@@ -1,7 +1,7 @@
using NATS.Client.Core; using NATS.Client.Core;
using NATS.Server.TestUtilities; using NATS.Server.TestUtilities;
namespace NATS.Server.Tests.LeafNodes; namespace NATS.Server.LeafNodes.Tests.LeafNodes;
/// <summary> /// <summary>
/// Tests for subject filter propagation through leaf nodes. /// Tests for subject filter propagation through leaf nodes.
@@ -105,9 +105,7 @@ public class LeafNodeSubjectFilterTests
await sub.DisposeAsync(); await sub.DisposeAsync();
await leafConn.PingAsync(); await leafConn.PingAsync();
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => !(fixture.Hub.HasRemoteInterest("unsub.test")), timeoutMs: 5000);
while (!timeout.IsCancellationRequested && fixture.Hub.HasRemoteInterest("unsub.test"))
await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
fixture.Hub.HasRemoteInterest("unsub.test").ShouldBeFalse(); fixture.Hub.HasRemoteInterest("unsub.test").ShouldBeFalse();
} }

View File

@@ -5,7 +5,7 @@ using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Configuration; using NATS.Server.Configuration;
using NATS.Server.LeafNodes; using NATS.Server.LeafNodes;
namespace NATS.Server.Tests.LeafNodes; namespace NATS.Server.LeafNodes.Tests.LeafNodes;
/// <summary> /// <summary>
/// Unit tests for leaf node permission and account syncing (Gap 12.2). /// Unit tests for leaf node permission and account syncing (Gap 12.2).

View File

@@ -5,8 +5,9 @@ using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Configuration; using NATS.Server.Configuration;
using NATS.Server.LeafNodes; using NATS.Server.LeafNodes;
using NATS.Server.Subscriptions; using NATS.Server.Subscriptions;
using NATS.Server.TestUtilities;
namespace NATS.Server.Tests.LeafNodes; namespace NATS.Server.LeafNodes.Tests.LeafNodes;
/// <summary> /// <summary>
/// Tests for solicited (outbound) leaf node connections with retry logic, /// Tests for solicited (outbound) leaf node connections with retry logic,
@@ -55,9 +56,7 @@ public class LeafSolicitedConnectionTests
await spoke.WaitForReadyAsync(); await spoke.WaitForReadyAsync();
// Wait for leaf connections to establish // Wait for leaf connections to establish
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
hub.Stats.Leafs.ShouldBeGreaterThan(0); hub.Stats.Leafs.ShouldBeGreaterThan(0);
spoke.Stats.Leafs.ShouldBeGreaterThan(0); spoke.Stats.Leafs.ShouldBeGreaterThan(0);
@@ -93,7 +92,7 @@ public class LeafSolicitedConnectionTests
await manager.StartAsync(cts.Token); await manager.StartAsync(cts.Token);
// Give it some time to attempt connections // Give it some time to attempt connections
await Task.Delay(500); await PollHelper.YieldForAsync(500);
// No connections should have succeeded // No connections should have succeeded
stats.Leafs.ShouldBe(0); stats.Leafs.ShouldBe(0);
@@ -185,7 +184,7 @@ public class LeafSolicitedConnectionTests
await manager.StartAsync(cts.Token); await manager.StartAsync(cts.Token);
// Let it attempt at least one retry // Let it attempt at least one retry
await Task.Delay(200); await PollHelper.YieldForAsync(200);
// Cancel — the retry loop should stop promptly // Cancel — the retry loop should stop promptly
await cts.CancelAsync(); await cts.CancelAsync();

View File

@@ -1,7 +1,7 @@
using NATS.Server.LeafNodes; using NATS.Server.LeafNodes;
using NATS.Server.Subscriptions; using NATS.Server.Subscriptions;
namespace NATS.Server.Tests.LeafNodes; namespace NATS.Server.LeafNodes.Tests.LeafNodes;
public class LeafSubKeyParityBatch2Tests public class LeafSubKeyParityBatch2Tests
{ {

View File

@@ -6,8 +6,9 @@ using NATS.Client.Core;
using NATS.Server.Configuration; using NATS.Server.Configuration;
using NATS.Server.LeafNodes; using NATS.Server.LeafNodes;
using NATS.Server.Subscriptions; using NATS.Server.Subscriptions;
using NATS.Server.TestUtilities;
namespace NATS.Server.Tests.LeafNodes; namespace NATS.Server.LeafNodes.Tests.LeafNodes;
/// <summary> /// <summary>
/// Tests for leaf node subject filtering via DenyExports/DenyImports (deny-lists) and /// Tests for leaf node subject filtering via DenyExports/DenyImports (deny-lists) and
@@ -196,9 +197,7 @@ public class LeafSubjectFilterTests
try try
{ {
// Wait for leaf connection // Wait for leaf connection
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
await using var leafConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{spoke.Port}" }); await using var leafConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{spoke.Port}" });
await leafConn.ConnectAsync(); await leafConn.ConnectAsync();
@@ -211,7 +210,7 @@ public class LeafSubjectFilterTests
await leafConn.PingAsync(); await leafConn.PingAsync();
// Wait for interest propagation // Wait for interest propagation
await Task.Delay(500); await PollHelper.YieldForAsync(500);
// Publish from hub // Publish from hub
await hubConn.PublishAsync("public.data", "allowed-msg"); await hubConn.PublishAsync("public.data", "allowed-msg");
@@ -285,9 +284,7 @@ public class LeafSubjectFilterTests
try try
{ {
// Wait for leaf connection // Wait for leaf connection
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
await using var hubConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{hub.Port}" }); await using var hubConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{hub.Port}" });
await hubConn.ConnectAsync(); await hubConn.ConnectAsync();
@@ -300,7 +297,7 @@ public class LeafSubjectFilterTests
await hubConn.PingAsync(); await hubConn.PingAsync();
// Wait for interest propagation // Wait for interest propagation
await Task.Delay(500); await PollHelper.YieldForAsync(500);
// Publish from spoke (leaf) // Publish from spoke (leaf)
await leafConn.PublishAsync("public.data", "allowed-msg"); await leafConn.PublishAsync("public.data", "allowed-msg");
@@ -372,9 +369,7 @@ public class LeafSubjectFilterTests
try try
{ {
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
await using var leafConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{spoke.Port}" }); await using var leafConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{spoke.Port}" });
await leafConn.ConnectAsync(); await leafConn.ConnectAsync();
@@ -385,7 +380,7 @@ public class LeafSubjectFilterTests
await using var blockedSub = await leafConn.SubscribeCoreAsync<string>("admin.users"); await using var blockedSub = await leafConn.SubscribeCoreAsync<string>("admin.users");
await using var allowedSub = await leafConn.SubscribeCoreAsync<string>("admin.deep.nested"); await using var allowedSub = await leafConn.SubscribeCoreAsync<string>("admin.deep.nested");
await leafConn.PingAsync(); await leafConn.PingAsync();
await Task.Delay(500); await PollHelper.YieldForAsync(500);
await hubConn.PublishAsync("admin.users", "blocked"); await hubConn.PublishAsync("admin.users", "blocked");
await hubConn.PublishAsync("admin.deep.nested", "allowed"); await hubConn.PublishAsync("admin.deep.nested", "allowed");
@@ -451,11 +446,11 @@ public class LeafSubjectFilterTests
var line = await ReadLineAsync(remoteSocket, cts.Token); var line = await ReadLineAsync(remoteSocket, cts.Token);
line.ShouldStartWith("LEAF "); line.ShouldStartWith("LEAF ");
await Task.Delay(200); await PollHelper.YieldForAsync(200);
// Propagate allowed subscription // Propagate allowed subscription
manager.PropagateLocalSubscription("$G", "public.data", null); manager.PropagateLocalSubscription("$G", "public.data", null);
await Task.Delay(100); await PollHelper.YieldForAsync(100);
var lsLine = await ReadLineAsync(remoteSocket, cts.Token); var lsLine = await ReadLineAsync(remoteSocket, cts.Token);
lsLine.ShouldBe("LS+ $G public.data"); lsLine.ShouldBe("LS+ $G public.data");
@@ -464,7 +459,7 @@ public class LeafSubjectFilterTests
// Send a PING to verify nothing else was sent // Send a PING to verify nothing else was sent
manager.PropagateLocalSubscription("$G", "allowed.check", null); manager.PropagateLocalSubscription("$G", "allowed.check", null);
await Task.Delay(100); await PollHelper.YieldForAsync(100);
var nextLine = await ReadLineAsync(remoteSocket, cts.Token); var nextLine = await ReadLineAsync(remoteSocket, cts.Token);
nextLine.ShouldBe("LS+ $G allowed.check"); nextLine.ShouldBe("LS+ $G allowed.check");
} }
@@ -687,9 +682,7 @@ public class LeafSubjectFilterTests
try try
{ {
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
await using var leafConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{spoke.Port}" }); await using var leafConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{spoke.Port}" });
await leafConn.ConnectAsync(); await leafConn.ConnectAsync();
@@ -699,7 +692,7 @@ public class LeafSubjectFilterTests
await using var allowedSub = await leafConn.SubscribeCoreAsync<string>("allowed.data"); await using var allowedSub = await leafConn.SubscribeCoreAsync<string>("allowed.data");
await using var blockedSub = await leafConn.SubscribeCoreAsync<string>("blocked.data"); await using var blockedSub = await leafConn.SubscribeCoreAsync<string>("blocked.data");
await leafConn.PingAsync(); await leafConn.PingAsync();
await Task.Delay(500); await PollHelper.YieldForAsync(500);
await hubConn.PublishAsync("allowed.data", "yes"); await hubConn.PublishAsync("allowed.data", "yes");
await hubConn.PublishAsync("blocked.data", "no"); await hubConn.PublishAsync("blocked.data", "no");
@@ -768,9 +761,7 @@ public class LeafSubjectFilterTests
try try
{ {
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
await using var hubConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{hub.Port}" }); await using var hubConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{hub.Port}" });
await hubConn.ConnectAsync(); await hubConn.ConnectAsync();
@@ -780,7 +771,7 @@ public class LeafSubjectFilterTests
await using var allowedSub = await hubConn.SubscribeCoreAsync<string>("allowed.data"); await using var allowedSub = await hubConn.SubscribeCoreAsync<string>("allowed.data");
await using var blockedSub = await hubConn.SubscribeCoreAsync<string>("blocked.data"); await using var blockedSub = await hubConn.SubscribeCoreAsync<string>("blocked.data");
await hubConn.PingAsync(); await hubConn.PingAsync();
await Task.Delay(500); await PollHelper.YieldForAsync(500);
await leafConn.PublishAsync("allowed.data", "yes"); await leafConn.PublishAsync("allowed.data", "yes");
await leafConn.PublishAsync("blocked.data", "no"); await leafConn.PublishAsync("blocked.data", "no");
@@ -839,11 +830,11 @@ public class LeafSubjectFilterTests
var line = await ReadLineAsync(remoteSocket, cts.Token); var line = await ReadLineAsync(remoteSocket, cts.Token);
line.ShouldStartWith("LEAF "); line.ShouldStartWith("LEAF ");
await Task.Delay(200); await PollHelper.YieldForAsync(200);
// Propagate allowed subscription // Propagate allowed subscription
manager.PropagateLocalSubscription("$G", "allowed.data", null); manager.PropagateLocalSubscription("$G", "allowed.data", null);
await Task.Delay(100); await PollHelper.YieldForAsync(100);
var lsLine = await ReadLineAsync(remoteSocket, cts.Token); var lsLine = await ReadLineAsync(remoteSocket, cts.Token);
lsLine.ShouldBe("LS+ $G allowed.data"); lsLine.ShouldBe("LS+ $G allowed.data");
@@ -852,7 +843,7 @@ public class LeafSubjectFilterTests
// Verify by sending another allowed subscription // Verify by sending another allowed subscription
manager.PropagateLocalSubscription("$G", "allowed.check", null); manager.PropagateLocalSubscription("$G", "allowed.check", null);
await Task.Delay(100); await PollHelper.YieldForAsync(100);
var nextLine = await ReadLineAsync(remoteSocket, cts.Token); var nextLine = await ReadLineAsync(remoteSocket, cts.Token);
nextLine.ShouldBe("LS+ $G allowed.check"); nextLine.ShouldBe("LS+ $G allowed.check");
} }

View File

@@ -2,7 +2,7 @@ using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Configuration; using NATS.Server.Configuration;
using NATS.Server.LeafNodes; using NATS.Server.LeafNodes;
namespace NATS.Server.Tests.LeafNodes; namespace NATS.Server.LeafNodes.Tests.LeafNodes;
/// <summary> /// <summary>
/// Unit tests for leaf node TLS certificate hot-reload (Gap 12.1). /// Unit tests for leaf node TLS certificate hot-reload (Gap 12.1).

View File

@@ -4,7 +4,7 @@ using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Configuration; using NATS.Server.Configuration;
using NATS.Server.LeafNodes; using NATS.Server.LeafNodes;
namespace NATS.Server.Tests.LeafNodes; namespace NATS.Server.LeafNodes.Tests.LeafNodes;
/// <summary> /// <summary>
/// Unit tests for leaf node reconnect state validation (Gap 12.3). /// Unit tests for leaf node reconnect state validation (Gap 12.3).

View File

@@ -3,7 +3,7 @@ using System.Net.WebSockets;
using NSubstitute; using NSubstitute;
using NATS.Server.LeafNodes; using NATS.Server.LeafNodes;
namespace NATS.Server.Tests.LeafNodes; namespace NATS.Server.LeafNodes.Tests.LeafNodes;
/// <summary> /// <summary>
/// Unit tests for <see cref="WebSocketStreamAdapter"/> (Gap 12.5). /// Unit tests for <see cref="WebSocketStreamAdapter"/> (Gap 12.5).

View File

@@ -3,8 +3,9 @@ using System.Net.Sockets;
using System.Text; using System.Text;
using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Configuration; using NATS.Server.Configuration;
using NATS.Server.TestUtilities;
namespace NATS.Server.Tests; namespace NATS.Server.LeafNodes.Tests;
public class LeafProtocolTests public class LeafProtocolTests
{ {
@@ -70,9 +71,7 @@ internal sealed class LeafProtocolTestFixture : IAsyncDisposable
_ = spoke.StartAsync(spokeCts.Token); _ = spoke.StartAsync(spokeCts.Token);
await spoke.WaitForReadyAsync(); await spoke.WaitForReadyAsync();
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await PollHelper.WaitUntilAsync(() => hub.Stats.Leafs > 0 && spoke.Stats.Leafs > 0);
while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
return new LeafProtocolTestFixture(hub, spoke, hubCts, spokeCts); return new LeafProtocolTestFixture(hub, spoke, hubCts, spokeCts);
} }
@@ -85,7 +84,7 @@ internal sealed class LeafProtocolTestFixture : IAsyncDisposable
_ = await ReadLineAsync(sock); // INFO _ = await ReadLineAsync(sock); // INFO
await sock.SendAsync(Encoding.ASCII.GetBytes($"CONNECT {{}}\r\nSUB {subject} 1\r\nPING\r\n")); await sock.SendAsync(Encoding.ASCII.GetBytes($"CONNECT {{}}\r\nSUB {subject} 1\r\nPING\r\n"));
await ReadUntilAsync(sock, "PONG"); await SocketTestHelper.ReadUntilAsync(sock, "PONG");
} }
public async Task PublishHubAsync(string subject, string payload) public async Task PublishHubAsync(string subject, string payload)
@@ -98,11 +97,11 @@ internal sealed class LeafProtocolTestFixture : IAsyncDisposable
_hubPublisher = sock; _hubPublisher = sock;
_ = await ReadLineAsync(sock); // INFO _ = await ReadLineAsync(sock); // INFO
await sock.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPING\r\n")); await sock.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPING\r\n"));
await ReadUntilAsync(sock, "PONG"); await SocketTestHelper.ReadUntilAsync(sock, "PONG");
} }
await sock.SendAsync(Encoding.ASCII.GetBytes($"PUB {subject} {payload.Length}\r\n{payload}\r\nPING\r\n")); await sock.SendAsync(Encoding.ASCII.GetBytes($"PUB {subject} {payload.Length}\r\n{payload}\r\nPING\r\n"));
await ReadUntilAsync(sock, "PONG"); await SocketTestHelper.ReadUntilAsync(sock, "PONG");
} }
public Task<string> ReadSpokeMessageAsync() public Task<string> ReadSpokeMessageAsync()
@@ -110,7 +109,7 @@ internal sealed class LeafProtocolTestFixture : IAsyncDisposable
if (_spokeSubscriber == null) if (_spokeSubscriber == null)
throw new InvalidOperationException("Spoke subscriber was not initialized."); throw new InvalidOperationException("Spoke subscriber was not initialized.");
return ReadUntilAsync(_spokeSubscriber, "MSG "); return SocketTestHelper.ReadUntilAsync(_spokeSubscriber, "MSG ");
} }
public async ValueTask DisposeAsync() public async ValueTask DisposeAsync()
@@ -132,20 +131,5 @@ internal sealed class LeafProtocolTestFixture : IAsyncDisposable
return Encoding.ASCII.GetString(buf, 0, n); return Encoding.ASCII.GetString(buf, 0, n);
} }
private static async Task<string> ReadUntilAsync(Socket sock, string expected)
{
var sb = new StringBuilder();
var buf = new byte[4096];
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
while (!sb.ToString().Contains(expected, StringComparison.Ordinal))
{
var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token);
if (n == 0)
break;
sb.Append(Encoding.ASCII.GetString(buf, 0, n));
}
return sb.ToString();
}
} }

View File

@@ -0,0 +1,25 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<IsPackable>false</IsPackable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="coverlet.collector" />
<PackageReference Include="Microsoft.NET.Test.Sdk" />
<PackageReference Include="NATS.Client.Core" />
<PackageReference Include="NSubstitute" />
<PackageReference Include="Shouldly" />
<PackageReference Include="xunit" />
<PackageReference Include="xunit.runner.visualstudio" />
</ItemGroup>
<ItemGroup>
<Using Include="Xunit" />
<Using Include="Shouldly" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\NATS.Server\NATS.Server.csproj" />
<ProjectReference Include="..\NATS.Server.TestUtilities\NATS.Server.TestUtilities.csproj" />
</ItemGroup>
</Project>