Validates the server against the official NATS .NET client library with tests for basic pub/sub, wildcard (* and >) matching, fan-out to multiple subscribers, and PING/PONG keepalive. All 5 tests pass without requiring any server changes.
180 lines
4.9 KiB
C#
180 lines
4.9 KiB
C#
using System.Net;
|
|
using System.Net.Sockets;
|
|
using System.Text;
|
|
using NATS.Client.Core;
|
|
using NATS.Server;
|
|
|
|
namespace NATS.Server.Tests;
|
|
|
|
public class IntegrationTests : IAsyncDisposable
|
|
{
|
|
private readonly NatsServer _server;
|
|
private readonly int _port;
|
|
private readonly CancellationTokenSource _cts = new();
|
|
private readonly Task _serverTask;
|
|
|
|
public IntegrationTests()
|
|
{
|
|
_port = GetFreePort();
|
|
_server = new NatsServer(new NatsOptions { Port = _port });
|
|
_serverTask = _server.StartAsync(_cts.Token);
|
|
Thread.Sleep(200); // Let server start
|
|
}
|
|
|
|
public async ValueTask DisposeAsync()
|
|
{
|
|
await _cts.CancelAsync();
|
|
_server.Dispose();
|
|
}
|
|
|
|
private static int GetFreePort()
|
|
{
|
|
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
|
sock.Bind(new IPEndPoint(IPAddress.Loopback, 0));
|
|
return ((IPEndPoint)sock.LocalEndPoint!).Port;
|
|
}
|
|
|
|
private NatsConnection CreateClient()
|
|
{
|
|
var opts = new NatsOpts { Url = $"nats://127.0.0.1:{_port}" };
|
|
return new NatsConnection(opts);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task PubSub_basic()
|
|
{
|
|
await using var pub = CreateClient();
|
|
await using var sub = CreateClient();
|
|
|
|
await pub.ConnectAsync();
|
|
await sub.ConnectAsync();
|
|
|
|
var received = new TaskCompletionSource<string>();
|
|
|
|
var subscription = Task.Run(async () =>
|
|
{
|
|
await foreach (var msg in sub.SubscribeAsync<string>("test.subject"))
|
|
{
|
|
received.TrySetResult(msg.Data!);
|
|
break;
|
|
}
|
|
});
|
|
|
|
await Task.Delay(100); // let subscription register
|
|
|
|
await pub.PublishAsync("test.subject", "Hello NATS!");
|
|
|
|
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
|
Assert.Equal("Hello NATS!", result);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task PubSub_wildcard_star()
|
|
{
|
|
await using var pub = CreateClient();
|
|
await using var sub = CreateClient();
|
|
|
|
await pub.ConnectAsync();
|
|
await sub.ConnectAsync();
|
|
|
|
var received = new TaskCompletionSource<string>();
|
|
|
|
var subscription = Task.Run(async () =>
|
|
{
|
|
await foreach (var msg in sub.SubscribeAsync<string>("test.*"))
|
|
{
|
|
received.TrySetResult(msg.Subject);
|
|
break;
|
|
}
|
|
});
|
|
|
|
await Task.Delay(100);
|
|
await pub.PublishAsync("test.hello", "data");
|
|
|
|
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
|
Assert.Equal("test.hello", result);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task PubSub_wildcard_gt()
|
|
{
|
|
await using var pub = CreateClient();
|
|
await using var sub = CreateClient();
|
|
|
|
await pub.ConnectAsync();
|
|
await sub.ConnectAsync();
|
|
|
|
var received = new TaskCompletionSource<string>();
|
|
|
|
var subscription = Task.Run(async () =>
|
|
{
|
|
await foreach (var msg in sub.SubscribeAsync<string>("test.>"))
|
|
{
|
|
received.TrySetResult(msg.Subject);
|
|
break;
|
|
}
|
|
});
|
|
|
|
await Task.Delay(100);
|
|
await pub.PublishAsync("test.foo.bar.baz", "data");
|
|
|
|
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
|
Assert.Equal("test.foo.bar.baz", result);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task PubSub_fan_out()
|
|
{
|
|
await using var pub = CreateClient();
|
|
await using var sub1 = CreateClient();
|
|
await using var sub2 = CreateClient();
|
|
|
|
await pub.ConnectAsync();
|
|
await sub1.ConnectAsync();
|
|
await sub2.ConnectAsync();
|
|
|
|
var count1 = 0;
|
|
var count2 = 0;
|
|
var done = new TaskCompletionSource();
|
|
|
|
var s1 = Task.Run(async () =>
|
|
{
|
|
await foreach (var msg in sub1.SubscribeAsync<string>("fanout"))
|
|
{
|
|
Interlocked.Increment(ref count1);
|
|
if (Volatile.Read(ref count1) + Volatile.Read(ref count2) >= 2)
|
|
done.TrySetResult();
|
|
break;
|
|
}
|
|
});
|
|
|
|
var s2 = Task.Run(async () =>
|
|
{
|
|
await foreach (var msg in sub2.SubscribeAsync<string>("fanout"))
|
|
{
|
|
Interlocked.Increment(ref count2);
|
|
if (Volatile.Read(ref count1) + Volatile.Read(ref count2) >= 2)
|
|
done.TrySetResult();
|
|
break;
|
|
}
|
|
});
|
|
|
|
await Task.Delay(100);
|
|
await pub.PublishAsync("fanout", "hello");
|
|
|
|
await done.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
|
Assert.Equal(1, count1);
|
|
Assert.Equal(1, count2);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task PingPong()
|
|
{
|
|
await using var client = CreateClient();
|
|
await client.ConnectAsync();
|
|
|
|
// If we got here, the connection is alive and PING/PONG works
|
|
await client.PingAsync();
|
|
}
|
|
}
|