Files
Joseph Doherty 5c608f07e3 Move shared fixtures and parity utilities to TestUtilities project
- git mv JetStreamApiFixture, JetStreamClusterFixture, LeafFixture,
  Parity utilities, and TestData from NATS.Server.Tests to
  NATS.Server.TestUtilities
- Update namespaces to NATS.Server.TestUtilities (and .Parity sub-ns)
- Make fixture classes public for cross-project access
- Add PollHelper to replace Task.Delay polling with SemaphoreSlim waits
- Refactor all fixture polling loops to use PollHelper
- Add 'using NATS.Server.TestUtilities;' to ~75 consuming test files
- Rename local fixture duplicates (MetaGroupTestFixture,
  LeafProtocolTestFixture) to avoid shadowing shared fixtures
- Remove TestData entry from NATS.Server.Tests.csproj (moved to
  TestUtilities)
2026-03-12 14:45:21 -04:00

97 lines
2.8 KiB
C#

using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Configuration;
namespace NATS.Server.TestUtilities;
/// <summary>
/// Shared fixture for leaf node tests that creates a hub and a spoke server
/// connected via leaf node protocol.
/// </summary>
public sealed class LeafFixture : IAsyncDisposable
{
private readonly CancellationTokenSource _hubCts;
private readonly CancellationTokenSource _spokeCts;
private LeafFixture(NatsServer hub, NatsServer spoke, CancellationTokenSource hubCts, CancellationTokenSource spokeCts)
{
Hub = hub;
Spoke = spoke;
_hubCts = hubCts;
_spokeCts = spokeCts;
}
public NatsServer Hub { get; }
public NatsServer Spoke { get; }
public static async Task<LeafFixture> StartAsync()
{
var hubOptions = new NatsOptions
{
Host = "127.0.0.1",
Port = 0,
LeafNode = new LeafNodeOptions
{
Host = "127.0.0.1",
Port = 0,
},
};
var hub = new NatsServer(hubOptions, NullLoggerFactory.Instance);
var hubCts = new CancellationTokenSource();
_ = hub.StartAsync(hubCts.Token);
await hub.WaitForReadyAsync();
var spokeOptions = new NatsOptions
{
Host = "127.0.0.1",
Port = 0,
LeafNode = new LeafNodeOptions
{
Host = "127.0.0.1",
Port = 0,
Remotes = [hub.LeafListen!],
},
};
var spoke = new NatsServer(spokeOptions, NullLoggerFactory.Instance);
var spokeCts = new CancellationTokenSource();
_ = spoke.StartAsync(spokeCts.Token);
await spoke.WaitForReadyAsync();
await PollHelper.WaitUntilAsync(
() => hub.Stats.Leafs > 0 && spoke.Stats.Leafs > 0,
timeoutMs: 5000,
intervalMs: 50);
return new LeafFixture(hub, spoke, hubCts, spokeCts);
}
public async Task WaitForRemoteInterestOnHubAsync(string subject)
{
await PollHelper.WaitOrThrowAsync(
() => Hub.HasRemoteInterest(subject),
$"Timed out waiting for remote interest on hub for '{subject}'.",
timeoutMs: 5000,
intervalMs: 50);
}
public async Task WaitForRemoteInterestOnSpokeAsync(string subject)
{
await PollHelper.WaitOrThrowAsync(
() => Spoke.HasRemoteInterest(subject),
$"Timed out waiting for remote interest on spoke for '{subject}'.",
timeoutMs: 5000,
intervalMs: 50);
}
public async ValueTask DisposeAsync()
{
await _spokeCts.CancelAsync();
await _hubCts.CancelAsync();
Spoke.Dispose();
Hub.Dispose();
_spokeCts.Dispose();
_hubCts.Dispose();
}
}