diff --git a/tests/NATS.E2E.Tests/Infrastructure/WebSocketServerFixture.cs b/tests/NATS.E2E.Tests/Infrastructure/WebSocketServerFixture.cs new file mode 100644 index 0000000..f03df9d --- /dev/null +++ b/tests/NATS.E2E.Tests/Infrastructure/WebSocketServerFixture.cs @@ -0,0 +1,37 @@ +using NATS.Client.Core; + +namespace NATS.E2E.Tests.Infrastructure; + +public sealed class WebSocketServerFixture : IAsyncLifetime +{ + private NatsServerProcess _server = null!; + + public int Port => _server.Port; + public int WsPort { get; private set; } + + public async Task InitializeAsync() + { + WsPort = NatsServerProcess.AllocateFreePort(); + + var config = $$""" + websocket { + listen: 127.0.0.1:{{WsPort}} + no_tls: true + } + """; + + _server = NatsServerProcess.WithConfig(config); + await _server.StartAsync(); + } + + public async Task DisposeAsync() + { + await _server.DisposeAsync(); + } + + public NatsConnection CreateNatsClient() + => new(new NatsOpts { Url = $"nats://127.0.0.1:{Port}" }); +} + +[CollectionDefinition("E2E-WebSocket")] +public class WebSocketCollection : ICollectionFixture; diff --git a/tests/NATS.E2E.Tests/WebSocketTests.cs b/tests/NATS.E2E.Tests/WebSocketTests.cs new file mode 100644 index 0000000..158e80d --- /dev/null +++ b/tests/NATS.E2E.Tests/WebSocketTests.cs @@ -0,0 +1,99 @@ +using System.Net.WebSockets; +using System.Text; +using NATS.E2E.Tests.Infrastructure; + +namespace NATS.E2E.Tests; + +[Collection("E2E-WebSocket")] +public class WebSocketTests(WebSocketServerFixture fixture) +{ + [Fact] + public async Task WebSocket_ConnectAndReceiveInfo() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + using var ws = new ClientWebSocket(); + + await ws.ConnectAsync(new Uri($"ws://127.0.0.1:{fixture.WsPort}"), cts.Token); + ws.State.ShouldBe(WebSocketState.Open); + + var buffer = new byte[4096]; + var result = await ws.ReceiveAsync(buffer, cts.Token); + var info = Encoding.ASCII.GetString(buffer, 0, result.Count); + info.ShouldStartWith("INFO"); + + await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, null, cts.Token); + } + + [Fact] + public async Task WebSocket_PubSub_RoundTrip() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + using var ws = new ClientWebSocket(); + + await ws.ConnectAsync(new Uri($"ws://127.0.0.1:{fixture.WsPort}"), cts.Token); + + var reader = new WsLineReader(ws); + + // Read the initial INFO frame + await reader.ReadLineAsync(cts.Token); + + await WsSend(ws, "CONNECT {\"verbose\":false,\"protocol\":1}\r\n", cts.Token); + await WsSend(ws, "SUB e2e.ws.test 1\r\n", cts.Token); + await WsSend(ws, "PING\r\n", cts.Token); + var pong = await reader.ReadLineAsync(cts.Token); + pong.ShouldBe("PONG"); + + await using var natsClient = fixture.CreateNatsClient(); + await natsClient.ConnectAsync(); + await natsClient.PublishAsync("e2e.ws.test", "ws-hello"); + await natsClient.PingAsync(); + + var msgLine = await reader.ReadLineAsync(cts.Token); + msgLine.ShouldStartWith("MSG e2e.ws.test 1"); + + var payload = await reader.ReadLineAsync(cts.Token); + payload.ShouldBe("ws-hello"); + + await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, null, cts.Token); + } + + private static async Task WsSend(ClientWebSocket ws, string data, CancellationToken ct) + { + var bytes = Encoding.ASCII.GetBytes(data); + await ws.SendAsync(bytes, WebSocketMessageType.Binary, true, ct); + } + + /// + /// Buffers incoming WebSocket frames and returns one NATS protocol line at a time. + /// Handles the case where a single WebSocket frame contains multiple protocol lines + /// (e.g., MSG header + payload delivered in one frame). + /// + private sealed class WsLineReader(ClientWebSocket ws) + { + private readonly byte[] _recvBuffer = new byte[4096]; + private readonly StringBuilder _pending = new(); + + public async Task ReadLineAsync(CancellationToken ct) + { + while (true) + { + var full = _pending.ToString(); + var crlfIdx = full.IndexOf("\r\n", StringComparison.Ordinal); + if (crlfIdx >= 0) + { + var line = full[..crlfIdx]; + _pending.Clear(); + _pending.Append(full[(crlfIdx + 2)..]); + return line; + } + + var result = await ws.ReceiveAsync(_recvBuffer, ct); + if (result.MessageType == WebSocketMessageType.Close) + throw new InvalidOperationException("WebSocket closed unexpectedly while reading"); + + var chunk = Encoding.ASCII.GetString(_recvBuffer, 0, result.Count); + _pending.Append(chunk); + } + } + } +}