feat(batch26): merge websocket

This commit is contained in:
Joseph Doherty
2026-02-28 21:57:21 -05:00
18 changed files with 1734 additions and 708 deletions

View File

@@ -0,0 +1,253 @@
// Copyright 2012-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
using System.Buffers.Binary;
using System.Text;
using ZB.MOM.NatsNet.Server.WebSocket;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class ClientConnection
{
internal (List<OutboundChunk> chunks, long attempted) WsCollapsePtoNB() => CollapsePtoNB();
internal List<byte[]> WsRead(WsReadInfo readInfo, Stream reader, byte[] buffer)
{
var bufs = new List<byte[]>();
var pos = 0;
var max = buffer.Length;
var maxPayload = Volatile.Read(ref _mpay);
while (pos != max)
{
if (readInfo.FrameStart)
{
var b0 = buffer[pos];
var frameType = (WsOpCode)(b0 & 0xF);
var final = (b0 & WsConstants.FinalBit) != 0;
var compressed = (b0 & WsConstants.Rsv1Bit) != 0;
pos++;
var (firstLen, newPos) = WebSocketHelpers.WsGet(reader, buffer, pos, 1);
pos = newPos;
var b1 = firstLen[0];
if (readInfo.Mask && (b1 & WsConstants.MaskBit) == 0)
throw WsHandleProtocolError("mask bit missing");
readInfo.Rem = b1 & 0x7F;
switch (frameType)
{
case WsOpCode.Ping:
case WsOpCode.Pong:
case WsOpCode.Close:
if (readInfo.Rem > WsConstants.MaxControlPayloadSize)
throw WsHandleProtocolError($"control frame length bigger than maximum allowed of {WsConstants.MaxControlPayloadSize} bytes");
if (!final)
throw WsHandleProtocolError("control frame does not have final bit set");
break;
case WsOpCode.Text:
case WsOpCode.Binary:
if (!readInfo.FinalFrameReceived)
throw WsHandleProtocolError("new message started before final frame for previous message was received");
readInfo.FinalFrameReceived = final;
readInfo.FrameCompressed = compressed;
break;
case WsOpCode.Continuation:
if (readInfo.FinalFrameReceived || compressed)
throw WsHandleProtocolError("invalid continuation frame");
readInfo.FinalFrameReceived = final;
break;
default:
throw WsHandleProtocolError($"unknown opcode {(int)frameType}");
}
if (readInfo.Rem == 126)
{
var (extended, p) = WebSocketHelpers.WsGet(reader, buffer, pos, 2);
pos = p;
readInfo.Rem = BinaryPrimitives.ReadUInt16BigEndian(extended);
}
else if (readInfo.Rem == 127)
{
var (extended, p) = WebSocketHelpers.WsGet(reader, buffer, pos, 8);
pos = p;
readInfo.Rem = checked((int)BinaryPrimitives.ReadUInt64BigEndian(extended));
}
if (readInfo.Mask)
{
var (maskKey, p) = WebSocketHelpers.WsGet(reader, buffer, pos, 4);
pos = p;
Array.Copy(maskKey, 0, readInfo.MaskKey, 0, 4);
readInfo.MaskKeyPosition = 0;
}
if (WebSocketHelpers.WsIsControlFrame(frameType))
{
pos = WsHandleControlFrame(readInfo, frameType, reader, buffer, pos);
continue;
}
readInfo.FrameStart = false;
}
if (pos >= max)
continue;
var n = readInfo.Rem;
if (pos + n > max)
n = max - pos;
var payload = buffer.AsSpan(pos, n).ToArray();
pos += n;
readInfo.Rem -= n;
if (readInfo.Mask)
readInfo.Unmask(payload);
var addToBufs = true;
if (readInfo.FrameCompressed)
{
addToBufs = false;
readInfo.CompressedBuffers.Add(payload);
if (readInfo.FinalFrameReceived && readInfo.Rem == 0)
{
payload = readInfo.Decompress(maxPayload);
readInfo.FrameCompressed = false;
addToBufs = true;
}
}
if (addToBufs)
bufs.Add(payload);
if (readInfo.Rem == 0)
readInfo.FrameStart = true;
}
return bufs;
}
internal int WsHandleControlFrame(WsReadInfo readInfo, WsOpCode frameType, Stream networkConnection, byte[] buffer, int pos)
{
byte[] payload = [];
if (readInfo.Rem > 0)
{
(payload, pos) = WebSocketHelpers.WsGet(networkConnection, buffer, pos, readInfo.Rem);
if (readInfo.Mask)
readInfo.Unmask(payload);
readInfo.Rem = 0;
}
switch (frameType)
{
case WsOpCode.Close:
{
var status = WsConstants.CloseNoStatusReceived;
string body = string.Empty;
var payloadLength = payload.Length;
var hasStatus = payloadLength >= WsConstants.CloseStatusSize;
var hasBody = payloadLength > WsConstants.CloseStatusSize;
if (hasStatus)
{
status = BinaryPrimitives.ReadUInt16BigEndian(payload.AsSpan(0, 2));
if (hasBody)
{
body = Encoding.UTF8.GetString(payload.AsSpan(WsConstants.CloseStatusSize));
if (!Encoding.UTF8.GetBytes(body).AsSpan().SequenceEqual(payload.AsSpan(WsConstants.CloseStatusSize)))
{
status = WsConstants.CloseInvalidPayloadData;
body = "invalid utf8 body in close frame";
}
}
}
byte[]? closeMessage = null;
if (status != WsConstants.CloseNoStatusReceived)
closeMessage = WebSocketHelpers.WsCreateCloseMessage(status, body);
WsEnqueueControlMessage(WsOpCode.Close, closeMessage ?? []);
throw new EndOfStreamException();
}
case WsOpCode.Ping:
WsEnqueueControlMessage(WsOpCode.Pong, payload);
break;
case WsOpCode.Pong:
break;
}
return pos;
}
internal void WsEnqueueControlMessage(WsOpCode controlMessage, byte[] payload)
{
lock (_mu)
WsEnqueueControlMessageLocked(controlMessage, payload);
}
internal void WsEnqueueControlMessageLocked(WsOpCode controlMessage, byte[] payload)
{
if (Ws == null)
Ws = new WebsocketConnection();
var useMasking = Ws.MaskWrite;
var headerSize = 2 + (useMasking ? 4 : 0);
var control = NbPool.Get(headerSize + payload.Length);
var (n, key) = WebSocketHelpers.WsFillFrameHeader(control, useMasking, first: true, final: true, compressed: false, controlMessage, payload.Length);
var totalLen = n;
if (payload.Length > 0)
{
Array.Copy(payload, 0, control, n, payload.Length);
if (useMasking && key != null)
WebSocketHelpers.WsMaskBuf(key, control.AsSpan(n, payload.Length));
totalLen += payload.Length;
}
var frame = control[..totalLen];
OutPb += totalLen;
if (controlMessage == WsOpCode.Close)
{
Ws.CloseSent = true;
Ws.CloseMessage = frame;
}
else
{
Ws.Frames.Add(frame);
Ws.FrameSize += frame.Length;
}
FlushSignal();
}
internal void WsEnqueueCloseMessage(ClosedState reason)
{
var status = reason switch
{
ClosedState.ClientClosed => WsConstants.CloseNormalClosure,
ClosedState.AuthenticationTimeout or ClosedState.AuthenticationViolation or ClosedState.SlowConsumerPendingBytes or
ClosedState.SlowConsumerWriteDeadline or ClosedState.MaxAccountConnectionsExceeded or ClosedState.MaxConnectionsExceeded or
ClosedState.MaxControlLineExceeded or ClosedState.MaxSubscriptionsExceeded or ClosedState.MissingAccount or
ClosedState.AuthenticationExpired or ClosedState.Revocation => WsConstants.ClosePolicyViolation,
ClosedState.TlsHandshakeError => WsConstants.CloseTlsHandshake,
ClosedState.ParseError or ClosedState.ProtocolViolation or ClosedState.BadClientProtocolVersion => WsConstants.CloseProtocolError,
ClosedState.MaxPayloadExceeded => WsConstants.CloseMessageTooBig,
ClosedState.WriteError or ClosedState.ReadError or ClosedState.StaleConnection or ClosedState.ServerShutdown => WsConstants.CloseGoingAway,
_ => WsConstants.CloseInternalError,
};
var body = WebSocketHelpers.WsCreateCloseMessage(status, reason.ToString());
WsEnqueueControlMessageLocked(WsOpCode.Close, body);
}
internal Exception WsHandleProtocolError(string message)
{
var payload = WebSocketHelpers.WsCreateCloseMessage(WsConstants.CloseProtocolError, message);
WsEnqueueControlMessage(WsOpCode.Close, payload);
return new InvalidDataException(message);
}
}

View File

@@ -26,6 +26,7 @@ using ZB.MOM.NatsNet.Server.Auth;
using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
using ZB.MOM.NatsNet.Server.Protocol;
using ZB.MOM.NatsNet.Server.WebSocket;
namespace ZB.MOM.NatsNet.Server;
@@ -113,6 +114,7 @@ public sealed partial class ClientConnection
// Client options (from CONNECT message).
internal ClientOptions Opts = ClientOptions.Default;
internal Route? Route;
internal WebsocketConnection? Ws;
// Flags and state.
internal ClientFlags Flags; // mirrors c.flags clientFlag
@@ -1484,10 +1486,26 @@ public sealed partial class ClientConnection
internal (List<OutboundChunk> chunks, long attempted) CollapsePtoNB()
{
var chunks = OutNb;
if (Ws != null && Ws.Frames.Count > 0)
{
chunks = [..OutNb];
foreach (var frame in Ws.Frames)
chunks.Add(new OutboundChunk(frame, frame.Length));
Ws.Frames.Clear();
Ws.FrameSize = 0;
}
if (Ws is { CloseSent: true, CloseMessage: not null } && OutPb == Ws.CloseMessage.Length)
{
chunks = [..chunks, new OutboundChunk(Ws.CloseMessage, Ws.CloseMessage.Length)];
Ws.CloseMessage = null;
}
long attempted = 0;
foreach (var chunk in OutNb)
foreach (var chunk in chunks)
attempted += chunk.Count;
return (OutNb, attempted);
return (chunks, attempted);
}
internal bool FlushOutbound()
@@ -1784,7 +1802,7 @@ public sealed partial class ClientConnection
// =========================================================================
internal bool IsMqtt() => false; // Deferred to session 22 (MQTT).
internal bool IsWebSocket() => false; // Deferred to session 23 (WebSocket).
internal bool IsWebSocket() => Ws != null;
internal bool IsHubLeafNode() => false; // Deferred to session 15 (leaf nodes).
internal string RemoteCluster() => string.Empty; // Deferred to sessions 14/15.
}

View File

@@ -503,7 +503,7 @@ public sealed partial class NatsServer
gatewayErr = _gatewayListenerErr;
leafOk = opts.LeafNode.Port == 0 || _leafNodeListener != null;
leafErr = _leafNodeListenerErr;
wsOk = opts.Websocket.Port == 0;
wsOk = opts.Websocket.Port == 0 || _websocket.Listener != null;
mqttOk = opts.Mqtt.Port == 0;
_mu.ExitReadLock();
@@ -952,10 +952,10 @@ public sealed partial class NatsServer
}
/// <summary>
/// Stub — closes WebSocket server if running (session 23).
/// Closes the WebSocket server if running.
/// Returns the number of done-channel signals to expect.
/// </summary>
private int CloseWebsocketServer() => 0;
private int CloseWebsocketServer() => CloseWebsocketServerCore();
/// <summary>
/// Iterates over all route connections. Stub — session 14.

View File

@@ -189,6 +189,7 @@ public sealed partial class NatsServer
_clientConnectUrls.Clear();
_clientConnectUrls.AddRange(GetClientConnectURLs());
_listener = l;
StartWebsocketServer();
// Start the accept goroutine.
_ = Task.Run(() =>
@@ -801,7 +802,8 @@ public sealed partial class NatsServer
if (wsUpdated)
{
// WebSocket connect URLs stub — session 23.
var wsUrls = _websocket.ConnectUrlsMap.GetAsStringSlice();
_info.WsConnectUrls = wsUrls.Length > 0 ? wsUrls : null;
}
if (cliUpdated || wsUpdated)
@@ -1140,7 +1142,7 @@ public sealed partial class NatsServer
if (opts.Cluster.Port != 0) list.Add(_routeListener);
if (opts.HttpPort != 0 || opts.HttpsPort != 0) list.Add(_http);
if (opts.ProfPort != 0) list.Add(_profiler);
// WebSocket listener — session 23.
if (opts.Websocket.Port != 0) list.Add(_websocket.Listener);
return list.ToArray();
}

View File

@@ -0,0 +1,332 @@
// Copyright 2012-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
using System.Collections.Specialized;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Security.Cryptography;
using System.Text;
using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.WebSocket;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
private sealed class WsHttpError : Exception
{
public int StatusCode { get; }
public WsHttpError(int statusCode, string message)
: base(message)
{
StatusCode = statusCode;
}
}
internal static bool WsHeaderContains(NameValueCollection headers, string key, string expected)
{
var values = headers.GetValues(key);
if (values == null || values.Length == 0)
return false;
foreach (var value in values)
{
if (string.IsNullOrEmpty(value))
continue;
var tokens = value.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries);
foreach (var token in tokens)
{
if (string.Equals(token, expected, StringComparison.OrdinalIgnoreCase))
return true;
}
}
return false;
}
internal static (bool supported, bool noContext) WsPMCExtensionSupport(NameValueCollection headers, bool checkNoContextTakeOver)
{
var values = headers.GetValues("Sec-WebSocket-Extensions");
if (values == null || values.Length == 0)
return (false, false);
var foundPmc = false;
var noContext = false;
foreach (var value in values)
{
var extensions = value.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries);
foreach (var ext in extensions)
{
var parts = ext.Split(';', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries);
if (parts.Length == 0)
continue;
if (!parts[0].Equals(WsConstants.PMCExtension, StringComparison.OrdinalIgnoreCase))
continue;
foundPmc = true;
if (!checkNoContextTakeOver)
return (true, false);
noContext = parts.Any(p => p.Equals(WsConstants.PMCSrvNoCtx, StringComparison.OrdinalIgnoreCase)) &&
parts.Any(p => p.Equals(WsConstants.PMCCliNoCtx, StringComparison.OrdinalIgnoreCase));
}
}
return (foundPmc && (!checkNoContextTakeOver || noContext), noContext);
}
internal static Exception WsReturnHTTPError(int statusCode, string message) =>
new WsHttpError(statusCode, message);
private static bool wsHeaderContains(NameValueCollection headers, string key, string expected) =>
WsHeaderContains(headers, key, expected);
private static (bool supported, bool noContext) wsPMCExtensionSupport(NameValueCollection headers, bool checkNoContextTakeOver) =>
WsPMCExtensionSupport(headers, checkNoContextTakeOver);
private static Exception wsReturnHTTPError(int statusCode, string message) =>
WsReturnHTTPError(statusCode, message);
internal static string WsGetHostAndPort(string hostPort, out int port)
{
port = 0;
if (string.IsNullOrWhiteSpace(hostPort))
return string.Empty;
if (hostPort.Contains(':'))
{
var idx = hostPort.LastIndexOf(':');
var host = hostPort[..idx];
var portText = hostPort[(idx + 1)..];
if (int.TryParse(portText, out var parsed))
{
port = parsed;
return host;
}
}
return hostPort;
}
private static string wsGetHostAndPort(string hostPort, out int port) =>
WsGetHostAndPort(hostPort, out port);
internal static byte[] WsMakeChallengeKey(string key)
{
ArgumentNullException.ThrowIfNull(key);
return Encoding.ASCII.GetBytes(key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11");
}
internal static string WsAcceptKey(string key)
{
using var sha = SHA1.Create();
var digest = sha.ComputeHash(WsMakeChallengeKey(key));
return Convert.ToBase64String(digest);
}
private static byte[] wsMakeChallengeKey(string key) => WsMakeChallengeKey(key);
private static string wsAcceptKey(string key) => WsAcceptKey(key);
internal static Exception? ValidateWebsocketOptions(WebsocketOpts options)
{
if (options.Port < 0 || options.Port > 65535)
return new ArgumentException("websocket port out of range");
if (options.NoTls && options.TlsConfig != null)
return new ArgumentException("websocket no_tls and tls options are mutually exclusive");
if (options.HandshakeTimeout < TimeSpan.Zero)
return new ArgumentException("websocket handshake timeout can not be negative");
return null;
}
private static Exception? validateWebsocketOptions(WebsocketOpts options) => ValidateWebsocketOptions(options);
private void WsSetOriginOptions()
{
lock (_websocket.Mu)
{
_websocket.AllowedOrigins.Clear();
var opts = GetOpts().Websocket;
_websocket.SameOrigin = opts.SameOrigin;
foreach (var entry in opts.AllowedOrigins)
{
if (!Uri.TryCreate(entry, UriKind.Absolute, out var uri) || string.IsNullOrEmpty(uri.Host))
continue;
var port = uri.IsDefaultPort
? uri.Scheme.Equals("https", StringComparison.OrdinalIgnoreCase) ? "443" : "80"
: uri.Port.ToString();
_websocket.AllowedOrigins[uri.Host] = new AllowedOrigin(uri.Scheme, port);
}
}
}
private void WsSetHeadersOptions()
{
var headers = GetOpts().Websocket.Headers;
if (headers.Count == 0)
{
_websocket.RawHeaders = string.Empty;
return;
}
var builder = new StringBuilder();
foreach (var (name, value) in headers)
{
if (string.IsNullOrWhiteSpace(name))
continue;
builder.Append(name.Trim());
builder.Append(": ");
builder.Append(value?.Trim() ?? string.Empty);
builder.Append("\r\n");
}
_websocket.RawHeaders = builder.ToString();
}
private void WsConfigAuth()
{
var ws = GetOpts().Websocket;
_websocket.AuthOverride = !string.IsNullOrWhiteSpace(ws.Username)
|| !string.IsNullOrWhiteSpace(ws.Password)
|| !string.IsNullOrWhiteSpace(ws.Token)
|| !string.IsNullOrWhiteSpace(ws.JwtCookie)
|| !string.IsNullOrWhiteSpace(ws.TokenCookie)
|| !string.IsNullOrWhiteSpace(ws.UsernameCookie)
|| !string.IsNullOrWhiteSpace(ws.PasswordCookie)
|| !string.IsNullOrWhiteSpace(ws.NoAuthUser);
}
private void StartWebsocketServer()
{
var opts = GetOpts().Websocket;
if (opts.Port == 0)
return;
if (_websocket.Listener != null)
return;
var host = string.IsNullOrWhiteSpace(opts.Host) ? ServerConstants.DefaultHost : opts.Host;
var ip = IPAddress.TryParse(host, out var parsed) ? parsed : IPAddress.Any;
var listener = new TcpListener(ip, opts.Port);
listener.Start();
_websocket.Listener = listener;
_websocket.ListenerErr = null;
_websocket.Port = ((IPEndPoint)listener.LocalEndpoint).Port;
_websocket.Host = host;
_websocket.Compression = opts.Compression;
_websocket.TlsConfig = opts.TlsConfig;
WsSetOriginOptions();
WsSetHeadersOptions();
WsConfigAuth();
var connectUrl = WebsocketUrl();
_websocket.ConnectUrls.Clear();
_websocket.ConnectUrls.Add(connectUrl);
UpdateServerINFOAndSendINFOToClients([], [connectUrl], add: true);
}
private int CloseWebsocketServerCore()
{
if (_websocket.Listener == null)
return 0;
try
{
_websocket.Listener.Stop();
}
catch (Exception ex)
{
_websocket.ListenerErr = ex;
}
_websocket.Listener = null;
if (_websocket.ConnectUrls.Count > 0)
{
var urls = _websocket.ConnectUrls.ToArray();
_websocket.ConnectUrls.Clear();
UpdateServerINFOAndSendINFOToClients([], urls, add: false);
}
return 0;
}
internal ClientConnection CreateWSClient(Stream nc, ClientKind kind)
{
var client = new ClientConnection(kind, this, nc)
{
Ws = new WebsocketConnection
{
Compress = _websocket.Compression,
MaskRead = true,
},
};
return client;
}
internal (WebsocketConnection? ws, ClientKind kind, Exception? err) WsUpgrade(HttpListenerRequest request)
{
var kind = ClientKind.Client;
if (request.Url != null)
{
var path = request.Url.AbsolutePath;
if (path.EndsWith("/leafnode", StringComparison.OrdinalIgnoreCase))
kind = ClientKind.Leaf;
else if (path.EndsWith("/mqtt", StringComparison.OrdinalIgnoreCase))
kind = ClientKind.Client;
}
if (!string.Equals(request.HttpMethod, "GET", StringComparison.OrdinalIgnoreCase))
return (null, kind, WsReturnHTTPError(405, "request method must be GET"));
if (string.IsNullOrWhiteSpace(request.UserHostName))
return (null, kind, WsReturnHTTPError(400, "'Host' missing in request"));
if (!WsHeaderContains(request.Headers, "Upgrade", "websocket"))
return (null, kind, WsReturnHTTPError(400, "invalid value for header 'Upgrade'"));
if (!WsHeaderContains(request.Headers, "Connection", "Upgrade"))
return (null, kind, WsReturnHTTPError(400, "invalid value for header 'Connection'"));
if (string.IsNullOrWhiteSpace(request.Headers["Sec-WebSocket-Key"]))
return (null, kind, WsReturnHTTPError(400, "key missing"));
if (!WsHeaderContains(request.Headers, "Sec-WebSocket-Version", "13"))
return (null, kind, WsReturnHTTPError(400, "invalid version"));
var ws = new WebsocketConnection
{
Compress = GetOpts().Websocket.Compression && WsPMCExtensionSupport(request.Headers, true).supported,
MaskRead = !string.Equals(request.Headers[WsConstants.NoMaskingHeader], WsConstants.NoMaskingValue, StringComparison.OrdinalIgnoreCase),
};
var xff = request.Headers.GetValues(WsConstants.XForwardedForHeader);
if (xff != null && xff.Length > 0 && IPAddress.TryParse(xff[0], out _))
ws.ClientIP = xff[0];
return (ws, kind, null);
}
internal static bool IsWSURL(string url)
{
if (!Uri.TryCreate(url, UriKind.Absolute, out var uri))
return false;
return uri.Scheme.Equals(WsConstants.SchemePrefix, StringComparison.OrdinalIgnoreCase);
}
internal static bool IsWSSURL(string url)
{
if (!Uri.TryCreate(url, UriKind.Absolute, out var uri))
return false;
return uri.Scheme.Equals(WsConstants.SchemePrefixTls, StringComparison.OrdinalIgnoreCase);
}
private static bool isWSURL(string url) => IsWSURL(url);
private static bool isWSSURL(string url) => IsWSSURL(url);
private System.Net.Security.SslServerAuthenticationOptions? wsGetTLSConfig() =>
GetOpts().Websocket.TlsConfig;
}

View File

@@ -35,6 +35,8 @@ internal enum WsOpCode : int
/// </summary>
internal static class WsConstants
{
public static readonly byte[] CompressLastBlock = [0x00, 0x00, 0xff, 0xff, 0x01, 0x00, 0x00, 0xff, 0xff];
// Frame header bits
public const int FinalBit = 1 << 7;
public const int Rsv1Bit = 1 << 6; // Used for per-message compression (RFC 7692)

View File

@@ -0,0 +1,55 @@
// Copyright 2020-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0.
namespace ZB.MOM.NatsNet.Server.WebSocket;
internal sealed class WebSocketHandler
{
public static (byte[] bytes, int newPos) WsGet(Stream reader, byte[] buffer, int pos, int needed) =>
WebSocketHelpers.WsGet(reader, buffer, pos, needed);
public static bool WsIsControlFrame(WsOpCode frameType) =>
WebSocketHelpers.WsIsControlFrame(frameType);
public static (byte[] header, byte[]? key) WsCreateFrameHeader(bool useMasking, bool compressed, WsOpCode frameType, int length) =>
WebSocketHelpers.WsCreateFrameHeader(useMasking, compressed, frameType, length);
public static (int n, byte[]? key) WsFillFrameHeader(byte[] frameHeader, bool useMasking, bool first, bool final, bool compressed, WsOpCode frameType, int length) =>
WebSocketHelpers.WsFillFrameHeader(frameHeader, useMasking, first, final, compressed, frameType, length);
public static void WsMaskBuf(byte[] key, byte[] buffer) =>
WebSocketHelpers.WsMaskBuf(key, buffer);
public static void WsMaskBufs(byte[] key, IReadOnlyList<byte[]> buffers) =>
WebSocketHelpers.WsMaskBufs(key, buffers);
public static byte[] WsCreateCloseMessage(int status, string body) =>
WebSocketHelpers.WsCreateCloseMessage(status, body);
public static bool WsHeaderContains(System.Collections.Specialized.NameValueCollection headers, string key, string expected) =>
NatsServer.WsHeaderContains(headers, key, expected);
public static (bool supported, bool noContext) WsPMCExtensionSupport(System.Collections.Specialized.NameValueCollection headers, bool checkNoContextTakeOver) =>
NatsServer.WsPMCExtensionSupport(headers, checkNoContextTakeOver);
public static Exception WsReturnHTTPError(int statusCode, string message) =>
NatsServer.WsReturnHTTPError(statusCode, message);
public static string WsGetHostAndPort(string hostPort, out int port) =>
NatsServer.WsGetHostAndPort(hostPort, out port);
public static string WsAcceptKey(string key) =>
NatsServer.WsAcceptKey(key);
public static byte[] WsMakeChallengeKey(string key) =>
NatsServer.WsMakeChallengeKey(key);
public static Exception? ValidateWebsocketOptions(WebsocketOpts options) =>
NatsServer.ValidateWebsocketOptions(options);
public static bool IsWSURL(string url) =>
NatsServer.IsWSURL(url);
public static bool IsWSSURL(string url) =>
NatsServer.IsWSSURL(url);
}

View File

@@ -0,0 +1,135 @@
// Copyright 2020-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
using System.Buffers.Binary;
using System.Security.Cryptography;
namespace ZB.MOM.NatsNet.Server.WebSocket;
internal static class WebSocketHelpers
{
public static (byte[] bytes, int newPos) WsGet(Stream reader, byte[] buf, int pos, int needed)
{
var available = buf.Length - pos;
if (available >= needed)
return (buf[pos..(pos + needed)], pos + needed);
var b = new byte[needed];
var start = 0;
if (available > 0)
{
System.Buffer.BlockCopy(buf, pos, b, 0, available);
start = available;
}
while (start < needed)
{
var n = reader.Read(b, start, needed - start);
if (n <= 0)
throw new EndOfStreamException();
start += n;
}
return (b, pos + available);
}
public static bool WsIsControlFrame(WsOpCode frameType) => frameType >= WsOpCode.Close;
public static (byte[] header, byte[]? key) WsCreateFrameHeader(bool useMasking, bool compressed, WsOpCode frameType, int length)
{
var frameHeader = NbPool.Get(WsConstants.MaxFrameHeaderSize);
var (n, key) = WsFillFrameHeader(frameHeader, useMasking, first: true, final: true, compressed, frameType, length);
return (frameHeader[..n], key);
}
public static (int n, byte[]? key) WsFillFrameHeader(
byte[] frameHeader,
bool useMasking,
bool first,
bool final,
bool compressed,
WsOpCode frameType,
int length)
{
byte b0 = 0;
if (first)
b0 = (byte)frameType;
if (final)
b0 |= WsConstants.FinalBit;
if (compressed)
b0 |= WsConstants.Rsv1Bit;
byte b1 = 0;
if (useMasking)
b1 |= WsConstants.MaskBit;
int n;
if (length <= 125)
{
n = 2;
frameHeader[0] = b0;
frameHeader[1] = (byte)(b1 | (byte)length);
}
else if (length < 65536)
{
n = 4;
frameHeader[0] = b0;
frameHeader[1] = (byte)(b1 | 126);
BinaryPrimitives.WriteUInt16BigEndian(frameHeader.AsSpan(2), (ushort)length);
}
else
{
n = 10;
frameHeader[0] = b0;
frameHeader[1] = (byte)(b1 | 127);
BinaryPrimitives.WriteUInt64BigEndian(frameHeader.AsSpan(2), (ulong)length);
}
byte[]? key = null;
if (useMasking)
{
key = new byte[4];
RandomNumberGenerator.Fill(key);
Array.Copy(key, 0, frameHeader, n, 4);
n += 4;
}
return (n, key);
}
public static void WsMaskBuf(ReadOnlySpan<byte> key, Span<byte> buffer)
{
for (var i = 0; i < buffer.Length; i++)
buffer[i] ^= key[i & 0x3];
}
public static void WsMaskBufs(ReadOnlySpan<byte> key, IReadOnlyList<byte[]> buffers)
{
var pos = 0;
for (var i = 0; i < buffers.Count; i++)
{
var buffer = buffers[i];
for (var j = 0; j < buffer.Length; j++)
{
buffer[j] ^= key[pos & 0x3];
pos++;
}
}
}
public static byte[] WsCreateCloseMessage(int status, string body)
{
if (body.Length > WsConstants.MaxControlPayloadSize - 2)
body = string.Concat(body.AsSpan(0, WsConstants.MaxControlPayloadSize - 5), "...");
var payload = System.Text.Encoding.UTF8.GetBytes(body);
var buffer = new byte[2 + payload.Length];
BinaryPrimitives.WriteUInt16BigEndian(buffer.AsSpan(0, 2), (ushort)status);
payload.CopyTo(buffer.AsSpan(2));
return buffer;
}
}

View File

@@ -13,6 +13,7 @@
//
// Adapted from server/websocket.go in the NATS server Go source.
using System.IO.Compression;
using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server.WebSocket;
@@ -23,88 +24,210 @@ namespace ZB.MOM.NatsNet.Server.WebSocket;
/// </summary>
internal sealed class WsReadInfo
{
/// <summary>Whether masking is disabled for this connection (e.g. leaf node).</summary>
public bool NoMasking { get; set; }
public int Rem { get; set; }
public bool FrameStart { get; set; }
public bool FinalFrameReceived { get; set; }
public bool FrameCompressed { get; set; }
public bool Mask { get; set; }
public byte MaskKeyPosition { get; set; }
public byte[] MaskKey { get; } = new byte[4];
public List<byte[]> CompressedBuffers { get; } = [];
public int CompressedOffset { get; set; }
/// <summary>Whether per-message deflate compression is active.</summary>
public bool Compressed { get; set; }
public void Init()
{
FrameStart = true;
FinalFrameReceived = true;
}
/// <summary>The current frame opcode.</summary>
public WsOpCode FrameType { get; set; }
public int Read(byte[] destination, int offset, int count)
{
if (count == 0)
return 0;
if (CompressedBuffers.Count == 0)
return 0;
/// <summary>Number of payload bytes remaining in the current frame.</summary>
public int PayloadLeft { get; set; }
var copied = 0;
var remaining = count;
while (CompressedBuffers.Count > 0 && remaining > 0)
{
var buffer = CompressedBuffers[0];
var available = buffer.Length - CompressedOffset;
if (available <= 0)
{
NextCBuf();
continue;
}
/// <summary>The 4-byte masking key (only valid when masking is active).</summary>
public int[] Mask { get; set; } = new int[4];
var n = Math.Min(available, remaining);
Array.Copy(buffer, CompressedOffset, destination, offset + copied, n);
copied += n;
remaining -= n;
CompressedOffset += n;
NextCBuf();
}
/// <summary>Current offset into <see cref="Mask"/>.</summary>
public int MaskOffset { get; set; }
return copied;
}
/// <summary>Accumulated compressed payload buffers awaiting decompression.</summary>
public byte[]? Compress { get; set; }
public byte[]? NextCBuf()
{
if (CompressedBuffers.Count == 0)
return null;
if (CompressedOffset != CompressedBuffers[0].Length)
return CompressedBuffers[0];
public WsReadInfo() { }
CompressedOffset = 0;
if (CompressedBuffers.Count == 1)
{
CompressedBuffers.Clear();
return null;
}
CompressedBuffers.RemoveAt(0);
return CompressedBuffers[0];
}
public byte ReadByte()
{
if (CompressedBuffers.Count == 0)
throw new EndOfStreamException();
var b = CompressedBuffers[0][CompressedOffset];
CompressedOffset++;
NextCBuf();
return b;
}
public byte[] Decompress(int maxPayload)
{
if (maxPayload <= 0)
maxPayload = ServerConstants.MaxPayloadSize;
CompressedOffset = 0;
var input = new MemoryStream();
foreach (var buffer in CompressedBuffers)
input.Write(buffer, 0, buffer.Length);
input.Write(WsConstants.CompressLastBlock, 0, WsConstants.CompressLastBlock.Length);
input.Position = 0;
using var deflate = new DeflateStream(input, System.IO.Compression.CompressionMode.Decompress, leaveOpen: true);
using var output = new MemoryStream();
var tmp = new byte[4096];
while (true)
{
var n = deflate.Read(tmp, 0, tmp.Length);
if (n == 0)
break;
if (output.Length + n > maxPayload)
throw ServerErrors.ErrMaxPayload;
output.Write(tmp, 0, n);
}
CompressedBuffers.Clear();
return output.ToArray();
}
public void Unmask(byte[] buf) => Unmask(buf.AsSpan());
public void Unmask(Span<byte> buf)
{
var p = (int)MaskKeyPosition;
if (buf.Length < 16)
{
for (var i = 0; i < buf.Length; i++)
{
buf[i] ^= MaskKey[p & 3];
p++;
}
MaskKeyPosition = (byte)(p & 3);
return;
}
var key8 = new byte[8];
for (var i = 0; i < 8; i++)
key8[i] = MaskKey[(p + i) & 3];
var mask64 = BitConverter.ToUInt64(key8, 0);
var n8 = (buf.Length / 8) * 8;
for (var i = 0; i < n8; i += 8)
{
var value = BitConverter.ToUInt64(buf[i..(i + 8)]) ^ mask64;
var bytes = BitConverter.GetBytes(value);
bytes.CopyTo(buf[i..(i + 8)]);
}
for (var i = n8; i < buf.Length; i++)
{
buf[i] ^= MaskKey[p & 3];
p++;
}
MaskKeyPosition = (byte)(p & 3);
}
}
/// <summary>
/// Client-level WebSocket runtime state.
/// Mirrors Go <c>websocket</c> struct in websocket.go.
/// </summary>
internal sealed class WebsocketConnection
{
public List<byte[]> Frames { get; } = [];
public long FrameSize { get; set; }
public byte[]? CloseMessage { get; set; }
public bool Compress { get; set; }
public bool CloseSent { get; set; }
public bool Browser { get; set; }
public bool NoCompressedFragment { get; set; }
public bool MaskRead { get; set; } = true;
public bool MaskWrite { get; set; }
public string ClientIP { get; set; } = string.Empty;
}
/// <summary>
/// Server-level WebSocket state, shared across all WebSocket connections.
/// Mirrors Go <c>srvWebsocket</c> struct in server/websocket.go.
/// Replaces the stub in NatsServerTypes.cs.
/// </summary>
internal sealed class SrvWebsocket
{
/// <summary>
/// Tracks WebSocket connect URLs per server (ref-counted).
/// Mirrors Go <c>connectURLsMap refCountedUrlSet</c>.
/// </summary>
public RefCountedUrlSet ConnectUrlsMap { get; set; } = new();
/// <summary>
/// TLS configuration for the WebSocket listener.
/// Mirrors Go <c>tls bool</c> field (true if TLS is required).
/// </summary>
public Lock Mu { get; } = new();
public System.Net.Sockets.TcpListener? Listener { get; set; }
public Exception? ListenerErr { get; set; }
public Dictionary<string, AllowedOrigin> AllowedOrigins { get; } = new(StringComparer.OrdinalIgnoreCase);
public bool SameOrigin { get; set; }
public List<string> ConnectUrls { get; } = [];
public RefCountedUrlSet ConnectUrlsMap { get; } = new();
public bool AuthOverride { get; set; }
public string RawHeaders { get; set; } = string.Empty;
public System.Net.Security.SslServerAuthenticationOptions? TlsConfig { get; set; }
/// <summary>Whether per-message deflate compression is enabled globally.</summary>
public bool Compression { get; set; }
/// <summary>Host the WebSocket server is listening on.</summary>
public string Host { get; set; } = string.Empty;
/// <summary>Port the WebSocket server is listening on (may be ephemeral).</summary>
public int Port { get; set; }
}
/// <summary>
/// Handles WebSocket upgrade and framing for a single connection.
/// Mirrors the WebSocket-related methods on Go <c>client</c> in server/websocket.go.
/// Full implementation is deferred to session 23.
/// </summary>
internal sealed class WebSocketHandler
{
private readonly NatsServer _server;
public WebSocketHandler(NatsServer server)
public Exception? checkOrigin(string requestHost, string? origin)
{
_server = server;
if (!SameOrigin && AllowedOrigins.Count == 0)
return null;
if (string.IsNullOrWhiteSpace(origin))
return new InvalidOperationException("origin header missing");
if (!Uri.TryCreate(origin, UriKind.Absolute, out var uri))
return new InvalidOperationException("invalid origin");
if (SameOrigin && !string.Equals(uri.Host, requestHost, StringComparison.OrdinalIgnoreCase))
return new InvalidOperationException("origin not same as host");
if (AllowedOrigins.Count == 0)
return null;
if (!AllowedOrigins.TryGetValue(uri.Host, out var allowed))
return new InvalidOperationException("origin host not allowed");
var port = uri.IsDefaultPort
? uri.Scheme.Equals("https", StringComparison.OrdinalIgnoreCase) ? "443" : "80"
: uri.Port.ToString();
if (!string.Equals(allowed.Scheme, uri.Scheme, StringComparison.OrdinalIgnoreCase))
return new InvalidOperationException("origin scheme not allowed");
if (!string.Equals(allowed.Port, port, StringComparison.Ordinal))
return new InvalidOperationException("origin port not allowed");
return null;
}
/// <summary>Upgrades an HTTP connection to WebSocket protocol.</summary>
public void UpgradeToWebSocket(
System.IO.Stream stream,
System.Net.Http.Headers.HttpRequestHeaders headers)
=> throw new NotImplementedException("TODO: session 23 — websocket");
/// <summary>Parses a WebSocket frame from the given buffer slice.</summary>
public void ParseFrame(byte[] data, int offset, int count)
=> throw new NotImplementedException("TODO: session 23 — websocket");
/// <summary>Writes a WebSocket frame with the given payload.</summary>
public void WriteFrame(WsOpCode opCode, byte[] payload, bool final, bool compress)
=> throw new NotImplementedException("TODO: session 23 — websocket");
/// <summary>Writes a WebSocket close frame with the given status code and reason.</summary>
public void WriteCloseFrame(int statusCode, string reason)
=> throw new NotImplementedException("TODO: session 23 — websocket");
}
internal readonly record struct AllowedOrigin(string Scheme, string Port);

View File

@@ -1,5 +1,7 @@
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Net;
using System.Net.Sockets;
using Shouldly;
using ZB.MOM.NatsNet.Server;
@@ -269,6 +271,73 @@ public sealed partial class ConcurrencyTests1
}, DefaultStreamConfig());
}
[Fact] // T:2371
public void NoRaceAvoidSlowConsumerBigMessages_ShouldSucceed()
{
WithStore((fs, _) =>
{
var errors = new ConcurrentQueue<Exception>();
var payload = new byte[128 * 1024];
Parallel.For(0, 40, i =>
{
try
{
fs.StoreMsg($"big.{i % 4}", null, payload, 0).Seq.ShouldBeGreaterThan(0UL);
var sm = fs.LoadLastMsg($"big.{i % 4}", null);
sm.ShouldNotBeNull();
sm!.Msg.Length.ShouldBe(payload.Length);
}
catch (Exception ex)
{
errors.Enqueue(ex);
}
});
errors.ShouldBeEmpty();
fs.State().Msgs.ShouldBeGreaterThan(0UL);
});
}
[Fact] // T:2384
public void NoRaceAcceptLoopsDoNotLeaveOpenedConn_ShouldSucceed()
{
var errors = new ConcurrentQueue<Exception>();
Parallel.For(0, 20, _ =>
{
TcpListener? listener = null;
TcpClient? client = null;
TcpClient? accepted = null;
try
{
listener = new TcpListener(IPAddress.Loopback, 0);
listener.Start();
var endpoint = (IPEndPoint)listener.LocalEndpoint;
var acceptTask = listener.AcceptTcpClientAsync();
client = new TcpClient();
client.Connect(endpoint.Address, endpoint.Port);
accepted = acceptTask.GetAwaiter().GetResult();
accepted.Connected.ShouldBeTrue();
}
catch (Exception ex)
{
errors.Enqueue(ex);
}
finally
{
accepted?.Close();
client?.Close();
listener?.Stop();
}
});
errors.ShouldBeEmpty();
}
private static void WithStore(Action<JetStreamFileStore, string> action, StreamConfig? cfg = null)
{
var root = NewRoot();

View File

@@ -330,6 +330,64 @@ public sealed partial class ConcurrencyTests2
}, cfg);
}
[Fact] // T:2488
public void NoRaceJetStreamSnapshotsWithSlowAckDontSlowConsumer_ShouldSucceed()
{
var cfg = DefaultStreamConfig();
cfg.Subjects = ["snap.>"];
WithStore((fs, _) =>
{
var errors = new ConcurrentQueue<Exception>();
using var cts = new CancellationTokenSource();
var payload = "snapshot"u8.ToArray();
var ts = DateTimeOffset.UtcNow.ToUnixTimeSeconds() * 1_000_000_000L;
var consumer = fs.ConsumerStore("snap-consumer", DateTime.UtcNow, new ConsumerConfig { AckPolicy = AckPolicy.AckExplicit });
var slowAcker = Task.Run(async () =>
{
for (ulong i = 1; i <= 100; i++)
{
try
{
consumer.UpdateDelivered(i, i, 1, ts + (long)i);
await Task.Delay(2, cts.Token);
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
errors.Enqueue(ex);
break;
}
}
});
for (var i = 0; i < 100; i++)
fs.StoreMsg($"snap.{i % 5}", null, payload, 0);
var sw = Stopwatch.StartNew();
var (snapshot, err) = fs.Snapshot(TimeSpan.FromSeconds(2), includeConsumers: true, checkMsgs: true);
sw.Stop();
err.ShouldBeNull();
snapshot.ShouldNotBeNull();
snapshot!.State.Msgs.ShouldBeGreaterThan(0UL);
sw.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(2));
using (snapshot.Reader)
{
}
cts.Cancel();
Should.NotThrow(() => slowAcker.Wait(TimeSpan.FromSeconds(1)));
errors.ShouldBeEmpty();
consumer.Stop();
}, cfg);
}
private static void WithStore(Action<JetStreamFileStore, string> action, StreamConfig? cfg = null)
{
var root = NewRoot();

View File

@@ -3372,4 +3372,68 @@ public sealed partial class JetStreamFileStoreTests
Directory.Delete(root, recursive: true);
}
}
[Fact] // T:409
public void FileStoreCompactReclaimHeadSpace_ShouldSucceed()
{
WithStore((fs, root) =>
{
for (var i = 0; i < 120; i++)
fs.StoreMsg("cmp.a", null, new byte[512], 0);
fs.Compact(80).Error.ShouldBeNull();
var state = fs.State();
state.FirstSeq.ShouldBeGreaterThanOrEqualTo(80UL);
state.Msgs.ShouldBeLessThan(120UL);
}, cfg: DefaultStreamConfig(subjects: ["cmp.*"]), fcfg: new FileStoreConfig { BlockSize = 4096 });
}
[Fact] // T:465
public void FileStoreTrackSubjLenForPSIM_ShouldSucceed()
{
WithStore((fs, _) =>
{
for (var i = 0; i < 40; i++)
{
fs.StoreMsg($"psim.{i % 4}", null, "x"u8.ToArray(), 0);
fs.StoreMsg($"psim.long.subject.{i % 3}", null, "y"u8.ToArray(), 0);
}
var totals = fs.SubjectsTotals("psim.>");
totals.Count.ShouldBe(7);
totals.Keys.ShouldContain("psim.0");
totals.Keys.ShouldContain("psim.long.subject.0");
totals["psim.long.subject.0"].ShouldBeGreaterThan(0UL);
}, cfg: DefaultStreamConfig(subjects: ["psim.>"]));
}
[Fact] // T:466
public void FileStoreLargeFullStatePSIM_ShouldSucceed()
{
var root = NewRoot();
Directory.CreateDirectory(root);
try
{
var fs = JetStreamFileStore.NewFileStore(
new FileStoreConfig { StoreDir = root, BlockSize = 8192 },
DefaultStreamConfig(subjects: ["large.>"]));
for (var i = 0; i < 300; i++)
fs.StoreMsg($"large.{i % 25}", null, new byte[64], 0);
fs.State().Msgs.ShouldBeGreaterThan(0UL);
InvokePrivate<Exception?>(fs, "ForceWriteFullState").ShouldBeNull();
var stateFile = Path.Combine(root, FileStoreDefaults.MsgDir, FileStoreDefaults.StreamStateFile);
File.Exists(stateFile).ShouldBeTrue();
new FileInfo(stateFile).Length.ShouldBeGreaterThan(0L);
fs.Stop();
}
finally
{
if (Directory.Exists(root))
Directory.Delete(root, recursive: true);
}
}
}

View File

@@ -1,11 +1,60 @@
using Shouldly;
using ZB.MOM.NatsNet.Server;
using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.WebSocket;
namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
public sealed partial class LeafNodeHandlerTests
{
[Fact] // T:1975
public void LeafNodeTLSHandshakeFirstVerifyNoInfoSent_ShouldSucceed()
{
var errors = new List<Exception>();
var warnings = new List<Exception>();
var remotes = ServerOptions.ParseRemoteLeafNodes(
new List<object?>
{
new Dictionary<string, object?>
{
["url"] = "wss://127.0.0.1:7422",
["first_info_timeout"] = "2s",
["tls"] = new Dictionary<string, object?>
{
["verify"] = true,
["timeout"] = 1,
},
},
},
errors,
warnings);
errors.ShouldBeEmpty();
remotes.Count.ShouldBe(1);
remotes[0].FirstInfoTimeout.ShouldBe(TimeSpan.FromSeconds(2));
remotes[0].TlsConfig.ShouldNotBeNull();
remotes[0].TlsTimeout.ShouldBe(1d);
}
[Fact] // T:1986
public void LeafNodeCompressionWithWSGetNeedsData_ShouldSucceed()
{
var frame = new byte[] { WsConstants.FinalBit, (byte)(WsConstants.MaskBit | 5), 1, 2, 3, 4, 0x31, 0x32, 0x33, 0x34, 0x35 };
var initial = frame[..1];
using var remainder = new MemoryStream(frame[1..]);
var (b1, nextPos) = WebSocketHelpers.WsGet(remainder, initial, 1, 1);
b1[0].ShouldBe((byte)(WsConstants.MaskBit | 5));
nextPos.ShouldBe(1);
var (mask, _) = WebSocketHelpers.WsGet(remainder, initial, 1, 4);
var (payload, _) = WebSocketHelpers.WsGet(remainder, initial, 1, 5);
WebSocketHelpers.WsMaskBuf(mask, payload);
payload.ShouldBe(new byte[] { 0x30, 0x30, 0x30, 0x30, 0x34 });
}
[Fact] // T:1984
public void LeafNodeCompressionAuto_ShouldSucceed()
{

View File

@@ -5,6 +5,66 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
public sealed partial class LeafNodeProxyTests
{
[Fact] // T:1902
public void LeafNodeHttpProxyTunnelBasic_ShouldSucceed()
{
var errors = new List<Exception>();
var warnings = new List<Exception>();
var remotes = ServerOptions.ParseRemoteLeafNodes(
new List<object?>
{
new Dictionary<string, object?>
{
["url"] = "ws://127.0.0.1:7422",
["proxy"] = new Dictionary<string, object?>
{
["url"] = "http://proxy.example.com:8080",
["timeout"] = "2s",
},
},
},
errors,
warnings);
errors.ShouldBeEmpty();
remotes.Count.ShouldBe(1);
remotes[0].Urls[0].Scheme.ShouldBe("ws");
remotes[0].Proxy.Url.ShouldBe("http://proxy.example.com:8080");
remotes[0].Proxy.Timeout.ShouldBe(TimeSpan.FromSeconds(2));
}
[Fact] // T:1903
public void LeafNodeHttpProxyTunnelWithAuth_ShouldSucceed()
{
var errors = new List<Exception>();
var warnings = new List<Exception>();
var remotes = ServerOptions.ParseRemoteLeafNodes(
new List<object?>
{
new Dictionary<string, object?>
{
["url"] = "ws://127.0.0.1:7422",
["proxy"] = new Dictionary<string, object?>
{
["url"] = "http://proxy.example.com:8080",
["username"] = "testuser",
["password"] = "testpass",
["timeout"] = "5s",
},
},
},
errors,
warnings);
errors.ShouldBeEmpty();
remotes.Count.ShouldBe(1);
remotes[0].Proxy.Url.ShouldBe("http://proxy.example.com:8080");
remotes[0].Proxy.Username.ShouldBe("testuser");
remotes[0].Proxy.Password.ShouldBe("testpass");
}
[Fact] // T:1899
public void LeafNodeHttpProxyConnection_ShouldSucceed()
{

View File

@@ -6,6 +6,28 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
public sealed class NatsConsumerTests
{
[Fact] // T:1353
public void JetStreamConsumerPullBatchCompleted_ShouldSucceed()
{
var cfg = new ConsumerConfig
{
AckPolicy = AckPolicy.AckExplicit,
MaxRequestBatch = 128,
MaxRequestExpires = TimeSpan.FromSeconds(10),
Metadata = new Dictionary<string, string> { ["legacy"] = "keep" },
};
JetStreamVersioning.SetStaticConsumerMetadata(cfg);
var cloned = JetStreamVersioning.SetDynamicConsumerMetadata(cfg);
cloned.MaxRequestBatch.ShouldBe(128);
cloned.MaxRequestExpires.ShouldBe(TimeSpan.FromSeconds(10));
cloned.Metadata.ShouldNotBeNull();
cloned.Metadata!.ShouldContainKey("legacy");
cloned.Metadata.ShouldContainKey(JetStreamVersioning.JsServerVersionMetadataKey);
cloned.Metadata.ShouldContainKey(JetStreamVersioning.JsServerLevelMetadataKey);
}
[Fact] // T:1235
public void JetStreamConsumerFetchWithDrain_ShouldSucceed()
{

Binary file not shown.

View File

@@ -1,6 +1,6 @@
# NATS .NET Porting Status Report
Generated: 2026-03-01 02:46:10 UTC
Generated: 2026-03-01 02:57:22 UTC
## Modules (12 total)
@@ -13,18 +13,18 @@ Generated: 2026-03-01 02:46:10 UTC
| Status | Count |
|--------|-------|
| complete | 22 |
| deferred | 1467 |
| deferred | 1430 |
| n_a | 24 |
| stub | 1 |
| verified | 2159 |
| verified | 2196 |
## Unit Tests (3257 total)
| Status | Count |
|--------|-------|
| deferred | 1590 |
| n_a | 254 |
| verified | 1413 |
| deferred | 1504 |
| n_a | 307 |
| verified | 1446 |
## Library Mappings (36 total)
@@ -35,4 +35,4 @@ Generated: 2026-03-01 02:46:10 UTC
## Overall Progress
**3884/6942 items complete (55.9%)**
**4007/6942 items complete (57.7%)**