diff --git a/NatsDotNet.slnx b/NatsDotNet.slnx
index 23becec..8c9e61e 100644
--- a/NatsDotNet.slnx
+++ b/NatsDotNet.slnx
@@ -9,6 +9,7 @@
+
diff --git a/src/NATS.Server/NATS.Server.csproj b/src/NATS.Server/NATS.Server.csproj
index e0b9a71..253b7bc 100644
--- a/src/NATS.Server/NATS.Server.csproj
+++ b/src/NATS.Server/NATS.Server.csproj
@@ -4,6 +4,7 @@
+
diff --git a/tests/NATS.Server.Tests/LeafAdvancedSemanticsTests.cs b/tests/NATS.Server.LeafNodes.Tests/LeafAdvancedSemanticsTests.cs
similarity index 98%
rename from tests/NATS.Server.Tests/LeafAdvancedSemanticsTests.cs
rename to tests/NATS.Server.LeafNodes.Tests/LeafAdvancedSemanticsTests.cs
index a180099..ddb92ec 100644
--- a/tests/NATS.Server.Tests/LeafAdvancedSemanticsTests.cs
+++ b/tests/NATS.Server.LeafNodes.Tests/LeafAdvancedSemanticsTests.cs
@@ -4,7 +4,7 @@ using System.Text;
using NATS.Server.LeafNodes;
using NATS.Server.Subscriptions;
-namespace NATS.Server.Tests;
+namespace NATS.Server.LeafNodes.Tests;
public class LeafAdvancedSemanticsTests
{
diff --git a/tests/NATS.Server.Tests/LeafNode/LeafNodeGoParityTests.cs b/tests/NATS.Server.LeafNodes.Tests/LeafNode/LeafNodeGoParityTests.cs
similarity index 97%
rename from tests/NATS.Server.Tests/LeafNode/LeafNodeGoParityTests.cs
rename to tests/NATS.Server.LeafNodes.Tests/LeafNode/LeafNodeGoParityTests.cs
index 30ca4f9..09fabe9 100644
--- a/tests/NATS.Server.Tests/LeafNode/LeafNodeGoParityTests.cs
+++ b/tests/NATS.Server.LeafNodes.Tests/LeafNode/LeafNodeGoParityTests.cs
@@ -7,8 +7,9 @@ using NATS.Server.Auth;
using NATS.Server.Configuration;
using NATS.Server.LeafNodes;
using NATS.Server.Subscriptions;
+using NATS.Server.TestUtilities;
-namespace NATS.Server.Tests.LeafNode;
+namespace NATS.Server.LeafNodes.Tests.LeafNode;
///
/// Go-parity tests for leaf node functionality.
@@ -330,12 +331,9 @@ public class LeafNodeGoParityTests
// Wait for all three leaf connections to be established.
// B has TWO leaf connections: one outbound to A, one inbound from C.
- using var waitTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(10));
- while (!waitTimeout.IsCancellationRequested
- && (serverA.Stats.Leafs == 0
- || Interlocked.Read(ref serverB.Stats.Leafs) < 2
- || serverC.Stats.Leafs == 0))
- await Task.Delay(50, waitTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => serverA.Stats.Leafs > 0
+ && Interlocked.Read(ref serverB.Stats.Leafs) >= 2
+ && serverC.Stats.Leafs > 0, timeoutMs: 10000);
// Verify the connection counts match the expected topology
Interlocked.Read(ref serverA.Stats.Leafs).ShouldBe(1); // A has 1 inbound from B
@@ -542,13 +540,13 @@ public class LeafNodeGoParityTests
await conn.ConnectAsync();
var sub = await conn.SubscribeCoreAsync($"concurrent.leaf.{i}");
await conn.PingAsync();
- await Task.Delay(30);
+ await PollHelper.YieldForAsync(30);
await sub.DisposeAsync();
await conn.PingAsync();
})).ToList();
await Task.WhenAll(tasks);
- await Task.Delay(200);
+ await PollHelper.YieldForAsync(200);
// All subs should be gone from hub's perspective
for (var i = 0; i < 8; i++)
@@ -831,7 +829,7 @@ public class LeafNodeGoParityTests
using var cts = new CancellationTokenSource();
await manager.StartAsync(cts.Token);
- await Task.Delay(300);
+ await PollHelper.YieldForAsync(300);
stats.Leafs.ShouldBe(0);
await cts.CancelAsync();
@@ -857,7 +855,7 @@ public class LeafNodeGoParityTests
using var cts = new CancellationTokenSource();
await manager.StartAsync(cts.Token);
- await Task.Delay(150);
+ await PollHelper.YieldForAsync(150);
await cts.CancelAsync();
await manager.DisposeAsync(); // Must not hang
@@ -1678,14 +1676,7 @@ public class LeafNodeGoParityTests
private static async Task WaitForConditionAsync(Func predicate, int timeoutMs = 5000)
{
- using var cts = new CancellationTokenSource(timeoutMs);
- while (!cts.IsCancellationRequested)
- {
- if (predicate()) return;
- await Task.Delay(20, cts.Token).ContinueWith(_ => { }, TaskScheduler.Default);
- }
-
- throw new TimeoutException($"Condition not met within {timeoutMs}ms.");
+ await PollHelper.WaitOrThrowAsync(predicate, $"Condition not met within {timeoutMs}ms.", timeoutMs: timeoutMs);
}
}
@@ -1739,36 +1730,19 @@ internal sealed class LeafGoFixture : IAsyncDisposable
_ = spoke.StartAsync(spokeCts.Token);
await spoke.WaitForReadyAsync();
- using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!timeout.IsCancellationRequested
- && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
- await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
return new LeafGoFixture(hub, spoke, hubCts, spokeCts);
}
public async Task WaitForRemoteInterestOnHubAsync(string subject)
{
- using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!timeout.IsCancellationRequested)
- {
- if (Hub.HasRemoteInterest(subject)) return;
- await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
- }
-
- throw new TimeoutException($"Timed out waiting for hub remote interest on '{subject}'.");
+ await PollHelper.WaitOrThrowAsync(() => Hub.HasRemoteInterest(subject), $"Timed out waiting for hub remote interest on '{subject}'.", timeoutMs: 5000);
}
public async Task WaitForRemoteInterestOnSpokeAsync(string subject)
{
- using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!timeout.IsCancellationRequested)
- {
- if (Spoke.HasRemoteInterest(subject)) return;
- await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
- }
-
- throw new TimeoutException($"Timed out waiting for spoke remote interest on '{subject}'.");
+ await PollHelper.WaitOrThrowAsync(() => Spoke.HasRemoteInterest(subject), $"Timed out waiting for spoke remote interest on '{subject}'.", timeoutMs: 5000);
}
public async ValueTask DisposeAsync()
diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafAccountScopedDeliveryTests.cs b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafAccountScopedDeliveryTests.cs
similarity index 84%
rename from tests/NATS.Server.Tests/LeafNodes/LeafAccountScopedDeliveryTests.cs
rename to tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafAccountScopedDeliveryTests.cs
index 854dc02..a67e120 100644
--- a/tests/NATS.Server.Tests/LeafNodes/LeafAccountScopedDeliveryTests.cs
+++ b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafAccountScopedDeliveryTests.cs
@@ -2,8 +2,9 @@ using Microsoft.Extensions.Logging.Abstractions;
using NATS.Client.Core;
using NATS.Server.Auth;
using NATS.Server.Configuration;
+using NATS.Server.TestUtilities;
-namespace NATS.Server.Tests.LeafNodes;
+namespace NATS.Server.LeafNodes.Tests.LeafNodes;
public class LeafAccountScopedDeliveryTests
{
@@ -95,9 +96,7 @@ internal sealed class LeafAccountDeliveryFixture : IAsyncDisposable
_ = spoke.StartAsync(spokeCts.Token);
await spoke.WaitForReadyAsync();
- using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
- await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
return new LeafAccountDeliveryFixture(hub, spoke, hubCts, spokeCts);
}
@@ -114,16 +113,7 @@ internal sealed class LeafAccountDeliveryFixture : IAsyncDisposable
public async Task WaitForRemoteInterestOnHubAsync(string account, string subject)
{
- using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!timeout.IsCancellationRequested)
- {
- if (Hub.HasRemoteInterest(account, subject))
- return;
-
- await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
- }
-
- throw new TimeoutException($"Timed out waiting for remote interest {account}:{subject}.");
+ await PollHelper.WaitOrThrowAsync(() => Hub.HasRemoteInterest(account, subject), $"Timed out waiting for remote interest {account}:{subject}.", timeoutMs: 5000);
}
public async ValueTask DisposeAsync()
diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafBasicTests.cs b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafBasicTests.cs
similarity index 81%
rename from tests/NATS.Server.Tests/LeafNodes/LeafBasicTests.cs
rename to tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafBasicTests.cs
index cff7715..43b0dec 100644
--- a/tests/NATS.Server.Tests/LeafNodes/LeafBasicTests.cs
+++ b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafBasicTests.cs
@@ -1,8 +1,9 @@
using Microsoft.Extensions.Logging.Abstractions;
using NATS.Client.Core;
using NATS.Server.Configuration;
+using NATS.Server.TestUtilities;
-namespace NATS.Server.Tests.LeafNodes;
+namespace NATS.Server.LeafNodes.Tests.LeafNodes;
///
/// Basic leaf node hub-spoke connectivity tests.
@@ -133,39 +134,19 @@ internal sealed class LeafBasicFixture : IAsyncDisposable
await spoke.WaitForReadyAsync();
// Wait for the leaf node connection to be established on both sides
- using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
- await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
return new LeafBasicFixture(hub, spoke, hubCts, spokeCts);
}
public async Task WaitForRemoteInterestOnHubAsync(string subject)
{
- using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!timeout.IsCancellationRequested)
- {
- if (Hub.HasRemoteInterest(subject))
- return;
-
- await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
- }
-
- throw new TimeoutException($"Timed out waiting for remote interest on hub for '{subject}'.");
+ await PollHelper.WaitOrThrowAsync(() => Hub.HasRemoteInterest(subject), $"Timed out waiting for remote interest on hub for '{subject}'.", timeoutMs: 5000);
}
public async Task WaitForRemoteInterestOnSpokeAsync(string subject)
{
- using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!timeout.IsCancellationRequested)
- {
- if (Spoke.HasRemoteInterest(subject))
- return;
-
- await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
- }
-
- throw new TimeoutException($"Timed out waiting for remote interest on spoke for '{subject}'.");
+ await PollHelper.WaitOrThrowAsync(() => Spoke.HasRemoteInterest(subject), $"Timed out waiting for remote interest on spoke for '{subject}'.", timeoutMs: 5000);
}
public async ValueTask DisposeAsync()
diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafClusterRegistrationTests.cs b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafClusterRegistrationTests.cs
similarity index 99%
rename from tests/NATS.Server.Tests/LeafNodes/LeafClusterRegistrationTests.cs
rename to tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafClusterRegistrationTests.cs
index b49b0ad..8f5934e 100644
--- a/tests/NATS.Server.Tests/LeafNodes/LeafClusterRegistrationTests.cs
+++ b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafClusterRegistrationTests.cs
@@ -2,7 +2,7 @@ using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Configuration;
using NATS.Server.LeafNodes;
-namespace NATS.Server.Tests.LeafNodes;
+namespace NATS.Server.LeafNodes.Tests.LeafNodes;
///
/// Unit tests for leaf cluster topology registration (Gap 12.6).
diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafConnectionAndRemoteConfigParityBatch1Tests.cs b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafConnectionAndRemoteConfigParityBatch1Tests.cs
similarity index 98%
rename from tests/NATS.Server.Tests/LeafNodes/LeafConnectionAndRemoteConfigParityBatch1Tests.cs
rename to tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafConnectionAndRemoteConfigParityBatch1Tests.cs
index 9b9d0cd..47080b4 100644
--- a/tests/NATS.Server.Tests/LeafNodes/LeafConnectionAndRemoteConfigParityBatch1Tests.cs
+++ b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafConnectionAndRemoteConfigParityBatch1Tests.cs
@@ -4,7 +4,7 @@ using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Configuration;
using NATS.Server.LeafNodes;
-namespace NATS.Server.Tests.LeafNodes;
+namespace NATS.Server.LeafNodes.Tests.LeafNodes;
public class LeafConnectionAndRemoteConfigParityBatch1Tests
{
diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafConnectionParityBatch3Tests.cs b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafConnectionParityBatch3Tests.cs
similarity index 99%
rename from tests/NATS.Server.Tests/LeafNodes/LeafConnectionParityBatch3Tests.cs
rename to tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafConnectionParityBatch3Tests.cs
index 9f7f72f..2b9f326 100644
--- a/tests/NATS.Server.Tests/LeafNodes/LeafConnectionParityBatch3Tests.cs
+++ b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafConnectionParityBatch3Tests.cs
@@ -5,7 +5,7 @@ using System.Text.Json;
using NATS.Server.Configuration;
using NATS.Server.LeafNodes;
-namespace NATS.Server.Tests.LeafNodes;
+namespace NATS.Server.LeafNodes.Tests.LeafNodes;
public class LeafConnectionParityBatch3Tests
{
diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafConnectionParityBatch4Tests.cs b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafConnectionParityBatch4Tests.cs
similarity index 99%
rename from tests/NATS.Server.Tests/LeafNodes/LeafConnectionParityBatch4Tests.cs
rename to tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafConnectionParityBatch4Tests.cs
index bc09979..d2827a1 100644
--- a/tests/NATS.Server.Tests/LeafNodes/LeafConnectionParityBatch4Tests.cs
+++ b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafConnectionParityBatch4Tests.cs
@@ -4,7 +4,7 @@ using System.Text;
using NATS.Server.LeafNodes;
using NATS.Server.Subscriptions;
-namespace NATS.Server.Tests.LeafNodes;
+namespace NATS.Server.LeafNodes.Tests.LeafNodes;
public class LeafConnectionParityBatch4Tests
{
diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafDisableTests.cs b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafDisableTests.cs
similarity index 99%
rename from tests/NATS.Server.Tests/LeafNodes/LeafDisableTests.cs
rename to tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafDisableTests.cs
index 2e5bdf3..c7812d4 100644
--- a/tests/NATS.Server.Tests/LeafNodes/LeafDisableTests.cs
+++ b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafDisableTests.cs
@@ -2,7 +2,7 @@ using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Configuration;
using NATS.Server.LeafNodes;
-namespace NATS.Server.Tests.LeafNodes;
+namespace NATS.Server.LeafNodes.Tests.LeafNodes;
///
/// Unit tests for leaf connection disable flag (Gap 12.7).
diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafHubSpokeMappingParityTests.cs b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafHubSpokeMappingParityTests.cs
similarity index 91%
rename from tests/NATS.Server.Tests/LeafNodes/LeafHubSpokeMappingParityTests.cs
rename to tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafHubSpokeMappingParityTests.cs
index 72eb238..88441bb 100644
--- a/tests/NATS.Server.Tests/LeafNodes/LeafHubSpokeMappingParityTests.cs
+++ b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafHubSpokeMappingParityTests.cs
@@ -1,6 +1,6 @@
using NATS.Server.LeafNodes;
-namespace NATS.Server.Tests;
+namespace NATS.Server.LeafNodes.Tests.LeafNodes;
public class LeafHubSpokeMappingParityTests
{
diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafInterestIdempotencyTests.cs b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafInterestIdempotencyTests.cs
similarity index 89%
rename from tests/NATS.Server.Tests/LeafNodes/LeafInterestIdempotencyTests.cs
rename to tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafInterestIdempotencyTests.cs
index 13e27f4..85cebe0 100644
--- a/tests/NATS.Server.Tests/LeafNodes/LeafInterestIdempotencyTests.cs
+++ b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafInterestIdempotencyTests.cs
@@ -3,8 +3,9 @@ using System.Net.Sockets;
using System.Text;
using NATS.Server.LeafNodes;
using NATS.Server.Subscriptions;
+using NATS.Server.TestUtilities;
-namespace NATS.Server.Tests.LeafNodes;
+namespace NATS.Server.LeafNodes.Tests.LeafNodes;
public class LeafInterestIdempotencyTests
{
@@ -45,7 +46,7 @@ public class LeafInterestIdempotencyTests
await WaitForAsync(() => subList.HasRemoteInterest("A", "orders.created"), timeout.Token);
await WriteLineAsync(remoteSocket, "LS+ A orders.*", timeout.Token);
- await Task.Delay(100, timeout.Token);
+ await PollHelper.YieldForAsync(100);
subList.MatchRemote("A", "orders.created").Count.ShouldBe(1);
remoteAdded.ShouldBe(1);
@@ -74,14 +75,6 @@ public class LeafInterestIdempotencyTests
private static async Task WaitForAsync(Func predicate, CancellationToken ct)
{
- while (!ct.IsCancellationRequested)
- {
- if (predicate())
- return;
-
- await Task.Delay(20, ct);
- }
-
- throw new TimeoutException("Timed out waiting for condition.");
+ await PollHelper.WaitOrThrowAsync(predicate, "Timed out waiting for condition.");
}
}
diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafJetStreamMigrationTests.cs b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafJetStreamMigrationTests.cs
similarity index 99%
rename from tests/NATS.Server.Tests/LeafNodes/LeafJetStreamMigrationTests.cs
rename to tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafJetStreamMigrationTests.cs
index 6c5ce2f..77d388d 100644
--- a/tests/NATS.Server.Tests/LeafNodes/LeafJetStreamMigrationTests.cs
+++ b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafJetStreamMigrationTests.cs
@@ -4,7 +4,7 @@ using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Configuration;
using NATS.Server.LeafNodes;
-namespace NATS.Server.Tests.LeafNodes;
+namespace NATS.Server.LeafNodes.Tests.LeafNodes;
///
/// Unit tests for JetStream migration checks on leaf node connections (Gap 12.4).
diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafLoopTransparencyRuntimeTests.cs b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafLoopTransparencyRuntimeTests.cs
similarity index 90%
rename from tests/NATS.Server.Tests/LeafNodes/LeafLoopTransparencyRuntimeTests.cs
rename to tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafLoopTransparencyRuntimeTests.cs
index 81a244f..f1c484d 100644
--- a/tests/NATS.Server.Tests/LeafNodes/LeafLoopTransparencyRuntimeTests.cs
+++ b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafLoopTransparencyRuntimeTests.cs
@@ -1,6 +1,6 @@
using NATS.Server.LeafNodes;
-namespace NATS.Server.Tests.LeafNodes;
+namespace NATS.Server.LeafNodes.Tests.LeafNodes;
public class LeafLoopTransparencyRuntimeTests
{
diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafNodeAdvancedTests.cs b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafNodeAdvancedTests.cs
similarity index 90%
rename from tests/NATS.Server.Tests/LeafNodes/LeafNodeAdvancedTests.cs
rename to tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafNodeAdvancedTests.cs
index 0e4c3ba..9b60301 100644
--- a/tests/NATS.Server.Tests/LeafNodes/LeafNodeAdvancedTests.cs
+++ b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafNodeAdvancedTests.cs
@@ -9,7 +9,7 @@ using NATS.Server.LeafNodes;
using NATS.Server.Subscriptions;
using NATS.Server.TestUtilities;
-namespace NATS.Server.Tests.LeafNodes;
+namespace NATS.Server.LeafNodes.Tests.LeafNodes;
///
/// Advanced leaf node behavior tests: daisy chains, account scoping, concurrency,
@@ -68,10 +68,7 @@ public class LeafNodeAdvancedTests
await serverC.WaitForReadyAsync();
// Wait for leaf connections
- using var waitTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!waitTimeout.IsCancellationRequested
- && (serverA.Stats.Leafs == 0 || Interlocked.Read(ref serverB.Stats.Leafs) < 2 || serverC.Stats.Leafs == 0))
- await Task.Delay(50, waitTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => !((serverA.Stats.Leafs == 0 || Interlocked.Read(ref serverB.Stats.Leafs) < 2 || serverC.Stats.Leafs == 0)), timeoutMs: 5000);
Interlocked.Read(ref serverA.Stats.Leafs).ShouldBe(1);
Interlocked.Read(ref serverB.Stats.Leafs).ShouldBeGreaterThanOrEqualTo(2);
@@ -171,9 +168,7 @@ public class LeafNodeAdvancedTests
_ = spoke.StartAsync(spokeCts.Token);
await spoke.WaitForReadyAsync();
- using var waitTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!waitTimeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
- await Task.Delay(50, waitTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
// Subscribe with account A on spoke
await using var connA = new NatsConnection(new NatsOpts
@@ -195,9 +190,7 @@ public class LeafNodeAdvancedTests
await connB.PingAsync();
// Wait for account A interest to propagate
- using var interestTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!interestTimeout.IsCancellationRequested && !hub.HasRemoteInterest("ACCT_A", "acct.test"))
- await Task.Delay(50, interestTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => !(!hub.HasRemoteInterest("ACCT_A", "acct.test")), timeoutMs: 5000);
// Publish from account A on hub
await using var pubA = new NatsConnection(new NatsOpts
@@ -245,7 +238,7 @@ public class LeafNodeAdvancedTests
var sub = await conn.SubscribeCoreAsync($"concurrent.{index}");
await conn.PingAsync();
- await Task.Delay(50);
+ await PollHelper.YieldForAsync(50);
await sub.DisposeAsync();
await conn.PingAsync();
}));
@@ -254,7 +247,7 @@ public class LeafNodeAdvancedTests
await Task.WhenAll(tasks);
// After all subs are unsubscribed, interest should be gone
- await Task.Delay(200);
+ await PollHelper.YieldForAsync(200);
for (var i = 0; i < 10; i++)
fixture.Hub.HasRemoteInterest($"concurrent.{i}").ShouldBeFalse();
}
@@ -383,9 +376,7 @@ public class LeafNodeAdvancedTests
_ = spoke.StartAsync(spokeCts.Token);
await spoke.WaitForReadyAsync();
- using var waitTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!waitTimeout.IsCancellationRequested && hub.Stats.Leafs == 0)
- await Task.Delay(50, waitTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => !(hub.Stats.Leafs == 0), timeoutMs: 5000);
Interlocked.Read(ref hub.Stats.Leafs).ShouldBe(1);
@@ -393,9 +384,7 @@ public class LeafNodeAdvancedTests
spoke.Dispose();
// After spoke disconnects, wait for count to drop
- using var disconnTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!disconnTimeout.IsCancellationRequested && Interlocked.Read(ref hub.Stats.Leafs) > 0)
- await Task.Delay(50, disconnTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => !(Interlocked.Read(ref hub.Stats.Leafs) > 0), timeoutMs: 5000);
Interlocked.Read(ref hub.Stats.Leafs).ShouldBe(0);
@@ -469,9 +458,7 @@ public class LeafNodeAdvancedTests
await conn1.PingAsync();
await conn2.PingAsync();
- using var interestTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!interestTimeout.IsCancellationRequested && !fixture.Hub.HasRemoteInterest("dist.test"))
- await Task.Delay(50, interestTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => !(!fixture.Hub.HasRemoteInterest("dist.test")), timeoutMs: 5000);
// Hub should have remote interest from at least one spoke
fixture.Hub.HasRemoteInterest("dist.test").ShouldBeTrue();
@@ -643,9 +630,7 @@ public class LeafNodeAdvancedTests
_ = spoke.StartAsync(spokeCts.Token);
await spoke.WaitForReadyAsync();
- using var waitTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!waitTimeout.IsCancellationRequested && hub.Stats.Leafs == 0)
- await Task.Delay(50, waitTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => !(hub.Stats.Leafs == 0), timeoutMs: 5000);
Interlocked.Read(ref hub.Stats.Leafs).ShouldBe(1);
@@ -653,9 +638,7 @@ public class LeafNodeAdvancedTests
await spokeCts.CancelAsync();
spoke.Dispose();
- using var disconnTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!disconnTimeout.IsCancellationRequested && Interlocked.Read(ref hub.Stats.Leafs) > 0)
- await Task.Delay(50, disconnTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => !(Interlocked.Read(ref hub.Stats.Leafs) > 0), timeoutMs: 5000);
Interlocked.Read(ref hub.Stats.Leafs).ShouldBe(0);
diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafNodeConnectionTests.cs b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafNodeConnectionTests.cs
similarity index 96%
rename from tests/NATS.Server.Tests/LeafNodes/LeafNodeConnectionTests.cs
rename to tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafNodeConnectionTests.cs
index f707d2f..5826f84 100644
--- a/tests/NATS.Server.Tests/LeafNodes/LeafNodeConnectionTests.cs
+++ b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafNodeConnectionTests.cs
@@ -9,7 +9,7 @@ using NATS.Server.LeafNodes;
using NATS.Server.Subscriptions;
using NATS.Server.TestUtilities;
-namespace NATS.Server.Tests.LeafNodes;
+namespace NATS.Server.LeafNodes.Tests.LeafNodes;
///
/// Tests for leaf node connection establishment, authentication, and lifecycle.
@@ -60,9 +60,7 @@ public class LeafNodeConnectionTests
_ = spoke.StartAsync(spokeCts.Token);
await spoke.WaitForReadyAsync();
- using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
- await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
hub.Stats.Leafs.ShouldBeGreaterThan(0);
spoke.Stats.Leafs.ShouldBeGreaterThan(0);
@@ -103,9 +101,7 @@ public class LeafNodeConnectionTests
_ = spoke.StartAsync(spokeCts.Token);
await spoke.WaitForReadyAsync();
- using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
- await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
hub.Stats.Leafs.ShouldBeGreaterThan(0);
@@ -527,12 +523,6 @@ public class LeafNodeConnectionTests
private static async Task WaitForAsync(Func predicate, CancellationToken ct)
{
- while (!ct.IsCancellationRequested)
- {
- if (predicate()) return;
- await Task.Delay(20, ct);
- }
-
- throw new TimeoutException("Timed out waiting for condition.");
+ await PollHelper.WaitOrThrowAsync(predicate, "Timed out waiting for condition.");
}
}
diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafNodeForwardingTests.cs b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafNodeForwardingTests.cs
similarity index 95%
rename from tests/NATS.Server.Tests/LeafNodes/LeafNodeForwardingTests.cs
rename to tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafNodeForwardingTests.cs
index 974c143..e5382ec 100644
--- a/tests/NATS.Server.Tests/LeafNodes/LeafNodeForwardingTests.cs
+++ b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafNodeForwardingTests.cs
@@ -3,7 +3,7 @@ using NATS.Client.Core;
using NATS.Server.Configuration;
using NATS.Server.TestUtilities;
-namespace NATS.Server.Tests.LeafNodes;
+namespace NATS.Server.LeafNodes.Tests.LeafNodes;
///
/// Tests for message forwarding through leaf node connections (hub-to-leaf, leaf-to-hub, leaf-to-leaf).
@@ -155,10 +155,7 @@ public class LeafNodeForwardingTests
await spoke2Conn.PingAsync();
// Both spokes' interests should propagate to the hub
- using var waitCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!waitCts.IsCancellationRequested
- && (!fixture.Hub.HasRemoteInterest("spoke1.interest") || !fixture.Hub.HasRemoteInterest("spoke2.interest")))
- await Task.Delay(50, waitCts.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => !((!fixture.Hub.HasRemoteInterest("spoke1.interest") || !fixture.Hub.HasRemoteInterest("spoke2.interest"))), timeoutMs: 5000);
fixture.Hub.HasRemoteInterest("spoke1.interest").ShouldBeTrue();
fixture.Hub.HasRemoteInterest("spoke2.interest").ShouldBeTrue();
@@ -280,7 +277,7 @@ public class LeafNodeForwardingTests
await hubConn.ConnectAsync();
await hubConn.PublishAsync("no.subscriber", "lost");
- await Task.Delay(200);
+ await PollHelper.YieldForAsync(200);
true.ShouldBeTrue(); // No crash = success
}
@@ -364,12 +361,9 @@ internal sealed class TwoSpokeFixture : IAsyncDisposable
_ = spoke2.StartAsync(spoke2Cts.Token);
await spoke2.WaitForReadyAsync();
- using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!timeout.IsCancellationRequested
- && (Interlocked.Read(ref hub.Stats.Leafs) < 2
- || spoke1.Stats.Leafs == 0
- || spoke2.Stats.Leafs == 0))
- await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => Interlocked.Read(ref hub.Stats.Leafs) >= 2
+ && spoke1.Stats.Leafs > 0
+ && spoke2.Stats.Leafs > 0);
return new TwoSpokeFixture(hub, spoke1, spoke2, hubCts, spoke1Cts, spoke2Cts);
}
diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafNodeJetStreamTests.cs b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafNodeJetStreamTests.cs
similarity index 82%
rename from tests/NATS.Server.Tests/LeafNodes/LeafNodeJetStreamTests.cs
rename to tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafNodeJetStreamTests.cs
index ada0e3a..dd17788 100644
--- a/tests/NATS.Server.Tests/LeafNodes/LeafNodeJetStreamTests.cs
+++ b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafNodeJetStreamTests.cs
@@ -1,8 +1,9 @@
using Microsoft.Extensions.Logging.Abstractions;
using NATS.Client.Core;
using NATS.Server.Configuration;
+using NATS.Server.TestUtilities;
-namespace NATS.Server.Tests.LeafNodes;
+namespace NATS.Server.LeafNodes.Tests.LeafNodes;
///
/// Tests for JetStream behavior over leaf node connections.
@@ -44,9 +45,7 @@ public class LeafNodeJetStreamTests
_ = spoke.StartAsync(spokeCts.Token);
await spoke.WaitForReadyAsync();
- using var waitTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!waitTimeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
- await Task.Delay(50, waitTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
hub.Stats.JetStreamEnabled.ShouldBeTrue();
@@ -100,9 +99,7 @@ public class LeafNodeJetStreamTests
_ = spoke.StartAsync(spokeCts.Token);
await spoke.WaitForReadyAsync();
- using var waitTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!waitTimeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
- await Task.Delay(50, waitTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
// Subscribe on hub for a subject
await using var hubConn = new NatsConnection(new NatsOpts
@@ -115,9 +112,7 @@ public class LeafNodeJetStreamTests
await hubConn.PingAsync();
// Wait for interest propagation
- using var interestTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!interestTimeout.IsCancellationRequested && !spoke.HasRemoteInterest("js.leaf.test"))
- await Task.Delay(50, interestTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => !(!spoke.HasRemoteInterest("js.leaf.test")), timeoutMs: 5000);
// Publish from spoke
await using var spokeConn = new NatsConnection(new NatsOpts
@@ -178,9 +173,7 @@ public class LeafNodeJetStreamTests
_ = spoke.StartAsync(spokeCts.Token);
await spoke.WaitForReadyAsync();
- using var waitTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!waitTimeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
- await Task.Delay(50, waitTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
hub.Stats.JetStreamEnabled.ShouldBeTrue();
spoke.Stats.JetStreamEnabled.ShouldBeFalse();
@@ -191,9 +184,7 @@ public class LeafNodeJetStreamTests
await using var sub = await hubConn.SubscribeCoreAsync("njs.forward");
await hubConn.PingAsync();
- using var interestTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!interestTimeout.IsCancellationRequested && !spoke.HasRemoteInterest("njs.forward"))
- await Task.Delay(50, interestTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => !(!spoke.HasRemoteInterest("njs.forward")), timeoutMs: 5000);
// Publish from spoke
await using var spokeConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{spoke.Port}" });
@@ -253,9 +244,7 @@ public class LeafNodeJetStreamTests
_ = spoke.StartAsync(spokeCts.Token);
await spoke.WaitForReadyAsync();
- using var waitTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!waitTimeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
- await Task.Delay(50, waitTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
hub.Stats.JetStreamEnabled.ShouldBeTrue();
spoke.Stats.JetStreamEnabled.ShouldBeTrue();
@@ -309,9 +298,7 @@ public class LeafNodeJetStreamTests
_ = spoke.StartAsync(spokeCts.Token);
await spoke.WaitForReadyAsync();
- using var waitTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!waitTimeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
- await Task.Delay(50, waitTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
// Regular pub/sub should still work alongside JS
await using var leafConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{spoke.Port}" });
@@ -322,9 +309,7 @@ public class LeafNodeJetStreamTests
await using var sub = await leafConn.SubscribeCoreAsync("combo.test");
await leafConn.PingAsync();
- using var interestTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!interestTimeout.IsCancellationRequested && !hub.HasRemoteInterest("combo.test"))
- await Task.Delay(50, interestTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => !(!hub.HasRemoteInterest("combo.test")), timeoutMs: 5000);
await hubConn.PublishAsync("combo.test", "js-combo");
diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafNodeLoopDetectionTests.cs b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafNodeLoopDetectionTests.cs
similarity index 99%
rename from tests/NATS.Server.Tests/LeafNodes/LeafNodeLoopDetectionTests.cs
rename to tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafNodeLoopDetectionTests.cs
index 754b65b..a6cbc6c 100644
--- a/tests/NATS.Server.Tests/LeafNodes/LeafNodeLoopDetectionTests.cs
+++ b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafNodeLoopDetectionTests.cs
@@ -1,6 +1,6 @@
using NATS.Server.LeafNodes;
-namespace NATS.Server.Tests.LeafNodes;
+namespace NATS.Server.LeafNodes.Tests.LeafNodes;
///
/// Tests for leaf node loop detection via $LDS. prefix.
diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafNodeManagerParityBatch5Tests.cs b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafNodeManagerParityBatch5Tests.cs
similarity index 99%
rename from tests/NATS.Server.Tests/LeafNodes/LeafNodeManagerParityBatch5Tests.cs
rename to tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafNodeManagerParityBatch5Tests.cs
index 481a31e..a9caf58 100644
--- a/tests/NATS.Server.Tests/LeafNodes/LeafNodeManagerParityBatch5Tests.cs
+++ b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafNodeManagerParityBatch5Tests.cs
@@ -5,7 +5,7 @@ using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Configuration;
using NATS.Server.LeafNodes;
-namespace NATS.Server.Tests.LeafNodes;
+namespace NATS.Server.LeafNodes.Tests.LeafNodes;
public class LeafNodeManagerParityBatch5Tests
{
diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafNodeSubjectFilterTests.cs b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafNodeSubjectFilterTests.cs
similarity index 97%
rename from tests/NATS.Server.Tests/LeafNodes/LeafNodeSubjectFilterTests.cs
rename to tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafNodeSubjectFilterTests.cs
index cade619..36545a5 100644
--- a/tests/NATS.Server.Tests/LeafNodes/LeafNodeSubjectFilterTests.cs
+++ b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafNodeSubjectFilterTests.cs
@@ -1,7 +1,7 @@
using NATS.Client.Core;
using NATS.Server.TestUtilities;
-namespace NATS.Server.Tests.LeafNodes;
+namespace NATS.Server.LeafNodes.Tests.LeafNodes;
///
/// Tests for subject filter propagation through leaf nodes.
@@ -105,9 +105,7 @@ public class LeafNodeSubjectFilterTests
await sub.DisposeAsync();
await leafConn.PingAsync();
- using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!timeout.IsCancellationRequested && fixture.Hub.HasRemoteInterest("unsub.test"))
- await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => !(fixture.Hub.HasRemoteInterest("unsub.test")), timeoutMs: 5000);
fixture.Hub.HasRemoteInterest("unsub.test").ShouldBeFalse();
}
diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafPermissionSyncTests.cs b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafPermissionSyncTests.cs
similarity index 99%
rename from tests/NATS.Server.Tests/LeafNodes/LeafPermissionSyncTests.cs
rename to tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafPermissionSyncTests.cs
index 3259eee..434265d 100644
--- a/tests/NATS.Server.Tests/LeafNodes/LeafPermissionSyncTests.cs
+++ b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafPermissionSyncTests.cs
@@ -5,7 +5,7 @@ using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Configuration;
using NATS.Server.LeafNodes;
-namespace NATS.Server.Tests.LeafNodes;
+namespace NATS.Server.LeafNodes.Tests.LeafNodes;
///
/// Unit tests for leaf node permission and account syncing (Gap 12.2).
diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafSolicitedConnectionTests.cs b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafSolicitedConnectionTests.cs
similarity index 95%
rename from tests/NATS.Server.Tests/LeafNodes/LeafSolicitedConnectionTests.cs
rename to tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafSolicitedConnectionTests.cs
index 989a792..612c4f2 100644
--- a/tests/NATS.Server.Tests/LeafNodes/LeafSolicitedConnectionTests.cs
+++ b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafSolicitedConnectionTests.cs
@@ -5,8 +5,9 @@ using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Configuration;
using NATS.Server.LeafNodes;
using NATS.Server.Subscriptions;
+using NATS.Server.TestUtilities;
-namespace NATS.Server.Tests.LeafNodes;
+namespace NATS.Server.LeafNodes.Tests.LeafNodes;
///
/// Tests for solicited (outbound) leaf node connections with retry logic,
@@ -55,9 +56,7 @@ public class LeafSolicitedConnectionTests
await spoke.WaitForReadyAsync();
// Wait for leaf connections to establish
- using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
- await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
hub.Stats.Leafs.ShouldBeGreaterThan(0);
spoke.Stats.Leafs.ShouldBeGreaterThan(0);
@@ -93,7 +92,7 @@ public class LeafSolicitedConnectionTests
await manager.StartAsync(cts.Token);
// Give it some time to attempt connections
- await Task.Delay(500);
+ await PollHelper.YieldForAsync(500);
// No connections should have succeeded
stats.Leafs.ShouldBe(0);
@@ -185,7 +184,7 @@ public class LeafSolicitedConnectionTests
await manager.StartAsync(cts.Token);
// Let it attempt at least one retry
- await Task.Delay(200);
+ await PollHelper.YieldForAsync(200);
// Cancel — the retry loop should stop promptly
await cts.CancelAsync();
diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafSubKeyParityBatch2Tests.cs b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafSubKeyParityBatch2Tests.cs
similarity index 96%
rename from tests/NATS.Server.Tests/LeafNodes/LeafSubKeyParityBatch2Tests.cs
rename to tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafSubKeyParityBatch2Tests.cs
index 5a8759c..cc59c7f 100644
--- a/tests/NATS.Server.Tests/LeafNodes/LeafSubKeyParityBatch2Tests.cs
+++ b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafSubKeyParityBatch2Tests.cs
@@ -1,7 +1,7 @@
using NATS.Server.LeafNodes;
using NATS.Server.Subscriptions;
-namespace NATS.Server.Tests.LeafNodes;
+namespace NATS.Server.LeafNodes.Tests.LeafNodes;
public class LeafSubKeyParityBatch2Tests
{
diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafSubjectFilterTests.cs b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafSubjectFilterTests.cs
similarity index 94%
rename from tests/NATS.Server.Tests/LeafNodes/LeafSubjectFilterTests.cs
rename to tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafSubjectFilterTests.cs
index 9e9e4ed..41f6971 100644
--- a/tests/NATS.Server.Tests/LeafNodes/LeafSubjectFilterTests.cs
+++ b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafSubjectFilterTests.cs
@@ -6,8 +6,9 @@ using NATS.Client.Core;
using NATS.Server.Configuration;
using NATS.Server.LeafNodes;
using NATS.Server.Subscriptions;
+using NATS.Server.TestUtilities;
-namespace NATS.Server.Tests.LeafNodes;
+namespace NATS.Server.LeafNodes.Tests.LeafNodes;
///
/// Tests for leaf node subject filtering via DenyExports/DenyImports (deny-lists) and
@@ -196,9 +197,7 @@ public class LeafSubjectFilterTests
try
{
// Wait for leaf connection
- using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
- await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
await using var leafConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{spoke.Port}" });
await leafConn.ConnectAsync();
@@ -211,7 +210,7 @@ public class LeafSubjectFilterTests
await leafConn.PingAsync();
// Wait for interest propagation
- await Task.Delay(500);
+ await PollHelper.YieldForAsync(500);
// Publish from hub
await hubConn.PublishAsync("public.data", "allowed-msg");
@@ -285,9 +284,7 @@ public class LeafSubjectFilterTests
try
{
// Wait for leaf connection
- using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
- await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
await using var hubConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{hub.Port}" });
await hubConn.ConnectAsync();
@@ -300,7 +297,7 @@ public class LeafSubjectFilterTests
await hubConn.PingAsync();
// Wait for interest propagation
- await Task.Delay(500);
+ await PollHelper.YieldForAsync(500);
// Publish from spoke (leaf)
await leafConn.PublishAsync("public.data", "allowed-msg");
@@ -372,9 +369,7 @@ public class LeafSubjectFilterTests
try
{
- using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
- await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
await using var leafConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{spoke.Port}" });
await leafConn.ConnectAsync();
@@ -385,7 +380,7 @@ public class LeafSubjectFilterTests
await using var blockedSub = await leafConn.SubscribeCoreAsync("admin.users");
await using var allowedSub = await leafConn.SubscribeCoreAsync("admin.deep.nested");
await leafConn.PingAsync();
- await Task.Delay(500);
+ await PollHelper.YieldForAsync(500);
await hubConn.PublishAsync("admin.users", "blocked");
await hubConn.PublishAsync("admin.deep.nested", "allowed");
@@ -451,11 +446,11 @@ public class LeafSubjectFilterTests
var line = await ReadLineAsync(remoteSocket, cts.Token);
line.ShouldStartWith("LEAF ");
- await Task.Delay(200);
+ await PollHelper.YieldForAsync(200);
// Propagate allowed subscription
manager.PropagateLocalSubscription("$G", "public.data", null);
- await Task.Delay(100);
+ await PollHelper.YieldForAsync(100);
var lsLine = await ReadLineAsync(remoteSocket, cts.Token);
lsLine.ShouldBe("LS+ $G public.data");
@@ -464,7 +459,7 @@ public class LeafSubjectFilterTests
// Send a PING to verify nothing else was sent
manager.PropagateLocalSubscription("$G", "allowed.check", null);
- await Task.Delay(100);
+ await PollHelper.YieldForAsync(100);
var nextLine = await ReadLineAsync(remoteSocket, cts.Token);
nextLine.ShouldBe("LS+ $G allowed.check");
}
@@ -687,9 +682,7 @@ public class LeafSubjectFilterTests
try
{
- using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
- await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
await using var leafConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{spoke.Port}" });
await leafConn.ConnectAsync();
@@ -699,7 +692,7 @@ public class LeafSubjectFilterTests
await using var allowedSub = await leafConn.SubscribeCoreAsync("allowed.data");
await using var blockedSub = await leafConn.SubscribeCoreAsync("blocked.data");
await leafConn.PingAsync();
- await Task.Delay(500);
+ await PollHelper.YieldForAsync(500);
await hubConn.PublishAsync("allowed.data", "yes");
await hubConn.PublishAsync("blocked.data", "no");
@@ -768,9 +761,7 @@ public class LeafSubjectFilterTests
try
{
- using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
- await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => !((hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)), timeoutMs: 5000);
await using var hubConn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{hub.Port}" });
await hubConn.ConnectAsync();
@@ -780,7 +771,7 @@ public class LeafSubjectFilterTests
await using var allowedSub = await hubConn.SubscribeCoreAsync("allowed.data");
await using var blockedSub = await hubConn.SubscribeCoreAsync("blocked.data");
await hubConn.PingAsync();
- await Task.Delay(500);
+ await PollHelper.YieldForAsync(500);
await leafConn.PublishAsync("allowed.data", "yes");
await leafConn.PublishAsync("blocked.data", "no");
@@ -839,11 +830,11 @@ public class LeafSubjectFilterTests
var line = await ReadLineAsync(remoteSocket, cts.Token);
line.ShouldStartWith("LEAF ");
- await Task.Delay(200);
+ await PollHelper.YieldForAsync(200);
// Propagate allowed subscription
manager.PropagateLocalSubscription("$G", "allowed.data", null);
- await Task.Delay(100);
+ await PollHelper.YieldForAsync(100);
var lsLine = await ReadLineAsync(remoteSocket, cts.Token);
lsLine.ShouldBe("LS+ $G allowed.data");
@@ -852,7 +843,7 @@ public class LeafSubjectFilterTests
// Verify by sending another allowed subscription
manager.PropagateLocalSubscription("$G", "allowed.check", null);
- await Task.Delay(100);
+ await PollHelper.YieldForAsync(100);
var nextLine = await ReadLineAsync(remoteSocket, cts.Token);
nextLine.ShouldBe("LS+ $G allowed.check");
}
diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafTlsReloadTests.cs b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafTlsReloadTests.cs
similarity index 98%
rename from tests/NATS.Server.Tests/LeafNodes/LeafTlsReloadTests.cs
rename to tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafTlsReloadTests.cs
index a793d21..a96cf01 100644
--- a/tests/NATS.Server.Tests/LeafNodes/LeafTlsReloadTests.cs
+++ b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafTlsReloadTests.cs
@@ -2,7 +2,7 @@ using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Configuration;
using NATS.Server.LeafNodes;
-namespace NATS.Server.Tests.LeafNodes;
+namespace NATS.Server.LeafNodes.Tests.LeafNodes;
///
/// Unit tests for leaf node TLS certificate hot-reload (Gap 12.1).
diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafValidationTests.cs b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafValidationTests.cs
similarity index 99%
rename from tests/NATS.Server.Tests/LeafNodes/LeafValidationTests.cs
rename to tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafValidationTests.cs
index c8381d6..be768cd 100644
--- a/tests/NATS.Server.Tests/LeafNodes/LeafValidationTests.cs
+++ b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafValidationTests.cs
@@ -4,7 +4,7 @@ using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Configuration;
using NATS.Server.LeafNodes;
-namespace NATS.Server.Tests.LeafNodes;
+namespace NATS.Server.LeafNodes.Tests.LeafNodes;
///
/// Unit tests for leaf node reconnect state validation (Gap 12.3).
diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafWebSocketTests.cs b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafWebSocketTests.cs
similarity index 99%
rename from tests/NATS.Server.Tests/LeafNodes/LeafWebSocketTests.cs
rename to tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafWebSocketTests.cs
index 18229d1..40d6b98 100644
--- a/tests/NATS.Server.Tests/LeafNodes/LeafWebSocketTests.cs
+++ b/tests/NATS.Server.LeafNodes.Tests/LeafNodes/LeafWebSocketTests.cs
@@ -3,7 +3,7 @@ using System.Net.WebSockets;
using NSubstitute;
using NATS.Server.LeafNodes;
-namespace NATS.Server.Tests.LeafNodes;
+namespace NATS.Server.LeafNodes.Tests.LeafNodes;
///
/// Unit tests for (Gap 12.5).
diff --git a/tests/NATS.Server.Tests/LeafProtocolTests.cs b/tests/NATS.Server.LeafNodes.Tests/LeafProtocolTests.cs
similarity index 79%
rename from tests/NATS.Server.Tests/LeafProtocolTests.cs
rename to tests/NATS.Server.LeafNodes.Tests/LeafProtocolTests.cs
index 90da14e..f74ddec 100644
--- a/tests/NATS.Server.Tests/LeafProtocolTests.cs
+++ b/tests/NATS.Server.LeafNodes.Tests/LeafProtocolTests.cs
@@ -3,8 +3,9 @@ using System.Net.Sockets;
using System.Text;
using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Configuration;
+using NATS.Server.TestUtilities;
-namespace NATS.Server.Tests;
+namespace NATS.Server.LeafNodes.Tests;
public class LeafProtocolTests
{
@@ -70,9 +71,7 @@ internal sealed class LeafProtocolTestFixture : IAsyncDisposable
_ = spoke.StartAsync(spokeCts.Token);
await spoke.WaitForReadyAsync();
- using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0))
- await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
+ await PollHelper.WaitUntilAsync(() => hub.Stats.Leafs > 0 && spoke.Stats.Leafs > 0);
return new LeafProtocolTestFixture(hub, spoke, hubCts, spokeCts);
}
@@ -85,7 +84,7 @@ internal sealed class LeafProtocolTestFixture : IAsyncDisposable
_ = await ReadLineAsync(sock); // INFO
await sock.SendAsync(Encoding.ASCII.GetBytes($"CONNECT {{}}\r\nSUB {subject} 1\r\nPING\r\n"));
- await ReadUntilAsync(sock, "PONG");
+ await SocketTestHelper.ReadUntilAsync(sock, "PONG");
}
public async Task PublishHubAsync(string subject, string payload)
@@ -98,11 +97,11 @@ internal sealed class LeafProtocolTestFixture : IAsyncDisposable
_hubPublisher = sock;
_ = await ReadLineAsync(sock); // INFO
await sock.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPING\r\n"));
- await ReadUntilAsync(sock, "PONG");
+ await SocketTestHelper.ReadUntilAsync(sock, "PONG");
}
await sock.SendAsync(Encoding.ASCII.GetBytes($"PUB {subject} {payload.Length}\r\n{payload}\r\nPING\r\n"));
- await ReadUntilAsync(sock, "PONG");
+ await SocketTestHelper.ReadUntilAsync(sock, "PONG");
}
public Task ReadSpokeMessageAsync()
@@ -110,7 +109,7 @@ internal sealed class LeafProtocolTestFixture : IAsyncDisposable
if (_spokeSubscriber == null)
throw new InvalidOperationException("Spoke subscriber was not initialized.");
- return ReadUntilAsync(_spokeSubscriber, "MSG ");
+ return SocketTestHelper.ReadUntilAsync(_spokeSubscriber, "MSG ");
}
public async ValueTask DisposeAsync()
@@ -132,20 +131,5 @@ internal sealed class LeafProtocolTestFixture : IAsyncDisposable
return Encoding.ASCII.GetString(buf, 0, n);
}
- private static async Task ReadUntilAsync(Socket sock, string expected)
- {
- var sb = new StringBuilder();
- var buf = new byte[4096];
- using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- while (!sb.ToString().Contains(expected, StringComparison.Ordinal))
- {
- var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token);
- if (n == 0)
- break;
- sb.Append(Encoding.ASCII.GetString(buf, 0, n));
- }
-
- return sb.ToString();
- }
}
diff --git a/tests/NATS.Server.LeafNodes.Tests/NATS.Server.LeafNodes.Tests.csproj b/tests/NATS.Server.LeafNodes.Tests/NATS.Server.LeafNodes.Tests.csproj
new file mode 100644
index 0000000..81a4151
--- /dev/null
+++ b/tests/NATS.Server.LeafNodes.Tests/NATS.Server.LeafNodes.Tests.csproj
@@ -0,0 +1,25 @@
+
+
+ false
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+