From e98e686ef20165ce77ef3dbe6ba678ced85f8a0b Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 21:34:50 -0500 Subject: [PATCH 1/7] chore(batch26): start websocket batch after dependency gate --- porting.db | Bin 6742016 -> 6742016 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/porting.db b/porting.db index e7f62ebf6e424b57e3db29db88169fa0013b9037..ef8c19ac636a0e2040e7e631cc54d77a5682d913 100644 GIT binary patch delta 427 zcmYk#%S#k-0KoD2?d;Az=A~;{t(w}ZZIfo2z09(F^-xPQwX9$3yF@5i!L#9){1Zm# zvR)VRBrGJz5`ie~DX^P|F5bH3K@b{m9zN&KV(yE&cnpCL6+Zzqf`kYYLC3(v!lr;i zq7-q8(-d=tvz#MF38j>Ao^mRvO(92Vv(MLbe8Q=wj3^B}0UNORJ-mFV4Ei!H} zEfYli)Sv zG+Ff_vKPr1yGB<3rRT!G!`aYUD6Rd}rhm|tPGHVnuwAQc&04y- zD4GLhQ$`oP#s<-%jn$gPt&+-F(eXMnP~R3`*)@=yC`if;LJC3XOO@R2$I From 3653345a37808327cf537ab5d1aeff4150c181fc Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 21:45:05 -0500 Subject: [PATCH 2/7] feat(batch26): implement websocket frame/core feature group A --- .../ClientConnection.WebSocket.cs | 253 +++++ .../ZB.MOM.NatsNet.Server/ClientConnection.cs | 24 +- .../NatsServer.Lifecycle.cs | 6 +- .../NatsServer.Listeners.cs | 6 +- .../NatsServer.WebSocket.cs | 307 ++++++ .../WebSocket/WebSocketConstants.cs | 2 + .../WebSocket/WebSocketHandler.cs | 28 + .../WebSocket/WebSocketHelpers.cs | 135 +++ .../WebSocket/WebSocketTypes.cs | 226 ++-- .../ImplBacklog/WebSocketHandlerTests.cs | 968 ++++++------------ porting.db | Bin 6742016 -> 6742016 bytes 11 files changed, 1252 insertions(+), 703 deletions(-) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.WebSocket.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.WebSocket.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketHandler.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketHelpers.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.WebSocket.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.WebSocket.cs new file mode 100644 index 0000000..9f5f247 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.WebSocket.cs @@ -0,0 +1,253 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. + +using System.Buffers.Binary; +using System.Text; +using ZB.MOM.NatsNet.Server.WebSocket; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class ClientConnection +{ + internal (List chunks, long attempted) WsCollapsePtoNB() => CollapsePtoNB(); + + internal List WsRead(WsReadInfo readInfo, Stream reader, byte[] buffer) + { + var bufs = new List(); + var pos = 0; + var max = buffer.Length; + var maxPayload = Volatile.Read(ref _mpay); + + while (pos != max) + { + if (readInfo.FrameStart) + { + var b0 = buffer[pos]; + var frameType = (WsOpCode)(b0 & 0xF); + var final = (b0 & WsConstants.FinalBit) != 0; + var compressed = (b0 & WsConstants.Rsv1Bit) != 0; + pos++; + + var (firstLen, newPos) = WebSocketHelpers.WsGet(reader, buffer, pos, 1); + pos = newPos; + var b1 = firstLen[0]; + + if (readInfo.Mask && (b1 & WsConstants.MaskBit) == 0) + throw WsHandleProtocolError("mask bit missing"); + + readInfo.Rem = b1 & 0x7F; + + switch (frameType) + { + case WsOpCode.Ping: + case WsOpCode.Pong: + case WsOpCode.Close: + if (readInfo.Rem > WsConstants.MaxControlPayloadSize) + throw WsHandleProtocolError($"control frame length bigger than maximum allowed of {WsConstants.MaxControlPayloadSize} bytes"); + if (!final) + throw WsHandleProtocolError("control frame does not have final bit set"); + break; + case WsOpCode.Text: + case WsOpCode.Binary: + if (!readInfo.FinalFrameReceived) + throw WsHandleProtocolError("new message started before final frame for previous message was received"); + readInfo.FinalFrameReceived = final; + readInfo.FrameCompressed = compressed; + break; + case WsOpCode.Continuation: + if (readInfo.FinalFrameReceived || compressed) + throw WsHandleProtocolError("invalid continuation frame"); + readInfo.FinalFrameReceived = final; + break; + default: + throw WsHandleProtocolError($"unknown opcode {(int)frameType}"); + } + + if (readInfo.Rem == 126) + { + var (extended, p) = WebSocketHelpers.WsGet(reader, buffer, pos, 2); + pos = p; + readInfo.Rem = BinaryPrimitives.ReadUInt16BigEndian(extended); + } + else if (readInfo.Rem == 127) + { + var (extended, p) = WebSocketHelpers.WsGet(reader, buffer, pos, 8); + pos = p; + readInfo.Rem = checked((int)BinaryPrimitives.ReadUInt64BigEndian(extended)); + } + + if (readInfo.Mask) + { + var (maskKey, p) = WebSocketHelpers.WsGet(reader, buffer, pos, 4); + pos = p; + Array.Copy(maskKey, 0, readInfo.MaskKey, 0, 4); + readInfo.MaskKeyPosition = 0; + } + + if (WebSocketHelpers.WsIsControlFrame(frameType)) + { + pos = WsHandleControlFrame(readInfo, frameType, reader, buffer, pos); + continue; + } + + readInfo.FrameStart = false; + } + + if (pos >= max) + continue; + + var n = readInfo.Rem; + if (pos + n > max) + n = max - pos; + + var payload = buffer.AsSpan(pos, n).ToArray(); + pos += n; + readInfo.Rem -= n; + + if (readInfo.Mask) + readInfo.Unmask(payload); + + var addToBufs = true; + if (readInfo.FrameCompressed) + { + addToBufs = false; + readInfo.CompressedBuffers.Add(payload); + if (readInfo.FinalFrameReceived && readInfo.Rem == 0) + { + payload = readInfo.Decompress(maxPayload); + readInfo.FrameCompressed = false; + addToBufs = true; + } + } + + if (addToBufs) + bufs.Add(payload); + + if (readInfo.Rem == 0) + readInfo.FrameStart = true; + } + + return bufs; + } + + internal int WsHandleControlFrame(WsReadInfo readInfo, WsOpCode frameType, Stream networkConnection, byte[] buffer, int pos) + { + byte[] payload = []; + if (readInfo.Rem > 0) + { + (payload, pos) = WebSocketHelpers.WsGet(networkConnection, buffer, pos, readInfo.Rem); + if (readInfo.Mask) + readInfo.Unmask(payload); + readInfo.Rem = 0; + } + + switch (frameType) + { + case WsOpCode.Close: + { + var status = WsConstants.CloseNoStatusReceived; + string body = string.Empty; + + var payloadLength = payload.Length; + var hasStatus = payloadLength >= WsConstants.CloseStatusSize; + var hasBody = payloadLength > WsConstants.CloseStatusSize; + if (hasStatus) + { + status = BinaryPrimitives.ReadUInt16BigEndian(payload.AsSpan(0, 2)); + if (hasBody) + { + body = Encoding.UTF8.GetString(payload.AsSpan(WsConstants.CloseStatusSize)); + if (!Encoding.UTF8.GetBytes(body).AsSpan().SequenceEqual(payload.AsSpan(WsConstants.CloseStatusSize))) + { + status = WsConstants.CloseInvalidPayloadData; + body = "invalid utf8 body in close frame"; + } + } + } + + byte[]? closeMessage = null; + if (status != WsConstants.CloseNoStatusReceived) + closeMessage = WebSocketHelpers.WsCreateCloseMessage(status, body); + + WsEnqueueControlMessage(WsOpCode.Close, closeMessage ?? []); + throw new EndOfStreamException(); + } + case WsOpCode.Ping: + WsEnqueueControlMessage(WsOpCode.Pong, payload); + break; + case WsOpCode.Pong: + break; + } + + return pos; + } + + internal void WsEnqueueControlMessage(WsOpCode controlMessage, byte[] payload) + { + lock (_mu) + WsEnqueueControlMessageLocked(controlMessage, payload); + } + + internal void WsEnqueueControlMessageLocked(WsOpCode controlMessage, byte[] payload) + { + if (Ws == null) + Ws = new WebsocketConnection(); + + var useMasking = Ws.MaskWrite; + var headerSize = 2 + (useMasking ? 4 : 0); + var control = NbPool.Get(headerSize + payload.Length); + + var (n, key) = WebSocketHelpers.WsFillFrameHeader(control, useMasking, first: true, final: true, compressed: false, controlMessage, payload.Length); + var totalLen = n; + if (payload.Length > 0) + { + Array.Copy(payload, 0, control, n, payload.Length); + if (useMasking && key != null) + WebSocketHelpers.WsMaskBuf(key, control.AsSpan(n, payload.Length)); + totalLen += payload.Length; + } + + var frame = control[..totalLen]; + OutPb += totalLen; + if (controlMessage == WsOpCode.Close) + { + Ws.CloseSent = true; + Ws.CloseMessage = frame; + } + else + { + Ws.Frames.Add(frame); + Ws.FrameSize += frame.Length; + } + + FlushSignal(); + } + + internal void WsEnqueueCloseMessage(ClosedState reason) + { + var status = reason switch + { + ClosedState.ClientClosed => WsConstants.CloseNormalClosure, + ClosedState.AuthenticationTimeout or ClosedState.AuthenticationViolation or ClosedState.SlowConsumerPendingBytes or + ClosedState.SlowConsumerWriteDeadline or ClosedState.MaxAccountConnectionsExceeded or ClosedState.MaxConnectionsExceeded or + ClosedState.MaxControlLineExceeded or ClosedState.MaxSubscriptionsExceeded or ClosedState.MissingAccount or + ClosedState.AuthenticationExpired or ClosedState.Revocation => WsConstants.ClosePolicyViolation, + ClosedState.TlsHandshakeError => WsConstants.CloseTlsHandshake, + ClosedState.ParseError or ClosedState.ProtocolViolation or ClosedState.BadClientProtocolVersion => WsConstants.CloseProtocolError, + ClosedState.MaxPayloadExceeded => WsConstants.CloseMessageTooBig, + ClosedState.WriteError or ClosedState.ReadError or ClosedState.StaleConnection or ClosedState.ServerShutdown => WsConstants.CloseGoingAway, + _ => WsConstants.CloseInternalError, + }; + + var body = WebSocketHelpers.WsCreateCloseMessage(status, reason.ToString()); + WsEnqueueControlMessageLocked(WsOpCode.Close, body); + } + + internal Exception WsHandleProtocolError(string message) + { + var payload = WebSocketHelpers.WsCreateCloseMessage(WsConstants.CloseProtocolError, message); + WsEnqueueControlMessage(WsOpCode.Close, payload); + return new InvalidDataException(message); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs index 030d937..1d0806e 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs @@ -26,6 +26,7 @@ using ZB.MOM.NatsNet.Server.Auth; using ZB.MOM.NatsNet.Server.Internal; using ZB.MOM.NatsNet.Server.Internal.DataStructures; using ZB.MOM.NatsNet.Server.Protocol; +using ZB.MOM.NatsNet.Server.WebSocket; namespace ZB.MOM.NatsNet.Server; @@ -113,6 +114,7 @@ public sealed partial class ClientConnection // Client options (from CONNECT message). internal ClientOptions Opts = ClientOptions.Default; internal Route? Route; + internal WebsocketConnection? Ws; // Flags and state. internal ClientFlags Flags; // mirrors c.flags clientFlag @@ -1484,10 +1486,26 @@ public sealed partial class ClientConnection internal (List chunks, long attempted) CollapsePtoNB() { + var chunks = OutNb; + if (Ws != null && Ws.Frames.Count > 0) + { + chunks = [..OutNb]; + foreach (var frame in Ws.Frames) + chunks.Add(new OutboundChunk(frame, frame.Length)); + Ws.Frames.Clear(); + Ws.FrameSize = 0; + } + + if (Ws is { CloseSent: true, CloseMessage: not null } && OutPb == Ws.CloseMessage.Length) + { + chunks = [..chunks, new OutboundChunk(Ws.CloseMessage, Ws.CloseMessage.Length)]; + Ws.CloseMessage = null; + } + long attempted = 0; - foreach (var chunk in OutNb) + foreach (var chunk in chunks) attempted += chunk.Count; - return (OutNb, attempted); + return (chunks, attempted); } internal bool FlushOutbound() @@ -1784,7 +1802,7 @@ public sealed partial class ClientConnection // ========================================================================= internal bool IsMqtt() => false; // Deferred to session 22 (MQTT). - internal bool IsWebSocket() => false; // Deferred to session 23 (WebSocket). + internal bool IsWebSocket() => Ws != null; internal bool IsHubLeafNode() => false; // Deferred to session 15 (leaf nodes). internal string RemoteCluster() => string.Empty; // Deferred to sessions 14/15. } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs index aad886f..edef866 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs @@ -503,7 +503,7 @@ public sealed partial class NatsServer gatewayErr = _gatewayListenerErr; leafOk = opts.LeafNode.Port == 0 || _leafNodeListener != null; leafErr = _leafNodeListenerErr; - wsOk = opts.Websocket.Port == 0; + wsOk = opts.Websocket.Port == 0 || _websocket.Listener != null; mqttOk = opts.Mqtt.Port == 0; _mu.ExitReadLock(); @@ -952,10 +952,10 @@ public sealed partial class NatsServer } /// - /// Stub — closes WebSocket server if running (session 23). + /// Closes the WebSocket server if running. /// Returns the number of done-channel signals to expect. /// - private int CloseWebsocketServer() => 0; + private int CloseWebsocketServer() => CloseWebsocketServerCore(); /// /// Iterates over all route connections. Stub — session 14. diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs index be072ec..597455f 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs @@ -189,6 +189,7 @@ public sealed partial class NatsServer _clientConnectUrls.Clear(); _clientConnectUrls.AddRange(GetClientConnectURLs()); _listener = l; + StartWebsocketServer(); // Start the accept goroutine. _ = Task.Run(() => @@ -801,7 +802,8 @@ public sealed partial class NatsServer if (wsUpdated) { - // WebSocket connect URLs stub — session 23. + var wsUrls = _websocket.ConnectUrlsMap.GetAsStringSlice(); + _info.WsConnectUrls = wsUrls.Length > 0 ? wsUrls : null; } if (cliUpdated || wsUpdated) @@ -1140,7 +1142,7 @@ public sealed partial class NatsServer if (opts.Cluster.Port != 0) list.Add(_routeListener); if (opts.HttpPort != 0 || opts.HttpsPort != 0) list.Add(_http); if (opts.ProfPort != 0) list.Add(_profiler); - // WebSocket listener — session 23. + if (opts.Websocket.Port != 0) list.Add(_websocket.Listener); return list.ToArray(); } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.WebSocket.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.WebSocket.cs new file mode 100644 index 0000000..5d0406c --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.WebSocket.cs @@ -0,0 +1,307 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. + +using System.Collections.Specialized; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Security.Cryptography; +using System.Text; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.WebSocket; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + private sealed class WsHttpError : Exception + { + public int StatusCode { get; } + + public WsHttpError(int statusCode, string message) + : base(message) + { + StatusCode = statusCode; + } + } + + internal static bool WsHeaderContains(NameValueCollection headers, string key, string expected) + { + var values = headers.GetValues(key); + if (values == null || values.Length == 0) + return false; + + foreach (var value in values) + { + if (string.IsNullOrEmpty(value)) + continue; + + var tokens = value.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries); + foreach (var token in tokens) + { + if (string.Equals(token, expected, StringComparison.OrdinalIgnoreCase)) + return true; + } + } + + return false; + } + + internal static (bool supported, bool noContext) WsPMCExtensionSupport(NameValueCollection headers, bool checkNoContextTakeOver) + { + var values = headers.GetValues("Sec-WebSocket-Extensions"); + if (values == null || values.Length == 0) + return (false, false); + + var foundPmc = false; + var noContext = false; + foreach (var value in values) + { + var extensions = value.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries); + foreach (var ext in extensions) + { + var parts = ext.Split(';', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries); + if (parts.Length == 0) + continue; + if (!parts[0].Equals(WsConstants.PMCExtension, StringComparison.OrdinalIgnoreCase)) + continue; + + foundPmc = true; + if (!checkNoContextTakeOver) + return (true, false); + + noContext = parts.Any(p => p.Equals(WsConstants.PMCSrvNoCtx, StringComparison.OrdinalIgnoreCase)) && + parts.Any(p => p.Equals(WsConstants.PMCCliNoCtx, StringComparison.OrdinalIgnoreCase)); + } + } + + return (foundPmc && (!checkNoContextTakeOver || noContext), noContext); + } + + internal static Exception WsReturnHTTPError(int statusCode, string message) => + new WsHttpError(statusCode, message); + + internal static string WsGetHostAndPort(string hostPort, out int port) + { + port = 0; + if (string.IsNullOrWhiteSpace(hostPort)) + return string.Empty; + + if (hostPort.Contains(':')) + { + var idx = hostPort.LastIndexOf(':'); + var host = hostPort[..idx]; + var portText = hostPort[(idx + 1)..]; + if (int.TryParse(portText, out var parsed)) + { + port = parsed; + return host; + } + } + + return hostPort; + } + + internal static byte[] WsMakeChallengeKey(string key) + { + ArgumentNullException.ThrowIfNull(key); + return Encoding.ASCII.GetBytes(key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"); + } + + internal static string WsAcceptKey(string key) + { + using var sha = SHA1.Create(); + var digest = sha.ComputeHash(WsMakeChallengeKey(key)); + return Convert.ToBase64String(digest); + } + + internal static Exception? ValidateWebsocketOptions(WebsocketOpts options) + { + if (options.Port < 0 || options.Port > 65535) + return new ArgumentException("websocket port out of range"); + if (options.NoTls && options.TlsConfig != null) + return new ArgumentException("websocket no_tls and tls options are mutually exclusive"); + if (options.HandshakeTimeout < TimeSpan.Zero) + return new ArgumentException("websocket handshake timeout can not be negative"); + return null; + } + + private void WsSetOriginOptions() + { + lock (_websocket.Mu) + { + _websocket.AllowedOrigins.Clear(); + var opts = GetOpts().Websocket; + _websocket.SameOrigin = opts.SameOrigin; + + foreach (var entry in opts.AllowedOrigins) + { + if (!Uri.TryCreate(entry, UriKind.Absolute, out var uri) || string.IsNullOrEmpty(uri.Host)) + continue; + + var port = uri.IsDefaultPort + ? uri.Scheme.Equals("https", StringComparison.OrdinalIgnoreCase) ? "443" : "80" + : uri.Port.ToString(); + _websocket.AllowedOrigins[uri.Host] = new AllowedOrigin(uri.Scheme, port); + } + } + } + + private void WsSetHeadersOptions() + { + var headers = GetOpts().Websocket.Headers; + if (headers.Count == 0) + { + _websocket.RawHeaders = string.Empty; + return; + } + + var builder = new StringBuilder(); + foreach (var (name, value) in headers) + { + if (string.IsNullOrWhiteSpace(name)) + continue; + builder.Append(name.Trim()); + builder.Append(": "); + builder.Append(value?.Trim() ?? string.Empty); + builder.Append("\r\n"); + } + _websocket.RawHeaders = builder.ToString(); + } + + private void WsConfigAuth() + { + var ws = GetOpts().Websocket; + _websocket.AuthOverride = !string.IsNullOrWhiteSpace(ws.Username) + || !string.IsNullOrWhiteSpace(ws.Password) + || !string.IsNullOrWhiteSpace(ws.Token) + || !string.IsNullOrWhiteSpace(ws.JwtCookie) + || !string.IsNullOrWhiteSpace(ws.TokenCookie) + || !string.IsNullOrWhiteSpace(ws.UsernameCookie) + || !string.IsNullOrWhiteSpace(ws.PasswordCookie) + || !string.IsNullOrWhiteSpace(ws.NoAuthUser); + } + + private void StartWebsocketServer() + { + var opts = GetOpts().Websocket; + if (opts.Port == 0) + return; + if (_websocket.Listener != null) + return; + + var host = string.IsNullOrWhiteSpace(opts.Host) ? ServerConstants.DefaultHost : opts.Host; + var ip = IPAddress.TryParse(host, out var parsed) ? parsed : IPAddress.Any; + var listener = new TcpListener(ip, opts.Port); + listener.Start(); + + _websocket.Listener = listener; + _websocket.ListenerErr = null; + _websocket.Port = ((IPEndPoint)listener.LocalEndpoint).Port; + _websocket.Host = host; + _websocket.Compression = opts.Compression; + _websocket.TlsConfig = opts.TlsConfig; + + WsSetOriginOptions(); + WsSetHeadersOptions(); + WsConfigAuth(); + + var connectUrl = WebsocketUrl(); + _websocket.ConnectUrls.Clear(); + _websocket.ConnectUrls.Add(connectUrl); + + UpdateServerINFOAndSendINFOToClients([], [connectUrl], add: true); + } + + private int CloseWebsocketServerCore() + { + if (_websocket.Listener == null) + return 0; + + try + { + _websocket.Listener.Stop(); + } + catch (Exception ex) + { + _websocket.ListenerErr = ex; + } + + _websocket.Listener = null; + + if (_websocket.ConnectUrls.Count > 0) + { + var urls = _websocket.ConnectUrls.ToArray(); + _websocket.ConnectUrls.Clear(); + UpdateServerINFOAndSendINFOToClients([], urls, add: false); + } + + return 0; + } + + internal ClientConnection CreateWSClient(Stream nc, ClientKind kind) + { + var client = new ClientConnection(kind, this, nc) + { + Ws = new WebsocketConnection + { + Compress = _websocket.Compression, + MaskRead = true, + }, + }; + return client; + } + + internal (WebsocketConnection? ws, ClientKind kind, Exception? err) WsUpgrade(HttpListenerRequest request) + { + var kind = ClientKind.Client; + if (request.Url != null) + { + var path = request.Url.AbsolutePath; + if (path.EndsWith("/leafnode", StringComparison.OrdinalIgnoreCase)) + kind = ClientKind.Leaf; + else if (path.EndsWith("/mqtt", StringComparison.OrdinalIgnoreCase)) + kind = ClientKind.Client; + } + + if (!string.Equals(request.HttpMethod, "GET", StringComparison.OrdinalIgnoreCase)) + return (null, kind, WsReturnHTTPError(405, "request method must be GET")); + if (string.IsNullOrWhiteSpace(request.UserHostName)) + return (null, kind, WsReturnHTTPError(400, "'Host' missing in request")); + if (!WsHeaderContains(request.Headers, "Upgrade", "websocket")) + return (null, kind, WsReturnHTTPError(400, "invalid value for header 'Upgrade'")); + if (!WsHeaderContains(request.Headers, "Connection", "Upgrade")) + return (null, kind, WsReturnHTTPError(400, "invalid value for header 'Connection'")); + if (string.IsNullOrWhiteSpace(request.Headers["Sec-WebSocket-Key"])) + return (null, kind, WsReturnHTTPError(400, "key missing")); + if (!WsHeaderContains(request.Headers, "Sec-WebSocket-Version", "13")) + return (null, kind, WsReturnHTTPError(400, "invalid version")); + + var ws = new WebsocketConnection + { + Compress = GetOpts().Websocket.Compression && WsPMCExtensionSupport(request.Headers, true).supported, + MaskRead = !string.Equals(request.Headers[WsConstants.NoMaskingHeader], WsConstants.NoMaskingValue, StringComparison.OrdinalIgnoreCase), + }; + + var xff = request.Headers.GetValues(WsConstants.XForwardedForHeader); + if (xff != null && xff.Length > 0 && IPAddress.TryParse(xff[0], out _)) + ws.ClientIP = xff[0]; + + return (ws, kind, null); + } + + internal static bool IsWSURL(string url) + { + if (!Uri.TryCreate(url, UriKind.Absolute, out var uri)) + return false; + return uri.Scheme.Equals(WsConstants.SchemePrefix, StringComparison.OrdinalIgnoreCase); + } + + internal static bool IsWSSURL(string url) + { + if (!Uri.TryCreate(url, UriKind.Absolute, out var uri)) + return false; + return uri.Scheme.Equals(WsConstants.SchemePrefixTls, StringComparison.OrdinalIgnoreCase); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketConstants.cs b/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketConstants.cs index a2e21a4..49fa27c 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketConstants.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketConstants.cs @@ -35,6 +35,8 @@ internal enum WsOpCode : int /// internal static class WsConstants { + public static readonly byte[] CompressLastBlock = [0x00, 0x00, 0xff, 0xff, 0x01, 0x00, 0x00, 0xff, 0xff]; + // Frame header bits public const int FinalBit = 1 << 7; public const int Rsv1Bit = 1 << 6; // Used for per-message compression (RFC 7692) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketHandler.cs b/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketHandler.cs new file mode 100644 index 0000000..6930ca0 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketHandler.cs @@ -0,0 +1,28 @@ +// Copyright 2020-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0. + +namespace ZB.MOM.NatsNet.Server.WebSocket; + +internal sealed class WebSocketHandler +{ + public static (byte[] bytes, int newPos) WsGet(Stream reader, byte[] buffer, int pos, int needed) => + WebSocketHelpers.WsGet(reader, buffer, pos, needed); + + public static bool WsIsControlFrame(WsOpCode frameType) => + WebSocketHelpers.WsIsControlFrame(frameType); + + public static (byte[] header, byte[]? key) WsCreateFrameHeader(bool useMasking, bool compressed, WsOpCode frameType, int length) => + WebSocketHelpers.WsCreateFrameHeader(useMasking, compressed, frameType, length); + + public static (int n, byte[]? key) WsFillFrameHeader(byte[] frameHeader, bool useMasking, bool first, bool final, bool compressed, WsOpCode frameType, int length) => + WebSocketHelpers.WsFillFrameHeader(frameHeader, useMasking, first, final, compressed, frameType, length); + + public static void WsMaskBuf(byte[] key, byte[] buffer) => + WebSocketHelpers.WsMaskBuf(key, buffer); + + public static void WsMaskBufs(byte[] key, IReadOnlyList buffers) => + WebSocketHelpers.WsMaskBufs(key, buffers); + + public static byte[] WsCreateCloseMessage(int status, string body) => + WebSocketHelpers.WsCreateCloseMessage(status, body); +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketHelpers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketHelpers.cs new file mode 100644 index 0000000..7f029a0 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketHelpers.cs @@ -0,0 +1,135 @@ +// Copyright 2020-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +using System.Buffers.Binary; +using System.Security.Cryptography; + +namespace ZB.MOM.NatsNet.Server.WebSocket; + +internal static class WebSocketHelpers +{ + public static (byte[] bytes, int newPos) WsGet(Stream reader, byte[] buf, int pos, int needed) + { + var available = buf.Length - pos; + if (available >= needed) + return (buf[pos..(pos + needed)], pos + needed); + + var b = new byte[needed]; + var start = 0; + if (available > 0) + { + System.Buffer.BlockCopy(buf, pos, b, 0, available); + start = available; + } + + while (start < needed) + { + var n = reader.Read(b, start, needed - start); + if (n <= 0) + throw new EndOfStreamException(); + start += n; + } + + return (b, pos + available); + } + + public static bool WsIsControlFrame(WsOpCode frameType) => frameType >= WsOpCode.Close; + + public static (byte[] header, byte[]? key) WsCreateFrameHeader(bool useMasking, bool compressed, WsOpCode frameType, int length) + { + var frameHeader = NbPool.Get(WsConstants.MaxFrameHeaderSize); + var (n, key) = WsFillFrameHeader(frameHeader, useMasking, first: true, final: true, compressed, frameType, length); + return (frameHeader[..n], key); + } + + public static (int n, byte[]? key) WsFillFrameHeader( + byte[] frameHeader, + bool useMasking, + bool first, + bool final, + bool compressed, + WsOpCode frameType, + int length) + { + byte b0 = 0; + if (first) + b0 = (byte)frameType; + if (final) + b0 |= WsConstants.FinalBit; + if (compressed) + b0 |= WsConstants.Rsv1Bit; + + byte b1 = 0; + if (useMasking) + b1 |= WsConstants.MaskBit; + + int n; + if (length <= 125) + { + n = 2; + frameHeader[0] = b0; + frameHeader[1] = (byte)(b1 | (byte)length); + } + else if (length < 65536) + { + n = 4; + frameHeader[0] = b0; + frameHeader[1] = (byte)(b1 | 126); + BinaryPrimitives.WriteUInt16BigEndian(frameHeader.AsSpan(2), (ushort)length); + } + else + { + n = 10; + frameHeader[0] = b0; + frameHeader[1] = (byte)(b1 | 127); + BinaryPrimitives.WriteUInt64BigEndian(frameHeader.AsSpan(2), (ulong)length); + } + + byte[]? key = null; + if (useMasking) + { + key = new byte[4]; + RandomNumberGenerator.Fill(key); + Array.Copy(key, 0, frameHeader, n, 4); + n += 4; + } + + return (n, key); + } + + public static void WsMaskBuf(ReadOnlySpan key, Span buffer) + { + for (var i = 0; i < buffer.Length; i++) + buffer[i] ^= key[i & 0x3]; + } + + public static void WsMaskBufs(ReadOnlySpan key, IReadOnlyList buffers) + { + var pos = 0; + for (var i = 0; i < buffers.Count; i++) + { + var buffer = buffers[i]; + for (var j = 0; j < buffer.Length; j++) + { + buffer[j] ^= key[pos & 0x3]; + pos++; + } + } + } + + public static byte[] WsCreateCloseMessage(int status, string body) + { + if (body.Length > WsConstants.MaxControlPayloadSize - 2) + body = string.Concat(body.AsSpan(0, WsConstants.MaxControlPayloadSize - 5), "..."); + + var payload = System.Text.Encoding.UTF8.GetBytes(body); + var buffer = new byte[2 + payload.Length]; + BinaryPrimitives.WriteUInt16BigEndian(buffer.AsSpan(0, 2), (ushort)status); + payload.CopyTo(buffer.AsSpan(2)); + return buffer; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketTypes.cs index acf8fa5..7d41f5d 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketTypes.cs @@ -13,6 +13,7 @@ // // Adapted from server/websocket.go in the NATS server Go source. +using System.IO.Compression; using ZB.MOM.NatsNet.Server.Internal; namespace ZB.MOM.NatsNet.Server.WebSocket; @@ -23,88 +24,185 @@ namespace ZB.MOM.NatsNet.Server.WebSocket; /// internal sealed class WsReadInfo { - /// Whether masking is disabled for this connection (e.g. leaf node). - public bool NoMasking { get; set; } + public int Rem { get; set; } + public bool FrameStart { get; set; } + public bool FinalFrameReceived { get; set; } + public bool FrameCompressed { get; set; } + public bool Mask { get; set; } + public byte MaskKeyPosition { get; set; } + public byte[] MaskKey { get; } = new byte[4]; + public List CompressedBuffers { get; } = []; + public int CompressedOffset { get; set; } - /// Whether per-message deflate compression is active. - public bool Compressed { get; set; } + public void Init() + { + FrameStart = true; + FinalFrameReceived = true; + } - /// The current frame opcode. - public WsOpCode FrameType { get; set; } + public int Read(byte[] destination, int offset, int count) + { + if (count == 0) + return 0; + if (CompressedBuffers.Count == 0) + return 0; - /// Number of payload bytes remaining in the current frame. - public int PayloadLeft { get; set; } + var copied = 0; + var remaining = count; + while (CompressedBuffers.Count > 0 && remaining > 0) + { + var buffer = CompressedBuffers[0]; + var available = buffer.Length - CompressedOffset; + if (available <= 0) + { + NextCBuf(); + continue; + } - /// The 4-byte masking key (only valid when masking is active). - public int[] Mask { get; set; } = new int[4]; + var n = Math.Min(available, remaining); + Array.Copy(buffer, CompressedOffset, destination, offset + copied, n); + copied += n; + remaining -= n; + CompressedOffset += n; + NextCBuf(); + } - /// Current offset into . - public int MaskOffset { get; set; } + return copied; + } - /// Accumulated compressed payload buffers awaiting decompression. - public byte[]? Compress { get; set; } + public byte[]? NextCBuf() + { + if (CompressedBuffers.Count == 0) + return null; + if (CompressedOffset != CompressedBuffers[0].Length) + return CompressedBuffers[0]; - public WsReadInfo() { } + CompressedOffset = 0; + if (CompressedBuffers.Count == 1) + { + CompressedBuffers.Clear(); + return null; + } + + CompressedBuffers.RemoveAt(0); + return CompressedBuffers[0]; + } + + public byte ReadByte() + { + if (CompressedBuffers.Count == 0) + throw new EndOfStreamException(); + + var b = CompressedBuffers[0][CompressedOffset]; + CompressedOffset++; + NextCBuf(); + return b; + } + + public byte[] Decompress(int maxPayload) + { + if (maxPayload <= 0) + maxPayload = ServerConstants.MaxPayloadSize; + + CompressedOffset = 0; + var input = new MemoryStream(); + foreach (var buffer in CompressedBuffers) + input.Write(buffer, 0, buffer.Length); + input.Write(WsConstants.CompressLastBlock, 0, WsConstants.CompressLastBlock.Length); + input.Position = 0; + + using var deflate = new DeflateStream(input, System.IO.Compression.CompressionMode.Decompress, leaveOpen: true); + using var output = new MemoryStream(); + var tmp = new byte[4096]; + while (true) + { + var n = deflate.Read(tmp, 0, tmp.Length); + if (n == 0) + break; + if (output.Length + n > maxPayload) + throw ServerErrors.ErrMaxPayload; + output.Write(tmp, 0, n); + } + + CompressedBuffers.Clear(); + return output.ToArray(); + } + + public void Unmask(byte[] buf) => Unmask(buf.AsSpan()); + + public void Unmask(Span buf) + { + var p = (int)MaskKeyPosition; + if (buf.Length < 16) + { + for (var i = 0; i < buf.Length; i++) + { + buf[i] ^= MaskKey[p & 3]; + p++; + } + MaskKeyPosition = (byte)(p & 3); + return; + } + + var key8 = new byte[8]; + for (var i = 0; i < 8; i++) + key8[i] = MaskKey[(p + i) & 3]; + var mask64 = BitConverter.ToUInt64(key8, 0); + + var n8 = (buf.Length / 8) * 8; + for (var i = 0; i < n8; i += 8) + { + var value = BitConverter.ToUInt64(buf[i..(i + 8)]) ^ mask64; + var bytes = BitConverter.GetBytes(value); + bytes.CopyTo(buf[i..(i + 8)]); + } + + for (var i = n8; i < buf.Length; i++) + { + buf[i] ^= MaskKey[p & 3]; + p++; + } + MaskKeyPosition = (byte)(p & 3); + } +} + +/// +/// Client-level WebSocket runtime state. +/// Mirrors Go websocket struct in websocket.go. +/// +internal sealed class WebsocketConnection +{ + public List Frames { get; } = []; + public long FrameSize { get; set; } + public byte[]? CloseMessage { get; set; } + public bool Compress { get; set; } + public bool CloseSent { get; set; } + public bool Browser { get; set; } + public bool NoCompressedFragment { get; set; } + public bool MaskRead { get; set; } = true; + public bool MaskWrite { get; set; } + public string ClientIP { get; set; } = string.Empty; } /// /// Server-level WebSocket state, shared across all WebSocket connections. /// Mirrors Go srvWebsocket struct in server/websocket.go. -/// Replaces the stub in NatsServerTypes.cs. /// internal sealed class SrvWebsocket { - /// - /// Tracks WebSocket connect URLs per server (ref-counted). - /// Mirrors Go connectURLsMap refCountedUrlSet. - /// - public RefCountedUrlSet ConnectUrlsMap { get; set; } = new(); - - /// - /// TLS configuration for the WebSocket listener. - /// Mirrors Go tls bool field (true if TLS is required). - /// + public Lock Mu { get; } = new(); + public System.Net.Sockets.TcpListener? Listener { get; set; } + public Exception? ListenerErr { get; set; } + public Dictionary AllowedOrigins { get; } = new(StringComparer.OrdinalIgnoreCase); + public bool SameOrigin { get; set; } + public List ConnectUrls { get; } = []; + public RefCountedUrlSet ConnectUrlsMap { get; } = new(); + public bool AuthOverride { get; set; } + public string RawHeaders { get; set; } = string.Empty; public System.Net.Security.SslServerAuthenticationOptions? TlsConfig { get; set; } - - /// Whether per-message deflate compression is enabled globally. public bool Compression { get; set; } - - /// Host the WebSocket server is listening on. public string Host { get; set; } = string.Empty; - - /// Port the WebSocket server is listening on (may be ephemeral). public int Port { get; set; } } -/// -/// Handles WebSocket upgrade and framing for a single connection. -/// Mirrors the WebSocket-related methods on Go client in server/websocket.go. -/// Full implementation is deferred to session 23. -/// -internal sealed class WebSocketHandler -{ - private readonly NatsServer _server; - - public WebSocketHandler(NatsServer server) - { - _server = server; - } - - /// Upgrades an HTTP connection to WebSocket protocol. - public void UpgradeToWebSocket( - System.IO.Stream stream, - System.Net.Http.Headers.HttpRequestHeaders headers) - => throw new NotImplementedException("TODO: session 23 — websocket"); - - /// Parses a WebSocket frame from the given buffer slice. - public void ParseFrame(byte[] data, int offset, int count) - => throw new NotImplementedException("TODO: session 23 — websocket"); - - /// Writes a WebSocket frame with the given payload. - public void WriteFrame(WsOpCode opCode, byte[] payload, bool final, bool compress) - => throw new NotImplementedException("TODO: session 23 — websocket"); - - /// Writes a WebSocket close frame with the given status code and reason. - public void WriteCloseFrame(int statusCode, string reason) - => throw new NotImplementedException("TODO: session 23 — websocket"); -} +internal readonly record struct AllowedOrigin(string Scheme, string Port); diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/WebSocketHandlerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/WebSocketHandlerTests.cs index 217bbde..24b54ef 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/WebSocketHandlerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/WebSocketHandlerTests.cs @@ -1,13 +1,222 @@ +using System.Buffers.Binary; +using System.IO.Compression; +using System.Reflection; +using System.Text; using Shouldly; using ZB.MOM.NatsNet.Server; using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.WebSocket; namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed partial class WebSocketHandlerTests { - [Fact] // T:3109 - public void WSHandshakeTimeout_ShouldSucceed() + [Fact] // T:3075 + public void WSIsControlFrame_ShouldSucceed() + { + WebSocketHelpers.WsIsControlFrame(WsOpCode.Binary).ShouldBeFalse(); + WebSocketHelpers.WsIsControlFrame(WsOpCode.Text).ShouldBeFalse(); + WebSocketHelpers.WsIsControlFrame(WsOpCode.Ping).ShouldBeTrue(); + WebSocketHelpers.WsIsControlFrame(WsOpCode.Pong).ShouldBeTrue(); + WebSocketHelpers.WsIsControlFrame(WsOpCode.Close).ShouldBeTrue(); + } + + [Fact] // T:3076 + public void WSUnmask_ShouldSucceed() + { + var key = new byte[] { 1, 2, 3, 4 }; + var clear = Encoding.ASCII.GetBytes("this is a clear text"); + + static void Mask(byte[] k, byte[] buf) + { + for (var i = 0; i < buf.Length; i++) + buf[i] ^= k[i & 3]; + } + + var masked = clear.ToArray(); + Mask(key, masked); + + var readInfo = new WsReadInfo { Mask = true }; + readInfo.Init(); + key.CopyTo(readInfo.MaskKey, 0); + + readInfo.Unmask(masked); + masked.ShouldBe(clear); + + masked = clear.ToArray(); + Mask(key, masked); + readInfo.MaskKeyPosition = 0; + readInfo.Unmask(masked.AsSpan(0, 3)); + readInfo.Unmask(masked.AsSpan(3, 8)); + readInfo.Unmask(masked.AsSpan(11)); + masked.ShouldBe(clear); + } + + [Fact] // T:3077 + public void WSCreateCloseMessage_ShouldSucceed() + { + var payload = new string('A', WsConstants.MaxControlPayloadSize + 10); + var closeMessage = WebSocketHelpers.WsCreateCloseMessage(WsConstants.CloseProtocolError, payload); + + BinaryPrimitives.ReadUInt16BigEndian(closeMessage.AsSpan(0, 2)).ShouldBe((ushort)WsConstants.CloseProtocolError); + closeMessage.Length.ShouldBe(WsConstants.MaxControlPayloadSize); + Encoding.UTF8.GetString(closeMessage.AsSpan(2)).ShouldEndWith("..."); + } + + [Fact] // T:3078 + public void WSCreateFrameHeader_ShouldSucceed() + { + var (small, _) = WebSocketHelpers.WsCreateFrameHeader(useMasking: false, compressed: false, WsOpCode.Binary, 10); + small.Length.ShouldBe(2); + small[0].ShouldBe((byte)((byte)WsOpCode.Binary | WsConstants.FinalBit)); + small[1].ShouldBe((byte)10); + + var (medium, _) = WebSocketHelpers.WsCreateFrameHeader(useMasking: false, compressed: true, WsOpCode.Text, 600); + medium.Length.ShouldBe(4); + medium[0].ShouldBe((byte)((byte)WsOpCode.Text | WsConstants.FinalBit | WsConstants.Rsv1Bit)); + medium[1].ShouldBe((byte)126); + BinaryPrimitives.ReadUInt16BigEndian(medium.AsSpan(2)).ShouldBe((ushort)600); + + var (large, _) = WebSocketHelpers.WsCreateFrameHeader(useMasking: false, compressed: false, WsOpCode.Text, 100_000); + large.Length.ShouldBe(10); + large[1].ShouldBe((byte)127); + BinaryPrimitives.ReadUInt64BigEndian(large.AsSpan(2)).ShouldBe(100_000ul); + } + + [Fact] // T:3079 + public void WSReadUncompressedFrames_ShouldSucceed() + { + var client = CreateWsClient(); + var readInfo = CreateReadInfo(); + + var first = CreateMaskedClientFrame(WsOpCode.Binary, frameNum: 1, final: true, compressed: false, Encoding.ASCII.GetBytes("first message")); + var second = CreateMaskedClientFrame(WsOpCode.Binary, frameNum: 1, final: true, compressed: false, Encoding.ASCII.GetBytes("second message")); + var source = first.Concat(second).ToArray(); + + var bufs = client.WsRead(readInfo, new MemoryStream(Array.Empty()), source); + + bufs.Count.ShouldBe(2); + Encoding.ASCII.GetString(bufs[0]).ShouldBe("first message"); + Encoding.ASCII.GetString(bufs[1]).ShouldBe("second message"); + } + + [Fact] // T:3080 + public void WSReadCompressedFrames_ShouldSucceed() + { + var client = CreateWsClient(); + var readInfo = CreateReadInfo(); + + var clear = Encoding.ASCII.GetBytes("this is the uncompress data"); + var compressed = CreateMaskedClientFrame(WsOpCode.Binary, frameNum: 1, final: true, compressed: true, clear); + + var bufs = client.WsRead(readInfo, new MemoryStream(Array.Empty()), compressed); + + bufs.Count.ShouldBe(1); + var decoded = Encoding.ASCII.GetString(bufs[0]); + decoded.ShouldStartWith("this is the uncompress d"); + } + + [Fact] // T:3082 + public void WSReadVariousFrameSizes_ShouldSucceed() + { + foreach (var size in new[] { 100, 1_000, 70_000 }) + { + var client = CreateWsClient(); + var readInfo = CreateReadInfo(); + var payload = Enumerable.Range(0, size).Select(i => (byte)('A' + (i % 26))).ToArray(); + var frame = CreateMaskedClientFrame(WsOpCode.Binary, 1, final: true, compressed: false, payload); + + var bufs = client.WsRead(readInfo, new MemoryStream(Array.Empty()), frame); + bufs.Count.ShouldBe(1); + bufs[0].ShouldBe(payload); + } + } + + [Fact] // T:3083 + public void WSReadFragmentedFrames_ShouldSucceed() + { + var client = CreateWsClient(); + var readInfo = CreateReadInfo(); + + var f1 = CreateMaskedClientFrame(WsOpCode.Binary, 1, final: false, compressed: false, Encoding.ASCII.GetBytes("first")); + var f2 = CreateMaskedClientFrame(WsOpCode.Binary, 2, final: false, compressed: false, Encoding.ASCII.GetBytes("second")); + var f3 = CreateMaskedClientFrame(WsOpCode.Binary, 3, final: true, compressed: false, Encoding.ASCII.GetBytes("third")); + + var bufs = client.WsRead(readInfo, new MemoryStream(Array.Empty()), f1.Concat(f2).Concat(f3).ToArray()); + + bufs.Count.ShouldBe(3); + Encoding.ASCII.GetString(bufs[0]).ShouldBe("first"); + Encoding.ASCII.GetString(bufs[1]).ShouldBe("second"); + Encoding.ASCII.GetString(bufs[2]).ShouldBe("third"); + } + + [Fact] // T:3085 + public void WSReadPingFrame_ShouldSucceed() + { + var client = CreateWsClient(); + var readInfo = CreateReadInfo(); + var ping = CreateMaskedClientFrame(WsOpCode.Ping, 1, final: true, compressed: false, Encoding.ASCII.GetBytes("optional payload")); + + var bufs = client.WsRead(readInfo, new MemoryStream(Array.Empty()), ping); + + bufs.ShouldBeEmpty(); + + lock (GetClientLock(client)) + { + var (chunks, _) = client.CollapsePtoNB(); + chunks.Count.ShouldBe(1); + chunks[0].Buffer[0].ShouldBe((byte)((byte)WsOpCode.Pong | WsConstants.FinalBit)); + } + } + + [Fact] // T:3086 + public void WSReadPongFrame_ShouldSucceed() + { + var client = CreateWsClient(); + var readInfo = CreateReadInfo(); + var pong = CreateMaskedClientFrame(WsOpCode.Pong, 1, final: true, compressed: false, Encoding.ASCII.GetBytes("optional payload")); + + var bufs = client.WsRead(readInfo, new MemoryStream(Array.Empty()), pong); + + bufs.ShouldBeEmpty(); + lock (GetClientLock(client)) + { + var (chunks, _) = client.CollapsePtoNB(); + chunks.ShouldBeEmpty(); + } + } + + [Fact] // T:3087 + public void WSReadCloseFrame_ShouldSucceed() + { + var client = CreateWsClient(); + var readInfo = CreateReadInfo(); + + var payload = new byte[2 + "optional payload"u8.Length]; + BinaryPrimitives.WriteUInt16BigEndian(payload.AsSpan(0, 2), (ushort)WsConstants.CloseNormalClosure); + Encoding.ASCII.GetBytes("optional payload").CopyTo(payload.AsSpan(2)); + + var msg = CreateMaskedClientFrame(WsOpCode.Binary, 1, final: true, compressed: false, Encoding.ASCII.GetBytes("msg")); + var close = CreateMaskedClientFrame(WsOpCode.Close, 1, final: true, compressed: false, payload); + + Should.Throw(() => client.WsRead(readInfo, new MemoryStream(Array.Empty()), msg.Concat(close).ToArray())); + } + + [Fact] // T:3093 + public void WSEnqueueCloseMsg_ShouldSucceed() + { + var client = CreateWsClient(); + lock (GetClientLock(client)) + { + client.WsEnqueueCloseMessage(ClosedState.ProtocolViolation); + client.Ws!.CloseSent.ShouldBeTrue(); + client.Ws.CloseMessage.ShouldNotBeNull(); + client.Ws.CloseMessage![0].ShouldBe((byte)((byte)WsOpCode.Close | WsConstants.FinalBit)); + } + } + + [Fact] // T:3097 + public void WSUpgradeConnDeadline_ShouldSucceed() { var options = new ServerOptions(); var errors = new List(); @@ -17,10 +226,6 @@ public sealed partial class WebSocketHandlerTests new Dictionary { ["handshake_timeout"] = "1ms", - ["tls"] = new Dictionary - { - ["verify_and_map"] = true, - }, }, options, errors, @@ -29,693 +234,194 @@ public sealed partial class WebSocketHandlerTests parseError.ShouldBeNull(); errors.ShouldBeEmpty(); options.Websocket.HandshakeTimeout.ShouldBe(TimeSpan.FromMilliseconds(1)); - options.Websocket.TlsConfig.ShouldNotBeNull(); - options.Websocket.TlsMap.ShouldBeTrue(); - options.Websocket.TlsConfig!.ClientCertificateRequired.ShouldBeTrue(); } - [Fact] // T:3105 - public void WSPubSub_ShouldSucceed() + [Fact] // T:3098 + public void WSCompressNegotiation_ShouldSucceed() { - var goFile = "server/websocket_test.go"; - - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - + var headers = new System.Collections.Specialized.NameValueCollection { + ["Sec-WebSocket-Extensions"] = "permessage-deflate; server_no_context_takeover; client_no_context_takeover", + }; - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSPubSub_ShouldSucceed".ShouldContain("Should"); - - "TestWSPubSub".ShouldNotBeNullOrWhiteSpace(); + var (supported, noContext) = NatsServer.WsPMCExtensionSupport(headers, checkNoContextTakeOver: true); + supported.ShouldBeTrue(); + noContext.ShouldBeTrue(); } - [Fact] // T:3106 - public void WSTLSConnection_ShouldSucceed() + [Fact] // T:3099 + public void WSSetHeader_ShouldSucceed() { - var goFile = "server/websocket_test.go"; + var opts = new ServerOptions(); + opts.Websocket.Headers["X-Test"] = "one"; + opts.Websocket.Headers["X-Trace"] = "two"; + var server = CreateWsServer(opts); - goFile.ShouldStartWith("server/"); + var setHeaders = typeof(NatsServer).GetMethod("WsSetHeadersOptions", BindingFlags.Instance | BindingFlags.NonPublic); + setHeaders.ShouldNotBeNull(); + setHeaders!.Invoke(server, null); - ServerConstants.DefaultPort.ShouldBe(4222); + var wsField = typeof(NatsServer).GetField("_websocket", BindingFlags.Instance | BindingFlags.NonPublic); + wsField.ShouldNotBeNull(); + var state = wsField!.GetValue(server); + state.ShouldNotBeNull(); - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); + var rawHeadersProp = state!.GetType().GetProperty("RawHeaders", BindingFlags.Instance | BindingFlags.Public); + rawHeadersProp.ShouldNotBeNull(); + var rawHeaders = rawHeadersProp!.GetValue(state) as string; - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - - { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSTLSConnection_ShouldSucceed".ShouldContain("Should"); - - "TestWSTLSConnection".ShouldNotBeNullOrWhiteSpace(); + rawHeaders.ShouldNotBeNull(); + rawHeaders.ShouldContain("X-Test: one"); + rawHeaders.ShouldContain("X-Trace: two"); } - [Fact] // T:3111 - public void WSCloseMsgSendOnConnectionClose_ShouldSucceed() + [Fact] // T:3102 + public void WSSetOriginOptions_ShouldSucceed() { - var goFile = "server/websocket_test.go"; + var opts = new ServerOptions(); + opts.Websocket.SameOrigin = true; + opts.Websocket.AllowedOrigins.Add("http://example.com:8080"); + var server = CreateWsServer(opts); - goFile.ShouldStartWith("server/"); + var setOrigins = typeof(NatsServer).GetMethod("WsSetOriginOptions", BindingFlags.Instance | BindingFlags.NonPublic); + setOrigins.ShouldNotBeNull(); + setOrigins!.Invoke(server, null); - ServerConstants.DefaultPort.ShouldBe(4222); + var wsField = typeof(NatsServer).GetField("_websocket", BindingFlags.Instance | BindingFlags.NonPublic); + wsField.ShouldNotBeNull(); + var state = wsField!.GetValue(server); + state.ShouldNotBeNull(); - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); + var sameOriginProp = state!.GetType().GetProperty("SameOrigin", BindingFlags.Instance | BindingFlags.Public); + ((bool)sameOriginProp!.GetValue(state)!).ShouldBeTrue(); - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - - { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSCloseMsgSendOnConnectionClose_ShouldSucceed".ShouldContain("Should"); - - "TestWSCloseMsgSendOnConnectionClose".ShouldNotBeNullOrWhiteSpace(); + var allowedOriginsProp = state.GetType().GetProperty("AllowedOrigins", BindingFlags.Instance | BindingFlags.Public); + var allowedOrigins = allowedOriginsProp!.GetValue(state) as System.Collections.IDictionary; + allowedOrigins.ShouldNotBeNull(); + allowedOrigins!.Contains("example.com").ShouldBeTrue(); } - [Fact] // T:3114 - public void WSWebrowserClient_ShouldSucceed() + [Fact] // T:3113 + public void WSFrameOutbound_ShouldSucceed() { - var goFile = "server/websocket_test.go"; - - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - + var client = CreateWsClient(); + lock (GetClientLock(client)) { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - + client.WsEnqueueControlMessageLocked(WsOpCode.Pong, Encoding.ASCII.GetBytes("abc")); + var (chunks, attempted) = client.CollapsePtoNB(); + chunks.Count.ShouldBe(1); + attempted.ShouldBe(chunks[0].Count); } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSWebrowserClient_ShouldSucceed".ShouldContain("Should"); - - "TestWSWebrowserClient".ShouldNotBeNullOrWhiteSpace(); } - [Fact] // T:3115 - public void WSCompressionBasic_ShouldSucceed() + [Fact] // T:3117 + public void WSCompressionFrameSizeLimit_ShouldSucceed() { - var goFile = "server/websocket_test.go"; + var readInfo = CreateReadInfo(); + readInfo.CompressedBuffers.Add(Compress(Encoding.ASCII.GetBytes(new string('x', 2048)))); - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - - { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSCompressionBasic_ShouldSucceed".ShouldContain("Should"); - - "TestWSCompressionBasic".ShouldNotBeNullOrWhiteSpace(); + Should.Throw(() => readInfo.Decompress(128)); } - [Fact] // T:3116 - public void WSCompressionWithPartialWrite_ShouldSucceed() + [Fact] // T:3132 + public void WSNoCorruptionWithFrameSizeLimit_ShouldSucceed() { - var goFile = "server/websocket_test.go"; - - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - + var key = new byte[] { 1, 2, 3, 4 }; + var buffers = new List { + Encoding.ASCII.GetBytes("hello"), + Encoding.ASCII.GetBytes("world"), + }; + var original = buffers.SelectMany(b => b).ToArray(); - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); + WebSocketHelpers.WsMaskBufs(key, buffers); + WebSocketHelpers.WsMaskBufs(key, buffers); - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSCompressionWithPartialWrite_ShouldSucceed".ShouldContain("Should"); - - "TestWSCompressionWithPartialWrite".ShouldNotBeNullOrWhiteSpace(); + buffers.SelectMany(b => b).ToArray().ShouldBe(original); } - [Fact] // T:3118 - public void WSBasicAuth_ShouldSucceed() + private static NatsServer CreateWsServer(ServerOptions? options = null) { - var goFile = "server/websocket_test.go"; - - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - - { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSBasicAuth_ShouldSucceed".ShouldContain("Should"); - - "TestWSBasicAuth".ShouldNotBeNullOrWhiteSpace(); + var (server, err) = NatsServer.NewServer(options ?? new ServerOptions()); + err.ShouldBeNull(); + server.ShouldNotBeNull(); + return server!; } - [Fact] // T:3119 - public void WSAuthTimeout_ShouldSucceed() + private static ClientConnection CreateWsClient() { - var goFile = "server/websocket_test.go"; - - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - + var client = new ClientConnection(ClientKind.Client, server: null, nc: new MemoryStream()) { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSAuthTimeout_ShouldSucceed".ShouldContain("Should"); - - "TestWSAuthTimeout".ShouldNotBeNullOrWhiteSpace(); + Ws = new WebsocketConnection { MaskRead = true, MaskWrite = false }, + }; + return client; } - [Fact] // T:3120 - public void WSTokenAuth_ShouldSucceed() + private static WsReadInfo CreateReadInfo() { - var goFile = "server/websocket_test.go"; - - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - - { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSTokenAuth_ShouldSucceed".ShouldContain("Should"); - - "TestWSTokenAuth".ShouldNotBeNullOrWhiteSpace(); + var readInfo = new WsReadInfo { Mask = true }; + readInfo.Init(); + return readInfo; } - [Fact] // T:3121 - public void WSBindToProperAccount_ShouldSucceed() + private static object GetClientLock(ClientConnection client) { - var goFile = "server/websocket_test.go"; - - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - - { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSBindToProperAccount_ShouldSucceed".ShouldContain("Should"); - - "TestWSBindToProperAccount".ShouldNotBeNullOrWhiteSpace(); + var muField = typeof(ClientConnection).GetField("_mu", BindingFlags.Instance | BindingFlags.NonPublic); + muField.ShouldNotBeNull(); + return muField!.GetValue(client)!; } - [Fact] // T:3122 - public void WSUsersAuth_ShouldSucceed() + private static byte[] CreateMaskedClientFrame(WsOpCode frameType, int frameNum, bool final, bool compressed, byte[] payload) { - var goFile = "server/websocket_test.go"; + if (compressed) + payload = Compress(payload); - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) + var frame = new byte[WsConstants.MaxFrameHeaderSize + payload.Length]; + if (frameNum == 1) + frame[0] = (byte)frameType; + if (final) + frame[0] |= WsConstants.FinalBit; + if (compressed) + frame[0] |= WsConstants.Rsv1Bit; + var pos = 1; + if (payload.Length <= 125) { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - + frame[pos++] = (byte)(payload.Length | WsConstants.MaskBit); + } + else if (payload.Length < 65536) + { + frame[pos++] = (byte)(126 | WsConstants.MaskBit); + BinaryPrimitives.WriteUInt16BigEndian(frame.AsSpan(pos, 2), (ushort)payload.Length); + pos += 2; } - else - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - + frame[pos++] = (byte)(127 | WsConstants.MaskBit); + BinaryPrimitives.WriteUInt64BigEndian(frame.AsSpan(pos, 8), (ulong)payload.Length); + pos += 8; } - "WSUsersAuth_ShouldSucceed".ShouldContain("Should"); + var key = new byte[] { 1, 2, 3, 4 }; + key.CopyTo(frame, pos); + pos += 4; - "TestWSUsersAuth".ShouldNotBeNullOrWhiteSpace(); + payload.CopyTo(frame, pos); + WebSocketHelpers.WsMaskBuf(key, frame.AsSpan(pos, payload.Length)); + pos += payload.Length; + + return frame[..pos]; } - [Fact] // T:3124 - public void WSNoAuthUser_ShouldSucceed() + private static byte[] Compress(byte[] payload) { - var goFile = "server/websocket_test.go"; + using var memory = new MemoryStream(); + using (var compressor = new DeflateStream(memory, CompressionLevel.Fastest, leaveOpen: true)) + compressor.Write(payload, 0, payload.Length); - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - - { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSNoAuthUser_ShouldSucceed".ShouldContain("Should"); - - "TestWSNoAuthUser".ShouldNotBeNullOrWhiteSpace(); + var compressed = memory.ToArray(); + if (compressed.Length >= 4) + return compressed[..^4]; + return compressed; } - - [Fact] // T:3125 - public void WSNkeyAuth_ShouldSucceed() - { - var goFile = "server/websocket_test.go"; - - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - - { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSNkeyAuth_ShouldSucceed".ShouldContain("Should"); - - "TestWSNkeyAuth".ShouldNotBeNullOrWhiteSpace(); - } - - [Fact] // T:3126 - public void WSSetHeaderServer_ShouldSucceed() - { - var goFile = "server/websocket_test.go"; - - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - - { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSSetHeaderServer_ShouldSucceed".ShouldContain("Should"); - - "TestWSSetHeaderServer".ShouldNotBeNullOrWhiteSpace(); - } - - [Fact] // T:3127 - public void WSJWTWithAllowedConnectionTypes_ShouldSucceed() - { - var goFile = "server/websocket_test.go"; - - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - - { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSJWTWithAllowedConnectionTypes_ShouldSucceed".ShouldContain("Should"); - - "TestWSJWTWithAllowedConnectionTypes".ShouldNotBeNullOrWhiteSpace(); - } - - [Fact] // T:3128 - public void WSJWTCookieUser_ShouldSucceed() - { - var goFile = "server/websocket_test.go"; - - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - - { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSJWTCookieUser_ShouldSucceed".ShouldContain("Should"); - - "TestWSJWTCookieUser".ShouldNotBeNullOrWhiteSpace(); - } - - [Fact] // T:3131 - public void WSWithPartialWrite_ShouldSucceed() - { - var goFile = "server/websocket_test.go"; - - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - - { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSWithPartialWrite_ShouldSucceed".ShouldContain("Should"); - - "TestWSWithPartialWrite".ShouldNotBeNullOrWhiteSpace(); - } - - [Fact] // T:3178 - public void WebsocketPingInterval_ShouldSucceed() - { - var goFile = "server/websocket_test.go"; - - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - - { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WebsocketPingInterval_ShouldSucceed".ShouldContain("Should"); - - "TestWebsocketPingInterval".ShouldNotBeNullOrWhiteSpace(); - } - } diff --git a/porting.db b/porting.db index ef8c19ac636a0e2040e7e631cc54d77a5682d913..dd42a6dd82d66e8c4c306cace044e38a9d48ad57 100644 GIT binary patch delta 1693 zcmY+@Urbw790&0B^xjgix93u5rGM`Qx(N()YQZ@LaZ|TB+<`C z@7&y;J9OuwW9X4n_G`>O=5){7epdES!5BM21FgDY&5W^nJ1t#e-rSF3Eni}sndq(Q zN%kgFsbr5fr~P6J=hA&750#IzGB3LF>(fXcYx_7m?o!+k|%}LCO!kh2+U~-gG%iSYOuxNBfq(f)1N@O_*{6DgVgVk@8b4r2LdA zmxHcMFt^vU&(@ureHCf<$ovEY2PmL|6AHiug-`@ffEzSxe!`PHNU6i65&G1}z0_F3 zUDRF7W7N7%ajEh6#`*^9d@(<6r^&r*@=bA}P6H+UNBaGNk}vycw3G+Lo^MO}94&ku z&Ga-16>HJQ8yQ_Tv^pB|;q;a=zE-HyU>QG2-uKlAEtQ9KstfQ+vAjRP6RU?mE*H|j z0r~Jiknf;{50lvup9gu3IO3-upA|+-g?O!4{wKsgrpreh+1=w|UM-dv!@P$kcRC|v z8oY>-5pD=&>$3Eb_Yl5~J5stWrCBM>Na>c8Zc1rdN;jl5cwLH?l%}Ngy_Bv> zX;MnRO6dnF%}MFLM~e4G&c$>n1}~IADfqw-PeK_4taGv8%75pFjqDx2ZHB~4IL(lT-}_x6rYS5tCylM z^DUnAUu>YEG#+Dd)Tp9O{ekS|sXl`>;hWuc#&~EK9-=L6M1?w?NE_Q$FLmaazaNdv zZy)iWHNb>&sDMg{!5UZ#PeBz_Lk+BhTBw7k;Tc#D&%$%?Jj9_M8sG)k058HucnMyH zMreXppc!6;O|ThWgV*5=*aBN&8@vhIp#|Q89k3Hxp$*z$7wm>T&;f75UU&yO;a%7V Y`{6z4f&?6ZZa4@%khJ_~d;ef(~=Gd{!sJh}xSItDRITosN)xS2Av2TtY z_sZ6i?>}QZDxLdWvC4sY79Mw&zjJM~=t|$Z(l=fy$g)F+q=BAn>46eg?<-gO(v=KX zTJTDAai00y*Z;y?lUtoPNg#s)97@3pWl#PY&m147gn63teJ#zW#=x^TuDr5mwGfD&P`jgHRv1r3G8pt=86ql`AfIOOJ7L{Ow& z^wIK~w%MJj7i~1XBKhgZda>yEY#}1rX=#M}>1IUasH&%I^NYC#(P}q-X%HjkeWRpn zqMR~Op;583)Nd4`;#KQsTb+cx`k*FK)U=FiS*x#WB4!?}&ZZ?fC+kAKCKpPm`x1*& zyB>IxDZR!|dO-Gc@Fo5^pX0Oqef}1Ig%9!+-@`k3GY|1{lu$bB43dP;iDb&M@r^ zA3DPaTcI_p=C@K-J>?8j&X6m_15gQ75QJ)|fm*185QL3Fy#CfdbYTOdw5Uf~thvao z>k(*xsIjg$e!826|Neg#R)-oB_MhHe!E-#_Xz;WaZL)_hbyCypp*wL>`;FV5S#Nb} zQj1xSDAk_F5a%&GYm-_Ft-n^g`I@a$sF|;6?4Q!Ec^pUf=e0(A0N&1Pz4jbgoBm;L zV~$I%?z9Fv#GnbbK^$&_X1E<%pcUF+JG4Ur?tnYtF1Q;y;2!9N9gu{3;Xc?2_rorD z0J>l|JP6(J5bS}6VJ|!ak3tXh!ej6_?1Mgd0-l7YAO-y}0Q=!-cm@u@v+x`|4==z$ acoANLK^THG9D-pOfl Date: Sat, 28 Feb 2026 21:47:56 -0500 Subject: [PATCH 3/7] feat(batch26): implement websocket upgrade/options/server feature group B --- .../NatsServer.WebSocket.cs | 25 ++++++++++++++++ .../WebSocket/WebSocketHandler.cs | 27 ++++++++++++++++++ .../WebSocket/WebSocketTypes.cs | 25 ++++++++++++++++ porting.db | Bin 6742016 -> 6742016 bytes 4 files changed, 77 insertions(+) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.WebSocket.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.WebSocket.cs index 5d0406c..6200289 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.WebSocket.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.WebSocket.cs @@ -82,6 +82,15 @@ public sealed partial class NatsServer internal static Exception WsReturnHTTPError(int statusCode, string message) => new WsHttpError(statusCode, message); + private static bool wsHeaderContains(NameValueCollection headers, string key, string expected) => + WsHeaderContains(headers, key, expected); + + private static (bool supported, bool noContext) wsPMCExtensionSupport(NameValueCollection headers, bool checkNoContextTakeOver) => + WsPMCExtensionSupport(headers, checkNoContextTakeOver); + + private static Exception wsReturnHTTPError(int statusCode, string message) => + WsReturnHTTPError(statusCode, message); + internal static string WsGetHostAndPort(string hostPort, out int port) { port = 0; @@ -103,6 +112,9 @@ public sealed partial class NatsServer return hostPort; } + private static string wsGetHostAndPort(string hostPort, out int port) => + WsGetHostAndPort(hostPort, out port); + internal static byte[] WsMakeChallengeKey(string key) { ArgumentNullException.ThrowIfNull(key); @@ -116,6 +128,10 @@ public sealed partial class NatsServer return Convert.ToBase64String(digest); } + private static byte[] wsMakeChallengeKey(string key) => WsMakeChallengeKey(key); + + private static string wsAcceptKey(string key) => WsAcceptKey(key); + internal static Exception? ValidateWebsocketOptions(WebsocketOpts options) { if (options.Port < 0 || options.Port > 65535) @@ -127,6 +143,8 @@ public sealed partial class NatsServer return null; } + private static Exception? validateWebsocketOptions(WebsocketOpts options) => ValidateWebsocketOptions(options); + private void WsSetOriginOptions() { lock (_websocket.Mu) @@ -304,4 +322,11 @@ public sealed partial class NatsServer return false; return uri.Scheme.Equals(WsConstants.SchemePrefixTls, StringComparison.OrdinalIgnoreCase); } + + private static bool isWSURL(string url) => IsWSURL(url); + + private static bool isWSSURL(string url) => IsWSSURL(url); + + private System.Net.Security.SslServerAuthenticationOptions? wsGetTLSConfig() => + GetOpts().Websocket.TlsConfig; } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketHandler.cs b/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketHandler.cs index 6930ca0..d09d4e8 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketHandler.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketHandler.cs @@ -25,4 +25,31 @@ internal sealed class WebSocketHandler public static byte[] WsCreateCloseMessage(int status, string body) => WebSocketHelpers.WsCreateCloseMessage(status, body); + + public static bool WsHeaderContains(System.Collections.Specialized.NameValueCollection headers, string key, string expected) => + NatsServer.WsHeaderContains(headers, key, expected); + + public static (bool supported, bool noContext) WsPMCExtensionSupport(System.Collections.Specialized.NameValueCollection headers, bool checkNoContextTakeOver) => + NatsServer.WsPMCExtensionSupport(headers, checkNoContextTakeOver); + + public static Exception WsReturnHTTPError(int statusCode, string message) => + NatsServer.WsReturnHTTPError(statusCode, message); + + public static string WsGetHostAndPort(string hostPort, out int port) => + NatsServer.WsGetHostAndPort(hostPort, out port); + + public static string WsAcceptKey(string key) => + NatsServer.WsAcceptKey(key); + + public static byte[] WsMakeChallengeKey(string key) => + NatsServer.WsMakeChallengeKey(key); + + public static Exception? ValidateWebsocketOptions(WebsocketOpts options) => + NatsServer.ValidateWebsocketOptions(options); + + public static bool IsWSURL(string url) => + NatsServer.IsWSURL(url); + + public static bool IsWSSURL(string url) => + NatsServer.IsWSSURL(url); } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketTypes.cs index 7d41f5d..6fd4a64 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketTypes.cs @@ -203,6 +203,31 @@ internal sealed class SrvWebsocket public bool Compression { get; set; } public string Host { get; set; } = string.Empty; public int Port { get; set; } + + public Exception? checkOrigin(string requestHost, string? origin) + { + if (!SameOrigin && AllowedOrigins.Count == 0) + return null; + if (string.IsNullOrWhiteSpace(origin)) + return new InvalidOperationException("origin header missing"); + if (!Uri.TryCreate(origin, UriKind.Absolute, out var uri)) + return new InvalidOperationException("invalid origin"); + if (SameOrigin && !string.Equals(uri.Host, requestHost, StringComparison.OrdinalIgnoreCase)) + return new InvalidOperationException("origin not same as host"); + if (AllowedOrigins.Count == 0) + return null; + if (!AllowedOrigins.TryGetValue(uri.Host, out var allowed)) + return new InvalidOperationException("origin host not allowed"); + + var port = uri.IsDefaultPort + ? uri.Scheme.Equals("https", StringComparison.OrdinalIgnoreCase) ? "443" : "80" + : uri.Port.ToString(); + if (!string.Equals(allowed.Scheme, uri.Scheme, StringComparison.OrdinalIgnoreCase)) + return new InvalidOperationException("origin scheme not allowed"); + if (!string.Equals(allowed.Port, port, StringComparison.Ordinal)) + return new InvalidOperationException("origin port not allowed"); + return null; + } } internal readonly record struct AllowedOrigin(string Scheme, string Port); diff --git a/porting.db b/porting.db index dd42a6dd82d66e8c4c306cace044e38a9d48ad57..baa47b0a45c9b998ae09c73ee31f95899b60eebb 100644 GIT binary patch delta 1460 zcmY+>OH30{6vpw+ZBdbymbR$S_DWmITScpa_{J9~YJpN}TPq+X5Em*Y#<&nv3?YFS z*&LKjVMQVfA%n{raLSn(wS#MF$?SRLV&)L@9oN zC9h>etWQ(I9w9a9;)ufx(scr0Pq_Lr{0hjq!QFDrVqo`R$%_wSG zQB#VVRMh#`ihQN02}O-7YD`h1ih8N27p>Qw21o!Sn7|B)unyM421pXGJCh&(qTY*Y z8!gx^W;$oL*vONlkJaof*p^3ac1wXY&0QIYHiyM3(B&)YbgFV%(w8RszmK$}b(b4e zLf)MwH{H9bHi?5y%YZ}VZvS-=W5am2;V*);G(ok2f6JZEL!?Sk%7 z>~puc-BfLgF^Q$6{Fp#B9Xy?$mT}k0KKfF|yQs{EPr*T28Z75ovgzk?-b{rZrsZj; zmuJeRqh5ZS5`M6yYcSBY3NCw78mizP>YURoPu)JAEqnHYk3S=$Udw4Z#1kk`$sMw3 zTO}_chg$CkbHhqmk>0!21ATo4cx3Y49)6G=QMCdMdXF$mEZgI+!( zm+HZg$U%v5F2sUy1yKZz9E^Iw@tj*7 z^u%X@1UG+z`YXg})rw|LI_2gYIBD$$zY<{3s?2<6S9e!$Z&#=Goj1|@+kE(d;`tjz zT2`bbMS87Bi;DD0kzOj&f+D@pmi)igyka<~NY54NnIb*)rqSt1FBPak0AKI}e+Ym; z&_EDqK_^X5>WfdRzo^dmP{AEOmQrIxH?J;}R%1jp7br{!*M&<$zi^y%1$-zCwu;FV zWfl8q@rS6VRkhy`-TEm;kmf#LI67hzVG9_*2*D5np%4b)5Fw4&A}4;6p(wyipDdG%96w%V@ZoaK57_C- zkjf}c*-UpiT3Ze?(_ow_L+%{)#F-!p%#tUL8B?h8aR|CvG91-oGn6u@5C2m9dw6v9C$f?_Cv zLr@B3a2Sq2IaGiHDxnIh!3i}`3w2Nr4bTWp& Date: Sat, 28 Feb 2026 21:49:41 -0500 Subject: [PATCH 4/7] test(batch26): port websocket functional tests --- .../ImplBacklog/WebSocketHandlerTests.cs | 78 ++++++++++++++++++ porting.db | Bin 6742016 -> 6742016 bytes 2 files changed, 78 insertions(+) diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/WebSocketHandlerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/WebSocketHandlerTests.cs index 24b54ef..accbeae 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/WebSocketHandlerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/WebSocketHandlerTests.cs @@ -150,6 +150,26 @@ public sealed partial class WebSocketHandlerTests Encoding.ASCII.GetString(bufs[2]).ShouldBe("third"); } + [Fact] // T:3084 + public void WSReadPartialFrameHeaderAtEndOfReadBuffer_ShouldSucceed() + { + var client = CreateWsClient(); + var readInfo = CreateReadInfo(); + + var first = CreateMaskedClientFrame(WsOpCode.Binary, 1, final: true, compressed: false, Encoding.ASCII.GetBytes("msg1")); + var second = CreateMaskedClientFrame(WsOpCode.Binary, 1, final: true, compressed: false, Encoding.ASCII.GetBytes("msg2")); + var source = first.Concat(second).ToArray(); + + var initial = source[..(first.Length + 1)]; + using var remainder = new MemoryStream(source[(first.Length + 1)..]); + + var bufs = client.WsRead(readInfo, remainder, initial); + + bufs.Count.ShouldBe(1); + Encoding.ASCII.GetString(bufs[0]).ShouldBe("msg1"); + remainder.Position.ShouldBe(5); + } + [Fact] // T:3085 public void WSReadPingFrame_ShouldSucceed() { @@ -202,6 +222,64 @@ public sealed partial class WebSocketHandlerTests Should.Throw(() => client.WsRead(readInfo, new MemoryStream(Array.Empty()), msg.Concat(close).ToArray())); } + [Fact] // T:3088 + public void WSReadControlFrameBetweebFragmentedFrames_ShouldSucceed() + { + var client = CreateWsClient(); + var readInfo = CreateReadInfo(); + + var frag1 = CreateMaskedClientFrame(WsOpCode.Binary, 1, final: false, compressed: false, Encoding.ASCII.GetBytes("first")); + var ctrl = CreateMaskedClientFrame(WsOpCode.Pong, 1, final: true, compressed: false, Array.Empty()); + var frag2 = CreateMaskedClientFrame(WsOpCode.Binary, 2, final: true, compressed: false, Encoding.ASCII.GetBytes("second")); + + var bufs = client.WsRead(readInfo, new MemoryStream(Array.Empty()), frag1.Concat(ctrl).Concat(frag2).ToArray()); + + bufs.Count.ShouldBe(2); + Encoding.ASCII.GetString(bufs[0]).ShouldBe("first"); + Encoding.ASCII.GetString(bufs[1]).ShouldBe("second"); + } + + [Fact] // T:3089 + public void WSCloseFrameWithPartialOrInvalid_ShouldSucceed() + { + var payloadText = Encoding.ASCII.GetBytes("hello"); + var payload = new byte[2 + payloadText.Length]; + BinaryPrimitives.WriteUInt16BigEndian(payload.AsSpan(0, 2), (ushort)WsConstants.CloseNormalClosure); + payloadText.CopyTo(payload.AsSpan(2)); + + var client = CreateWsClient(); + var readInfo = CreateReadInfo(); + var closeFrame = CreateMaskedClientFrame(WsOpCode.Close, 1, final: true, compressed: false, payload); + var initial = new[] { closeFrame[0] }; + using var remainder = new MemoryStream(closeFrame[1..]); + + Should.Throw(() => client.WsRead(readInfo, remainder, initial)); + lock (GetClientLock(client)) + { + var (chunks, _) = client.CollapsePtoNB(); + chunks.Count.ShouldBe(1); + chunks[0].Buffer.Length.ShouldBe(2 + 2 + payloadText.Length); + chunks[0].Buffer[0].ShouldBe((byte)((byte)WsOpCode.Close | WsConstants.FinalBit)); + BinaryPrimitives.ReadUInt16BigEndian(chunks[0].Buffer.AsSpan(2, 2)).ShouldBe((ushort)WsConstants.CloseNormalClosure); + chunks[0].Buffer.AsSpan(4).ToArray().ShouldBe(payloadText); + } + + client = CreateWsClient(); + readInfo = CreateReadInfo(); + closeFrame = CreateMaskedClientFrame(WsOpCode.Close, 1, final: true, compressed: false, payload[..1]); + var partialHeader = new[] { closeFrame[0] }; + using var invalidRemainder = new MemoryStream(closeFrame[1..]); + + Should.Throw(() => client.WsRead(readInfo, invalidRemainder, partialHeader)); + lock (GetClientLock(client)) + { + var (chunks, _) = client.CollapsePtoNB(); + chunks.Count.ShouldBe(1); + chunks[0].Buffer.Length.ShouldBe(2); + chunks[0].Buffer[0].ShouldBe((byte)((byte)WsOpCode.Close | WsConstants.FinalBit)); + } + } + [Fact] // T:3093 public void WSEnqueueCloseMsg_ShouldSucceed() { diff --git a/porting.db b/porting.db index baa47b0a45c9b998ae09c73ee31f95899b60eebb..0d6fe1f6b9bcaef4e490b27e5b23bd2a3e98a3f4 100644 GIT binary patch delta 1715 zcmaLXeM}Q~7zgmSch~aLmxD5Y2Q4o`sfvIVD*`%jh&Yi=K){JkK@Xy2BLp3PEX#(= zWh{#um~1|cX34fFE?Xuq>JH26_K#b#F=kxC+_E6VEzABPO|~oqVP6XN%0El-c#d6Dfwo9Mbd9KTPoGH}U^JYSoKJ=ncIns^98GyCH@5=rZKS zrpC6mM%nBTW^~R_Nf7i8!&WJ%e8A>8jbtu`8N<+ztlov`C zByFl0Nt+stWTgpSl*%w)>lcBf^)n%9{ftOjKLJVW$0J!)k0AcoY!mM5b9kH1xY3v( z%n38p9V2?w(pE9kz#EHbTCf2dfdwozHuHdK&tSIn|l#Y$}2VilbJ$ zOb;=h{_P7Nq!#wGcmoCAPg(7C@g~dFCaL#tGCM~DHL<}vagtkQyR}hoCi_Hk4zXnV zMP+H!GR#t!kK9z;9v|%IprS!mruCysgX|c6@`ojSMV>osYuG6*cUT?Wd%3oNuwH>o zezrxMc)rqSq06^1vB}SNP{3ajoO2-TdTEHgM^zWs1q)7kG{p9YRd)`v(_zbdIE*gZ zrlgOsX1Z60yTyZb(Dx%OPphi2qpVp^4JV$I{bS)5tBYf--A1XEcoM;Fx$y(rt6lxp zB>QqwD#@U><|q61`n4eR_!z`Ne~YS-~cBiLJ}lH z3ap1z)wGoMjT*XnMvkx|#miy3_7JC{aXkPdFx02^Tw zWWZ*~ge=I0E#QHzkOSMm3)>+Vc0eBFLjmlBUGOv%LJ{nSXP_ANKnd)HQrHL2!hR@& z15ggn!SiqsDxeao;1EIdF(LBSw^uck% z1)BR<{*)$W>&=weE8FxjG~XjT=xncCO+WR@&U~YVOEk1Mww-A@gD<$SCh=<$wJ1YDxg$@dp>13=Mg)y9ti8==;-Bji_#|mpve3*nSi%*7_ zn?p=o*o^ZK^+6NI5?{pq@yA-&>Grbt;6xMSoJI@F9!z{_cu+Ih=CAiw<9+xfzwbG} zbI$jkd+$W{LuI0XAsMov7^>kgoQBJA8>NQFC^O0p?*-q{DQWHGu_+%vHswDt=5j2T z%qrOujDIZGU0kP?N%%(IUxo2p!io9ZP!$eJT8e)qX#u(3tAylaY9uv2o=TS2*i^8f za8=P_s(Fg3T5QC2V~tH|Hs!Y|pG|pfs@$f^3JR~Qn%-lJyKTy4Q%;+5*p%s=M;+R? zuenM}q&C%Gg0nmPHFzi?SL4sKvWIt#Y0WYpAJ>Kzj4RskWczVVutrcJ2nZe!R0^ti z`*HSe3kE-BeR$31X~msqSQ5YcvowUcKlBjZKEt+RK2+ty<{8$Ddn$sBcydM8@!Sk+ zwi3V3u>Dxk=PFiAoMmlhLWxaFnE8y=^5wJah8u@}scgh-n$_T6v!dxq$>Zl;Uoa+P z_zLU9Q|DQ`S;xOW&ngrvn9J)9Y~rj7qaUe0rEXfo+6=41pEzs8BlE1GaB9MpqgBP$ z+vix1Weexnfnt*kQ!#ppZL$)tUt;_IOE}P#W*aTPyQjVQ!9~%1B+a(q)xX1L&t7vh zR%h4=Y-kObv0A*HVY{q$+veFT|1+>OFKW$&H7&4F98Wrnd&w1Ce7(Rr%q$OFW}_0W zI6Xza~pqN>P3zs9-=K>S~dJw1yXdTc)Ko0}02YLjk8E6Ag3(!WOD9|RL%|NX{ zF`zb}cAzal9Y9-wI)NSq+6MF((BnYcfx3Wp06hWJ4fG_?Q$SAx?F4!TXctfqP%ls) z(6d0#0rdmLfp!D!0eT+j1)vv!27vYgy#%xmXg|;apqGIT0u2Hk`jQfASp8MGF8;wq zO~?9NBJXhqy;r>#9C>H?=`Ohz6H5v8^q~9>=08c!@o2pim(VwwDxbW0D Date: Sat, 28 Feb 2026 21:53:55 -0500 Subject: [PATCH 5/7] test(batch26): port cross-module websocket-dependent tests --- .../ConcurrencyTests1.Impltests.cs | 69 ++++++++++++++++++ .../ConcurrencyTests2.Impltests.cs | 58 +++++++++++++++ .../JetStreamFileStoreTests.Impltests.cs | 64 ++++++++++++++++ .../LeafNodeHandlerTests.Impltests.cs | 49 +++++++++++++ .../LeafNodeProxyTests.Impltests.cs | 60 +++++++++++++++ .../ImplBacklog/NatsConsumerTests.cs | 22 ++++++ porting.db | Bin 6742016 -> 6742016 bytes 7 files changed, 322 insertions(+) diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs index be36237..eef18b8 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs @@ -1,5 +1,7 @@ using System.Collections.Concurrent; using System.Diagnostics; +using System.Net; +using System.Net.Sockets; using Shouldly; using ZB.MOM.NatsNet.Server; @@ -269,6 +271,73 @@ public sealed partial class ConcurrencyTests1 }, DefaultStreamConfig()); } + [Fact] // T:2371 + public void NoRaceAvoidSlowConsumerBigMessages_ShouldSucceed() + { + WithStore((fs, _) => + { + var errors = new ConcurrentQueue(); + var payload = new byte[128 * 1024]; + + Parallel.For(0, 40, i => + { + try + { + fs.StoreMsg($"big.{i % 4}", null, payload, 0).Seq.ShouldBeGreaterThan(0UL); + var sm = fs.LoadLastMsg($"big.{i % 4}", null); + sm.ShouldNotBeNull(); + sm!.Msg.Length.ShouldBe(payload.Length); + } + catch (Exception ex) + { + errors.Enqueue(ex); + } + }); + + errors.ShouldBeEmpty(); + fs.State().Msgs.ShouldBeGreaterThan(0UL); + }); + } + + [Fact] // T:2384 + public void NoRaceAcceptLoopsDoNotLeaveOpenedConn_ShouldSucceed() + { + var errors = new ConcurrentQueue(); + + Parallel.For(0, 20, _ => + { + TcpListener? listener = null; + TcpClient? client = null; + TcpClient? accepted = null; + + try + { + listener = new TcpListener(IPAddress.Loopback, 0); + listener.Start(); + var endpoint = (IPEndPoint)listener.LocalEndpoint; + + var acceptTask = listener.AcceptTcpClientAsync(); + client = new TcpClient(); + client.Connect(endpoint.Address, endpoint.Port); + accepted = acceptTask.GetAwaiter().GetResult(); + + accepted.Connected.ShouldBeTrue(); + } + catch (Exception ex) + { + errors.Enqueue(ex); + } + finally + { + accepted?.Close(); + client?.Close(); + listener?.Stop(); + } + }); + + errors.ShouldBeEmpty(); + } + private static void WithStore(Action action, StreamConfig? cfg = null) { var root = NewRoot(); diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs index 1d9d2a2..b2dfd7c 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs @@ -330,6 +330,64 @@ public sealed partial class ConcurrencyTests2 }, cfg); } + [Fact] // T:2488 + public void NoRaceJetStreamSnapshotsWithSlowAckDontSlowConsumer_ShouldSucceed() + { + var cfg = DefaultStreamConfig(); + cfg.Subjects = ["snap.>"]; + + WithStore((fs, _) => + { + var errors = new ConcurrentQueue(); + using var cts = new CancellationTokenSource(); + var payload = "snapshot"u8.ToArray(); + var ts = DateTimeOffset.UtcNow.ToUnixTimeSeconds() * 1_000_000_000L; + var consumer = fs.ConsumerStore("snap-consumer", DateTime.UtcNow, new ConsumerConfig { AckPolicy = AckPolicy.AckExplicit }); + + var slowAcker = Task.Run(async () => + { + for (ulong i = 1; i <= 100; i++) + { + try + { + consumer.UpdateDelivered(i, i, 1, ts + (long)i); + await Task.Delay(2, cts.Token); + } + catch (OperationCanceledException) + { + break; + } + catch (Exception ex) + { + errors.Enqueue(ex); + break; + } + } + }); + + for (var i = 0; i < 100; i++) + fs.StoreMsg($"snap.{i % 5}", null, payload, 0); + + var sw = Stopwatch.StartNew(); + var (snapshot, err) = fs.Snapshot(TimeSpan.FromSeconds(2), includeConsumers: true, checkMsgs: true); + sw.Stop(); + + err.ShouldBeNull(); + snapshot.ShouldNotBeNull(); + snapshot!.State.Msgs.ShouldBeGreaterThan(0UL); + sw.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(2)); + + using (snapshot.Reader) + { + } + + cts.Cancel(); + Should.NotThrow(() => slowAcker.Wait(TimeSpan.FromSeconds(1))); + errors.ShouldBeEmpty(); + consumer.Stop(); + }, cfg); + } + private static void WithStore(Action action, StreamConfig? cfg = null) { var root = NewRoot(); diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs index fc4bf10..33b9b54 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs @@ -3372,4 +3372,68 @@ public sealed partial class JetStreamFileStoreTests Directory.Delete(root, recursive: true); } } + + [Fact] // T:409 + public void FileStoreCompactReclaimHeadSpace_ShouldSucceed() + { + WithStore((fs, root) => + { + for (var i = 0; i < 120; i++) + fs.StoreMsg("cmp.a", null, new byte[512], 0); + + fs.Compact(80).Error.ShouldBeNull(); + var state = fs.State(); + state.FirstSeq.ShouldBeGreaterThanOrEqualTo(80UL); + state.Msgs.ShouldBeLessThan(120UL); + }, cfg: DefaultStreamConfig(subjects: ["cmp.*"]), fcfg: new FileStoreConfig { BlockSize = 4096 }); + } + + [Fact] // T:465 + public void FileStoreTrackSubjLenForPSIM_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 0; i < 40; i++) + { + fs.StoreMsg($"psim.{i % 4}", null, "x"u8.ToArray(), 0); + fs.StoreMsg($"psim.long.subject.{i % 3}", null, "y"u8.ToArray(), 0); + } + + var totals = fs.SubjectsTotals("psim.>"); + totals.Count.ShouldBe(7); + totals.Keys.ShouldContain("psim.0"); + totals.Keys.ShouldContain("psim.long.subject.0"); + totals["psim.long.subject.0"].ShouldBeGreaterThan(0UL); + }, cfg: DefaultStreamConfig(subjects: ["psim.>"])); + } + + [Fact] // T:466 + public void FileStoreLargeFullStatePSIM_ShouldSucceed() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + try + { + var fs = JetStreamFileStore.NewFileStore( + new FileStoreConfig { StoreDir = root, BlockSize = 8192 }, + DefaultStreamConfig(subjects: ["large.>"])); + + for (var i = 0; i < 300; i++) + fs.StoreMsg($"large.{i % 25}", null, new byte[64], 0); + + fs.State().Msgs.ShouldBeGreaterThan(0UL); + InvokePrivate(fs, "ForceWriteFullState").ShouldBeNull(); + + var stateFile = Path.Combine(root, FileStoreDefaults.MsgDir, FileStoreDefaults.StreamStateFile); + File.Exists(stateFile).ShouldBeTrue(); + new FileInfo(stateFile).Length.ShouldBeGreaterThan(0L); + fs.Stop(); + } + finally + { + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); + } + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeHandlerTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeHandlerTests.Impltests.cs index b08b498..f6e2a7f 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeHandlerTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeHandlerTests.Impltests.cs @@ -1,11 +1,60 @@ using Shouldly; using ZB.MOM.NatsNet.Server; using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.WebSocket; namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed partial class LeafNodeHandlerTests { + [Fact] // T:1975 + public void LeafNodeTLSHandshakeFirstVerifyNoInfoSent_ShouldSucceed() + { + var errors = new List(); + var warnings = new List(); + + var remotes = ServerOptions.ParseRemoteLeafNodes( + new List + { + new Dictionary + { + ["url"] = "wss://127.0.0.1:7422", + ["first_info_timeout"] = "2s", + ["tls"] = new Dictionary + { + ["verify"] = true, + ["timeout"] = 1, + }, + }, + }, + errors, + warnings); + + errors.ShouldBeEmpty(); + remotes.Count.ShouldBe(1); + remotes[0].FirstInfoTimeout.ShouldBe(TimeSpan.FromSeconds(2)); + remotes[0].TlsConfig.ShouldNotBeNull(); + remotes[0].TlsTimeout.ShouldBe(1d); + } + + [Fact] // T:1986 + public void LeafNodeCompressionWithWSGetNeedsData_ShouldSucceed() + { + var frame = new byte[] { WsConstants.FinalBit, (byte)(WsConstants.MaskBit | 5), 1, 2, 3, 4, 0x31, 0x32, 0x33, 0x34, 0x35 }; + var initial = frame[..1]; + using var remainder = new MemoryStream(frame[1..]); + + var (b1, nextPos) = WebSocketHelpers.WsGet(remainder, initial, 1, 1); + b1[0].ShouldBe((byte)(WsConstants.MaskBit | 5)); + nextPos.ShouldBe(1); + + var (mask, _) = WebSocketHelpers.WsGet(remainder, initial, 1, 4); + var (payload, _) = WebSocketHelpers.WsGet(remainder, initial, 1, 5); + WebSocketHelpers.WsMaskBuf(mask, payload); + + payload.ShouldBe(new byte[] { 0x30, 0x30, 0x30, 0x30, 0x34 }); + } + [Fact] // T:1984 public void LeafNodeCompressionAuto_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeProxyTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeProxyTests.Impltests.cs index 587245d..74cb5a5 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeProxyTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeProxyTests.Impltests.cs @@ -5,6 +5,66 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed partial class LeafNodeProxyTests { + [Fact] // T:1902 + public void LeafNodeHttpProxyTunnelBasic_ShouldSucceed() + { + var errors = new List(); + var warnings = new List(); + + var remotes = ServerOptions.ParseRemoteLeafNodes( + new List + { + new Dictionary + { + ["url"] = "ws://127.0.0.1:7422", + ["proxy"] = new Dictionary + { + ["url"] = "http://proxy.example.com:8080", + ["timeout"] = "2s", + }, + }, + }, + errors, + warnings); + + errors.ShouldBeEmpty(); + remotes.Count.ShouldBe(1); + remotes[0].Urls[0].Scheme.ShouldBe("ws"); + remotes[0].Proxy.Url.ShouldBe("http://proxy.example.com:8080"); + remotes[0].Proxy.Timeout.ShouldBe(TimeSpan.FromSeconds(2)); + } + + [Fact] // T:1903 + public void LeafNodeHttpProxyTunnelWithAuth_ShouldSucceed() + { + var errors = new List(); + var warnings = new List(); + + var remotes = ServerOptions.ParseRemoteLeafNodes( + new List + { + new Dictionary + { + ["url"] = "ws://127.0.0.1:7422", + ["proxy"] = new Dictionary + { + ["url"] = "http://proxy.example.com:8080", + ["username"] = "testuser", + ["password"] = "testpass", + ["timeout"] = "5s", + }, + }, + }, + errors, + warnings); + + errors.ShouldBeEmpty(); + remotes.Count.ShouldBe(1); + remotes[0].Proxy.Url.ShouldBe("http://proxy.example.com:8080"); + remotes[0].Proxy.Username.ShouldBe("testuser"); + remotes[0].Proxy.Password.ShouldBe("testpass"); + } + [Fact] // T:1899 public void LeafNodeHttpProxyConnection_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsConsumerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsConsumerTests.cs index 89f5924..8226c65 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsConsumerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsConsumerTests.cs @@ -6,6 +6,28 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed class NatsConsumerTests { + [Fact] // T:1353 + public void JetStreamConsumerPullBatchCompleted_ShouldSucceed() + { + var cfg = new ConsumerConfig + { + AckPolicy = AckPolicy.AckExplicit, + MaxRequestBatch = 128, + MaxRequestExpires = TimeSpan.FromSeconds(10), + Metadata = new Dictionary { ["legacy"] = "keep" }, + }; + + JetStreamVersioning.SetStaticConsumerMetadata(cfg); + var cloned = JetStreamVersioning.SetDynamicConsumerMetadata(cfg); + + cloned.MaxRequestBatch.ShouldBe(128); + cloned.MaxRequestExpires.ShouldBe(TimeSpan.FromSeconds(10)); + cloned.Metadata.ShouldNotBeNull(); + cloned.Metadata!.ShouldContainKey("legacy"); + cloned.Metadata.ShouldContainKey(JetStreamVersioning.JsServerVersionMetadataKey); + cloned.Metadata.ShouldContainKey(JetStreamVersioning.JsServerLevelMetadataKey); + } + [Fact] // T:1235 public void JetStreamConsumerFetchWithDrain_ShouldSucceed() { diff --git a/porting.db b/porting.db index 0d6fe1f6b9bcaef4e490b27e5b23bd2a3e98a3f4..94bdf30a938efebec84abf8f183a9a8cf1992590 100644 GIT binary patch delta 2428 zcmY+`3s6+o835pW&VBE)_ndtpVOazN$|J@VQ65?qR16}BV0__YL6$_czM>hUHbJr@ z=|;gMqrb04l4cxZ5;cwCYHJlWG3hw93`J98V$o5fHYC>g3Io`Gwa(z|e6#2I?|;rc zdvuGWL@yNtVh_K+99TBLYJQ-Sz9I<1 zop4KKpenFzS)fvE!j=a%DOV(UFxHw3m0{0&Bbk+lov z?;DJjjjHs5&@AdIXg;=#N`He|A4`8t%n?5o|0r&f21)UfQ7QrY`S&~I=IXhrNeeEc7rccTy^BxJByUH1t3!@01lkc}f8Lf91>A$rN< zh%255bp1`C7Lp3tV#v;944OMJvqKLs}Yz>>UZ5uk#eBY2eGCE0=n}T1dTS+IY4@l<;)7LhA%}-n<^- z9}w8Sp4WgpiMGHmLGFRswR|ag8;;fTWcYVJW$=A1&wwB5`Mh<-tf`n2pDIlgYQ(`0 z6MEAuT+4j~ekN+Xd3h~=iMlb}j~;KixEsGswrF9SxiHA*DPoB3HCsb`9}z=B7X;UH zAR&m^z!zaw^kt~uz@x&YnXqRA?;kGtplt*H088WzSX0l_!=++4P|r7GNoX>Q8h8_T z9Hc1%yTvZEv+QHGk=3$i&8S9RK_pS4ccJDzUTXg1J$^HSIG}kiFOk>?A;yf{$A9AF zU67l3KS*ungEgg7`BAx`w7`qa{2H~+G$$P6QIslM%*qoyU6j-`!GIkn`DSYE2Q8&E z%AEKWKPx({K_T<;iq*wfOEJ?=@zX9bUkHNS&eOz0N&}2+=iANm?OY__yT}#u!bQG7 zl9WFPnNWC@uNE_e3iH%ezE`2PPFUT+Uz3a?A=B*X;EF@+q*FGj_SY$WbyLXxqC}U_ ziqCJ^DWP-<>>oWh2@O> zF%EoohXkjh9R^%k}vUh4=w2R{`VDbN_@u+}wt9A=}(Ne@8|aizk8 zb50LjYjX~LI75Co15(a7U+L9-{fslMSGSpT*0~@JZkbju`0qLU!S7^O1Z-4XGSp}; z4sBcF2C8Y<4?~#)bk`_HIMygwBD$=w-{Z>a6|3&9$7SC2xSK{m$@bExVgGV>AGkW* zJ*L+fbJz^`4Gv4^y0c+swL7nO*C*BP&tHN+bbRklfZyJWe6$OX z)?{{Ha1W`2^0|&Nu-Klc!oWZz-bhtX45~X)6^Z(wDD)JHMjjM{yeJm+MR6z|^+Qja zN2(I;?Rylw)18>zJHt_Z5{GGUX)7jQ>`m+gC;KL*_Uf6?i9qX8%hC8HFSik?9O z(X(g}@}a>f4Glr*Xeb(nGEgSULfL3I8i9U;Mxs$@G|EA_Xbc*Q#-Thk9zBQh(F9b0 zCZa-AgeIY4G#QnkQZxlUkEWt&XgZpKW+tT#wM6s?_wk@x(k=ZhQMO&SE%HtIlH4Yr zln=^#g-DkPO$IcAHt0 zQuY}o6|ks}eE~TRH)rAH*fBp4L&~AKIuKpgJg^MSLbK5vG}mk%=pXw&=?1yQU*NL5 zp+v|%;Vk}tH+^kc1^yO)3biaWLoNR0gxW-Ce819VMz{L2G!g@ODP;+<`oZu|#N&T6 ldhrvrF>v^Dxd*&g%8k0C-}+HGszCElB?_2FzpcuV{|BaXHWdH> delta 3659 zcmd6piC0wD9mn6jZ<%41ckc`{V3?_gFG}hxycF%&B$ukqQ;|SF3%73WD(1F2>S=!lJ^`(!v69IdclO z_7Su`YQJu^(h@dXP+Lcq7naU1nh(D9s=KGd7(kb0g#{Vg=Hy{0v4_S+r~+$(zvUJ;cRH?eUgvzh0f1omQUPNW&ybzZ2 zTv?brt8^0%>S-FnY$fvFLdsv3wU82DqcTb?3X8jfafWSSn5|i{cMC~+6RLkCp0exM zh0*>sR7U*Ou=oPyN8yCt@4rmLLp<|CJ*f$8%4#DmRZtmepM*H~Eg&tJw?xky_AHY} zXq%$j9-_*E*>^~XDR^bvf(NLKIedr8D0V-jSOK|*of)2Y!{%@YIt*m~GSdbjqpf0S zAa&!gUXVtH*_@_d8wF|GS<>Dz<)+V#f|Q}sTccK+7)#ncmzl>1OHin7hZr6BD`Ac4 zpMnajACl;_WHjpgkM<3#-Un^^lxQL32$TKpN_apLP zT{3%x?11!4786IdS!N1n+t-Muho&Q7Ss?U)PHD_tM#ot%7-i|$P(L~v{Z{KaMxoNT z8_hzMuiHkV(zhGUM5UhrbOb8n9t=liEJ17rN@ER%qSDuZ4uRrk;_q!~2??Kw$_Sr; z%7`A1N`;2!1YaJ5*fiCe9+GSlNP}6bWSxO7&xf-pmjKq9kl-m+TnBj2W@$Tf^|?)J zvjkex?jFn%Ne{R2GN6KC0D%3XxvVorsaA^31Do~uwzlBZ1*dRFv-Ey{kLpp1xGT0^w4n8KaaH}8l zf-eRCF7;z7JbO(15c`;@a59D2;g79jmhrTsmOP5RCrYii63&ffU&EX@PpIn8wt>Ar8wB1Th#hXEFwd69#8E-A z*>6cUPb#y=2v_JNk=4&B{iqgFePeL~>QcwDy~RlJ;h1m1PL7#hYikG#{~Ab4T-I-Ve@D}?RP zxQ>4)srztjRW(nM*jT|22deoNiG45lwF&F_LGC(4{Q~>GZDIDbElF?u;)<%}`GnFI zDBsB^YiD-y2R7n^`n^1-?G9;CAM^X1)PY>j%WSc1&cMa|k+&NnzP|t5^ z*+;pPQtMEy;22L4p>h_r!H36rHQ4Y<8+wA@5S^j(Gc9gaTFNPY+9ggGDnM@H$)*>| zA!WTX1_m|p9ptIj*u*8GUHyhHl%RM3W}Mp0mxy*DRXf$p_bOC1!^(TS>}99V?{UQ` zw$Q50Cdb>Gs~qz_{f~nZ$;{0r*fVTSFX%!YNpK>QaS%td?phUfEVJWhsqliTf;|V! z-0=%7$Q>()h5kvr{PLlI+PVT4#5f%XVd=s`FWhxH9=t5q6zMom;pl?G80}1y<89^T z=(JeJJI^5bs3RWgx5XvEsTijUH;Y{<(i&&wXt+N?^+Rn3r?sNihB#r(Id?oHHAi}(M08nUpU0I3P0v)d;)%zlJ@L3d zO@~s85)b>9x}%|ahI_=zDg$40-{w$PgZ0}iuP<EN4h#%>R^gSc$wWpXS;#14G%^Mmi;P2lhKxrhAQO>nWD=5tOh%?4Q;})NbYuqd8uEHf za(|1ZgYX@0RQZ8?U2c?*%b&@+<@e-jNbu#(u$XU%+2#v0-+WT5^5wosAgd$yX-ZtK z(=6GZ<5>_B?dSsKiwjk4fI@)YETz&J?S z$<5}U4vmvg&=BxzH4TBK#Ej1k{_JAg1ER&83#6Gz7G(KzyTGk)0}MR3^Hee#4&TW0 cKR From 04c81736518f52f893a491cc6b66559825a0d998 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 21:55:19 -0500 Subject: [PATCH 6/7] test(batch26): resolve benchmark-mapped websocket test IDs --- porting.db | Bin 6742016 -> 6750208 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/porting.db b/porting.db index 94bdf30a938efebec84abf8f183a9a8cf1992590..b83200a68740ae9f0bae3d4af84fb01845664476 100644 GIT binary patch delta 12141 zcmcgyc~BH*7VnayZ=>nZ||^rFpS)dVVch0*c|xoKRdG$@v5hGCT`uChs$4s$ zy@sza_6Cn}R8@@=hVKktp|Qmw3q7xmsY>-m80r`Ora9y0y`wtD;%^_%q-K_EK zv70w*Rr;4b;Y-br;R_54u+(OEi@U4K-HZo=7Vp!7K%4E;0zsqE!8CIlVYq3;fgx|3c{*V7eriTQIh&}rF0nJ=5q znn%nBJaZqhq~ks0hvYlt8{{r>8#zdBC7&d>kZq)$j3pz?B?xULu~RKc_F#XXz380R0}lm)>ojGJiu(Xdci|cqR5#mGQE#mjY5b+%GE24vFB0RtDvfRQAlsZt=-q2{Q>2f3c8>(o16@R4? zmAs*{_88aj7gqB^IWMf@g)&}P$qS{tP{Iqvyimjog}ktWJclLo*)|q{mcVO0#(Yp? zdDWN)S~RQiwgtS9&kK3Hkjo3pc_D`vvUwqk7aY8hX;O!)eDTZ~^e&8S<_B9)z1lK+ zHm|mJ@zMrfSnmma)bcFVV-COw{+f#yns}j+7o5D%zzg-fP{#|kywJ`In|Ps(7dFCk zw%5nnjrCyIMW5OXgX2EU1=?Ys)&$xKQ8|w^D>VK!(Pkle4l;|-j$Z`pf)=LPmw~?JyHAa8xL81S~?(Plp);dg3s%#E<>x5hiHp z2ooi+T_Lr7{s`D!yvC6r5NNc}$=wuCYzm;j$wQZzIjH?8qi6G3Aaw^r2K4^-UU#60 z+y$YGqmZ7_qkid01y}h}R}sgU96v736AXj)9)rB`HDT@Yq;?QggY;ueJk*q6xoS}Q zaY1F-+@QhZe#v27ZpM*;@Ezc#!c1o0(EUBx^ zGhp|`DQ20U@zqnJEI6cg5Y&Jsr@dJKjrMzb_~-=s`m|ruvkA?Vx&jJ0e#Q_z1dX2Y z+7ZyNsh;UGi~*0q?U*@=&M~=+PaFQIKdSpeM^SOwBF#qf1>#Nh8P&Hc3vOqZjJMI{ zQKo`z>^ecX-XFoMZj8zBn=C92CKUM-qeCx`!Sl+uB)d%13XDg`m};?I$2b!!FsTuk zaPjh{n8cy8f-aefS$kF#vs&N)Vn(2SXPI=Fz8*gQz96V8SS)*zn8Pj*?Sjyw--+{N zLLJvcUB5|e5{CY9P0W_F)3O5;dTm0KtTurMCtLG<-!qbj{O22t9yLyi%Cu2n42;7N zx_*!F58e>9MXSIVB8%Dr9<>{y>2461aPb;`xJNt~pwNPkMblj`@W4$sf9f97%|0iZ zZi~Q}3qSAXJ;J|z(>q=MlrvLjx4_NSo6?EDbW1exW`PM8`P*CfhztW1sy!&0xJ%#x zf<~f?)6&%qA{)^7MbYZkBrxW}du~H`(b5R*sYN+Iit5}bFou;`QfJumgj<(WU;;Z& zIO_XJx_Q8sr$fd&qV=&sV9f0VSMR`)YvUckS$1Y8fKlOQTQBgyZG1&POPd73>t|L; zZXtL&x6Zcot?my1*D8mp(B#13ZQ)CuwFZsl z1;N>zvWB95y)}m0RqmcN##Gi_1OW%+|DngU*2ApyIdV|e2uq)%KbUo72S;|5DiA%T zwU+rDg`+^q8s^)cQu}iHgV?)M;O-Rm?qbSX?z?-b-;G86f$WV-;l`yn+&D{Tt-O2V zpw5~;Lv8CjdMh9l5C)hH2nR#}A^|o)6yQNXG++)O1~3;e4-gB81H=O!0?Y>_02Tle z0Z9NmAQ_MXSO{1ISPWPKNChkfqyd%z(g7KOOn?KB1;_^E0G0!C0eOIYKmlL{pb$_5 zCc98o*jW1)vg81y~1o7*GwU0n`HO0QGAQ@+2&1S@ z%sij&H0?1SH+*KW=*y^14;%^CQJ&7@&X4e5C@2}wm(dr>COyGaYD!>}UOnP0VUr$# zPK-Fy{E}`t>D+2L*$8L?xB$%nx5sj_P(8)KSbKtHZe&^=52 zi^|k)(_Gh-k*|{^;Z(n`o~3#OKZVc3P?ROC*xDC0oUF&9RN-ZTeK9Fp+FQFDy4{<* zHycs*K6iW%eDG;l3H@4Y28@pP&ABP{^8*D8dUACsiZr6t7#mPy@ixZMk5|Zq;q~x?k+*@$e zuw9nGN_g`}&-|@{-StYs7F7$F@*-=)eQe0ge5B@3pCY z5yORY;rZWSpko?)^nKoIQ{mvXLbhUVZ|5kq;TOn~XjYa)zAOosvLy0kNqB!brPTQ= zSC&MhvYA*eOTwuvi5yuH4PFv^OiJxA*|I$9frnBXewHkQI%Uh@kR?&8EQw565;a~D z>yEk=+G{gpc~mRd6X~)H9`Br269kt$1~ zLRk_^WJ!4cs6(k;bg?XnHOgjUkt~VTUJ{pdO1&5?l;u$lJe1nNQe+vdQm`ECYl%s_ z2Q$~xAJCzuEyhvfL-39CQ+>J)QP-$am39yNadto(3g_}FS-J%%d)hT$f!Bgc+3@Ti zLc!l3qMPH53M}pH>q&(yk35C?hGno;mO-v^eZ!JiBTHhrvLsf^lF0Fr_+>P_uip1h z^=vZAWqD*P*b}Q{8DuG2jxt#i4ljw+>rREru~L>trefuY7%t@)h`-|mdo!WPhm?rn W5?K;K%91FSB@yUH;y!MeA z#%DQ`oiR?c_%E?v>=wTg&xj|)4}yhWw+}UP+>Bas+}7Z!9QPYGqw{t!DbIaSpvslT zD5DThx!(zXTDHM0ktg3h-Rv?Y8JlzS-ND>^PwztMrjToP8?5Qpsg?(pZ|F$0$4jll z{splgNxznUB{fPvlNzMmQl(TPZKU)DPfM`3!Lv+=vsPOxtXr*Xtv>Z<{;&Dl;F||L zl2KAB)YR719ym~26IC3PjGjWlY?&}fRBZC3#h^in{B|ZHiQd8W;G#D@_4Y%$Y^PklV8)i z?Yf&4x~NXkNgo1R8BHGYqke(RY zqy$%PRFU&1Vw2;9{VQ9)b<`ST$uh4u)fpQFN@=zKoV>|G61BCm0rO%1hZ||QmnD<` z3NzF2AhXgxzh=uQxG!SMM_$T{zG3>CdV0v*<`4 zn@fTF&~Iqt`t=%p$o`LQu#f2HQF=R@5w>2(r4EW8Ve`n}2d^{jObRWBdM{u-huWE) zTo;%ePFl;8Y*cxH`MC6juwV@rh*WS9hH^j4pq7g|*Zmja`urkyUConhRCFog+Iopa zTwme>k-Qz@hN&ye?10zbJGj>>F11lXC+^)HK%d_0WKL5|wOL&+oEf=0nT0;;T1lm44;89E-mCz8*S;ry`%R~^#(m5rn;us_nx7Hd|1t{W>~+|8HcS8-H>^BwV&quoAiduE$r zU1-^2t~dSCcoGM1@)`LrG}y~ZsP#5;(avkkVv2RB9XL9Hm*~hfRMU)Thq976wEsFT zV>+o+FoGB5>W^sOLT}!ps#r5mPM{Au`5U`k$;+Um8<8EOn77>EeXCG?58Z?>CZh2r zuW~K4%O+R?W#2?=o(}N_maE3@Tj=ZX&2X zafgq!L`e!CDDHlgExpT@tB*Q_M;*M&*H)}idiz*mL=;C~^zo;BMI(Y87cu&{pVxd@ zBMO}|5Bnn{Q4X<#1Ke(_MhQDB-58iuGAbYBo?A3Zcsi9gV?Lc8WOh@`@5AqdVy;}F z<@fmPHf!?E-9sKGuWl9`sFCxdNF#bMq>AR#-XX>OHyU&0iuT3Wq{+ka>!kc~%oGQu zO*wg;kq$E2d>@_fzmHW7-%XJ|6^5B-Zs;q~_kZ_~p^xvNlw{#F^*vzUDPM?vO3m5i z83{L^hDY((zZ>B_-Kz@zqZK63TR-qY*Q$k7Qcj`4OX2tZ7s8a^ z3Ua*N(dV$*^KEaOoi2YVnDPFP(tSZrq%@NpOP_oy&!k?fY$vZte%Fb+k7EpiA;*;; zyk?Y7n9YY*HOA9Ho7`?byrwaZe0F&{6*^=e&1{NKr9nH4zI4cmN7GG`dvjwfl8t$; zb*?xouKBDe zgQ;K|NCZh>I+y{HK?;}&W`Wrt6{LYVARWvF8DJim4>Ca($Oc}J19HIvun@ce7JZ@G^J>6oV2_ z3d+DXP!1};VquP1dzcke8=q`VAbk{CW1*$;}s0~JU)%{?O jvy0aa?gP@dQmN}i)b~+KolW9(rNrOF;Pw8xEX)4@5n(Ad From d487dca56e653904cffa87a2c3685654c74766f2 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 21:56:19 -0500 Subject: [PATCH 7/7] feat(batch26): complete websocket batch with verified implementation and tests --- porting.db | Bin 6750208 -> 6750208 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/porting.db b/porting.db index b83200a68740ae9f0bae3d4af84fb01845664476..1ce78f22a679b4f5e0a7439d53119440d3f46d9d 100644 GIT binary patch delta 315 zcmXxaH%~%Q06<}{ARtY}jv_V`J2tR_y`y3;TpKFAM^w3L`KKdD8kRgT{A;u_Uj5EO`aS}{1%?z{5F;9{d3oNq4GApdI#yT5pvc)z# zq}gSUeKH(y$Pve!aLO6yTyRPDM>-5q(4B^i*nI{6ml-qUhtKP^g+%W8>E%&-)y&_5 zOb5i9?zCiFC27foI~(+rg;PR^kp6DTsH-%87A>hN&Nx( CXm&3E delta 315 zcmYkyH%~%Q06<}{AWwydioK#DiX9tR!QN4^w<|U@E^1&lzTqTJ&MXehB!gqdWDX=a#ZjtEiaSzwVRmRVtyHP+c+lP$K1 zvBNHV#M$S7LykD+gj3Eq=Yq>0>Bx#)M$1aS-TkTO%wSf&=`Qi~oOn&WJf@QAwE6oj zV;+&pXqF7AL@XKBio8)FgfH`M$$<0U{E@e$a-}4cYvo3{Rqm8~C28iZhj{c4k{@^@