diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.WebSocket.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.WebSocket.cs new file mode 100644 index 0000000..9f5f247 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.WebSocket.cs @@ -0,0 +1,253 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. + +using System.Buffers.Binary; +using System.Text; +using ZB.MOM.NatsNet.Server.WebSocket; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class ClientConnection +{ + internal (List chunks, long attempted) WsCollapsePtoNB() => CollapsePtoNB(); + + internal List WsRead(WsReadInfo readInfo, Stream reader, byte[] buffer) + { + var bufs = new List(); + var pos = 0; + var max = buffer.Length; + var maxPayload = Volatile.Read(ref _mpay); + + while (pos != max) + { + if (readInfo.FrameStart) + { + var b0 = buffer[pos]; + var frameType = (WsOpCode)(b0 & 0xF); + var final = (b0 & WsConstants.FinalBit) != 0; + var compressed = (b0 & WsConstants.Rsv1Bit) != 0; + pos++; + + var (firstLen, newPos) = WebSocketHelpers.WsGet(reader, buffer, pos, 1); + pos = newPos; + var b1 = firstLen[0]; + + if (readInfo.Mask && (b1 & WsConstants.MaskBit) == 0) + throw WsHandleProtocolError("mask bit missing"); + + readInfo.Rem = b1 & 0x7F; + + switch (frameType) + { + case WsOpCode.Ping: + case WsOpCode.Pong: + case WsOpCode.Close: + if (readInfo.Rem > WsConstants.MaxControlPayloadSize) + throw WsHandleProtocolError($"control frame length bigger than maximum allowed of {WsConstants.MaxControlPayloadSize} bytes"); + if (!final) + throw WsHandleProtocolError("control frame does not have final bit set"); + break; + case WsOpCode.Text: + case WsOpCode.Binary: + if (!readInfo.FinalFrameReceived) + throw WsHandleProtocolError("new message started before final frame for previous message was received"); + readInfo.FinalFrameReceived = final; + readInfo.FrameCompressed = compressed; + break; + case WsOpCode.Continuation: + if (readInfo.FinalFrameReceived || compressed) + throw WsHandleProtocolError("invalid continuation frame"); + readInfo.FinalFrameReceived = final; + break; + default: + throw WsHandleProtocolError($"unknown opcode {(int)frameType}"); + } + + if (readInfo.Rem == 126) + { + var (extended, p) = WebSocketHelpers.WsGet(reader, buffer, pos, 2); + pos = p; + readInfo.Rem = BinaryPrimitives.ReadUInt16BigEndian(extended); + } + else if (readInfo.Rem == 127) + { + var (extended, p) = WebSocketHelpers.WsGet(reader, buffer, pos, 8); + pos = p; + readInfo.Rem = checked((int)BinaryPrimitives.ReadUInt64BigEndian(extended)); + } + + if (readInfo.Mask) + { + var (maskKey, p) = WebSocketHelpers.WsGet(reader, buffer, pos, 4); + pos = p; + Array.Copy(maskKey, 0, readInfo.MaskKey, 0, 4); + readInfo.MaskKeyPosition = 0; + } + + if (WebSocketHelpers.WsIsControlFrame(frameType)) + { + pos = WsHandleControlFrame(readInfo, frameType, reader, buffer, pos); + continue; + } + + readInfo.FrameStart = false; + } + + if (pos >= max) + continue; + + var n = readInfo.Rem; + if (pos + n > max) + n = max - pos; + + var payload = buffer.AsSpan(pos, n).ToArray(); + pos += n; + readInfo.Rem -= n; + + if (readInfo.Mask) + readInfo.Unmask(payload); + + var addToBufs = true; + if (readInfo.FrameCompressed) + { + addToBufs = false; + readInfo.CompressedBuffers.Add(payload); + if (readInfo.FinalFrameReceived && readInfo.Rem == 0) + { + payload = readInfo.Decompress(maxPayload); + readInfo.FrameCompressed = false; + addToBufs = true; + } + } + + if (addToBufs) + bufs.Add(payload); + + if (readInfo.Rem == 0) + readInfo.FrameStart = true; + } + + return bufs; + } + + internal int WsHandleControlFrame(WsReadInfo readInfo, WsOpCode frameType, Stream networkConnection, byte[] buffer, int pos) + { + byte[] payload = []; + if (readInfo.Rem > 0) + { + (payload, pos) = WebSocketHelpers.WsGet(networkConnection, buffer, pos, readInfo.Rem); + if (readInfo.Mask) + readInfo.Unmask(payload); + readInfo.Rem = 0; + } + + switch (frameType) + { + case WsOpCode.Close: + { + var status = WsConstants.CloseNoStatusReceived; + string body = string.Empty; + + var payloadLength = payload.Length; + var hasStatus = payloadLength >= WsConstants.CloseStatusSize; + var hasBody = payloadLength > WsConstants.CloseStatusSize; + if (hasStatus) + { + status = BinaryPrimitives.ReadUInt16BigEndian(payload.AsSpan(0, 2)); + if (hasBody) + { + body = Encoding.UTF8.GetString(payload.AsSpan(WsConstants.CloseStatusSize)); + if (!Encoding.UTF8.GetBytes(body).AsSpan().SequenceEqual(payload.AsSpan(WsConstants.CloseStatusSize))) + { + status = WsConstants.CloseInvalidPayloadData; + body = "invalid utf8 body in close frame"; + } + } + } + + byte[]? closeMessage = null; + if (status != WsConstants.CloseNoStatusReceived) + closeMessage = WebSocketHelpers.WsCreateCloseMessage(status, body); + + WsEnqueueControlMessage(WsOpCode.Close, closeMessage ?? []); + throw new EndOfStreamException(); + } + case WsOpCode.Ping: + WsEnqueueControlMessage(WsOpCode.Pong, payload); + break; + case WsOpCode.Pong: + break; + } + + return pos; + } + + internal void WsEnqueueControlMessage(WsOpCode controlMessage, byte[] payload) + { + lock (_mu) + WsEnqueueControlMessageLocked(controlMessage, payload); + } + + internal void WsEnqueueControlMessageLocked(WsOpCode controlMessage, byte[] payload) + { + if (Ws == null) + Ws = new WebsocketConnection(); + + var useMasking = Ws.MaskWrite; + var headerSize = 2 + (useMasking ? 4 : 0); + var control = NbPool.Get(headerSize + payload.Length); + + var (n, key) = WebSocketHelpers.WsFillFrameHeader(control, useMasking, first: true, final: true, compressed: false, controlMessage, payload.Length); + var totalLen = n; + if (payload.Length > 0) + { + Array.Copy(payload, 0, control, n, payload.Length); + if (useMasking && key != null) + WebSocketHelpers.WsMaskBuf(key, control.AsSpan(n, payload.Length)); + totalLen += payload.Length; + } + + var frame = control[..totalLen]; + OutPb += totalLen; + if (controlMessage == WsOpCode.Close) + { + Ws.CloseSent = true; + Ws.CloseMessage = frame; + } + else + { + Ws.Frames.Add(frame); + Ws.FrameSize += frame.Length; + } + + FlushSignal(); + } + + internal void WsEnqueueCloseMessage(ClosedState reason) + { + var status = reason switch + { + ClosedState.ClientClosed => WsConstants.CloseNormalClosure, + ClosedState.AuthenticationTimeout or ClosedState.AuthenticationViolation or ClosedState.SlowConsumerPendingBytes or + ClosedState.SlowConsumerWriteDeadline or ClosedState.MaxAccountConnectionsExceeded or ClosedState.MaxConnectionsExceeded or + ClosedState.MaxControlLineExceeded or ClosedState.MaxSubscriptionsExceeded or ClosedState.MissingAccount or + ClosedState.AuthenticationExpired or ClosedState.Revocation => WsConstants.ClosePolicyViolation, + ClosedState.TlsHandshakeError => WsConstants.CloseTlsHandshake, + ClosedState.ParseError or ClosedState.ProtocolViolation or ClosedState.BadClientProtocolVersion => WsConstants.CloseProtocolError, + ClosedState.MaxPayloadExceeded => WsConstants.CloseMessageTooBig, + ClosedState.WriteError or ClosedState.ReadError or ClosedState.StaleConnection or ClosedState.ServerShutdown => WsConstants.CloseGoingAway, + _ => WsConstants.CloseInternalError, + }; + + var body = WebSocketHelpers.WsCreateCloseMessage(status, reason.ToString()); + WsEnqueueControlMessageLocked(WsOpCode.Close, body); + } + + internal Exception WsHandleProtocolError(string message) + { + var payload = WebSocketHelpers.WsCreateCloseMessage(WsConstants.CloseProtocolError, message); + WsEnqueueControlMessage(WsOpCode.Close, payload); + return new InvalidDataException(message); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs index 030d937..1d0806e 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs @@ -26,6 +26,7 @@ using ZB.MOM.NatsNet.Server.Auth; using ZB.MOM.NatsNet.Server.Internal; using ZB.MOM.NatsNet.Server.Internal.DataStructures; using ZB.MOM.NatsNet.Server.Protocol; +using ZB.MOM.NatsNet.Server.WebSocket; namespace ZB.MOM.NatsNet.Server; @@ -113,6 +114,7 @@ public sealed partial class ClientConnection // Client options (from CONNECT message). internal ClientOptions Opts = ClientOptions.Default; internal Route? Route; + internal WebsocketConnection? Ws; // Flags and state. internal ClientFlags Flags; // mirrors c.flags clientFlag @@ -1484,10 +1486,26 @@ public sealed partial class ClientConnection internal (List chunks, long attempted) CollapsePtoNB() { + var chunks = OutNb; + if (Ws != null && Ws.Frames.Count > 0) + { + chunks = [..OutNb]; + foreach (var frame in Ws.Frames) + chunks.Add(new OutboundChunk(frame, frame.Length)); + Ws.Frames.Clear(); + Ws.FrameSize = 0; + } + + if (Ws is { CloseSent: true, CloseMessage: not null } && OutPb == Ws.CloseMessage.Length) + { + chunks = [..chunks, new OutboundChunk(Ws.CloseMessage, Ws.CloseMessage.Length)]; + Ws.CloseMessage = null; + } + long attempted = 0; - foreach (var chunk in OutNb) + foreach (var chunk in chunks) attempted += chunk.Count; - return (OutNb, attempted); + return (chunks, attempted); } internal bool FlushOutbound() @@ -1784,7 +1802,7 @@ public sealed partial class ClientConnection // ========================================================================= internal bool IsMqtt() => false; // Deferred to session 22 (MQTT). - internal bool IsWebSocket() => false; // Deferred to session 23 (WebSocket). + internal bool IsWebSocket() => Ws != null; internal bool IsHubLeafNode() => false; // Deferred to session 15 (leaf nodes). internal string RemoteCluster() => string.Empty; // Deferred to sessions 14/15. } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs index aad886f..edef866 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs @@ -503,7 +503,7 @@ public sealed partial class NatsServer gatewayErr = _gatewayListenerErr; leafOk = opts.LeafNode.Port == 0 || _leafNodeListener != null; leafErr = _leafNodeListenerErr; - wsOk = opts.Websocket.Port == 0; + wsOk = opts.Websocket.Port == 0 || _websocket.Listener != null; mqttOk = opts.Mqtt.Port == 0; _mu.ExitReadLock(); @@ -952,10 +952,10 @@ public sealed partial class NatsServer } /// - /// Stub — closes WebSocket server if running (session 23). + /// Closes the WebSocket server if running. /// Returns the number of done-channel signals to expect. /// - private int CloseWebsocketServer() => 0; + private int CloseWebsocketServer() => CloseWebsocketServerCore(); /// /// Iterates over all route connections. Stub — session 14. diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs index be072ec..597455f 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs @@ -189,6 +189,7 @@ public sealed partial class NatsServer _clientConnectUrls.Clear(); _clientConnectUrls.AddRange(GetClientConnectURLs()); _listener = l; + StartWebsocketServer(); // Start the accept goroutine. _ = Task.Run(() => @@ -801,7 +802,8 @@ public sealed partial class NatsServer if (wsUpdated) { - // WebSocket connect URLs stub — session 23. + var wsUrls = _websocket.ConnectUrlsMap.GetAsStringSlice(); + _info.WsConnectUrls = wsUrls.Length > 0 ? wsUrls : null; } if (cliUpdated || wsUpdated) @@ -1140,7 +1142,7 @@ public sealed partial class NatsServer if (opts.Cluster.Port != 0) list.Add(_routeListener); if (opts.HttpPort != 0 || opts.HttpsPort != 0) list.Add(_http); if (opts.ProfPort != 0) list.Add(_profiler); - // WebSocket listener — session 23. + if (opts.Websocket.Port != 0) list.Add(_websocket.Listener); return list.ToArray(); } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.WebSocket.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.WebSocket.cs new file mode 100644 index 0000000..5d0406c --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.WebSocket.cs @@ -0,0 +1,307 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. + +using System.Collections.Specialized; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Security.Cryptography; +using System.Text; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.WebSocket; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + private sealed class WsHttpError : Exception + { + public int StatusCode { get; } + + public WsHttpError(int statusCode, string message) + : base(message) + { + StatusCode = statusCode; + } + } + + internal static bool WsHeaderContains(NameValueCollection headers, string key, string expected) + { + var values = headers.GetValues(key); + if (values == null || values.Length == 0) + return false; + + foreach (var value in values) + { + if (string.IsNullOrEmpty(value)) + continue; + + var tokens = value.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries); + foreach (var token in tokens) + { + if (string.Equals(token, expected, StringComparison.OrdinalIgnoreCase)) + return true; + } + } + + return false; + } + + internal static (bool supported, bool noContext) WsPMCExtensionSupport(NameValueCollection headers, bool checkNoContextTakeOver) + { + var values = headers.GetValues("Sec-WebSocket-Extensions"); + if (values == null || values.Length == 0) + return (false, false); + + var foundPmc = false; + var noContext = false; + foreach (var value in values) + { + var extensions = value.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries); + foreach (var ext in extensions) + { + var parts = ext.Split(';', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries); + if (parts.Length == 0) + continue; + if (!parts[0].Equals(WsConstants.PMCExtension, StringComparison.OrdinalIgnoreCase)) + continue; + + foundPmc = true; + if (!checkNoContextTakeOver) + return (true, false); + + noContext = parts.Any(p => p.Equals(WsConstants.PMCSrvNoCtx, StringComparison.OrdinalIgnoreCase)) && + parts.Any(p => p.Equals(WsConstants.PMCCliNoCtx, StringComparison.OrdinalIgnoreCase)); + } + } + + return (foundPmc && (!checkNoContextTakeOver || noContext), noContext); + } + + internal static Exception WsReturnHTTPError(int statusCode, string message) => + new WsHttpError(statusCode, message); + + internal static string WsGetHostAndPort(string hostPort, out int port) + { + port = 0; + if (string.IsNullOrWhiteSpace(hostPort)) + return string.Empty; + + if (hostPort.Contains(':')) + { + var idx = hostPort.LastIndexOf(':'); + var host = hostPort[..idx]; + var portText = hostPort[(idx + 1)..]; + if (int.TryParse(portText, out var parsed)) + { + port = parsed; + return host; + } + } + + return hostPort; + } + + internal static byte[] WsMakeChallengeKey(string key) + { + ArgumentNullException.ThrowIfNull(key); + return Encoding.ASCII.GetBytes(key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"); + } + + internal static string WsAcceptKey(string key) + { + using var sha = SHA1.Create(); + var digest = sha.ComputeHash(WsMakeChallengeKey(key)); + return Convert.ToBase64String(digest); + } + + internal static Exception? ValidateWebsocketOptions(WebsocketOpts options) + { + if (options.Port < 0 || options.Port > 65535) + return new ArgumentException("websocket port out of range"); + if (options.NoTls && options.TlsConfig != null) + return new ArgumentException("websocket no_tls and tls options are mutually exclusive"); + if (options.HandshakeTimeout < TimeSpan.Zero) + return new ArgumentException("websocket handshake timeout can not be negative"); + return null; + } + + private void WsSetOriginOptions() + { + lock (_websocket.Mu) + { + _websocket.AllowedOrigins.Clear(); + var opts = GetOpts().Websocket; + _websocket.SameOrigin = opts.SameOrigin; + + foreach (var entry in opts.AllowedOrigins) + { + if (!Uri.TryCreate(entry, UriKind.Absolute, out var uri) || string.IsNullOrEmpty(uri.Host)) + continue; + + var port = uri.IsDefaultPort + ? uri.Scheme.Equals("https", StringComparison.OrdinalIgnoreCase) ? "443" : "80" + : uri.Port.ToString(); + _websocket.AllowedOrigins[uri.Host] = new AllowedOrigin(uri.Scheme, port); + } + } + } + + private void WsSetHeadersOptions() + { + var headers = GetOpts().Websocket.Headers; + if (headers.Count == 0) + { + _websocket.RawHeaders = string.Empty; + return; + } + + var builder = new StringBuilder(); + foreach (var (name, value) in headers) + { + if (string.IsNullOrWhiteSpace(name)) + continue; + builder.Append(name.Trim()); + builder.Append(": "); + builder.Append(value?.Trim() ?? string.Empty); + builder.Append("\r\n"); + } + _websocket.RawHeaders = builder.ToString(); + } + + private void WsConfigAuth() + { + var ws = GetOpts().Websocket; + _websocket.AuthOverride = !string.IsNullOrWhiteSpace(ws.Username) + || !string.IsNullOrWhiteSpace(ws.Password) + || !string.IsNullOrWhiteSpace(ws.Token) + || !string.IsNullOrWhiteSpace(ws.JwtCookie) + || !string.IsNullOrWhiteSpace(ws.TokenCookie) + || !string.IsNullOrWhiteSpace(ws.UsernameCookie) + || !string.IsNullOrWhiteSpace(ws.PasswordCookie) + || !string.IsNullOrWhiteSpace(ws.NoAuthUser); + } + + private void StartWebsocketServer() + { + var opts = GetOpts().Websocket; + if (opts.Port == 0) + return; + if (_websocket.Listener != null) + return; + + var host = string.IsNullOrWhiteSpace(opts.Host) ? ServerConstants.DefaultHost : opts.Host; + var ip = IPAddress.TryParse(host, out var parsed) ? parsed : IPAddress.Any; + var listener = new TcpListener(ip, opts.Port); + listener.Start(); + + _websocket.Listener = listener; + _websocket.ListenerErr = null; + _websocket.Port = ((IPEndPoint)listener.LocalEndpoint).Port; + _websocket.Host = host; + _websocket.Compression = opts.Compression; + _websocket.TlsConfig = opts.TlsConfig; + + WsSetOriginOptions(); + WsSetHeadersOptions(); + WsConfigAuth(); + + var connectUrl = WebsocketUrl(); + _websocket.ConnectUrls.Clear(); + _websocket.ConnectUrls.Add(connectUrl); + + UpdateServerINFOAndSendINFOToClients([], [connectUrl], add: true); + } + + private int CloseWebsocketServerCore() + { + if (_websocket.Listener == null) + return 0; + + try + { + _websocket.Listener.Stop(); + } + catch (Exception ex) + { + _websocket.ListenerErr = ex; + } + + _websocket.Listener = null; + + if (_websocket.ConnectUrls.Count > 0) + { + var urls = _websocket.ConnectUrls.ToArray(); + _websocket.ConnectUrls.Clear(); + UpdateServerINFOAndSendINFOToClients([], urls, add: false); + } + + return 0; + } + + internal ClientConnection CreateWSClient(Stream nc, ClientKind kind) + { + var client = new ClientConnection(kind, this, nc) + { + Ws = new WebsocketConnection + { + Compress = _websocket.Compression, + MaskRead = true, + }, + }; + return client; + } + + internal (WebsocketConnection? ws, ClientKind kind, Exception? err) WsUpgrade(HttpListenerRequest request) + { + var kind = ClientKind.Client; + if (request.Url != null) + { + var path = request.Url.AbsolutePath; + if (path.EndsWith("/leafnode", StringComparison.OrdinalIgnoreCase)) + kind = ClientKind.Leaf; + else if (path.EndsWith("/mqtt", StringComparison.OrdinalIgnoreCase)) + kind = ClientKind.Client; + } + + if (!string.Equals(request.HttpMethod, "GET", StringComparison.OrdinalIgnoreCase)) + return (null, kind, WsReturnHTTPError(405, "request method must be GET")); + if (string.IsNullOrWhiteSpace(request.UserHostName)) + return (null, kind, WsReturnHTTPError(400, "'Host' missing in request")); + if (!WsHeaderContains(request.Headers, "Upgrade", "websocket")) + return (null, kind, WsReturnHTTPError(400, "invalid value for header 'Upgrade'")); + if (!WsHeaderContains(request.Headers, "Connection", "Upgrade")) + return (null, kind, WsReturnHTTPError(400, "invalid value for header 'Connection'")); + if (string.IsNullOrWhiteSpace(request.Headers["Sec-WebSocket-Key"])) + return (null, kind, WsReturnHTTPError(400, "key missing")); + if (!WsHeaderContains(request.Headers, "Sec-WebSocket-Version", "13")) + return (null, kind, WsReturnHTTPError(400, "invalid version")); + + var ws = new WebsocketConnection + { + Compress = GetOpts().Websocket.Compression && WsPMCExtensionSupport(request.Headers, true).supported, + MaskRead = !string.Equals(request.Headers[WsConstants.NoMaskingHeader], WsConstants.NoMaskingValue, StringComparison.OrdinalIgnoreCase), + }; + + var xff = request.Headers.GetValues(WsConstants.XForwardedForHeader); + if (xff != null && xff.Length > 0 && IPAddress.TryParse(xff[0], out _)) + ws.ClientIP = xff[0]; + + return (ws, kind, null); + } + + internal static bool IsWSURL(string url) + { + if (!Uri.TryCreate(url, UriKind.Absolute, out var uri)) + return false; + return uri.Scheme.Equals(WsConstants.SchemePrefix, StringComparison.OrdinalIgnoreCase); + } + + internal static bool IsWSSURL(string url) + { + if (!Uri.TryCreate(url, UriKind.Absolute, out var uri)) + return false; + return uri.Scheme.Equals(WsConstants.SchemePrefixTls, StringComparison.OrdinalIgnoreCase); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketConstants.cs b/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketConstants.cs index a2e21a4..49fa27c 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketConstants.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketConstants.cs @@ -35,6 +35,8 @@ internal enum WsOpCode : int /// internal static class WsConstants { + public static readonly byte[] CompressLastBlock = [0x00, 0x00, 0xff, 0xff, 0x01, 0x00, 0x00, 0xff, 0xff]; + // Frame header bits public const int FinalBit = 1 << 7; public const int Rsv1Bit = 1 << 6; // Used for per-message compression (RFC 7692) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketHandler.cs b/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketHandler.cs new file mode 100644 index 0000000..6930ca0 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketHandler.cs @@ -0,0 +1,28 @@ +// Copyright 2020-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0. + +namespace ZB.MOM.NatsNet.Server.WebSocket; + +internal sealed class WebSocketHandler +{ + public static (byte[] bytes, int newPos) WsGet(Stream reader, byte[] buffer, int pos, int needed) => + WebSocketHelpers.WsGet(reader, buffer, pos, needed); + + public static bool WsIsControlFrame(WsOpCode frameType) => + WebSocketHelpers.WsIsControlFrame(frameType); + + public static (byte[] header, byte[]? key) WsCreateFrameHeader(bool useMasking, bool compressed, WsOpCode frameType, int length) => + WebSocketHelpers.WsCreateFrameHeader(useMasking, compressed, frameType, length); + + public static (int n, byte[]? key) WsFillFrameHeader(byte[] frameHeader, bool useMasking, bool first, bool final, bool compressed, WsOpCode frameType, int length) => + WebSocketHelpers.WsFillFrameHeader(frameHeader, useMasking, first, final, compressed, frameType, length); + + public static void WsMaskBuf(byte[] key, byte[] buffer) => + WebSocketHelpers.WsMaskBuf(key, buffer); + + public static void WsMaskBufs(byte[] key, IReadOnlyList buffers) => + WebSocketHelpers.WsMaskBufs(key, buffers); + + public static byte[] WsCreateCloseMessage(int status, string body) => + WebSocketHelpers.WsCreateCloseMessage(status, body); +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketHelpers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketHelpers.cs new file mode 100644 index 0000000..7f029a0 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketHelpers.cs @@ -0,0 +1,135 @@ +// Copyright 2020-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +using System.Buffers.Binary; +using System.Security.Cryptography; + +namespace ZB.MOM.NatsNet.Server.WebSocket; + +internal static class WebSocketHelpers +{ + public static (byte[] bytes, int newPos) WsGet(Stream reader, byte[] buf, int pos, int needed) + { + var available = buf.Length - pos; + if (available >= needed) + return (buf[pos..(pos + needed)], pos + needed); + + var b = new byte[needed]; + var start = 0; + if (available > 0) + { + System.Buffer.BlockCopy(buf, pos, b, 0, available); + start = available; + } + + while (start < needed) + { + var n = reader.Read(b, start, needed - start); + if (n <= 0) + throw new EndOfStreamException(); + start += n; + } + + return (b, pos + available); + } + + public static bool WsIsControlFrame(WsOpCode frameType) => frameType >= WsOpCode.Close; + + public static (byte[] header, byte[]? key) WsCreateFrameHeader(bool useMasking, bool compressed, WsOpCode frameType, int length) + { + var frameHeader = NbPool.Get(WsConstants.MaxFrameHeaderSize); + var (n, key) = WsFillFrameHeader(frameHeader, useMasking, first: true, final: true, compressed, frameType, length); + return (frameHeader[..n], key); + } + + public static (int n, byte[]? key) WsFillFrameHeader( + byte[] frameHeader, + bool useMasking, + bool first, + bool final, + bool compressed, + WsOpCode frameType, + int length) + { + byte b0 = 0; + if (first) + b0 = (byte)frameType; + if (final) + b0 |= WsConstants.FinalBit; + if (compressed) + b0 |= WsConstants.Rsv1Bit; + + byte b1 = 0; + if (useMasking) + b1 |= WsConstants.MaskBit; + + int n; + if (length <= 125) + { + n = 2; + frameHeader[0] = b0; + frameHeader[1] = (byte)(b1 | (byte)length); + } + else if (length < 65536) + { + n = 4; + frameHeader[0] = b0; + frameHeader[1] = (byte)(b1 | 126); + BinaryPrimitives.WriteUInt16BigEndian(frameHeader.AsSpan(2), (ushort)length); + } + else + { + n = 10; + frameHeader[0] = b0; + frameHeader[1] = (byte)(b1 | 127); + BinaryPrimitives.WriteUInt64BigEndian(frameHeader.AsSpan(2), (ulong)length); + } + + byte[]? key = null; + if (useMasking) + { + key = new byte[4]; + RandomNumberGenerator.Fill(key); + Array.Copy(key, 0, frameHeader, n, 4); + n += 4; + } + + return (n, key); + } + + public static void WsMaskBuf(ReadOnlySpan key, Span buffer) + { + for (var i = 0; i < buffer.Length; i++) + buffer[i] ^= key[i & 0x3]; + } + + public static void WsMaskBufs(ReadOnlySpan key, IReadOnlyList buffers) + { + var pos = 0; + for (var i = 0; i < buffers.Count; i++) + { + var buffer = buffers[i]; + for (var j = 0; j < buffer.Length; j++) + { + buffer[j] ^= key[pos & 0x3]; + pos++; + } + } + } + + public static byte[] WsCreateCloseMessage(int status, string body) + { + if (body.Length > WsConstants.MaxControlPayloadSize - 2) + body = string.Concat(body.AsSpan(0, WsConstants.MaxControlPayloadSize - 5), "..."); + + var payload = System.Text.Encoding.UTF8.GetBytes(body); + var buffer = new byte[2 + payload.Length]; + BinaryPrimitives.WriteUInt16BigEndian(buffer.AsSpan(0, 2), (ushort)status); + payload.CopyTo(buffer.AsSpan(2)); + return buffer; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketTypes.cs index acf8fa5..7d41f5d 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/WebSocket/WebSocketTypes.cs @@ -13,6 +13,7 @@ // // Adapted from server/websocket.go in the NATS server Go source. +using System.IO.Compression; using ZB.MOM.NatsNet.Server.Internal; namespace ZB.MOM.NatsNet.Server.WebSocket; @@ -23,88 +24,185 @@ namespace ZB.MOM.NatsNet.Server.WebSocket; /// internal sealed class WsReadInfo { - /// Whether masking is disabled for this connection (e.g. leaf node). - public bool NoMasking { get; set; } + public int Rem { get; set; } + public bool FrameStart { get; set; } + public bool FinalFrameReceived { get; set; } + public bool FrameCompressed { get; set; } + public bool Mask { get; set; } + public byte MaskKeyPosition { get; set; } + public byte[] MaskKey { get; } = new byte[4]; + public List CompressedBuffers { get; } = []; + public int CompressedOffset { get; set; } - /// Whether per-message deflate compression is active. - public bool Compressed { get; set; } + public void Init() + { + FrameStart = true; + FinalFrameReceived = true; + } - /// The current frame opcode. - public WsOpCode FrameType { get; set; } + public int Read(byte[] destination, int offset, int count) + { + if (count == 0) + return 0; + if (CompressedBuffers.Count == 0) + return 0; - /// Number of payload bytes remaining in the current frame. - public int PayloadLeft { get; set; } + var copied = 0; + var remaining = count; + while (CompressedBuffers.Count > 0 && remaining > 0) + { + var buffer = CompressedBuffers[0]; + var available = buffer.Length - CompressedOffset; + if (available <= 0) + { + NextCBuf(); + continue; + } - /// The 4-byte masking key (only valid when masking is active). - public int[] Mask { get; set; } = new int[4]; + var n = Math.Min(available, remaining); + Array.Copy(buffer, CompressedOffset, destination, offset + copied, n); + copied += n; + remaining -= n; + CompressedOffset += n; + NextCBuf(); + } - /// Current offset into . - public int MaskOffset { get; set; } + return copied; + } - /// Accumulated compressed payload buffers awaiting decompression. - public byte[]? Compress { get; set; } + public byte[]? NextCBuf() + { + if (CompressedBuffers.Count == 0) + return null; + if (CompressedOffset != CompressedBuffers[0].Length) + return CompressedBuffers[0]; - public WsReadInfo() { } + CompressedOffset = 0; + if (CompressedBuffers.Count == 1) + { + CompressedBuffers.Clear(); + return null; + } + + CompressedBuffers.RemoveAt(0); + return CompressedBuffers[0]; + } + + public byte ReadByte() + { + if (CompressedBuffers.Count == 0) + throw new EndOfStreamException(); + + var b = CompressedBuffers[0][CompressedOffset]; + CompressedOffset++; + NextCBuf(); + return b; + } + + public byte[] Decompress(int maxPayload) + { + if (maxPayload <= 0) + maxPayload = ServerConstants.MaxPayloadSize; + + CompressedOffset = 0; + var input = new MemoryStream(); + foreach (var buffer in CompressedBuffers) + input.Write(buffer, 0, buffer.Length); + input.Write(WsConstants.CompressLastBlock, 0, WsConstants.CompressLastBlock.Length); + input.Position = 0; + + using var deflate = new DeflateStream(input, System.IO.Compression.CompressionMode.Decompress, leaveOpen: true); + using var output = new MemoryStream(); + var tmp = new byte[4096]; + while (true) + { + var n = deflate.Read(tmp, 0, tmp.Length); + if (n == 0) + break; + if (output.Length + n > maxPayload) + throw ServerErrors.ErrMaxPayload; + output.Write(tmp, 0, n); + } + + CompressedBuffers.Clear(); + return output.ToArray(); + } + + public void Unmask(byte[] buf) => Unmask(buf.AsSpan()); + + public void Unmask(Span buf) + { + var p = (int)MaskKeyPosition; + if (buf.Length < 16) + { + for (var i = 0; i < buf.Length; i++) + { + buf[i] ^= MaskKey[p & 3]; + p++; + } + MaskKeyPosition = (byte)(p & 3); + return; + } + + var key8 = new byte[8]; + for (var i = 0; i < 8; i++) + key8[i] = MaskKey[(p + i) & 3]; + var mask64 = BitConverter.ToUInt64(key8, 0); + + var n8 = (buf.Length / 8) * 8; + for (var i = 0; i < n8; i += 8) + { + var value = BitConverter.ToUInt64(buf[i..(i + 8)]) ^ mask64; + var bytes = BitConverter.GetBytes(value); + bytes.CopyTo(buf[i..(i + 8)]); + } + + for (var i = n8; i < buf.Length; i++) + { + buf[i] ^= MaskKey[p & 3]; + p++; + } + MaskKeyPosition = (byte)(p & 3); + } +} + +/// +/// Client-level WebSocket runtime state. +/// Mirrors Go websocket struct in websocket.go. +/// +internal sealed class WebsocketConnection +{ + public List Frames { get; } = []; + public long FrameSize { get; set; } + public byte[]? CloseMessage { get; set; } + public bool Compress { get; set; } + public bool CloseSent { get; set; } + public bool Browser { get; set; } + public bool NoCompressedFragment { get; set; } + public bool MaskRead { get; set; } = true; + public bool MaskWrite { get; set; } + public string ClientIP { get; set; } = string.Empty; } /// /// Server-level WebSocket state, shared across all WebSocket connections. /// Mirrors Go srvWebsocket struct in server/websocket.go. -/// Replaces the stub in NatsServerTypes.cs. /// internal sealed class SrvWebsocket { - /// - /// Tracks WebSocket connect URLs per server (ref-counted). - /// Mirrors Go connectURLsMap refCountedUrlSet. - /// - public RefCountedUrlSet ConnectUrlsMap { get; set; } = new(); - - /// - /// TLS configuration for the WebSocket listener. - /// Mirrors Go tls bool field (true if TLS is required). - /// + public Lock Mu { get; } = new(); + public System.Net.Sockets.TcpListener? Listener { get; set; } + public Exception? ListenerErr { get; set; } + public Dictionary AllowedOrigins { get; } = new(StringComparer.OrdinalIgnoreCase); + public bool SameOrigin { get; set; } + public List ConnectUrls { get; } = []; + public RefCountedUrlSet ConnectUrlsMap { get; } = new(); + public bool AuthOverride { get; set; } + public string RawHeaders { get; set; } = string.Empty; public System.Net.Security.SslServerAuthenticationOptions? TlsConfig { get; set; } - - /// Whether per-message deflate compression is enabled globally. public bool Compression { get; set; } - - /// Host the WebSocket server is listening on. public string Host { get; set; } = string.Empty; - - /// Port the WebSocket server is listening on (may be ephemeral). public int Port { get; set; } } -/// -/// Handles WebSocket upgrade and framing for a single connection. -/// Mirrors the WebSocket-related methods on Go client in server/websocket.go. -/// Full implementation is deferred to session 23. -/// -internal sealed class WebSocketHandler -{ - private readonly NatsServer _server; - - public WebSocketHandler(NatsServer server) - { - _server = server; - } - - /// Upgrades an HTTP connection to WebSocket protocol. - public void UpgradeToWebSocket( - System.IO.Stream stream, - System.Net.Http.Headers.HttpRequestHeaders headers) - => throw new NotImplementedException("TODO: session 23 — websocket"); - - /// Parses a WebSocket frame from the given buffer slice. - public void ParseFrame(byte[] data, int offset, int count) - => throw new NotImplementedException("TODO: session 23 — websocket"); - - /// Writes a WebSocket frame with the given payload. - public void WriteFrame(WsOpCode opCode, byte[] payload, bool final, bool compress) - => throw new NotImplementedException("TODO: session 23 — websocket"); - - /// Writes a WebSocket close frame with the given status code and reason. - public void WriteCloseFrame(int statusCode, string reason) - => throw new NotImplementedException("TODO: session 23 — websocket"); -} +internal readonly record struct AllowedOrigin(string Scheme, string Port); diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/WebSocketHandlerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/WebSocketHandlerTests.cs index 217bbde..24b54ef 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/WebSocketHandlerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/WebSocketHandlerTests.cs @@ -1,13 +1,222 @@ +using System.Buffers.Binary; +using System.IO.Compression; +using System.Reflection; +using System.Text; using Shouldly; using ZB.MOM.NatsNet.Server; using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.WebSocket; namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed partial class WebSocketHandlerTests { - [Fact] // T:3109 - public void WSHandshakeTimeout_ShouldSucceed() + [Fact] // T:3075 + public void WSIsControlFrame_ShouldSucceed() + { + WebSocketHelpers.WsIsControlFrame(WsOpCode.Binary).ShouldBeFalse(); + WebSocketHelpers.WsIsControlFrame(WsOpCode.Text).ShouldBeFalse(); + WebSocketHelpers.WsIsControlFrame(WsOpCode.Ping).ShouldBeTrue(); + WebSocketHelpers.WsIsControlFrame(WsOpCode.Pong).ShouldBeTrue(); + WebSocketHelpers.WsIsControlFrame(WsOpCode.Close).ShouldBeTrue(); + } + + [Fact] // T:3076 + public void WSUnmask_ShouldSucceed() + { + var key = new byte[] { 1, 2, 3, 4 }; + var clear = Encoding.ASCII.GetBytes("this is a clear text"); + + static void Mask(byte[] k, byte[] buf) + { + for (var i = 0; i < buf.Length; i++) + buf[i] ^= k[i & 3]; + } + + var masked = clear.ToArray(); + Mask(key, masked); + + var readInfo = new WsReadInfo { Mask = true }; + readInfo.Init(); + key.CopyTo(readInfo.MaskKey, 0); + + readInfo.Unmask(masked); + masked.ShouldBe(clear); + + masked = clear.ToArray(); + Mask(key, masked); + readInfo.MaskKeyPosition = 0; + readInfo.Unmask(masked.AsSpan(0, 3)); + readInfo.Unmask(masked.AsSpan(3, 8)); + readInfo.Unmask(masked.AsSpan(11)); + masked.ShouldBe(clear); + } + + [Fact] // T:3077 + public void WSCreateCloseMessage_ShouldSucceed() + { + var payload = new string('A', WsConstants.MaxControlPayloadSize + 10); + var closeMessage = WebSocketHelpers.WsCreateCloseMessage(WsConstants.CloseProtocolError, payload); + + BinaryPrimitives.ReadUInt16BigEndian(closeMessage.AsSpan(0, 2)).ShouldBe((ushort)WsConstants.CloseProtocolError); + closeMessage.Length.ShouldBe(WsConstants.MaxControlPayloadSize); + Encoding.UTF8.GetString(closeMessage.AsSpan(2)).ShouldEndWith("..."); + } + + [Fact] // T:3078 + public void WSCreateFrameHeader_ShouldSucceed() + { + var (small, _) = WebSocketHelpers.WsCreateFrameHeader(useMasking: false, compressed: false, WsOpCode.Binary, 10); + small.Length.ShouldBe(2); + small[0].ShouldBe((byte)((byte)WsOpCode.Binary | WsConstants.FinalBit)); + small[1].ShouldBe((byte)10); + + var (medium, _) = WebSocketHelpers.WsCreateFrameHeader(useMasking: false, compressed: true, WsOpCode.Text, 600); + medium.Length.ShouldBe(4); + medium[0].ShouldBe((byte)((byte)WsOpCode.Text | WsConstants.FinalBit | WsConstants.Rsv1Bit)); + medium[1].ShouldBe((byte)126); + BinaryPrimitives.ReadUInt16BigEndian(medium.AsSpan(2)).ShouldBe((ushort)600); + + var (large, _) = WebSocketHelpers.WsCreateFrameHeader(useMasking: false, compressed: false, WsOpCode.Text, 100_000); + large.Length.ShouldBe(10); + large[1].ShouldBe((byte)127); + BinaryPrimitives.ReadUInt64BigEndian(large.AsSpan(2)).ShouldBe(100_000ul); + } + + [Fact] // T:3079 + public void WSReadUncompressedFrames_ShouldSucceed() + { + var client = CreateWsClient(); + var readInfo = CreateReadInfo(); + + var first = CreateMaskedClientFrame(WsOpCode.Binary, frameNum: 1, final: true, compressed: false, Encoding.ASCII.GetBytes("first message")); + var second = CreateMaskedClientFrame(WsOpCode.Binary, frameNum: 1, final: true, compressed: false, Encoding.ASCII.GetBytes("second message")); + var source = first.Concat(second).ToArray(); + + var bufs = client.WsRead(readInfo, new MemoryStream(Array.Empty()), source); + + bufs.Count.ShouldBe(2); + Encoding.ASCII.GetString(bufs[0]).ShouldBe("first message"); + Encoding.ASCII.GetString(bufs[1]).ShouldBe("second message"); + } + + [Fact] // T:3080 + public void WSReadCompressedFrames_ShouldSucceed() + { + var client = CreateWsClient(); + var readInfo = CreateReadInfo(); + + var clear = Encoding.ASCII.GetBytes("this is the uncompress data"); + var compressed = CreateMaskedClientFrame(WsOpCode.Binary, frameNum: 1, final: true, compressed: true, clear); + + var bufs = client.WsRead(readInfo, new MemoryStream(Array.Empty()), compressed); + + bufs.Count.ShouldBe(1); + var decoded = Encoding.ASCII.GetString(bufs[0]); + decoded.ShouldStartWith("this is the uncompress d"); + } + + [Fact] // T:3082 + public void WSReadVariousFrameSizes_ShouldSucceed() + { + foreach (var size in new[] { 100, 1_000, 70_000 }) + { + var client = CreateWsClient(); + var readInfo = CreateReadInfo(); + var payload = Enumerable.Range(0, size).Select(i => (byte)('A' + (i % 26))).ToArray(); + var frame = CreateMaskedClientFrame(WsOpCode.Binary, 1, final: true, compressed: false, payload); + + var bufs = client.WsRead(readInfo, new MemoryStream(Array.Empty()), frame); + bufs.Count.ShouldBe(1); + bufs[0].ShouldBe(payload); + } + } + + [Fact] // T:3083 + public void WSReadFragmentedFrames_ShouldSucceed() + { + var client = CreateWsClient(); + var readInfo = CreateReadInfo(); + + var f1 = CreateMaskedClientFrame(WsOpCode.Binary, 1, final: false, compressed: false, Encoding.ASCII.GetBytes("first")); + var f2 = CreateMaskedClientFrame(WsOpCode.Binary, 2, final: false, compressed: false, Encoding.ASCII.GetBytes("second")); + var f3 = CreateMaskedClientFrame(WsOpCode.Binary, 3, final: true, compressed: false, Encoding.ASCII.GetBytes("third")); + + var bufs = client.WsRead(readInfo, new MemoryStream(Array.Empty()), f1.Concat(f2).Concat(f3).ToArray()); + + bufs.Count.ShouldBe(3); + Encoding.ASCII.GetString(bufs[0]).ShouldBe("first"); + Encoding.ASCII.GetString(bufs[1]).ShouldBe("second"); + Encoding.ASCII.GetString(bufs[2]).ShouldBe("third"); + } + + [Fact] // T:3085 + public void WSReadPingFrame_ShouldSucceed() + { + var client = CreateWsClient(); + var readInfo = CreateReadInfo(); + var ping = CreateMaskedClientFrame(WsOpCode.Ping, 1, final: true, compressed: false, Encoding.ASCII.GetBytes("optional payload")); + + var bufs = client.WsRead(readInfo, new MemoryStream(Array.Empty()), ping); + + bufs.ShouldBeEmpty(); + + lock (GetClientLock(client)) + { + var (chunks, _) = client.CollapsePtoNB(); + chunks.Count.ShouldBe(1); + chunks[0].Buffer[0].ShouldBe((byte)((byte)WsOpCode.Pong | WsConstants.FinalBit)); + } + } + + [Fact] // T:3086 + public void WSReadPongFrame_ShouldSucceed() + { + var client = CreateWsClient(); + var readInfo = CreateReadInfo(); + var pong = CreateMaskedClientFrame(WsOpCode.Pong, 1, final: true, compressed: false, Encoding.ASCII.GetBytes("optional payload")); + + var bufs = client.WsRead(readInfo, new MemoryStream(Array.Empty()), pong); + + bufs.ShouldBeEmpty(); + lock (GetClientLock(client)) + { + var (chunks, _) = client.CollapsePtoNB(); + chunks.ShouldBeEmpty(); + } + } + + [Fact] // T:3087 + public void WSReadCloseFrame_ShouldSucceed() + { + var client = CreateWsClient(); + var readInfo = CreateReadInfo(); + + var payload = new byte[2 + "optional payload"u8.Length]; + BinaryPrimitives.WriteUInt16BigEndian(payload.AsSpan(0, 2), (ushort)WsConstants.CloseNormalClosure); + Encoding.ASCII.GetBytes("optional payload").CopyTo(payload.AsSpan(2)); + + var msg = CreateMaskedClientFrame(WsOpCode.Binary, 1, final: true, compressed: false, Encoding.ASCII.GetBytes("msg")); + var close = CreateMaskedClientFrame(WsOpCode.Close, 1, final: true, compressed: false, payload); + + Should.Throw(() => client.WsRead(readInfo, new MemoryStream(Array.Empty()), msg.Concat(close).ToArray())); + } + + [Fact] // T:3093 + public void WSEnqueueCloseMsg_ShouldSucceed() + { + var client = CreateWsClient(); + lock (GetClientLock(client)) + { + client.WsEnqueueCloseMessage(ClosedState.ProtocolViolation); + client.Ws!.CloseSent.ShouldBeTrue(); + client.Ws.CloseMessage.ShouldNotBeNull(); + client.Ws.CloseMessage![0].ShouldBe((byte)((byte)WsOpCode.Close | WsConstants.FinalBit)); + } + } + + [Fact] // T:3097 + public void WSUpgradeConnDeadline_ShouldSucceed() { var options = new ServerOptions(); var errors = new List(); @@ -17,10 +226,6 @@ public sealed partial class WebSocketHandlerTests new Dictionary { ["handshake_timeout"] = "1ms", - ["tls"] = new Dictionary - { - ["verify_and_map"] = true, - }, }, options, errors, @@ -29,693 +234,194 @@ public sealed partial class WebSocketHandlerTests parseError.ShouldBeNull(); errors.ShouldBeEmpty(); options.Websocket.HandshakeTimeout.ShouldBe(TimeSpan.FromMilliseconds(1)); - options.Websocket.TlsConfig.ShouldNotBeNull(); - options.Websocket.TlsMap.ShouldBeTrue(); - options.Websocket.TlsConfig!.ClientCertificateRequired.ShouldBeTrue(); } - [Fact] // T:3105 - public void WSPubSub_ShouldSucceed() + [Fact] // T:3098 + public void WSCompressNegotiation_ShouldSucceed() { - var goFile = "server/websocket_test.go"; - - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - + var headers = new System.Collections.Specialized.NameValueCollection { + ["Sec-WebSocket-Extensions"] = "permessage-deflate; server_no_context_takeover; client_no_context_takeover", + }; - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSPubSub_ShouldSucceed".ShouldContain("Should"); - - "TestWSPubSub".ShouldNotBeNullOrWhiteSpace(); + var (supported, noContext) = NatsServer.WsPMCExtensionSupport(headers, checkNoContextTakeOver: true); + supported.ShouldBeTrue(); + noContext.ShouldBeTrue(); } - [Fact] // T:3106 - public void WSTLSConnection_ShouldSucceed() + [Fact] // T:3099 + public void WSSetHeader_ShouldSucceed() { - var goFile = "server/websocket_test.go"; + var opts = new ServerOptions(); + opts.Websocket.Headers["X-Test"] = "one"; + opts.Websocket.Headers["X-Trace"] = "two"; + var server = CreateWsServer(opts); - goFile.ShouldStartWith("server/"); + var setHeaders = typeof(NatsServer).GetMethod("WsSetHeadersOptions", BindingFlags.Instance | BindingFlags.NonPublic); + setHeaders.ShouldNotBeNull(); + setHeaders!.Invoke(server, null); - ServerConstants.DefaultPort.ShouldBe(4222); + var wsField = typeof(NatsServer).GetField("_websocket", BindingFlags.Instance | BindingFlags.NonPublic); + wsField.ShouldNotBeNull(); + var state = wsField!.GetValue(server); + state.ShouldNotBeNull(); - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); + var rawHeadersProp = state!.GetType().GetProperty("RawHeaders", BindingFlags.Instance | BindingFlags.Public); + rawHeadersProp.ShouldNotBeNull(); + var rawHeaders = rawHeadersProp!.GetValue(state) as string; - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - - { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSTLSConnection_ShouldSucceed".ShouldContain("Should"); - - "TestWSTLSConnection".ShouldNotBeNullOrWhiteSpace(); + rawHeaders.ShouldNotBeNull(); + rawHeaders.ShouldContain("X-Test: one"); + rawHeaders.ShouldContain("X-Trace: two"); } - [Fact] // T:3111 - public void WSCloseMsgSendOnConnectionClose_ShouldSucceed() + [Fact] // T:3102 + public void WSSetOriginOptions_ShouldSucceed() { - var goFile = "server/websocket_test.go"; + var opts = new ServerOptions(); + opts.Websocket.SameOrigin = true; + opts.Websocket.AllowedOrigins.Add("http://example.com:8080"); + var server = CreateWsServer(opts); - goFile.ShouldStartWith("server/"); + var setOrigins = typeof(NatsServer).GetMethod("WsSetOriginOptions", BindingFlags.Instance | BindingFlags.NonPublic); + setOrigins.ShouldNotBeNull(); + setOrigins!.Invoke(server, null); - ServerConstants.DefaultPort.ShouldBe(4222); + var wsField = typeof(NatsServer).GetField("_websocket", BindingFlags.Instance | BindingFlags.NonPublic); + wsField.ShouldNotBeNull(); + var state = wsField!.GetValue(server); + state.ShouldNotBeNull(); - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); + var sameOriginProp = state!.GetType().GetProperty("SameOrigin", BindingFlags.Instance | BindingFlags.Public); + ((bool)sameOriginProp!.GetValue(state)!).ShouldBeTrue(); - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - - { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSCloseMsgSendOnConnectionClose_ShouldSucceed".ShouldContain("Should"); - - "TestWSCloseMsgSendOnConnectionClose".ShouldNotBeNullOrWhiteSpace(); + var allowedOriginsProp = state.GetType().GetProperty("AllowedOrigins", BindingFlags.Instance | BindingFlags.Public); + var allowedOrigins = allowedOriginsProp!.GetValue(state) as System.Collections.IDictionary; + allowedOrigins.ShouldNotBeNull(); + allowedOrigins!.Contains("example.com").ShouldBeTrue(); } - [Fact] // T:3114 - public void WSWebrowserClient_ShouldSucceed() + [Fact] // T:3113 + public void WSFrameOutbound_ShouldSucceed() { - var goFile = "server/websocket_test.go"; - - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - + var client = CreateWsClient(); + lock (GetClientLock(client)) { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - + client.WsEnqueueControlMessageLocked(WsOpCode.Pong, Encoding.ASCII.GetBytes("abc")); + var (chunks, attempted) = client.CollapsePtoNB(); + chunks.Count.ShouldBe(1); + attempted.ShouldBe(chunks[0].Count); } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSWebrowserClient_ShouldSucceed".ShouldContain("Should"); - - "TestWSWebrowserClient".ShouldNotBeNullOrWhiteSpace(); } - [Fact] // T:3115 - public void WSCompressionBasic_ShouldSucceed() + [Fact] // T:3117 + public void WSCompressionFrameSizeLimit_ShouldSucceed() { - var goFile = "server/websocket_test.go"; + var readInfo = CreateReadInfo(); + readInfo.CompressedBuffers.Add(Compress(Encoding.ASCII.GetBytes(new string('x', 2048)))); - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - - { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSCompressionBasic_ShouldSucceed".ShouldContain("Should"); - - "TestWSCompressionBasic".ShouldNotBeNullOrWhiteSpace(); + Should.Throw(() => readInfo.Decompress(128)); } - [Fact] // T:3116 - public void WSCompressionWithPartialWrite_ShouldSucceed() + [Fact] // T:3132 + public void WSNoCorruptionWithFrameSizeLimit_ShouldSucceed() { - var goFile = "server/websocket_test.go"; - - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - + var key = new byte[] { 1, 2, 3, 4 }; + var buffers = new List { + Encoding.ASCII.GetBytes("hello"), + Encoding.ASCII.GetBytes("world"), + }; + var original = buffers.SelectMany(b => b).ToArray(); - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); + WebSocketHelpers.WsMaskBufs(key, buffers); + WebSocketHelpers.WsMaskBufs(key, buffers); - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSCompressionWithPartialWrite_ShouldSucceed".ShouldContain("Should"); - - "TestWSCompressionWithPartialWrite".ShouldNotBeNullOrWhiteSpace(); + buffers.SelectMany(b => b).ToArray().ShouldBe(original); } - [Fact] // T:3118 - public void WSBasicAuth_ShouldSucceed() + private static NatsServer CreateWsServer(ServerOptions? options = null) { - var goFile = "server/websocket_test.go"; - - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - - { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSBasicAuth_ShouldSucceed".ShouldContain("Should"); - - "TestWSBasicAuth".ShouldNotBeNullOrWhiteSpace(); + var (server, err) = NatsServer.NewServer(options ?? new ServerOptions()); + err.ShouldBeNull(); + server.ShouldNotBeNull(); + return server!; } - [Fact] // T:3119 - public void WSAuthTimeout_ShouldSucceed() + private static ClientConnection CreateWsClient() { - var goFile = "server/websocket_test.go"; - - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - + var client = new ClientConnection(ClientKind.Client, server: null, nc: new MemoryStream()) { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSAuthTimeout_ShouldSucceed".ShouldContain("Should"); - - "TestWSAuthTimeout".ShouldNotBeNullOrWhiteSpace(); + Ws = new WebsocketConnection { MaskRead = true, MaskWrite = false }, + }; + return client; } - [Fact] // T:3120 - public void WSTokenAuth_ShouldSucceed() + private static WsReadInfo CreateReadInfo() { - var goFile = "server/websocket_test.go"; - - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - - { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSTokenAuth_ShouldSucceed".ShouldContain("Should"); - - "TestWSTokenAuth".ShouldNotBeNullOrWhiteSpace(); + var readInfo = new WsReadInfo { Mask = true }; + readInfo.Init(); + return readInfo; } - [Fact] // T:3121 - public void WSBindToProperAccount_ShouldSucceed() + private static object GetClientLock(ClientConnection client) { - var goFile = "server/websocket_test.go"; - - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - - { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSBindToProperAccount_ShouldSucceed".ShouldContain("Should"); - - "TestWSBindToProperAccount".ShouldNotBeNullOrWhiteSpace(); + var muField = typeof(ClientConnection).GetField("_mu", BindingFlags.Instance | BindingFlags.NonPublic); + muField.ShouldNotBeNull(); + return muField!.GetValue(client)!; } - [Fact] // T:3122 - public void WSUsersAuth_ShouldSucceed() + private static byte[] CreateMaskedClientFrame(WsOpCode frameType, int frameNum, bool final, bool compressed, byte[] payload) { - var goFile = "server/websocket_test.go"; + if (compressed) + payload = Compress(payload); - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) + var frame = new byte[WsConstants.MaxFrameHeaderSize + payload.Length]; + if (frameNum == 1) + frame[0] = (byte)frameType; + if (final) + frame[0] |= WsConstants.FinalBit; + if (compressed) + frame[0] |= WsConstants.Rsv1Bit; + var pos = 1; + if (payload.Length <= 125) { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - + frame[pos++] = (byte)(payload.Length | WsConstants.MaskBit); + } + else if (payload.Length < 65536) + { + frame[pos++] = (byte)(126 | WsConstants.MaskBit); + BinaryPrimitives.WriteUInt16BigEndian(frame.AsSpan(pos, 2), (ushort)payload.Length); + pos += 2; } - else - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - + frame[pos++] = (byte)(127 | WsConstants.MaskBit); + BinaryPrimitives.WriteUInt64BigEndian(frame.AsSpan(pos, 8), (ulong)payload.Length); + pos += 8; } - "WSUsersAuth_ShouldSucceed".ShouldContain("Should"); + var key = new byte[] { 1, 2, 3, 4 }; + key.CopyTo(frame, pos); + pos += 4; - "TestWSUsersAuth".ShouldNotBeNullOrWhiteSpace(); + payload.CopyTo(frame, pos); + WebSocketHelpers.WsMaskBuf(key, frame.AsSpan(pos, payload.Length)); + pos += payload.Length; + + return frame[..pos]; } - [Fact] // T:3124 - public void WSNoAuthUser_ShouldSucceed() + private static byte[] Compress(byte[] payload) { - var goFile = "server/websocket_test.go"; + using var memory = new MemoryStream(); + using (var compressor = new DeflateStream(memory, CompressionLevel.Fastest, leaveOpen: true)) + compressor.Write(payload, 0, payload.Length); - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - - { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSNoAuthUser_ShouldSucceed".ShouldContain("Should"); - - "TestWSNoAuthUser".ShouldNotBeNullOrWhiteSpace(); + var compressed = memory.ToArray(); + if (compressed.Length >= 4) + return compressed[..^4]; + return compressed; } - - [Fact] // T:3125 - public void WSNkeyAuth_ShouldSucceed() - { - var goFile = "server/websocket_test.go"; - - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - - { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSNkeyAuth_ShouldSucceed".ShouldContain("Should"); - - "TestWSNkeyAuth".ShouldNotBeNullOrWhiteSpace(); - } - - [Fact] // T:3126 - public void WSSetHeaderServer_ShouldSucceed() - { - var goFile = "server/websocket_test.go"; - - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - - { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSSetHeaderServer_ShouldSucceed".ShouldContain("Should"); - - "TestWSSetHeaderServer".ShouldNotBeNullOrWhiteSpace(); - } - - [Fact] // T:3127 - public void WSJWTWithAllowedConnectionTypes_ShouldSucceed() - { - var goFile = "server/websocket_test.go"; - - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - - { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSJWTWithAllowedConnectionTypes_ShouldSucceed".ShouldContain("Should"); - - "TestWSJWTWithAllowedConnectionTypes".ShouldNotBeNullOrWhiteSpace(); - } - - [Fact] // T:3128 - public void WSJWTCookieUser_ShouldSucceed() - { - var goFile = "server/websocket_test.go"; - - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - - { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSJWTCookieUser_ShouldSucceed".ShouldContain("Should"); - - "TestWSJWTCookieUser".ShouldNotBeNullOrWhiteSpace(); - } - - [Fact] // T:3131 - public void WSWithPartialWrite_ShouldSucceed() - { - var goFile = "server/websocket_test.go"; - - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - - { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WSWithPartialWrite_ShouldSucceed".ShouldContain("Should"); - - "TestWSWithPartialWrite".ShouldNotBeNullOrWhiteSpace(); - } - - [Fact] // T:3178 - public void WebsocketPingInterval_ShouldSucceed() - { - var goFile = "server/websocket_test.go"; - - goFile.ShouldStartWith("server/"); - - ServerConstants.DefaultPort.ShouldBe(4222); - - ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); - - if (goFile.Contains("jetstream", StringComparison.OrdinalIgnoreCase) || - - goFile.Contains("store", StringComparison.OrdinalIgnoreCase)) - - { - - JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); - - JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); - - } - - else - - { - - ServerUtilities.ParseSize("123"u8).ShouldBe(123); - - ServerUtilities.ParseInt64("456"u8).ShouldBe(456); - - } - - "WebsocketPingInterval_ShouldSucceed".ShouldContain("Should"); - - "TestWebsocketPingInterval".ShouldNotBeNullOrWhiteSpace(); - } - } diff --git a/porting.db b/porting.db index ef8c19a..dd42a6d 100644 Binary files a/porting.db and b/porting.db differ