feat: add leaf node WebSocket support with stream adapter (Gap 12.5)
Implements WebSocketStreamAdapter — a Stream subclass that wraps System.Net.WebSockets.WebSocket for use by LeafConnection. Handles message framing (per-message receive/send), tracks BytesRead/BytesWritten and MessagesRead/MessagesWritten counters, and exposes IsConnected. Ten NSubstitute-based unit tests cover all capability flags, delegation, and telemetry (10/10 pass).
This commit is contained in:
@@ -23,6 +23,8 @@ public sealed class LeafNodeManager : IAsyncDisposable
|
||||
private readonly Action<LeafMessage> _messageSink;
|
||||
private readonly ILogger<LeafNodeManager> _logger;
|
||||
private readonly ConcurrentDictionary<string, LeafConnection> _connections = new(StringComparer.Ordinal);
|
||||
private readonly ConcurrentDictionary<string, bool> _disabledRemotes = new(StringComparer.Ordinal);
|
||||
private readonly ConcurrentDictionary<string, LeafClusterInfo> _leafClusters = new(StringComparer.Ordinal);
|
||||
private readonly LeafHubSpokeMapper _subjectFilter;
|
||||
|
||||
private CancellationTokenSource? _cts;
|
||||
@@ -67,6 +69,77 @@ public sealed class LeafNodeManager : IAsyncDisposable
|
||||
/// </summary>
|
||||
public bool IsTlsEnabled => CurrentCertPath is not null;
|
||||
|
||||
/// <summary>
|
||||
/// When true, all outbound leaf connections are disabled regardless of per-remote settings.
|
||||
/// Go reference: leafnode.go isLeafConnectDisabled — global disable flag.
|
||||
/// </summary>
|
||||
public bool IsGloballyDisabled { get; private set; }
|
||||
|
||||
/// <summary>
|
||||
/// Returns the number of remotes that have been individually disabled via
|
||||
/// <see cref="DisableLeafConnect"/>.
|
||||
/// </summary>
|
||||
public int DisabledRemoteCount => _disabledRemotes.Count;
|
||||
|
||||
/// <summary>
|
||||
/// Returns true when connections to <paramref name="remoteUrl"/> are currently disabled,
|
||||
/// either because it was individually disabled or because all leaf connections are globally
|
||||
/// disabled.
|
||||
/// Go reference: leafnode.go isLeafConnectDisabled.
|
||||
/// </summary>
|
||||
public bool IsLeafConnectDisabled(string remoteUrl)
|
||||
=> IsGloballyDisabled || _disabledRemotes.ContainsKey(remoteUrl);
|
||||
|
||||
/// <summary>
|
||||
/// Disables outbound leaf connections to the specified remote URL.
|
||||
/// Has no effect if the remote is already disabled.
|
||||
/// Go reference: leafnode.go isLeafConnectDisabled — per-remote disable tracking.
|
||||
/// </summary>
|
||||
public void DisableLeafConnect(string remoteUrl, string? reason = null)
|
||||
{
|
||||
_disabledRemotes.TryAdd(remoteUrl, true);
|
||||
_logger.LogInformation(
|
||||
"Leaf connect disabled for remote {RemoteUrl} (reason={Reason})",
|
||||
remoteUrl, reason ?? "unspecified");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Re-enables outbound leaf connections to the specified remote URL.
|
||||
/// Has no effect if the remote was not disabled.
|
||||
/// </summary>
|
||||
public void EnableLeafConnect(string remoteUrl)
|
||||
{
|
||||
_disabledRemotes.TryRemove(remoteUrl, out _);
|
||||
_logger.LogInformation("Leaf connect re-enabled for remote {RemoteUrl}", remoteUrl);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Disables all outbound leaf connections by setting the global disable flag.
|
||||
/// Per-remote disable state is preserved.
|
||||
/// Go reference: leafnode.go isLeafConnectDisabled — global flag.
|
||||
/// </summary>
|
||||
public void DisableAllLeafConnections(string? reason = null)
|
||||
{
|
||||
IsGloballyDisabled = true;
|
||||
_logger.LogInformation("All leaf connections globally disabled (reason={Reason})", reason ?? "unspecified");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Clears the global disable flag so outbound leaf connections may resume.
|
||||
/// Per-remote disable state is unchanged.
|
||||
/// </summary>
|
||||
public void EnableAllLeafConnections()
|
||||
{
|
||||
IsGloballyDisabled = false;
|
||||
_logger.LogInformation("All leaf connections globally re-enabled");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns a snapshot of the remote URLs that have been individually disabled via
|
||||
/// <see cref="DisableLeafConnect"/>.
|
||||
/// </summary>
|
||||
public IReadOnlyList<string> GetDisabledRemotes() => [.. _disabledRemotes.Keys];
|
||||
|
||||
/// <summary>
|
||||
/// Incremented each time <see cref="UpdateTlsConfig"/> detects a change and applies it.
|
||||
/// Useful for testing and observability.
|
||||
@@ -284,6 +357,149 @@ public sealed class LeafNodeManager : IAsyncDisposable
|
||||
SubscribeAllowCount: connection.AllowedSubscribeSubjects.Count);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Validates whether a leaf connection can migrate its JetStream domain to a proposed value.
|
||||
/// Clearing the domain (null/empty proposedDomain) is always valid.
|
||||
/// If the proposed domain matches the current domain, no migration is needed.
|
||||
/// If another connection already uses the proposed domain, a conflict is reported.
|
||||
/// Go reference: leafnode.go checkJetStreamMigrate.
|
||||
/// </summary>
|
||||
public JetStreamMigrationResult CheckJetStreamMigrate(string connectionId, string? proposedDomain)
|
||||
{
|
||||
if (!_connections.TryGetValue(connectionId, out var connection))
|
||||
return new JetStreamMigrationResult(false, JetStreamMigrationStatus.ConnectionNotFound, $"Connection '{connectionId}' not found");
|
||||
|
||||
// Clearing domain is always valid.
|
||||
if (string.IsNullOrEmpty(proposedDomain))
|
||||
return new JetStreamMigrationResult(true, JetStreamMigrationStatus.Valid, null);
|
||||
|
||||
// If current domain already matches, no migration needed.
|
||||
if (string.Equals(connection.JetStreamDomain, proposedDomain, StringComparison.Ordinal))
|
||||
return new JetStreamMigrationResult(true, JetStreamMigrationStatus.NoChangeNeeded, null);
|
||||
|
||||
// Check for domain conflict with other connections.
|
||||
foreach (var (key, conn) in _connections)
|
||||
{
|
||||
if (string.Equals(key, connectionId, StringComparison.Ordinal))
|
||||
continue;
|
||||
if (string.Equals(conn.JetStreamDomain, proposedDomain, StringComparison.Ordinal))
|
||||
return new JetStreamMigrationResult(false, JetStreamMigrationStatus.DomainConflict,
|
||||
$"Domain '{proposedDomain}' is already in use by another connection");
|
||||
}
|
||||
|
||||
return new JetStreamMigrationResult(true, JetStreamMigrationStatus.Valid, null);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns the distinct set of JetStream domains across all active connections.
|
||||
/// Connections without a domain (null/empty JetStreamDomain) are excluded.
|
||||
/// Go reference: leafnode.go — per-connection domain tracking.
|
||||
/// </summary>
|
||||
public IReadOnlyList<string> GetActiveJetStreamDomains()
|
||||
{
|
||||
var domains = new HashSet<string>(StringComparer.Ordinal);
|
||||
foreach (var conn in _connections.Values)
|
||||
{
|
||||
if (!string.IsNullOrEmpty(conn.JetStreamDomain))
|
||||
domains.Add(conn.JetStreamDomain);
|
||||
}
|
||||
return [.. domains];
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns true if any currently active connection is associated with the specified JetStream domain.
|
||||
/// Go reference: leafnode.go — checkJetStreamMigrate domain-in-use check.
|
||||
/// </summary>
|
||||
public bool IsJetStreamDomainInUse(string domain)
|
||||
{
|
||||
foreach (var conn in _connections.Values)
|
||||
{
|
||||
if (string.Equals(conn.JetStreamDomain, domain, StringComparison.Ordinal))
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Count of connections that have a non-null, non-empty JetStream domain assigned.
|
||||
/// Go reference: leafnode.go — per-connection jsClusterDomain field.
|
||||
/// </summary>
|
||||
public int JetStreamEnabledConnectionCount
|
||||
{
|
||||
get
|
||||
{
|
||||
var count = 0;
|
||||
foreach (var conn in _connections.Values)
|
||||
{
|
||||
if (!string.IsNullOrEmpty(conn.JetStreamDomain))
|
||||
count++;
|
||||
}
|
||||
return count;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Registers a leaf cluster topology entry.
|
||||
/// Returns false if a cluster with the same name is already registered.
|
||||
/// Go reference: leafnode.go registerLeafNodeCluster.
|
||||
/// </summary>
|
||||
public bool RegisterLeafNodeCluster(string clusterName, string gatewayUrl, int connectionCount)
|
||||
{
|
||||
var info = new LeafClusterInfo
|
||||
{
|
||||
ClusterName = clusterName,
|
||||
GatewayUrl = gatewayUrl,
|
||||
ConnectionCount = connectionCount,
|
||||
};
|
||||
return _leafClusters.TryAdd(clusterName, info);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Removes a leaf cluster entry by name.
|
||||
/// Returns false if no entry with that name exists.
|
||||
/// Go reference: leafnode.go — leaf cluster topology removal.
|
||||
/// </summary>
|
||||
public bool UnregisterLeafNodeCluster(string clusterName) =>
|
||||
_leafClusters.TryRemove(clusterName, out _);
|
||||
|
||||
/// <summary>
|
||||
/// Returns true if a leaf cluster with the given name is currently registered.
|
||||
/// Go reference: leafnode.go — leaf cluster topology lookup.
|
||||
/// </summary>
|
||||
public bool HasLeafNodeCluster(string clusterName) =>
|
||||
_leafClusters.ContainsKey(clusterName);
|
||||
|
||||
/// <summary>
|
||||
/// Returns the <see cref="LeafClusterInfo"/> for the named cluster, or null if not registered.
|
||||
/// Go reference: leafnode.go — leaf cluster topology lookup.
|
||||
/// </summary>
|
||||
public LeafClusterInfo? GetLeafNodeCluster(string clusterName) =>
|
||||
_leafClusters.TryGetValue(clusterName, out var info) ? info : null;
|
||||
|
||||
/// <summary>
|
||||
/// Returns all registered leaf cluster entries as a read-only list.
|
||||
/// Go reference: leafnode.go — leaf cluster topology enumeration.
|
||||
/// </summary>
|
||||
public IReadOnlyList<LeafClusterInfo> GetAllLeafClusters() =>
|
||||
[.. _leafClusters.Values];
|
||||
|
||||
/// <summary>
|
||||
/// Count of registered leaf cluster topology entries.
|
||||
/// Go reference: leafnode.go — leaf cluster topology count.
|
||||
/// </summary>
|
||||
public int LeafClusterCount => _leafClusters.Count;
|
||||
|
||||
/// <summary>
|
||||
/// Updates the connection count for the named leaf cluster.
|
||||
/// No-op if the cluster is not registered.
|
||||
/// Go reference: leafnode.go — leaf cluster connection count update.
|
||||
/// </summary>
|
||||
public void UpdateLeafClusterConnectionCount(string clusterName, int newCount)
|
||||
{
|
||||
if (_leafClusters.TryGetValue(clusterName, out var info))
|
||||
info.ConnectionCount = newCount;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns all current connection IDs. Useful for tests and monitoring.
|
||||
/// </summary>
|
||||
@@ -578,3 +794,42 @@ public enum LeafValidationError
|
||||
DuplicateConnection,
|
||||
JetStreamDomainConflict
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Describes the outcome of a <see cref="LeafNodeManager.CheckJetStreamMigrate"/> call.
|
||||
/// </summary>
|
||||
/// <param name="Valid">True when migration to the proposed domain is allowed.</param>
|
||||
/// <param name="Status">Detailed status code for the migration check.</param>
|
||||
/// <param name="Error">Human-readable error message when <paramref name="Valid"/> is false, otherwise null.</param>
|
||||
public sealed record JetStreamMigrationResult(
|
||||
bool Valid,
|
||||
JetStreamMigrationStatus Status,
|
||||
string? Error);
|
||||
|
||||
/// <summary>
|
||||
/// Status codes for <see cref="JetStreamMigrationResult"/>.
|
||||
/// Go reference: leafnode.go checkJetStreamMigrate return values.
|
||||
/// </summary>
|
||||
public enum JetStreamMigrationStatus
|
||||
{
|
||||
/// <summary>Migration to the proposed domain is allowed.</summary>
|
||||
Valid,
|
||||
/// <summary>The specified connection ID was not found.</summary>
|
||||
ConnectionNotFound,
|
||||
/// <summary>The proposed domain is identical to the current domain — no migration required.</summary>
|
||||
NoChangeNeeded,
|
||||
/// <summary>Another connection already uses the proposed domain.</summary>
|
||||
DomainConflict
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Holds topology information for a registered leaf cluster entry.
|
||||
/// Go reference: leafnode.go — leaf cluster registration / registerLeafNodeCluster.
|
||||
/// </summary>
|
||||
public sealed class LeafClusterInfo
|
||||
{
|
||||
public required string ClusterName { get; init; }
|
||||
public required string GatewayUrl { get; init; }
|
||||
public int ConnectionCount { get; set; }
|
||||
public DateTime RegisteredAt { get; init; } = DateTime.UtcNow;
|
||||
}
|
||||
|
||||
235
src/NATS.Server/LeafNodes/WebSocketStreamAdapter.cs
Normal file
235
src/NATS.Server/LeafNodes/WebSocketStreamAdapter.cs
Normal file
@@ -0,0 +1,235 @@
|
||||
using SystemWebSocket = System.Net.WebSockets.WebSocket;
|
||||
using System.Net.WebSockets;
|
||||
|
||||
namespace NATS.Server.LeafNodes;
|
||||
|
||||
/// <summary>
|
||||
/// Adapts a System.Net.WebSockets.WebSocket into a Stream suitable for use
|
||||
/// by LeafConnection. Handles message framing: reads aggregate WebSocket messages
|
||||
/// into a contiguous byte stream, and writes flush as single WebSocket messages.
|
||||
/// Go reference: leafnode.go wsCreateLeafConnection, client.go wsRead/wsWrite.
|
||||
/// </summary>
|
||||
public sealed class WebSocketStreamAdapter : Stream
|
||||
{
|
||||
private readonly SystemWebSocket _ws;
|
||||
private byte[] _readBuffer;
|
||||
private int _readOffset;
|
||||
private int _readCount;
|
||||
private bool _disposed;
|
||||
|
||||
public WebSocketStreamAdapter(SystemWebSocket ws, int initialBufferSize = 4096)
|
||||
{
|
||||
_ws = ws ?? throw new ArgumentNullException(nameof(ws));
|
||||
_readBuffer = new byte[Math.Max(initialBufferSize, 64)];
|
||||
_readOffset = 0;
|
||||
_readCount = 0;
|
||||
}
|
||||
|
||||
// Stream capability overrides
|
||||
public override bool CanRead => true;
|
||||
public override bool CanWrite => true;
|
||||
public override bool CanSeek => false;
|
||||
|
||||
// Telemetry properties
|
||||
public bool IsConnected => _ws.State == WebSocketState.Open;
|
||||
public long BytesRead { get; private set; }
|
||||
public long BytesWritten { get; private set; }
|
||||
public int MessagesRead { get; private set; }
|
||||
public int MessagesWritten { get; private set; }
|
||||
|
||||
/// <summary>
|
||||
/// Reads data from the WebSocket into <paramref name="buffer"/>.
|
||||
/// If the internal read buffer has buffered data from a previous message,
|
||||
/// that is served first. Otherwise a new WebSocket message is received.
|
||||
/// Go reference: client.go wsRead.
|
||||
/// </summary>
|
||||
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken ct)
|
||||
{
|
||||
ObjectDisposedException.ThrowIf(_disposed, this);
|
||||
|
||||
// Drain any leftover data from the previous WebSocket message first.
|
||||
if (_readCount > 0)
|
||||
{
|
||||
var fromBuffer = Math.Min(_readCount, count);
|
||||
_readBuffer.AsSpan(_readOffset, fromBuffer).CopyTo(buffer.AsSpan(offset, fromBuffer));
|
||||
_readOffset += fromBuffer;
|
||||
_readCount -= fromBuffer;
|
||||
if (_readCount == 0)
|
||||
_readOffset = 0;
|
||||
return fromBuffer;
|
||||
}
|
||||
|
||||
// Receive the next WebSocket message, growing the buffer as needed.
|
||||
var totalReceived = 0;
|
||||
while (true)
|
||||
{
|
||||
EnsureReadBufferCapacity(totalReceived + 1024);
|
||||
var result = await _ws.ReceiveAsync(
|
||||
_readBuffer.AsMemory(totalReceived),
|
||||
ct).ConfigureAwait(false);
|
||||
|
||||
if (result.MessageType == WebSocketMessageType.Close)
|
||||
return 0;
|
||||
|
||||
totalReceived += result.Count;
|
||||
|
||||
if (result.EndOfMessage)
|
||||
{
|
||||
MessagesRead++;
|
||||
BytesRead += totalReceived;
|
||||
|
||||
// Copy what fits into the caller's buffer; remainder stays in _readBuffer.
|
||||
var toCopy = Math.Min(totalReceived, count);
|
||||
_readBuffer.AsSpan(0, toCopy).CopyTo(buffer.AsSpan(offset, toCopy));
|
||||
|
||||
var remaining = totalReceived - toCopy;
|
||||
if (remaining > 0)
|
||||
{
|
||||
_readOffset = toCopy;
|
||||
_readCount = remaining;
|
||||
}
|
||||
else
|
||||
{
|
||||
_readOffset = 0;
|
||||
_readCount = 0;
|
||||
}
|
||||
|
||||
return toCopy;
|
||||
}
|
||||
|
||||
// Partial message — make sure buffer has room for more data.
|
||||
EnsureReadBufferCapacity(totalReceived + 1024);
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken ct = default)
|
||||
{
|
||||
ObjectDisposedException.ThrowIf(_disposed, this);
|
||||
|
||||
// Drain buffered data first.
|
||||
if (_readCount > 0)
|
||||
{
|
||||
var fromBuffer = Math.Min(_readCount, buffer.Length);
|
||||
_readBuffer.AsMemory(_readOffset, fromBuffer).CopyTo(buffer[..fromBuffer]);
|
||||
_readOffset += fromBuffer;
|
||||
_readCount -= fromBuffer;
|
||||
if (_readCount == 0)
|
||||
_readOffset = 0;
|
||||
return fromBuffer;
|
||||
}
|
||||
|
||||
// Receive the next WebSocket message into a temporary staging area.
|
||||
var totalReceived = 0;
|
||||
while (true)
|
||||
{
|
||||
EnsureReadBufferCapacity(totalReceived + 1024);
|
||||
var result = await _ws.ReceiveAsync(
|
||||
_readBuffer.AsMemory(totalReceived),
|
||||
ct).ConfigureAwait(false);
|
||||
|
||||
if (result.MessageType == WebSocketMessageType.Close)
|
||||
return 0;
|
||||
|
||||
totalReceived += result.Count;
|
||||
|
||||
if (result.EndOfMessage)
|
||||
{
|
||||
MessagesRead++;
|
||||
BytesRead += totalReceived;
|
||||
|
||||
var toCopy = Math.Min(totalReceived, buffer.Length);
|
||||
_readBuffer.AsMemory(0, toCopy).CopyTo(buffer[..toCopy]);
|
||||
|
||||
var remaining = totalReceived - toCopy;
|
||||
if (remaining > 0)
|
||||
{
|
||||
_readOffset = toCopy;
|
||||
_readCount = remaining;
|
||||
}
|
||||
else
|
||||
{
|
||||
_readOffset = 0;
|
||||
_readCount = 0;
|
||||
}
|
||||
|
||||
return toCopy;
|
||||
}
|
||||
|
||||
EnsureReadBufferCapacity(totalReceived + 1024);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Sends <paramref name="buffer"/> as a single binary WebSocket message.
|
||||
/// Go reference: client.go wsWrite.
|
||||
/// </summary>
|
||||
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken ct)
|
||||
{
|
||||
ObjectDisposedException.ThrowIf(_disposed, this);
|
||||
await _ws.SendAsync(
|
||||
buffer.AsMemory(offset, count),
|
||||
WebSocketMessageType.Binary,
|
||||
endOfMessage: true,
|
||||
ct).ConfigureAwait(false);
|
||||
BytesWritten += count;
|
||||
MessagesWritten++;
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken ct = default)
|
||||
{
|
||||
ObjectDisposedException.ThrowIf(_disposed, this);
|
||||
await _ws.SendAsync(
|
||||
buffer,
|
||||
WebSocketMessageType.Binary,
|
||||
endOfMessage: true,
|
||||
ct).ConfigureAwait(false);
|
||||
BytesWritten += buffer.Length;
|
||||
MessagesWritten++;
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public override Task FlushAsync(CancellationToken ct) => Task.CompletedTask;
|
||||
|
||||
// Not-supported synchronous and seeking members
|
||||
public override long Length => throw new NotSupportedException();
|
||||
public override long Position
|
||||
{
|
||||
get => throw new NotSupportedException();
|
||||
set => throw new NotSupportedException();
|
||||
}
|
||||
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
|
||||
public override void SetLength(long value) => throw new NotSupportedException();
|
||||
public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException("Use async methods");
|
||||
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException("Use async methods");
|
||||
public override void Flush() { }
|
||||
|
||||
protected override void Dispose(bool disposing)
|
||||
{
|
||||
if (_disposed)
|
||||
return;
|
||||
_disposed = true;
|
||||
if (disposing)
|
||||
_ws.Dispose();
|
||||
base.Dispose(disposing);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
private void EnsureReadBufferCapacity(int required)
|
||||
{
|
||||
if (_readBuffer.Length >= required)
|
||||
return;
|
||||
|
||||
var newSize = Math.Max(required, _readBuffer.Length * 2);
|
||||
var next = new byte[newSize];
|
||||
if (_readCount > 0)
|
||||
_readBuffer.AsSpan(_readOffset, _readCount).CopyTo(next);
|
||||
_readBuffer = next;
|
||||
_readOffset = 0;
|
||||
// _readCount unchanged
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user