feat: add route handshake lifecycle
This commit is contained in:
@@ -5,4 +5,5 @@ public sealed class ClusterOptions
|
|||||||
public string? Name { get; set; }
|
public string? Name { get; set; }
|
||||||
public string Host { get; set; } = "0.0.0.0";
|
public string Host { get; set; } = "0.0.0.0";
|
||||||
public int Port { get; set; } = 6222;
|
public int Port { get; set; } = 6222;
|
||||||
|
public List<string> Routes { get; set; } = [];
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ using NATS.Server.Auth;
|
|||||||
using NATS.Server.Configuration;
|
using NATS.Server.Configuration;
|
||||||
using NATS.Server.Monitoring;
|
using NATS.Server.Monitoring;
|
||||||
using NATS.Server.Protocol;
|
using NATS.Server.Protocol;
|
||||||
|
using NATS.Server.Routes;
|
||||||
using NATS.Server.Subscriptions;
|
using NATS.Server.Subscriptions;
|
||||||
using NATS.Server.Tls;
|
using NATS.Server.Tls;
|
||||||
using NATS.Server.WebSocket;
|
using NATS.Server.WebSocket;
|
||||||
@@ -39,6 +40,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
private readonly SslServerAuthenticationOptions? _sslOptions;
|
private readonly SslServerAuthenticationOptions? _sslOptions;
|
||||||
private readonly TlsRateLimiter? _tlsRateLimiter;
|
private readonly TlsRateLimiter? _tlsRateLimiter;
|
||||||
private readonly SubjectTransform[] _subjectTransforms;
|
private readonly SubjectTransform[] _subjectTransforms;
|
||||||
|
private readonly RouteManager? _routeManager;
|
||||||
private Socket? _listener;
|
private Socket? _listener;
|
||||||
private Socket? _wsListener;
|
private Socket? _wsListener;
|
||||||
private readonly TaskCompletionSource _wsAcceptLoopExited = new(TaskCreationOptions.RunContinuationsAsynchronously);
|
private readonly TaskCompletionSource _wsAcceptLoopExited = new(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||||
@@ -75,6 +77,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
public string ServerNKey { get; }
|
public string ServerNKey { get; }
|
||||||
public bool IsShuttingDown => Volatile.Read(ref _shutdown) != 0;
|
public bool IsShuttingDown => Volatile.Read(ref _shutdown) != 0;
|
||||||
public bool IsLameDuckMode => Volatile.Read(ref _lameDuck) != 0;
|
public bool IsLameDuckMode => Volatile.Read(ref _lameDuck) != 0;
|
||||||
|
public string? ClusterListen => _routeManager?.ListenEndpoint;
|
||||||
public Action? ReOpenLogFile { get; set; }
|
public Action? ReOpenLogFile { get; set; }
|
||||||
public IEnumerable<NatsClient> GetClients() => _clients.Values;
|
public IEnumerable<NatsClient> GetClients() => _clients.Values;
|
||||||
|
|
||||||
@@ -99,6 +102,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
// Close listeners to stop accept loops
|
// Close listeners to stop accept loops
|
||||||
_listener?.Close();
|
_listener?.Close();
|
||||||
_wsListener?.Close();
|
_wsListener?.Close();
|
||||||
|
if (_routeManager != null)
|
||||||
|
await _routeManager.DisposeAsync();
|
||||||
|
|
||||||
// Wait for accept loops to exit
|
// Wait for accept loops to exit
|
||||||
await _acceptLoopExited.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
|
await _acceptLoopExited.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
|
||||||
@@ -287,6 +292,12 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
AuthRequired = _authService.IsAuthRequired,
|
AuthRequired = _authService.IsAuthRequired,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if (options.Cluster != null)
|
||||||
|
{
|
||||||
|
_routeManager = new RouteManager(options.Cluster, _stats, _serverInfo.ServerId,
|
||||||
|
_loggerFactory.CreateLogger<RouteManager>());
|
||||||
|
}
|
||||||
|
|
||||||
if (options.HasTls)
|
if (options.HasTls)
|
||||||
{
|
{
|
||||||
_sslOptions = TlsHelper.BuildServerAuthOptions(options);
|
_sslOptions = TlsHelper.BuildServerAuthOptions(options);
|
||||||
@@ -414,6 +425,9 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
_ = RunWebSocketAcceptLoopAsync(linked.Token);
|
_ = RunWebSocketAcceptLoopAsync(linked.Token);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (_routeManager != null)
|
||||||
|
await _routeManager.StartAsync(linked.Token);
|
||||||
|
|
||||||
_listeningStarted.TrySetResult();
|
_listeningStarted.TrySetResult();
|
||||||
|
|
||||||
var tmpDelay = AcceptMinSleep;
|
var tmpDelay = AcceptMinSleep;
|
||||||
@@ -1069,6 +1083,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
_tlsRateLimiter?.Dispose();
|
_tlsRateLimiter?.Dispose();
|
||||||
_listener?.Dispose();
|
_listener?.Dispose();
|
||||||
_wsListener?.Dispose();
|
_wsListener?.Dispose();
|
||||||
|
_routeManager?.DisposeAsync().AsTask().GetAwaiter().GetResult();
|
||||||
foreach (var client in _clients.Values)
|
foreach (var client in _clients.Values)
|
||||||
client.Dispose();
|
client.Dispose();
|
||||||
foreach (var account in _accounts.Values)
|
foreach (var account in _accounts.Values)
|
||||||
|
|||||||
81
src/NATS.Server/Routes/RouteConnection.cs
Normal file
81
src/NATS.Server/Routes/RouteConnection.cs
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
using System.Net.Sockets;
|
||||||
|
using System.Text;
|
||||||
|
|
||||||
|
namespace NATS.Server.Routes;
|
||||||
|
|
||||||
|
public sealed class RouteConnection(Socket socket) : IAsyncDisposable
|
||||||
|
{
|
||||||
|
private readonly Socket _socket = socket;
|
||||||
|
private readonly NetworkStream _stream = new(socket, ownsSocket: true);
|
||||||
|
|
||||||
|
public string? RemoteServerId { get; private set; }
|
||||||
|
public string RemoteEndpoint => _socket.RemoteEndPoint?.ToString() ?? Guid.NewGuid().ToString("N");
|
||||||
|
|
||||||
|
public async Task PerformOutboundHandshakeAsync(string serverId, CancellationToken ct)
|
||||||
|
{
|
||||||
|
await WriteLineAsync($"ROUTE {serverId}", ct);
|
||||||
|
var line = await ReadLineAsync(ct);
|
||||||
|
RemoteServerId = ParseHandshake(line);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task PerformInboundHandshakeAsync(string serverId, CancellationToken ct)
|
||||||
|
{
|
||||||
|
var line = await ReadLineAsync(ct);
|
||||||
|
RemoteServerId = ParseHandshake(line);
|
||||||
|
await WriteLineAsync($"ROUTE {serverId}", ct);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task WaitUntilClosedAsync(CancellationToken ct)
|
||||||
|
{
|
||||||
|
var buffer = new byte[1024];
|
||||||
|
while (!ct.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
var bytesRead = await _stream.ReadAsync(buffer, ct);
|
||||||
|
if (bytesRead == 0)
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async ValueTask DisposeAsync()
|
||||||
|
{
|
||||||
|
await _stream.DisposeAsync();
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task WriteLineAsync(string line, CancellationToken ct)
|
||||||
|
{
|
||||||
|
var bytes = Encoding.ASCII.GetBytes($"{line}\r\n");
|
||||||
|
await _stream.WriteAsync(bytes, ct);
|
||||||
|
await _stream.FlushAsync(ct);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task<string> ReadLineAsync(CancellationToken ct)
|
||||||
|
{
|
||||||
|
var bytes = new List<byte>(64);
|
||||||
|
var single = new byte[1];
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
var read = await _stream.ReadAsync(single, ct);
|
||||||
|
if (read == 0)
|
||||||
|
throw new IOException("Route connection closed during handshake");
|
||||||
|
|
||||||
|
if (single[0] == (byte)'\n')
|
||||||
|
break;
|
||||||
|
if (single[0] != (byte)'\r')
|
||||||
|
bytes.Add(single[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
return Encoding.ASCII.GetString([.. bytes]);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static string ParseHandshake(string line)
|
||||||
|
{
|
||||||
|
if (!line.StartsWith("ROUTE ", StringComparison.OrdinalIgnoreCase))
|
||||||
|
throw new InvalidOperationException("Invalid route handshake");
|
||||||
|
|
||||||
|
var id = line[6..].Trim();
|
||||||
|
if (id.Length == 0)
|
||||||
|
throw new InvalidOperationException("Route handshake missing server id");
|
||||||
|
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
}
|
||||||
190
src/NATS.Server/Routes/RouteManager.cs
Normal file
190
src/NATS.Server/Routes/RouteManager.cs
Normal file
@@ -0,0 +1,190 @@
|
|||||||
|
using System.Collections.Concurrent;
|
||||||
|
using System.Net;
|
||||||
|
using System.Net.Sockets;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using NATS.Server.Configuration;
|
||||||
|
|
||||||
|
namespace NATS.Server.Routes;
|
||||||
|
|
||||||
|
public sealed class RouteManager : IAsyncDisposable
|
||||||
|
{
|
||||||
|
private readonly ClusterOptions _options;
|
||||||
|
private readonly ServerStats _stats;
|
||||||
|
private readonly string _serverId;
|
||||||
|
private readonly ILogger<RouteManager> _logger;
|
||||||
|
private readonly ConcurrentDictionary<string, RouteConnection> _routes = new(StringComparer.Ordinal);
|
||||||
|
|
||||||
|
private CancellationTokenSource? _cts;
|
||||||
|
private Socket? _listener;
|
||||||
|
private Task? _acceptLoopTask;
|
||||||
|
|
||||||
|
public string ListenEndpoint => $"{_options.Host}:{_options.Port}";
|
||||||
|
|
||||||
|
public RouteManager(ClusterOptions options, ServerStats stats, string serverId, ILogger<RouteManager> logger)
|
||||||
|
{
|
||||||
|
_options = options;
|
||||||
|
_stats = stats;
|
||||||
|
_serverId = serverId;
|
||||||
|
_logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task StartAsync(CancellationToken ct)
|
||||||
|
{
|
||||||
|
_cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
||||||
|
_listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||||
|
_listener.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
|
||||||
|
_listener.Bind(new IPEndPoint(IPAddress.Parse(_options.Host), _options.Port));
|
||||||
|
_listener.Listen(128);
|
||||||
|
|
||||||
|
if (_options.Port == 0)
|
||||||
|
_options.Port = ((IPEndPoint)_listener.LocalEndPoint!).Port;
|
||||||
|
|
||||||
|
_acceptLoopTask = Task.Run(() => AcceptLoopAsync(_cts.Token));
|
||||||
|
foreach (var route in _options.Routes.Distinct(StringComparer.OrdinalIgnoreCase))
|
||||||
|
_ = Task.Run(() => ConnectToRouteWithRetryAsync(route, _cts.Token));
|
||||||
|
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async ValueTask DisposeAsync()
|
||||||
|
{
|
||||||
|
if (_cts == null)
|
||||||
|
return;
|
||||||
|
|
||||||
|
await _cts.CancelAsync();
|
||||||
|
_listener?.Dispose();
|
||||||
|
|
||||||
|
if (_acceptLoopTask != null)
|
||||||
|
await _acceptLoopTask.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
|
||||||
|
|
||||||
|
foreach (var route in _routes.Values)
|
||||||
|
await route.DisposeAsync();
|
||||||
|
|
||||||
|
_routes.Clear();
|
||||||
|
Interlocked.Exchange(ref _stats.Routes, 0);
|
||||||
|
_cts.Dispose();
|
||||||
|
_cts = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task AcceptLoopAsync(CancellationToken ct)
|
||||||
|
{
|
||||||
|
while (!ct.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
Socket socket;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
socket = await _listener!.AcceptAsync(ct);
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
catch (ObjectDisposedException)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogDebug(ex, "Route accept loop error");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = Task.Run(() => HandleInboundRouteAsync(socket, ct), ct);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task HandleInboundRouteAsync(Socket socket, CancellationToken ct)
|
||||||
|
{
|
||||||
|
var route = new RouteConnection(socket);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await route.PerformInboundHandshakeAsync(_serverId, ct);
|
||||||
|
Register(route);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogDebug(ex, "Inbound route handshake failed");
|
||||||
|
await route.DisposeAsync();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task ConnectToRouteWithRetryAsync(string route, CancellationToken ct)
|
||||||
|
{
|
||||||
|
while (!ct.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var endPoint = ParseRouteEndpoint(route);
|
||||||
|
var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||||
|
await socket.ConnectAsync(endPoint.Address, endPoint.Port, ct);
|
||||||
|
var connection = new RouteConnection(socket);
|
||||||
|
await connection.PerformOutboundHandshakeAsync(_serverId, ct);
|
||||||
|
Register(connection);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogDebug(ex, "Failed to connect route seed {Route}", route);
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await Task.Delay(250, ct);
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void Register(RouteConnection route)
|
||||||
|
{
|
||||||
|
var key = $"{route.RemoteServerId}:{route.RemoteEndpoint}";
|
||||||
|
if (!_routes.TryAdd(key, route))
|
||||||
|
{
|
||||||
|
_ = route.DisposeAsync();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Interlocked.Increment(ref _stats.Routes);
|
||||||
|
_ = Task.Run(() => WatchRouteAsync(key, route, _cts!.Token));
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task WatchRouteAsync(string key, RouteConnection route, CancellationToken ct)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await route.WaitUntilClosedAsync(ct);
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException)
|
||||||
|
{
|
||||||
|
// Shutdown path.
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogDebug(ex, "Route {RouteKey} closed with error", key);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
if (_routes.TryRemove(key, out _))
|
||||||
|
Interlocked.Decrement(ref _stats.Routes);
|
||||||
|
|
||||||
|
await route.DisposeAsync();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static IPEndPoint ParseRouteEndpoint(string route)
|
||||||
|
{
|
||||||
|
var trimmed = route.Trim();
|
||||||
|
var parts = trimmed.Split(':', 2, StringSplitOptions.TrimEntries | StringSplitOptions.RemoveEmptyEntries);
|
||||||
|
if (parts.Length != 2)
|
||||||
|
throw new FormatException($"Invalid route endpoint: '{route}'");
|
||||||
|
|
||||||
|
return new IPEndPoint(IPAddress.Parse(parts[0]), int.Parse(parts[1]));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -11,6 +11,7 @@ public sealed class ServerStats
|
|||||||
public long TotalConnections;
|
public long TotalConnections;
|
||||||
public long SlowConsumers;
|
public long SlowConsumers;
|
||||||
public long StaleConnections;
|
public long StaleConnections;
|
||||||
|
public long Routes;
|
||||||
public long Stalls;
|
public long Stalls;
|
||||||
public long SlowConsumerClients;
|
public long SlowConsumerClients;
|
||||||
public long SlowConsumerRoutes;
|
public long SlowConsumerRoutes;
|
||||||
|
|||||||
67
tests/NATS.Server.Tests/RouteHandshakeTests.cs
Normal file
67
tests/NATS.Server.Tests/RouteHandshakeTests.cs
Normal file
@@ -0,0 +1,67 @@
|
|||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using NATS.Server.Configuration;
|
||||||
|
|
||||||
|
namespace NATS.Server.Tests;
|
||||||
|
|
||||||
|
public class RouteHandshakeTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public async Task Two_servers_establish_route_connection()
|
||||||
|
{
|
||||||
|
await using var a = await TestServerFactory.CreateClusterEnabledAsync();
|
||||||
|
await using var b = await TestServerFactory.CreateClusterEnabledAsync(seed: a.ClusterListen);
|
||||||
|
|
||||||
|
await a.WaitForReadyAsync();
|
||||||
|
await b.WaitForReadyAsync();
|
||||||
|
|
||||||
|
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||||
|
while (!timeout.IsCancellationRequested && (a.Stats.Routes == 0 || b.Stats.Routes == 0))
|
||||||
|
{
|
||||||
|
await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
|
||||||
|
}
|
||||||
|
|
||||||
|
a.Stats.Routes.ShouldBeGreaterThan(0);
|
||||||
|
b.Stats.Routes.ShouldBeGreaterThan(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal static class TestServerFactory
|
||||||
|
{
|
||||||
|
public static async Task<ClusterTestServer> CreateClusterEnabledAsync(string? seed = null)
|
||||||
|
{
|
||||||
|
var options = new NatsOptions
|
||||||
|
{
|
||||||
|
Host = "127.0.0.1",
|
||||||
|
Port = 0,
|
||||||
|
Cluster = new ClusterOptions
|
||||||
|
{
|
||||||
|
Name = Guid.NewGuid().ToString("N"),
|
||||||
|
Host = "127.0.0.1",
|
||||||
|
Port = 0,
|
||||||
|
Routes = seed is null ? [] : [seed],
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
var server = new NatsServer(options, NullLoggerFactory.Instance);
|
||||||
|
var cts = new CancellationTokenSource();
|
||||||
|
_ = server.StartAsync(cts.Token);
|
||||||
|
await server.WaitForReadyAsync();
|
||||||
|
|
||||||
|
return new ClusterTestServer(server, cts);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal sealed class ClusterTestServer(NatsServer server, CancellationTokenSource cts) : IAsyncDisposable
|
||||||
|
{
|
||||||
|
public ServerStats Stats => server.Stats;
|
||||||
|
public string ClusterListen => server.ClusterListen!;
|
||||||
|
|
||||||
|
public Task WaitForReadyAsync() => server.WaitForReadyAsync();
|
||||||
|
|
||||||
|
public async ValueTask DisposeAsync()
|
||||||
|
{
|
||||||
|
await cts.CancelAsync();
|
||||||
|
server.Dispose();
|
||||||
|
cts.Dispose();
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user