diff --git a/tests/NATS.Server.Tests/IntegrationTests.cs b/tests/NATS.Server.Tests/IntegrationTests.cs new file mode 100644 index 0000000..0c0acfb --- /dev/null +++ b/tests/NATS.Server.Tests/IntegrationTests.cs @@ -0,0 +1,179 @@ +using System.Net; +using System.Net.Sockets; +using System.Text; +using NATS.Client.Core; +using NATS.Server; + +namespace NATS.Server.Tests; + +public class IntegrationTests : IAsyncDisposable +{ + private readonly NatsServer _server; + private readonly int _port; + private readonly CancellationTokenSource _cts = new(); + private readonly Task _serverTask; + + public IntegrationTests() + { + _port = GetFreePort(); + _server = new NatsServer(new NatsOptions { Port = _port }); + _serverTask = _server.StartAsync(_cts.Token); + Thread.Sleep(200); // Let server start + } + + public async ValueTask DisposeAsync() + { + await _cts.CancelAsync(); + _server.Dispose(); + } + + private static int GetFreePort() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + sock.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + return ((IPEndPoint)sock.LocalEndPoint!).Port; + } + + private NatsConnection CreateClient() + { + var opts = new NatsOpts { Url = $"nats://127.0.0.1:{_port}" }; + return new NatsConnection(opts); + } + + [Fact] + public async Task PubSub_basic() + { + await using var pub = CreateClient(); + await using var sub = CreateClient(); + + await pub.ConnectAsync(); + await sub.ConnectAsync(); + + var received = new TaskCompletionSource(); + + var subscription = Task.Run(async () => + { + await foreach (var msg in sub.SubscribeAsync("test.subject")) + { + received.TrySetResult(msg.Data!); + break; + } + }); + + await Task.Delay(100); // let subscription register + + await pub.PublishAsync("test.subject", "Hello NATS!"); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.Equal("Hello NATS!", result); + } + + [Fact] + public async Task PubSub_wildcard_star() + { + await using var pub = CreateClient(); + await using var sub = CreateClient(); + + await pub.ConnectAsync(); + await sub.ConnectAsync(); + + var received = new TaskCompletionSource(); + + var subscription = Task.Run(async () => + { + await foreach (var msg in sub.SubscribeAsync("test.*")) + { + received.TrySetResult(msg.Subject); + break; + } + }); + + await Task.Delay(100); + await pub.PublishAsync("test.hello", "data"); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.Equal("test.hello", result); + } + + [Fact] + public async Task PubSub_wildcard_gt() + { + await using var pub = CreateClient(); + await using var sub = CreateClient(); + + await pub.ConnectAsync(); + await sub.ConnectAsync(); + + var received = new TaskCompletionSource(); + + var subscription = Task.Run(async () => + { + await foreach (var msg in sub.SubscribeAsync("test.>")) + { + received.TrySetResult(msg.Subject); + break; + } + }); + + await Task.Delay(100); + await pub.PublishAsync("test.foo.bar.baz", "data"); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.Equal("test.foo.bar.baz", result); + } + + [Fact] + public async Task PubSub_fan_out() + { + await using var pub = CreateClient(); + await using var sub1 = CreateClient(); + await using var sub2 = CreateClient(); + + await pub.ConnectAsync(); + await sub1.ConnectAsync(); + await sub2.ConnectAsync(); + + var count1 = 0; + var count2 = 0; + var done = new TaskCompletionSource(); + + var s1 = Task.Run(async () => + { + await foreach (var msg in sub1.SubscribeAsync("fanout")) + { + Interlocked.Increment(ref count1); + if (Volatile.Read(ref count1) + Volatile.Read(ref count2) >= 2) + done.TrySetResult(); + break; + } + }); + + var s2 = Task.Run(async () => + { + await foreach (var msg in sub2.SubscribeAsync("fanout")) + { + Interlocked.Increment(ref count2); + if (Volatile.Read(ref count1) + Volatile.Read(ref count2) >= 2) + done.TrySetResult(); + break; + } + }); + + await Task.Delay(100); + await pub.PublishAsync("fanout", "hello"); + + await done.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.Equal(1, count1); + Assert.Equal(1, count2); + } + + [Fact] + public async Task PingPong() + { + await using var client = CreateClient(); + await client.ConnectAsync(); + + // If we got here, the connection is alive and PING/PONG works + await client.PingAsync(); + } +} diff --git a/tests/NATS.Server.Tests/NATS.Server.Tests.csproj b/tests/NATS.Server.Tests/NATS.Server.Tests.csproj index c0253bc..dba85ed 100644 --- a/tests/NATS.Server.Tests/NATS.Server.Tests.csproj +++ b/tests/NATS.Server.Tests/NATS.Server.Tests.csproj @@ -7,6 +7,7 @@ +