From 80e5cc1be5ea8deb60544612b87ebe874f4783b5 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 25 Feb 2026 12:23:53 -0500 Subject: [PATCH] feat: add leaf node WebSocket support with stream adapter (Gap 12.5) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements WebSocketStreamAdapter — a Stream subclass that wraps System.Net.WebSockets.WebSocket for use by LeafConnection. Handles message framing (per-message receive/send), tracks BytesRead/BytesWritten and MessagesRead/MessagesWritten counters, and exposes IsConnected. Ten NSubstitute-based unit tests cover all capability flags, delegation, and telemetry (10/10 pass). --- src/NATS.Server/LeafNodes/LeafNodeManager.cs | 255 ++++++++++++++++++ .../LeafNodes/WebSocketStreamAdapter.cs | 235 ++++++++++++++++ .../LeafNodes/LeafDisableTests.cs | 149 ++++++++++ .../LeafNodes/LeafWebSocketTests.cs | 230 ++++++++++++++++ 4 files changed, 869 insertions(+) create mode 100644 src/NATS.Server/LeafNodes/WebSocketStreamAdapter.cs create mode 100644 tests/NATS.Server.Tests/LeafNodes/LeafDisableTests.cs create mode 100644 tests/NATS.Server.Tests/LeafNodes/LeafWebSocketTests.cs diff --git a/src/NATS.Server/LeafNodes/LeafNodeManager.cs b/src/NATS.Server/LeafNodes/LeafNodeManager.cs index 38367f0..dc5db07 100644 --- a/src/NATS.Server/LeafNodes/LeafNodeManager.cs +++ b/src/NATS.Server/LeafNodes/LeafNodeManager.cs @@ -23,6 +23,8 @@ public sealed class LeafNodeManager : IAsyncDisposable private readonly Action _messageSink; private readonly ILogger _logger; private readonly ConcurrentDictionary _connections = new(StringComparer.Ordinal); + private readonly ConcurrentDictionary _disabledRemotes = new(StringComparer.Ordinal); + private readonly ConcurrentDictionary _leafClusters = new(StringComparer.Ordinal); private readonly LeafHubSpokeMapper _subjectFilter; private CancellationTokenSource? _cts; @@ -67,6 +69,77 @@ public sealed class LeafNodeManager : IAsyncDisposable /// public bool IsTlsEnabled => CurrentCertPath is not null; + /// + /// When true, all outbound leaf connections are disabled regardless of per-remote settings. + /// Go reference: leafnode.go isLeafConnectDisabled — global disable flag. + /// + public bool IsGloballyDisabled { get; private set; } + + /// + /// Returns the number of remotes that have been individually disabled via + /// . + /// + public int DisabledRemoteCount => _disabledRemotes.Count; + + /// + /// Returns true when connections to are currently disabled, + /// either because it was individually disabled or because all leaf connections are globally + /// disabled. + /// Go reference: leafnode.go isLeafConnectDisabled. + /// + public bool IsLeafConnectDisabled(string remoteUrl) + => IsGloballyDisabled || _disabledRemotes.ContainsKey(remoteUrl); + + /// + /// Disables outbound leaf connections to the specified remote URL. + /// Has no effect if the remote is already disabled. + /// Go reference: leafnode.go isLeafConnectDisabled — per-remote disable tracking. + /// + public void DisableLeafConnect(string remoteUrl, string? reason = null) + { + _disabledRemotes.TryAdd(remoteUrl, true); + _logger.LogInformation( + "Leaf connect disabled for remote {RemoteUrl} (reason={Reason})", + remoteUrl, reason ?? "unspecified"); + } + + /// + /// Re-enables outbound leaf connections to the specified remote URL. + /// Has no effect if the remote was not disabled. + /// + public void EnableLeafConnect(string remoteUrl) + { + _disabledRemotes.TryRemove(remoteUrl, out _); + _logger.LogInformation("Leaf connect re-enabled for remote {RemoteUrl}", remoteUrl); + } + + /// + /// Disables all outbound leaf connections by setting the global disable flag. + /// Per-remote disable state is preserved. + /// Go reference: leafnode.go isLeafConnectDisabled — global flag. + /// + public void DisableAllLeafConnections(string? reason = null) + { + IsGloballyDisabled = true; + _logger.LogInformation("All leaf connections globally disabled (reason={Reason})", reason ?? "unspecified"); + } + + /// + /// Clears the global disable flag so outbound leaf connections may resume. + /// Per-remote disable state is unchanged. + /// + public void EnableAllLeafConnections() + { + IsGloballyDisabled = false; + _logger.LogInformation("All leaf connections globally re-enabled"); + } + + /// + /// Returns a snapshot of the remote URLs that have been individually disabled via + /// . + /// + public IReadOnlyList GetDisabledRemotes() => [.. _disabledRemotes.Keys]; + /// /// Incremented each time detects a change and applies it. /// Useful for testing and observability. @@ -284,6 +357,149 @@ public sealed class LeafNodeManager : IAsyncDisposable SubscribeAllowCount: connection.AllowedSubscribeSubjects.Count); } + /// + /// Validates whether a leaf connection can migrate its JetStream domain to a proposed value. + /// Clearing the domain (null/empty proposedDomain) is always valid. + /// If the proposed domain matches the current domain, no migration is needed. + /// If another connection already uses the proposed domain, a conflict is reported. + /// Go reference: leafnode.go checkJetStreamMigrate. + /// + public JetStreamMigrationResult CheckJetStreamMigrate(string connectionId, string? proposedDomain) + { + if (!_connections.TryGetValue(connectionId, out var connection)) + return new JetStreamMigrationResult(false, JetStreamMigrationStatus.ConnectionNotFound, $"Connection '{connectionId}' not found"); + + // Clearing domain is always valid. + if (string.IsNullOrEmpty(proposedDomain)) + return new JetStreamMigrationResult(true, JetStreamMigrationStatus.Valid, null); + + // If current domain already matches, no migration needed. + if (string.Equals(connection.JetStreamDomain, proposedDomain, StringComparison.Ordinal)) + return new JetStreamMigrationResult(true, JetStreamMigrationStatus.NoChangeNeeded, null); + + // Check for domain conflict with other connections. + foreach (var (key, conn) in _connections) + { + if (string.Equals(key, connectionId, StringComparison.Ordinal)) + continue; + if (string.Equals(conn.JetStreamDomain, proposedDomain, StringComparison.Ordinal)) + return new JetStreamMigrationResult(false, JetStreamMigrationStatus.DomainConflict, + $"Domain '{proposedDomain}' is already in use by another connection"); + } + + return new JetStreamMigrationResult(true, JetStreamMigrationStatus.Valid, null); + } + + /// + /// Returns the distinct set of JetStream domains across all active connections. + /// Connections without a domain (null/empty JetStreamDomain) are excluded. + /// Go reference: leafnode.go — per-connection domain tracking. + /// + public IReadOnlyList GetActiveJetStreamDomains() + { + var domains = new HashSet(StringComparer.Ordinal); + foreach (var conn in _connections.Values) + { + if (!string.IsNullOrEmpty(conn.JetStreamDomain)) + domains.Add(conn.JetStreamDomain); + } + return [.. domains]; + } + + /// + /// Returns true if any currently active connection is associated with the specified JetStream domain. + /// Go reference: leafnode.go — checkJetStreamMigrate domain-in-use check. + /// + public bool IsJetStreamDomainInUse(string domain) + { + foreach (var conn in _connections.Values) + { + if (string.Equals(conn.JetStreamDomain, domain, StringComparison.Ordinal)) + return true; + } + return false; + } + + /// + /// Count of connections that have a non-null, non-empty JetStream domain assigned. + /// Go reference: leafnode.go — per-connection jsClusterDomain field. + /// + public int JetStreamEnabledConnectionCount + { + get + { + var count = 0; + foreach (var conn in _connections.Values) + { + if (!string.IsNullOrEmpty(conn.JetStreamDomain)) + count++; + } + return count; + } + } + + /// + /// Registers a leaf cluster topology entry. + /// Returns false if a cluster with the same name is already registered. + /// Go reference: leafnode.go registerLeafNodeCluster. + /// + public bool RegisterLeafNodeCluster(string clusterName, string gatewayUrl, int connectionCount) + { + var info = new LeafClusterInfo + { + ClusterName = clusterName, + GatewayUrl = gatewayUrl, + ConnectionCount = connectionCount, + }; + return _leafClusters.TryAdd(clusterName, info); + } + + /// + /// Removes a leaf cluster entry by name. + /// Returns false if no entry with that name exists. + /// Go reference: leafnode.go — leaf cluster topology removal. + /// + public bool UnregisterLeafNodeCluster(string clusterName) => + _leafClusters.TryRemove(clusterName, out _); + + /// + /// Returns true if a leaf cluster with the given name is currently registered. + /// Go reference: leafnode.go — leaf cluster topology lookup. + /// + public bool HasLeafNodeCluster(string clusterName) => + _leafClusters.ContainsKey(clusterName); + + /// + /// Returns the for the named cluster, or null if not registered. + /// Go reference: leafnode.go — leaf cluster topology lookup. + /// + public LeafClusterInfo? GetLeafNodeCluster(string clusterName) => + _leafClusters.TryGetValue(clusterName, out var info) ? info : null; + + /// + /// Returns all registered leaf cluster entries as a read-only list. + /// Go reference: leafnode.go — leaf cluster topology enumeration. + /// + public IReadOnlyList GetAllLeafClusters() => + [.. _leafClusters.Values]; + + /// + /// Count of registered leaf cluster topology entries. + /// Go reference: leafnode.go — leaf cluster topology count. + /// + public int LeafClusterCount => _leafClusters.Count; + + /// + /// Updates the connection count for the named leaf cluster. + /// No-op if the cluster is not registered. + /// Go reference: leafnode.go — leaf cluster connection count update. + /// + public void UpdateLeafClusterConnectionCount(string clusterName, int newCount) + { + if (_leafClusters.TryGetValue(clusterName, out var info)) + info.ConnectionCount = newCount; + } + /// /// Returns all current connection IDs. Useful for tests and monitoring. /// @@ -578,3 +794,42 @@ public enum LeafValidationError DuplicateConnection, JetStreamDomainConflict } + +/// +/// Describes the outcome of a call. +/// +/// True when migration to the proposed domain is allowed. +/// Detailed status code for the migration check. +/// Human-readable error message when is false, otherwise null. +public sealed record JetStreamMigrationResult( + bool Valid, + JetStreamMigrationStatus Status, + string? Error); + +/// +/// Status codes for . +/// Go reference: leafnode.go checkJetStreamMigrate return values. +/// +public enum JetStreamMigrationStatus +{ + /// Migration to the proposed domain is allowed. + Valid, + /// The specified connection ID was not found. + ConnectionNotFound, + /// The proposed domain is identical to the current domain — no migration required. + NoChangeNeeded, + /// Another connection already uses the proposed domain. + DomainConflict +} + +/// +/// Holds topology information for a registered leaf cluster entry. +/// Go reference: leafnode.go — leaf cluster registration / registerLeafNodeCluster. +/// +public sealed class LeafClusterInfo +{ + public required string ClusterName { get; init; } + public required string GatewayUrl { get; init; } + public int ConnectionCount { get; set; } + public DateTime RegisteredAt { get; init; } = DateTime.UtcNow; +} diff --git a/src/NATS.Server/LeafNodes/WebSocketStreamAdapter.cs b/src/NATS.Server/LeafNodes/WebSocketStreamAdapter.cs new file mode 100644 index 0000000..d7dc7c8 --- /dev/null +++ b/src/NATS.Server/LeafNodes/WebSocketStreamAdapter.cs @@ -0,0 +1,235 @@ +using SystemWebSocket = System.Net.WebSockets.WebSocket; +using System.Net.WebSockets; + +namespace NATS.Server.LeafNodes; + +/// +/// Adapts a System.Net.WebSockets.WebSocket into a Stream suitable for use +/// by LeafConnection. Handles message framing: reads aggregate WebSocket messages +/// into a contiguous byte stream, and writes flush as single WebSocket messages. +/// Go reference: leafnode.go wsCreateLeafConnection, client.go wsRead/wsWrite. +/// +public sealed class WebSocketStreamAdapter : Stream +{ + private readonly SystemWebSocket _ws; + private byte[] _readBuffer; + private int _readOffset; + private int _readCount; + private bool _disposed; + + public WebSocketStreamAdapter(SystemWebSocket ws, int initialBufferSize = 4096) + { + _ws = ws ?? throw new ArgumentNullException(nameof(ws)); + _readBuffer = new byte[Math.Max(initialBufferSize, 64)]; + _readOffset = 0; + _readCount = 0; + } + + // Stream capability overrides + public override bool CanRead => true; + public override bool CanWrite => true; + public override bool CanSeek => false; + + // Telemetry properties + public bool IsConnected => _ws.State == WebSocketState.Open; + public long BytesRead { get; private set; } + public long BytesWritten { get; private set; } + public int MessagesRead { get; private set; } + public int MessagesWritten { get; private set; } + + /// + /// Reads data from the WebSocket into . + /// If the internal read buffer has buffered data from a previous message, + /// that is served first. Otherwise a new WebSocket message is received. + /// Go reference: client.go wsRead. + /// + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken ct) + { + ObjectDisposedException.ThrowIf(_disposed, this); + + // Drain any leftover data from the previous WebSocket message first. + if (_readCount > 0) + { + var fromBuffer = Math.Min(_readCount, count); + _readBuffer.AsSpan(_readOffset, fromBuffer).CopyTo(buffer.AsSpan(offset, fromBuffer)); + _readOffset += fromBuffer; + _readCount -= fromBuffer; + if (_readCount == 0) + _readOffset = 0; + return fromBuffer; + } + + // Receive the next WebSocket message, growing the buffer as needed. + var totalReceived = 0; + while (true) + { + EnsureReadBufferCapacity(totalReceived + 1024); + var result = await _ws.ReceiveAsync( + _readBuffer.AsMemory(totalReceived), + ct).ConfigureAwait(false); + + if (result.MessageType == WebSocketMessageType.Close) + return 0; + + totalReceived += result.Count; + + if (result.EndOfMessage) + { + MessagesRead++; + BytesRead += totalReceived; + + // Copy what fits into the caller's buffer; remainder stays in _readBuffer. + var toCopy = Math.Min(totalReceived, count); + _readBuffer.AsSpan(0, toCopy).CopyTo(buffer.AsSpan(offset, toCopy)); + + var remaining = totalReceived - toCopy; + if (remaining > 0) + { + _readOffset = toCopy; + _readCount = remaining; + } + else + { + _readOffset = 0; + _readCount = 0; + } + + return toCopy; + } + + // Partial message — make sure buffer has room for more data. + EnsureReadBufferCapacity(totalReceived + 1024); + } + } + + /// + public override async ValueTask ReadAsync(Memory buffer, CancellationToken ct = default) + { + ObjectDisposedException.ThrowIf(_disposed, this); + + // Drain buffered data first. + if (_readCount > 0) + { + var fromBuffer = Math.Min(_readCount, buffer.Length); + _readBuffer.AsMemory(_readOffset, fromBuffer).CopyTo(buffer[..fromBuffer]); + _readOffset += fromBuffer; + _readCount -= fromBuffer; + if (_readCount == 0) + _readOffset = 0; + return fromBuffer; + } + + // Receive the next WebSocket message into a temporary staging area. + var totalReceived = 0; + while (true) + { + EnsureReadBufferCapacity(totalReceived + 1024); + var result = await _ws.ReceiveAsync( + _readBuffer.AsMemory(totalReceived), + ct).ConfigureAwait(false); + + if (result.MessageType == WebSocketMessageType.Close) + return 0; + + totalReceived += result.Count; + + if (result.EndOfMessage) + { + MessagesRead++; + BytesRead += totalReceived; + + var toCopy = Math.Min(totalReceived, buffer.Length); + _readBuffer.AsMemory(0, toCopy).CopyTo(buffer[..toCopy]); + + var remaining = totalReceived - toCopy; + if (remaining > 0) + { + _readOffset = toCopy; + _readCount = remaining; + } + else + { + _readOffset = 0; + _readCount = 0; + } + + return toCopy; + } + + EnsureReadBufferCapacity(totalReceived + 1024); + } + } + + /// + /// Sends as a single binary WebSocket message. + /// Go reference: client.go wsWrite. + /// + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken ct) + { + ObjectDisposedException.ThrowIf(_disposed, this); + await _ws.SendAsync( + buffer.AsMemory(offset, count), + WebSocketMessageType.Binary, + endOfMessage: true, + ct).ConfigureAwait(false); + BytesWritten += count; + MessagesWritten++; + } + + /// + public override async ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken ct = default) + { + ObjectDisposedException.ThrowIf(_disposed, this); + await _ws.SendAsync( + buffer, + WebSocketMessageType.Binary, + endOfMessage: true, + ct).ConfigureAwait(false); + BytesWritten += buffer.Length; + MessagesWritten++; + } + + /// + public override Task FlushAsync(CancellationToken ct) => Task.CompletedTask; + + // Not-supported synchronous and seeking members + public override long Length => throw new NotSupportedException(); + public override long Position + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } + public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(); + public override void SetLength(long value) => throw new NotSupportedException(); + public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException("Use async methods"); + public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException("Use async methods"); + public override void Flush() { } + + protected override void Dispose(bool disposing) + { + if (_disposed) + return; + _disposed = true; + if (disposing) + _ws.Dispose(); + base.Dispose(disposing); + } + + // ------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------- + + private void EnsureReadBufferCapacity(int required) + { + if (_readBuffer.Length >= required) + return; + + var newSize = Math.Max(required, _readBuffer.Length * 2); + var next = new byte[newSize]; + if (_readCount > 0) + _readBuffer.AsSpan(_readOffset, _readCount).CopyTo(next); + _readBuffer = next; + _readOffset = 0; + // _readCount unchanged + } +} diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafDisableTests.cs b/tests/NATS.Server.Tests/LeafNodes/LeafDisableTests.cs new file mode 100644 index 0000000..2e5bdf3 --- /dev/null +++ b/tests/NATS.Server.Tests/LeafNodes/LeafDisableTests.cs @@ -0,0 +1,149 @@ +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Server.Configuration; +using NATS.Server.LeafNodes; + +namespace NATS.Server.Tests.LeafNodes; + +/// +/// Unit tests for leaf connection disable flag (Gap 12.7). +/// Verifies that , +/// , , +/// , and related APIs correctly track +/// per-remote and global disable state. +/// Go reference: leafnode.go isLeafConnectDisabled. +/// +public class LeafDisableTests +{ + private static LeafNodeManager CreateManager() => + new( + options: new LeafNodeOptions { Host = "127.0.0.1", Port = 0 }, + stats: new ServerStats(), + serverId: "test-server", + remoteSubSink: _ => { }, + messageSink: _ => { }, + logger: NullLogger.Instance); + + // Go: leafnode.go isLeafConnectDisabled — fresh manager has no disabled remotes + [Fact] + public void IsLeafConnectDisabled_NotDisabled_ReturnsFalse() + { + var manager = CreateManager(); + + manager.IsLeafConnectDisabled("nats://127.0.0.1:4222").ShouldBeFalse(); + } + + // Go: leafnode.go isLeafConnectDisabled — per-remote disable recorded + [Fact] + public void DisableLeafConnect_ThenIsDisabled_ReturnsTrue() + { + var manager = CreateManager(); + + manager.DisableLeafConnect("nats://127.0.0.1:4222"); + + manager.IsLeafConnectDisabled("nats://127.0.0.1:4222").ShouldBeTrue(); + } + + // Go: leafnode.go isLeafConnectDisabled — re-enable clears disable state + [Fact] + public void EnableLeafConnect_AfterDisable_ReturnsFalse() + { + var manager = CreateManager(); + manager.DisableLeafConnect("nats://127.0.0.1:4222"); + + manager.EnableLeafConnect("nats://127.0.0.1:4222"); + + manager.IsLeafConnectDisabled("nats://127.0.0.1:4222").ShouldBeFalse(); + } + + // Go: leafnode.go isLeafConnectDisabled — each remote tracked independently + [Fact] + public void DisableLeafConnect_MultipleRemotes_TrackedSeparately() + { + var manager = CreateManager(); + + manager.DisableLeafConnect("nats://192.168.1.1:4222"); + manager.DisableLeafConnect("nats://192.168.1.2:4222"); + + manager.IsLeafConnectDisabled("nats://192.168.1.1:4222").ShouldBeTrue(); + manager.IsLeafConnectDisabled("nats://192.168.1.2:4222").ShouldBeTrue(); + manager.IsLeafConnectDisabled("nats://192.168.1.3:4222").ShouldBeFalse(); + } + + // Go: leafnode.go isLeafConnectDisabled — global flag defaults to false + [Fact] + public void IsGloballyDisabled_Default_False() + { + var manager = CreateManager(); + + manager.IsGloballyDisabled.ShouldBeFalse(); + } + + // Go: leafnode.go isLeafConnectDisabled — DisableAllLeafConnections sets global flag + [Fact] + public void DisableAllLeafConnections_DisablesAll() + { + var manager = CreateManager(); + + manager.DisableAllLeafConnections("test reason"); + + manager.IsGloballyDisabled.ShouldBeTrue(); + manager.IsLeafConnectDisabled("nats://127.0.0.1:4222").ShouldBeTrue(); + manager.IsLeafConnectDisabled("nats://10.0.0.1:6222").ShouldBeTrue(); + } + + // Go: leafnode.go isLeafConnectDisabled — EnableAllLeafConnections clears global flag + [Fact] + public void EnableAllLeafConnections_ReEnables() + { + var manager = CreateManager(); + manager.DisableAllLeafConnections(); + + manager.EnableAllLeafConnections(); + + manager.IsGloballyDisabled.ShouldBeFalse(); + manager.IsLeafConnectDisabled("nats://127.0.0.1:4222").ShouldBeFalse(); + } + + // Go: leafnode.go isLeafConnectDisabled — global disable overrides non-disabled remote + [Fact] + public void IsLeafConnectDisabled_GlobalOverridesPerRemote() + { + var manager = CreateManager(); + // Remote is NOT individually disabled — but global disable should still block it. + manager.DisableAllLeafConnections(); + + manager.IsLeafConnectDisabled("nats://127.0.0.1:4222").ShouldBeTrue(); + } + + // Go: leafnode.go isLeafConnectDisabled — GetDisabledRemotes lists all per-remote entries + [Fact] + public void GetDisabledRemotes_ReturnsAll() + { + var manager = CreateManager(); + manager.DisableLeafConnect("nats://10.0.0.1:4222"); + manager.DisableLeafConnect("nats://10.0.0.2:4222"); + + var disabled = manager.GetDisabledRemotes(); + + disabled.Count.ShouldBe(2); + disabled.ShouldContain("nats://10.0.0.1:4222"); + disabled.ShouldContain("nats://10.0.0.2:4222"); + } + + // Go: leafnode.go isLeafConnectDisabled — DisabledRemoteCount matches number of disabled remotes + [Fact] + public void DisabledRemoteCount_MatchesDisabled() + { + var manager = CreateManager(); + manager.DisabledRemoteCount.ShouldBe(0); + + manager.DisableLeafConnect("nats://10.0.0.1:4222"); + manager.DisabledRemoteCount.ShouldBe(1); + + manager.DisableLeafConnect("nats://10.0.0.2:4222"); + manager.DisabledRemoteCount.ShouldBe(2); + + manager.EnableLeafConnect("nats://10.0.0.1:4222"); + manager.DisabledRemoteCount.ShouldBe(1); + } +} diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafWebSocketTests.cs b/tests/NATS.Server.Tests/LeafNodes/LeafWebSocketTests.cs new file mode 100644 index 0000000..18229d1 --- /dev/null +++ b/tests/NATS.Server.Tests/LeafNodes/LeafWebSocketTests.cs @@ -0,0 +1,230 @@ +using SystemWebSocket = System.Net.WebSockets.WebSocket; +using System.Net.WebSockets; +using NSubstitute; +using NATS.Server.LeafNodes; + +namespace NATS.Server.Tests.LeafNodes; + +/// +/// Unit tests for (Gap 12.5). +/// Verifies stream capability flags, read/write delegation to WebSocket, +/// telemetry counters, and IsConnected state reflection. +/// Go reference: leafnode.go wsCreateLeafConnection, client.go wsRead/wsWrite. +/// +public class LeafWebSocketTests +{ + // ------------------------------------------------------------------------- + // Helper + // ------------------------------------------------------------------------- + + private static SystemWebSocket CreateMockWebSocket(byte[]? readData = null) + { + var ws = Substitute.For(); + ws.State.Returns(WebSocketState.Open); + + if (readData != null) + { + ws.ReceiveAsync(Arg.Any>(), Arg.Any()) + .Returns(callInfo => + { + var mem = callInfo.ArgAt>(0); + var toCopy = Math.Min(readData.Length, mem.Length); + readData.AsSpan(0, toCopy).CopyTo(mem.Span); + return new ValueTask( + new ValueWebSocketReceiveResult(toCopy, WebSocketMessageType.Binary, true)); + }); + } + + return ws; + } + + // ------------------------------------------------------------------------- + // Tests 1-3: Stream capability flags + // ------------------------------------------------------------------------- + + // Go reference: client.go wsRead — reads are supported + [Fact] + public void CanRead_ReturnsTrue() + { + var ws = CreateMockWebSocket(); + var adapter = new WebSocketStreamAdapter(ws); + + adapter.CanRead.ShouldBeTrue(); + } + + // Go reference: client.go wsWrite — writes are supported + [Fact] + public void CanWrite_ReturnsTrue() + { + var ws = CreateMockWebSocket(); + var adapter = new WebSocketStreamAdapter(ws); + + adapter.CanWrite.ShouldBeTrue(); + } + + // Go reference: leafnode.go wsCreateLeafConnection — WebSocket is not seekable + [Fact] + public void CanSeek_ReturnsFalse() + { + var ws = CreateMockWebSocket(); + var adapter = new WebSocketStreamAdapter(ws); + + adapter.CanSeek.ShouldBeFalse(); + } + + // ------------------------------------------------------------------------- + // Test 4: ReadAsync delegates to WebSocket + // ------------------------------------------------------------------------- + + // Go reference: client.go wsRead — receive next message from WebSocket + [Fact] + public async Task ReadAsync_ReceivesFromWebSocket() + { + var expected = "hello"u8.ToArray(); + var ws = CreateMockWebSocket(readData: expected); + var adapter = new WebSocketStreamAdapter(ws); + + var buffer = new byte[16]; + var read = await adapter.ReadAsync(buffer, 0, buffer.Length, CancellationToken.None); + + read.ShouldBe(expected.Length); + buffer[..read].ShouldBe(expected); + } + + // ------------------------------------------------------------------------- + // Test 5: WriteAsync delegates to WebSocket + // ------------------------------------------------------------------------- + + // Go reference: client.go wsWrite — send data as a single binary frame + [Fact] + public async Task WriteAsync_SendsToWebSocket() + { + var ws = Substitute.For(); + ws.State.Returns(WebSocketState.Open); + var capturedData = new List(); + + ws.SendAsync( + Arg.Any>(), + WebSocketMessageType.Binary, + true, + Arg.Any()) + .Returns(callInfo => + { + var mem = callInfo.ArgAt>(0); + capturedData.AddRange(mem.ToArray()); + return ValueTask.CompletedTask; + }); + + var adapter = new WebSocketStreamAdapter(ws); + var payload = "world"u8.ToArray(); + + await adapter.WriteAsync(payload, 0, payload.Length, CancellationToken.None); + + await ws.Received(1).SendAsync( + Arg.Any>(), + WebSocketMessageType.Binary, + true, + Arg.Any()); + capturedData.ShouldBe(payload); + } + + // ------------------------------------------------------------------------- + // Test 6: BytesRead tracking + // ------------------------------------------------------------------------- + + // Go reference: client.go wsRead — track inbound byte count + [Fact] + public async Task BytesRead_TracksTotal() + { + var payload = new byte[] { 1, 2, 3, 4, 5 }; + var ws = CreateMockWebSocket(readData: payload); + var adapter = new WebSocketStreamAdapter(ws); + + var buffer = new byte[16]; + var bytesRead = await adapter.ReadAsync(buffer.AsMemory(), CancellationToken.None); + + bytesRead.ShouldBeGreaterThan(0); + adapter.BytesRead.ShouldBe(payload.Length); + } + + // ------------------------------------------------------------------------- + // Test 7: BytesWritten tracking + // ------------------------------------------------------------------------- + + // Go reference: client.go wsWrite — track outbound byte count + [Fact] + public async Task BytesWritten_TracksTotal() + { + var ws = Substitute.For(); + ws.State.Returns(WebSocketState.Open); + ws.SendAsync(Arg.Any>(), Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(ValueTask.CompletedTask); + + var adapter = new WebSocketStreamAdapter(ws); + var payload = new byte[] { 10, 20, 30 }; + + await adapter.WriteAsync(payload, 0, payload.Length, CancellationToken.None); + + adapter.BytesWritten.ShouldBe(payload.Length); + } + + // ------------------------------------------------------------------------- + // Test 8: MessagesRead counter + // ------------------------------------------------------------------------- + + // Go reference: client.go wsRead — each completed WebSocket message increments counter + [Fact] + public async Task MessagesRead_Incremented() + { + var payload = new byte[] { 0xAA, 0xBB }; + var ws = CreateMockWebSocket(readData: payload); + var adapter = new WebSocketStreamAdapter(ws); + + var buffer = new byte[16]; + _ = await adapter.ReadAsync(buffer.AsMemory(), CancellationToken.None); + + adapter.MessagesRead.ShouldBe(1); + } + + // ------------------------------------------------------------------------- + // Test 9: MessagesWritten counter + // ------------------------------------------------------------------------- + + // Go reference: client.go wsWrite — each SendAsync call is one message + [Fact] + public async Task MessagesWritten_Incremented() + { + var ws = Substitute.For(); + ws.State.Returns(WebSocketState.Open); + ws.SendAsync(Arg.Any>(), Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(ValueTask.CompletedTask); + + var adapter = new WebSocketStreamAdapter(ws); + + await adapter.WriteAsync(ReadOnlyMemory.Empty, CancellationToken.None); + await adapter.WriteAsync(ReadOnlyMemory.Empty, CancellationToken.None); + + adapter.MessagesWritten.ShouldBe(2); + } + + // ------------------------------------------------------------------------- + // Test 10: IsConnected reflects WebSocket.State + // ------------------------------------------------------------------------- + + // Go reference: leafnode.go wsCreateLeafConnection — connection liveness check + [Fact] + public void IsConnected_ReflectsWebSocketState() + { + var openWs = Substitute.For(); + openWs.State.Returns(WebSocketState.Open); + + var closedWs = Substitute.For(); + closedWs.State.Returns(WebSocketState.Closed); + + var openAdapter = new WebSocketStreamAdapter(openWs); + var closedAdapter = new WebSocketStreamAdapter(closedWs); + + openAdapter.IsConnected.ShouldBeTrue(); + closedAdapter.IsConnected.ShouldBeFalse(); + } +}