using System.Net; using System.Net.Sockets; using System.Text; using Microsoft.Extensions.Logging.Abstractions; using NATS.Server; namespace NATS.Server.Tests; public class ServerTests : IAsyncLifetime { private readonly NatsServer _server; private readonly int _port; private readonly CancellationTokenSource _cts = new(); public ServerTests() { // Use random port _port = GetFreePort(); _server = new NatsServer(new NatsOptions { Port = _port }, NullLoggerFactory.Instance); } public async Task InitializeAsync() { _ = _server.StartAsync(_cts.Token); await _server.WaitForReadyAsync(); } public async Task DisposeAsync() { await _cts.CancelAsync(); _server.Dispose(); } private static int GetFreePort() { using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); sock.Bind(new IPEndPoint(IPAddress.Loopback, 0)); return ((IPEndPoint)sock.LocalEndPoint!).Port; } private async Task ConnectClientAsync() { var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await sock.ConnectAsync(IPAddress.Loopback, _port); return sock; } private static async Task ReadLineAsync(Socket sock, int bufSize = 4096) { var buf = new byte[bufSize]; var n = await sock.ReceiveAsync(buf, SocketFlags.None); return Encoding.ASCII.GetString(buf, 0, n); } /// /// Reads from a socket until the accumulated data contains the expected substring. /// private static async Task ReadUntilAsync(Socket sock, string expected, int timeoutMs = 5000) { using var cts = new CancellationTokenSource(timeoutMs); var sb = new StringBuilder(); var buf = new byte[4096]; while (!sb.ToString().Contains(expected)) { var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token); if (n == 0) break; sb.Append(Encoding.ASCII.GetString(buf, 0, n)); } return sb.ToString(); } [Fact] public async Task Server_accepts_connection_and_sends_INFO() { using var client = await ConnectClientAsync(); var response = await ReadLineAsync(client); response.ShouldStartWith("INFO "); } [Fact] public async Task Server_basic_pubsub() { using var pub = await ConnectClientAsync(); using var sub = await ConnectClientAsync(); // Read INFO from both await ReadLineAsync(pub); await ReadLineAsync(sub); // CONNECT + SUB on subscriber, then PING to flush await sub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB foo 1\r\nPING\r\n")); var pong = await ReadLineAsync(sub); pong.ShouldContain("PONG"); // CONNECT + PUB on publisher await pub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPUB foo 5\r\nHello\r\n")); // Read MSG from subscriber (may arrive across multiple TCP segments) var msg = await ReadUntilAsync(sub, "Hello\r\n"); msg.ShouldContain("MSG foo 1 5\r\nHello\r\n"); } [Fact] public async Task Server_wildcard_matching() { using var pub = await ConnectClientAsync(); using var sub = await ConnectClientAsync(); await ReadLineAsync(pub); await ReadLineAsync(sub); // CONNECT + SUB on subscriber, then PING to flush await sub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB foo.* 1\r\nPING\r\n")); var pong = await ReadLineAsync(sub); pong.ShouldContain("PONG"); await pub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPUB foo.bar 5\r\nHello\r\n")); var buf = new byte[4096]; var n = await sub.ReceiveAsync(buf, SocketFlags.None); var msg = Encoding.ASCII.GetString(buf, 0, n); msg.ShouldContain("MSG foo.bar 1 5\r\n"); } }