feat: complete final jetstream parity transport and runtime baselines
This commit is contained in:
@@ -1,11 +1,191 @@
|
||||
using System.Net.Sockets;
|
||||
using System.Text;
|
||||
using NATS.Server.Subscriptions;
|
||||
|
||||
namespace NATS.Server.Gateways;
|
||||
|
||||
public sealed class GatewayConnection
|
||||
public sealed class GatewayConnection(Socket socket) : IAsyncDisposable
|
||||
{
|
||||
public string RemoteEndpoint { get; }
|
||||
private readonly NetworkStream _stream = new(socket, ownsSocket: true);
|
||||
private readonly SemaphoreSlim _writeGate = new(1, 1);
|
||||
private readonly CancellationTokenSource _closedCts = new();
|
||||
private Task? _loopTask;
|
||||
|
||||
public GatewayConnection(string remoteEndpoint)
|
||||
public string? RemoteId { get; private set; }
|
||||
public string RemoteEndpoint => socket.RemoteEndPoint?.ToString() ?? Guid.NewGuid().ToString("N");
|
||||
public Func<RemoteSubscription, Task>? RemoteSubscriptionReceived { get; set; }
|
||||
public Func<GatewayMessage, Task>? MessageReceived { get; set; }
|
||||
|
||||
public async Task PerformOutboundHandshakeAsync(string serverId, CancellationToken ct)
|
||||
{
|
||||
RemoteEndpoint = remoteEndpoint;
|
||||
await WriteLineAsync($"GATEWAY {serverId}", ct);
|
||||
var line = await ReadLineAsync(ct);
|
||||
RemoteId = ParseHandshake(line);
|
||||
}
|
||||
|
||||
public async Task PerformInboundHandshakeAsync(string serverId, CancellationToken ct)
|
||||
{
|
||||
var line = await ReadLineAsync(ct);
|
||||
RemoteId = ParseHandshake(line);
|
||||
await WriteLineAsync($"GATEWAY {serverId}", ct);
|
||||
}
|
||||
|
||||
public void StartLoop(CancellationToken ct)
|
||||
{
|
||||
if (_loopTask != null)
|
||||
return;
|
||||
|
||||
var linked = CancellationTokenSource.CreateLinkedTokenSource(ct, _closedCts.Token);
|
||||
_loopTask = Task.Run(() => ReadLoopAsync(linked.Token), linked.Token);
|
||||
}
|
||||
|
||||
public Task WaitUntilClosedAsync(CancellationToken ct)
|
||||
=> _loopTask?.WaitAsync(ct) ?? Task.CompletedTask;
|
||||
|
||||
public Task SendAPlusAsync(string subject, string? queue, CancellationToken ct)
|
||||
=> WriteLineAsync(queue is { Length: > 0 } ? $"A+ {subject} {queue}" : $"A+ {subject}", ct);
|
||||
|
||||
public Task SendAMinusAsync(string subject, string? queue, CancellationToken ct)
|
||||
=> WriteLineAsync(queue is { Length: > 0 } ? $"A- {subject} {queue}" : $"A- {subject}", ct);
|
||||
|
||||
public async Task SendMessageAsync(string subject, string? replyTo, ReadOnlyMemory<byte> payload, CancellationToken ct)
|
||||
{
|
||||
var reply = string.IsNullOrEmpty(replyTo) ? "-" : replyTo;
|
||||
await _writeGate.WaitAsync(ct);
|
||||
try
|
||||
{
|
||||
var control = Encoding.ASCII.GetBytes($"GMSG {subject} {reply} {payload.Length}\r\n");
|
||||
await _stream.WriteAsync(control, ct);
|
||||
if (!payload.IsEmpty)
|
||||
await _stream.WriteAsync(payload, ct);
|
||||
await _stream.WriteAsync("\r\n"u8.ToArray(), ct);
|
||||
await _stream.FlushAsync(ct);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_writeGate.Release();
|
||||
}
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
await _closedCts.CancelAsync();
|
||||
if (_loopTask != null)
|
||||
await _loopTask.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
|
||||
_closedCts.Dispose();
|
||||
_writeGate.Dispose();
|
||||
await _stream.DisposeAsync();
|
||||
}
|
||||
|
||||
private async Task ReadLoopAsync(CancellationToken ct)
|
||||
{
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
string line;
|
||||
try
|
||||
{
|
||||
line = await ReadLineAsync(ct);
|
||||
}
|
||||
catch
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
if (line.StartsWith("A+ ", StringComparison.Ordinal))
|
||||
{
|
||||
var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries);
|
||||
if (parts.Length >= 2 && RemoteSubscriptionReceived != null)
|
||||
{
|
||||
var queue = parts.Length >= 3 ? parts[2] : null;
|
||||
await RemoteSubscriptionReceived(new RemoteSubscription(parts[1], queue, RemoteId ?? string.Empty));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (line.StartsWith("A- ", StringComparison.Ordinal))
|
||||
{
|
||||
var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries);
|
||||
if (parts.Length >= 2 && RemoteSubscriptionReceived != null)
|
||||
{
|
||||
var queue = parts.Length >= 3 ? parts[2] : null;
|
||||
await RemoteSubscriptionReceived(RemoteSubscription.Removal(parts[1], queue, RemoteId ?? string.Empty));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!line.StartsWith("GMSG ", StringComparison.Ordinal))
|
||||
continue;
|
||||
|
||||
var args = line.Split(' ', StringSplitOptions.RemoveEmptyEntries);
|
||||
if (args.Length < 4 || !int.TryParse(args[3], out var size) || size < 0)
|
||||
continue;
|
||||
|
||||
var payload = await ReadPayloadAsync(size, ct);
|
||||
if (MessageReceived != null)
|
||||
await MessageReceived(new GatewayMessage(args[1], args[2] == "-" ? null : args[2], payload));
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<ReadOnlyMemory<byte>> ReadPayloadAsync(int size, CancellationToken ct)
|
||||
{
|
||||
var payload = new byte[size];
|
||||
var offset = 0;
|
||||
while (offset < size)
|
||||
{
|
||||
var read = await _stream.ReadAsync(payload.AsMemory(offset, size - offset), ct);
|
||||
if (read == 0)
|
||||
throw new IOException("Gateway payload read closed");
|
||||
offset += read;
|
||||
}
|
||||
|
||||
var trailer = new byte[2];
|
||||
_ = await _stream.ReadAsync(trailer, ct);
|
||||
return payload;
|
||||
}
|
||||
|
||||
private async Task WriteLineAsync(string line, CancellationToken ct)
|
||||
{
|
||||
await _writeGate.WaitAsync(ct);
|
||||
try
|
||||
{
|
||||
var bytes = Encoding.ASCII.GetBytes($"{line}\r\n");
|
||||
await _stream.WriteAsync(bytes, ct);
|
||||
await _stream.FlushAsync(ct);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_writeGate.Release();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<string> ReadLineAsync(CancellationToken ct)
|
||||
{
|
||||
var bytes = new List<byte>(64);
|
||||
var single = new byte[1];
|
||||
while (true)
|
||||
{
|
||||
var read = await _stream.ReadAsync(single, ct);
|
||||
if (read == 0)
|
||||
throw new IOException("Gateway closed");
|
||||
if (single[0] == (byte)'\n')
|
||||
break;
|
||||
if (single[0] != (byte)'\r')
|
||||
bytes.Add(single[0]);
|
||||
}
|
||||
|
||||
return Encoding.ASCII.GetString([.. bytes]);
|
||||
}
|
||||
|
||||
private static string ParseHandshake(string line)
|
||||
{
|
||||
if (!line.StartsWith("GATEWAY ", StringComparison.OrdinalIgnoreCase))
|
||||
throw new InvalidOperationException("Invalid gateway handshake");
|
||||
|
||||
var id = line[8..].Trim();
|
||||
if (id.Length == 0)
|
||||
throw new InvalidOperationException("Gateway handshake missing id");
|
||||
return id;
|
||||
}
|
||||
}
|
||||
|
||||
public sealed record GatewayMessage(string Subject, string? ReplyTo, ReadOnlyMemory<byte> Payload);
|
||||
|
||||
@@ -1,5 +1,9 @@
|
||||
using System.Collections.Concurrent;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using NATS.Server.Configuration;
|
||||
using NATS.Server.Subscriptions;
|
||||
|
||||
namespace NATS.Server.Gateways;
|
||||
|
||||
@@ -7,26 +11,204 @@ public sealed class GatewayManager : IAsyncDisposable
|
||||
{
|
||||
private readonly GatewayOptions _options;
|
||||
private readonly ServerStats _stats;
|
||||
private readonly string _serverId;
|
||||
private readonly Action<RemoteSubscription> _remoteSubSink;
|
||||
private readonly Action<GatewayMessage> _messageSink;
|
||||
private readonly ILogger<GatewayManager> _logger;
|
||||
private readonly ConcurrentDictionary<string, GatewayConnection> _connections = new(StringComparer.Ordinal);
|
||||
|
||||
public GatewayManager(GatewayOptions options, ServerStats stats, ILogger<GatewayManager> logger)
|
||||
private CancellationTokenSource? _cts;
|
||||
private Socket? _listener;
|
||||
private Task? _acceptLoopTask;
|
||||
|
||||
public string ListenEndpoint => $"{_options.Host}:{_options.Port}";
|
||||
|
||||
public GatewayManager(
|
||||
GatewayOptions options,
|
||||
ServerStats stats,
|
||||
string serverId,
|
||||
Action<RemoteSubscription> remoteSubSink,
|
||||
Action<GatewayMessage> messageSink,
|
||||
ILogger<GatewayManager> logger)
|
||||
{
|
||||
_options = options;
|
||||
_stats = stats;
|
||||
_serverId = serverId;
|
||||
_remoteSubSink = remoteSubSink;
|
||||
_messageSink = messageSink;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
public Task StartAsync(CancellationToken ct)
|
||||
{
|
||||
_cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
||||
_listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
_listener.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
|
||||
_listener.Bind(new IPEndPoint(IPAddress.Parse(_options.Host), _options.Port));
|
||||
_listener.Listen(128);
|
||||
|
||||
if (_options.Port == 0)
|
||||
_options.Port = ((IPEndPoint)_listener.LocalEndPoint!).Port;
|
||||
|
||||
_acceptLoopTask = Task.Run(() => AcceptLoopAsync(_cts.Token));
|
||||
foreach (var remote in _options.Remotes.Distinct(StringComparer.OrdinalIgnoreCase))
|
||||
_ = Task.Run(() => ConnectWithRetryAsync(remote, _cts.Token));
|
||||
|
||||
_logger.LogDebug("Gateway manager started (name={Name}, listen={Host}:{Port})",
|
||||
_options.Name, _options.Host, _options.Port);
|
||||
Interlocked.Exchange(ref _stats.Gateways, 0);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public ValueTask DisposeAsync()
|
||||
public async Task ForwardMessageAsync(string subject, string? replyTo, ReadOnlyMemory<byte> payload, CancellationToken ct)
|
||||
{
|
||||
foreach (var connection in _connections.Values)
|
||||
await connection.SendMessageAsync(subject, replyTo, payload, ct);
|
||||
}
|
||||
|
||||
public void PropagateLocalSubscription(string subject, string? queue)
|
||||
{
|
||||
foreach (var connection in _connections.Values)
|
||||
_ = connection.SendAPlusAsync(subject, queue, _cts?.Token ?? CancellationToken.None);
|
||||
}
|
||||
|
||||
public void PropagateLocalUnsubscription(string subject, string? queue)
|
||||
{
|
||||
foreach (var connection in _connections.Values)
|
||||
_ = connection.SendAMinusAsync(subject, queue, _cts?.Token ?? CancellationToken.None);
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
if (_cts == null)
|
||||
return;
|
||||
|
||||
await _cts.CancelAsync();
|
||||
_listener?.Dispose();
|
||||
if (_acceptLoopTask != null)
|
||||
await _acceptLoopTask.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
|
||||
|
||||
foreach (var connection in _connections.Values)
|
||||
await connection.DisposeAsync();
|
||||
_connections.Clear();
|
||||
Interlocked.Exchange(ref _stats.Gateways, 0);
|
||||
_cts.Dispose();
|
||||
_cts = null;
|
||||
_logger.LogDebug("Gateway manager stopped");
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
|
||||
private async Task AcceptLoopAsync(CancellationToken ct)
|
||||
{
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
Socket socket;
|
||||
try
|
||||
{
|
||||
socket = await _listener!.AcceptAsync(ct);
|
||||
}
|
||||
catch
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
_ = Task.Run(() => HandleInboundAsync(socket, ct), ct);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task HandleInboundAsync(Socket socket, CancellationToken ct)
|
||||
{
|
||||
var connection = new GatewayConnection(socket);
|
||||
try
|
||||
{
|
||||
await connection.PerformInboundHandshakeAsync(_serverId, ct);
|
||||
Register(connection);
|
||||
}
|
||||
catch
|
||||
{
|
||||
await connection.DisposeAsync();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ConnectWithRetryAsync(string remote, CancellationToken ct)
|
||||
{
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
var endPoint = ParseEndpoint(remote);
|
||||
var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
await socket.ConnectAsync(endPoint.Address, endPoint.Port, ct);
|
||||
var connection = new GatewayConnection(socket);
|
||||
await connection.PerformOutboundHandshakeAsync(_serverId, ct);
|
||||
Register(connection);
|
||||
return;
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
return;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogDebug(ex, "Gateway connect retry for {Remote}", remote);
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
await Task.Delay(250, ct);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void Register(GatewayConnection connection)
|
||||
{
|
||||
var key = $"{connection.RemoteId}:{connection.RemoteEndpoint}:{Guid.NewGuid():N}";
|
||||
if (!_connections.TryAdd(key, connection))
|
||||
{
|
||||
_ = connection.DisposeAsync();
|
||||
return;
|
||||
}
|
||||
|
||||
connection.RemoteSubscriptionReceived = sub =>
|
||||
{
|
||||
_remoteSubSink(sub);
|
||||
return Task.CompletedTask;
|
||||
};
|
||||
connection.MessageReceived = msg =>
|
||||
{
|
||||
_messageSink(msg);
|
||||
return Task.CompletedTask;
|
||||
};
|
||||
connection.StartLoop(_cts!.Token);
|
||||
Interlocked.Increment(ref _stats.Gateways);
|
||||
_ = Task.Run(() => WatchConnectionAsync(key, connection, _cts!.Token));
|
||||
}
|
||||
|
||||
private async Task WatchConnectionAsync(string key, GatewayConnection connection, CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
await connection.WaitUntilClosedAsync(ct);
|
||||
}
|
||||
catch
|
||||
{
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (_connections.TryRemove(key, out _))
|
||||
Interlocked.Decrement(ref _stats.Gateways);
|
||||
await connection.DisposeAsync();
|
||||
}
|
||||
}
|
||||
|
||||
private static IPEndPoint ParseEndpoint(string endpoint)
|
||||
{
|
||||
var parts = endpoint.Split(':', 2, StringSplitOptions.TrimEntries | StringSplitOptions.RemoveEmptyEntries);
|
||||
if (parts.Length != 2)
|
||||
throw new FormatException($"Invalid endpoint: {endpoint}");
|
||||
|
||||
return new IPEndPoint(IPAddress.Parse(parts[0]), int.Parse(parts[1]));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user