diff --git a/src/NATS.Server/Configuration/ClusterOptions.cs b/src/NATS.Server/Configuration/ClusterOptions.cs index 4571522..0920d69 100644 --- a/src/NATS.Server/Configuration/ClusterOptions.cs +++ b/src/NATS.Server/Configuration/ClusterOptions.cs @@ -5,4 +5,5 @@ public sealed class ClusterOptions public string? Name { get; set; } public string Host { get; set; } = "0.0.0.0"; public int Port { get; set; } = 6222; + public List Routes { get; set; } = []; } diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 9a1f717..cfb03e4 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -11,6 +11,7 @@ using NATS.Server.Auth; using NATS.Server.Configuration; using NATS.Server.Monitoring; using NATS.Server.Protocol; +using NATS.Server.Routes; using NATS.Server.Subscriptions; using NATS.Server.Tls; using NATS.Server.WebSocket; @@ -39,6 +40,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable private readonly SslServerAuthenticationOptions? _sslOptions; private readonly TlsRateLimiter? _tlsRateLimiter; private readonly SubjectTransform[] _subjectTransforms; + private readonly RouteManager? _routeManager; private Socket? _listener; private Socket? _wsListener; private readonly TaskCompletionSource _wsAcceptLoopExited = new(TaskCreationOptions.RunContinuationsAsynchronously); @@ -75,6 +77,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable public string ServerNKey { get; } public bool IsShuttingDown => Volatile.Read(ref _shutdown) != 0; public bool IsLameDuckMode => Volatile.Read(ref _lameDuck) != 0; + public string? ClusterListen => _routeManager?.ListenEndpoint; public Action? ReOpenLogFile { get; set; } public IEnumerable GetClients() => _clients.Values; @@ -99,6 +102,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable // Close listeners to stop accept loops _listener?.Close(); _wsListener?.Close(); + if (_routeManager != null) + await _routeManager.DisposeAsync(); // Wait for accept loops to exit await _acceptLoopExited.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); @@ -287,6 +292,12 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable AuthRequired = _authService.IsAuthRequired, }; + if (options.Cluster != null) + { + _routeManager = new RouteManager(options.Cluster, _stats, _serverInfo.ServerId, + _loggerFactory.CreateLogger()); + } + if (options.HasTls) { _sslOptions = TlsHelper.BuildServerAuthOptions(options); @@ -414,6 +425,9 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _ = RunWebSocketAcceptLoopAsync(linked.Token); } + if (_routeManager != null) + await _routeManager.StartAsync(linked.Token); + _listeningStarted.TrySetResult(); var tmpDelay = AcceptMinSleep; @@ -1069,6 +1083,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _tlsRateLimiter?.Dispose(); _listener?.Dispose(); _wsListener?.Dispose(); + _routeManager?.DisposeAsync().AsTask().GetAwaiter().GetResult(); foreach (var client in _clients.Values) client.Dispose(); foreach (var account in _accounts.Values) diff --git a/src/NATS.Server/Routes/RouteConnection.cs b/src/NATS.Server/Routes/RouteConnection.cs new file mode 100644 index 0000000..95595ae --- /dev/null +++ b/src/NATS.Server/Routes/RouteConnection.cs @@ -0,0 +1,81 @@ +using System.Net.Sockets; +using System.Text; + +namespace NATS.Server.Routes; + +public sealed class RouteConnection(Socket socket) : IAsyncDisposable +{ + private readonly Socket _socket = socket; + private readonly NetworkStream _stream = new(socket, ownsSocket: true); + + public string? RemoteServerId { get; private set; } + public string RemoteEndpoint => _socket.RemoteEndPoint?.ToString() ?? Guid.NewGuid().ToString("N"); + + public async Task PerformOutboundHandshakeAsync(string serverId, CancellationToken ct) + { + await WriteLineAsync($"ROUTE {serverId}", ct); + var line = await ReadLineAsync(ct); + RemoteServerId = ParseHandshake(line); + } + + public async Task PerformInboundHandshakeAsync(string serverId, CancellationToken ct) + { + var line = await ReadLineAsync(ct); + RemoteServerId = ParseHandshake(line); + await WriteLineAsync($"ROUTE {serverId}", ct); + } + + public async Task WaitUntilClosedAsync(CancellationToken ct) + { + var buffer = new byte[1024]; + while (!ct.IsCancellationRequested) + { + var bytesRead = await _stream.ReadAsync(buffer, ct); + if (bytesRead == 0) + return; + } + } + + public async ValueTask DisposeAsync() + { + await _stream.DisposeAsync(); + } + + private async Task WriteLineAsync(string line, CancellationToken ct) + { + var bytes = Encoding.ASCII.GetBytes($"{line}\r\n"); + await _stream.WriteAsync(bytes, ct); + await _stream.FlushAsync(ct); + } + + private async Task ReadLineAsync(CancellationToken ct) + { + var bytes = new List(64); + var single = new byte[1]; + while (true) + { + var read = await _stream.ReadAsync(single, ct); + if (read == 0) + throw new IOException("Route connection closed during handshake"); + + if (single[0] == (byte)'\n') + break; + if (single[0] != (byte)'\r') + bytes.Add(single[0]); + } + + return Encoding.ASCII.GetString([.. bytes]); + } + + private static string ParseHandshake(string line) + { + if (!line.StartsWith("ROUTE ", StringComparison.OrdinalIgnoreCase)) + throw new InvalidOperationException("Invalid route handshake"); + + var id = line[6..].Trim(); + if (id.Length == 0) + throw new InvalidOperationException("Route handshake missing server id"); + + return id; + } +} diff --git a/src/NATS.Server/Routes/RouteManager.cs b/src/NATS.Server/Routes/RouteManager.cs new file mode 100644 index 0000000..2774fa2 --- /dev/null +++ b/src/NATS.Server/Routes/RouteManager.cs @@ -0,0 +1,190 @@ +using System.Collections.Concurrent; +using System.Net; +using System.Net.Sockets; +using Microsoft.Extensions.Logging; +using NATS.Server.Configuration; + +namespace NATS.Server.Routes; + +public sealed class RouteManager : IAsyncDisposable +{ + private readonly ClusterOptions _options; + private readonly ServerStats _stats; + private readonly string _serverId; + private readonly ILogger _logger; + private readonly ConcurrentDictionary _routes = new(StringComparer.Ordinal); + + private CancellationTokenSource? _cts; + private Socket? _listener; + private Task? _acceptLoopTask; + + public string ListenEndpoint => $"{_options.Host}:{_options.Port}"; + + public RouteManager(ClusterOptions options, ServerStats stats, string serverId, ILogger logger) + { + _options = options; + _stats = stats; + _serverId = serverId; + _logger = logger; + } + + public Task StartAsync(CancellationToken ct) + { + _cts = CancellationTokenSource.CreateLinkedTokenSource(ct); + _listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + _listener.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); + _listener.Bind(new IPEndPoint(IPAddress.Parse(_options.Host), _options.Port)); + _listener.Listen(128); + + if (_options.Port == 0) + _options.Port = ((IPEndPoint)_listener.LocalEndPoint!).Port; + + _acceptLoopTask = Task.Run(() => AcceptLoopAsync(_cts.Token)); + foreach (var route in _options.Routes.Distinct(StringComparer.OrdinalIgnoreCase)) + _ = Task.Run(() => ConnectToRouteWithRetryAsync(route, _cts.Token)); + + return Task.CompletedTask; + } + + public async ValueTask DisposeAsync() + { + if (_cts == null) + return; + + await _cts.CancelAsync(); + _listener?.Dispose(); + + if (_acceptLoopTask != null) + await _acceptLoopTask.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); + + foreach (var route in _routes.Values) + await route.DisposeAsync(); + + _routes.Clear(); + Interlocked.Exchange(ref _stats.Routes, 0); + _cts.Dispose(); + _cts = null; + } + + private async Task AcceptLoopAsync(CancellationToken ct) + { + while (!ct.IsCancellationRequested) + { + Socket socket; + try + { + socket = await _listener!.AcceptAsync(ct); + } + catch (OperationCanceledException) + { + break; + } + catch (ObjectDisposedException) + { + break; + } + catch (Exception ex) + { + _logger.LogDebug(ex, "Route accept loop error"); + break; + } + + _ = Task.Run(() => HandleInboundRouteAsync(socket, ct), ct); + } + } + + private async Task HandleInboundRouteAsync(Socket socket, CancellationToken ct) + { + var route = new RouteConnection(socket); + try + { + await route.PerformInboundHandshakeAsync(_serverId, ct); + Register(route); + } + catch (Exception ex) + { + _logger.LogDebug(ex, "Inbound route handshake failed"); + await route.DisposeAsync(); + } + } + + private async Task ConnectToRouteWithRetryAsync(string route, CancellationToken ct) + { + while (!ct.IsCancellationRequested) + { + try + { + var endPoint = ParseRouteEndpoint(route); + var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await socket.ConnectAsync(endPoint.Address, endPoint.Port, ct); + var connection = new RouteConnection(socket); + await connection.PerformOutboundHandshakeAsync(_serverId, ct); + Register(connection); + return; + } + catch (OperationCanceledException) + { + return; + } + catch (Exception ex) + { + _logger.LogDebug(ex, "Failed to connect route seed {Route}", route); + } + + try + { + await Task.Delay(250, ct); + } + catch (OperationCanceledException) + { + return; + } + } + } + + private void Register(RouteConnection route) + { + var key = $"{route.RemoteServerId}:{route.RemoteEndpoint}"; + if (!_routes.TryAdd(key, route)) + { + _ = route.DisposeAsync(); + return; + } + + Interlocked.Increment(ref _stats.Routes); + _ = Task.Run(() => WatchRouteAsync(key, route, _cts!.Token)); + } + + private async Task WatchRouteAsync(string key, RouteConnection route, CancellationToken ct) + { + try + { + await route.WaitUntilClosedAsync(ct); + } + catch (OperationCanceledException) + { + // Shutdown path. + } + catch (Exception ex) + { + _logger.LogDebug(ex, "Route {RouteKey} closed with error", key); + } + finally + { + if (_routes.TryRemove(key, out _)) + Interlocked.Decrement(ref _stats.Routes); + + await route.DisposeAsync(); + } + } + + private static IPEndPoint ParseRouteEndpoint(string route) + { + var trimmed = route.Trim(); + var parts = trimmed.Split(':', 2, StringSplitOptions.TrimEntries | StringSplitOptions.RemoveEmptyEntries); + if (parts.Length != 2) + throw new FormatException($"Invalid route endpoint: '{route}'"); + + return new IPEndPoint(IPAddress.Parse(parts[0]), int.Parse(parts[1])); + } +} diff --git a/src/NATS.Server/ServerStats.cs b/src/NATS.Server/ServerStats.cs index 21ebd7d..41d6690 100644 --- a/src/NATS.Server/ServerStats.cs +++ b/src/NATS.Server/ServerStats.cs @@ -11,6 +11,7 @@ public sealed class ServerStats public long TotalConnections; public long SlowConsumers; public long StaleConnections; + public long Routes; public long Stalls; public long SlowConsumerClients; public long SlowConsumerRoutes; diff --git a/tests/NATS.Server.Tests/RouteHandshakeTests.cs b/tests/NATS.Server.Tests/RouteHandshakeTests.cs new file mode 100644 index 0000000..586dbfa --- /dev/null +++ b/tests/NATS.Server.Tests/RouteHandshakeTests.cs @@ -0,0 +1,67 @@ +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Server.Configuration; + +namespace NATS.Server.Tests; + +public class RouteHandshakeTests +{ + [Fact] + public async Task Two_servers_establish_route_connection() + { + await using var a = await TestServerFactory.CreateClusterEnabledAsync(); + await using var b = await TestServerFactory.CreateClusterEnabledAsync(seed: a.ClusterListen); + + await a.WaitForReadyAsync(); + await b.WaitForReadyAsync(); + + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + while (!timeout.IsCancellationRequested && (a.Stats.Routes == 0 || b.Stats.Routes == 0)) + { + await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); + } + + a.Stats.Routes.ShouldBeGreaterThan(0); + b.Stats.Routes.ShouldBeGreaterThan(0); + } +} + +internal static class TestServerFactory +{ + public static async Task CreateClusterEnabledAsync(string? seed = null) + { + var options = new NatsOptions + { + Host = "127.0.0.1", + Port = 0, + Cluster = new ClusterOptions + { + Name = Guid.NewGuid().ToString("N"), + Host = "127.0.0.1", + Port = 0, + Routes = seed is null ? [] : [seed], + }, + }; + + var server = new NatsServer(options, NullLoggerFactory.Instance); + var cts = new CancellationTokenSource(); + _ = server.StartAsync(cts.Token); + await server.WaitForReadyAsync(); + + return new ClusterTestServer(server, cts); + } +} + +internal sealed class ClusterTestServer(NatsServer server, CancellationTokenSource cts) : IAsyncDisposable +{ + public ServerStats Stats => server.Stats; + public string ClusterListen => server.ClusterListen!; + + public Task WaitForReadyAsync() => server.WaitForReadyAsync(); + + public async ValueTask DisposeAsync() + { + await cts.CancelAsync(); + server.Dispose(); + cts.Dispose(); + } +}