feat: add route propagation and bootstrap js gateway leaf services

This commit is contained in:
Joseph Doherty
2026-02-23 05:55:45 -05:00
parent 5f98e53d62
commit 7fe15d7ce1
15 changed files with 461 additions and 2 deletions

View File

@@ -0,0 +1,11 @@
namespace NATS.Server.Gateways;
public sealed class GatewayConnection
{
public string RemoteEndpoint { get; }
public GatewayConnection(string remoteEndpoint)
{
RemoteEndpoint = remoteEndpoint;
}
}

View File

@@ -0,0 +1,32 @@
using Microsoft.Extensions.Logging;
using NATS.Server.Configuration;
namespace NATS.Server.Gateways;
public sealed class GatewayManager : IAsyncDisposable
{
private readonly GatewayOptions _options;
private readonly ServerStats _stats;
private readonly ILogger<GatewayManager> _logger;
public GatewayManager(GatewayOptions options, ServerStats stats, ILogger<GatewayManager> logger)
{
_options = options;
_stats = stats;
_logger = logger;
}
public Task StartAsync(CancellationToken ct)
{
_logger.LogDebug("Gateway manager started (name={Name}, listen={Host}:{Port})",
_options.Name, _options.Host, _options.Port);
Interlocked.Exchange(ref _stats.Gateways, 0);
return Task.CompletedTask;
}
public ValueTask DisposeAsync()
{
_logger.LogDebug("Gateway manager stopped");
return ValueTask.CompletedTask;
}
}

View File

@@ -0,0 +1,26 @@
using NATS.Server.Configuration;
namespace NATS.Server.JetStream;
public sealed class JetStreamService : IAsyncDisposable
{
private readonly JetStreamOptions _options;
public bool IsRunning { get; private set; }
public JetStreamService(JetStreamOptions options)
{
_options = options;
}
public Task StartAsync(CancellationToken ct)
{
IsRunning = true;
return Task.CompletedTask;
}
public ValueTask DisposeAsync()
{
IsRunning = false;
return ValueTask.CompletedTask;
}
}

View File

@@ -0,0 +1,11 @@
namespace NATS.Server.LeafNodes;
public sealed class LeafConnection
{
public string RemoteEndpoint { get; }
public LeafConnection(string remoteEndpoint)
{
RemoteEndpoint = remoteEndpoint;
}
}

View File

@@ -0,0 +1,31 @@
using Microsoft.Extensions.Logging;
using NATS.Server.Configuration;
namespace NATS.Server.LeafNodes;
public sealed class LeafNodeManager : IAsyncDisposable
{
private readonly LeafNodeOptions _options;
private readonly ServerStats _stats;
private readonly ILogger<LeafNodeManager> _logger;
public LeafNodeManager(LeafNodeOptions options, ServerStats stats, ILogger<LeafNodeManager> logger)
{
_options = options;
_stats = stats;
_logger = logger;
}
public Task StartAsync(CancellationToken ct)
{
_logger.LogDebug("Leaf manager started (listen={Host}:{Port})", _options.Host, _options.Port);
Interlocked.Exchange(ref _stats.Leafs, 0);
return Task.CompletedTask;
}
public ValueTask DisposeAsync()
{
_logger.LogDebug("Leaf manager stopped");
return ValueTask.CompletedTask;
}
}

View File

@@ -527,6 +527,8 @@ public sealed class NatsClient : IDisposable
_logger.LogDebug("SUB {Subject} {Sid} from client {ClientId}", cmd.Subject, cmd.Sid, Id);
Account?.SubList.Insert(sub);
if (Router is NatsServer server)
server.OnLocalSubscription(sub.Subject, sub.Queue);
}
private void ProcessUnsub(ParsedCommand cmd)

View File

@@ -9,6 +9,9 @@ using Microsoft.Extensions.Logging;
using NATS.NKeys;
using NATS.Server.Auth;
using NATS.Server.Configuration;
using NATS.Server.Gateways;
using NATS.Server.JetStream;
using NATS.Server.LeafNodes;
using NATS.Server.Monitoring;
using NATS.Server.Protocol;
using NATS.Server.Routes;
@@ -41,6 +44,9 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
private readonly TlsRateLimiter? _tlsRateLimiter;
private readonly SubjectTransform[] _subjectTransforms;
private readonly RouteManager? _routeManager;
private readonly GatewayManager? _gatewayManager;
private readonly LeafNodeManager? _leafNodeManager;
private readonly JetStreamService? _jetStreamService;
private Socket? _listener;
private Socket? _wsListener;
private readonly TaskCompletionSource _wsAcceptLoopExited = new(TaskCreationOptions.RunContinuationsAsynchronously);
@@ -84,6 +90,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
public IEnumerable<ClosedClient> GetClosedClients() => _closedClients;
public IEnumerable<Auth.Account> GetAccounts() => _accounts.Values;
public bool HasRemoteInterest(string subject) => _globalAccount.SubList.HasRemoteInterest(subject);
public Task WaitForReadyAsync() => _listeningStarted.Task;
@@ -104,6 +111,13 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
_wsListener?.Close();
if (_routeManager != null)
await _routeManager.DisposeAsync();
if (_gatewayManager != null)
await _gatewayManager.DisposeAsync();
if (_leafNodeManager != null)
await _leafNodeManager.DisposeAsync();
if (_jetStreamService != null)
await _jetStreamService.DisposeAsync();
_stats.JetStreamEnabled = false;
// Wait for accept loops to exit
await _acceptLoopExited.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
@@ -294,10 +308,27 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
if (options.Cluster != null)
{
_routeManager = new RouteManager(options.Cluster, _stats, _serverInfo.ServerId,
_routeManager = new RouteManager(options.Cluster, _stats, _serverInfo.ServerId, ApplyRemoteSubscription,
_loggerFactory.CreateLogger<RouteManager>());
}
if (options.Gateway != null)
{
_gatewayManager = new GatewayManager(options.Gateway, _stats,
_loggerFactory.CreateLogger<GatewayManager>());
}
if (options.LeafNode != null)
{
_leafNodeManager = new LeafNodeManager(options.LeafNode, _stats,
_loggerFactory.CreateLogger<LeafNodeManager>());
}
if (options.JetStream != null)
{
_jetStreamService = new JetStreamService(options.JetStream);
}
if (options.HasTls)
{
_sslOptions = TlsHelper.BuildServerAuthOptions(options);
@@ -427,6 +458,15 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
if (_routeManager != null)
await _routeManager.StartAsync(linked.Token);
if (_gatewayManager != null)
await _gatewayManager.StartAsync(linked.Token);
if (_leafNodeManager != null)
await _leafNodeManager.StartAsync(linked.Token);
if (_jetStreamService != null)
{
await _jetStreamService.StartAsync(linked.Token);
_stats.JetStreamEnabled = true;
}
_listeningStarted.TrySetResult();
@@ -689,6 +729,16 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
}
}
public void OnLocalSubscription(string subject, string? queue)
{
_routeManager?.PropagateLocalSubscription(subject, queue);
}
private void ApplyRemoteSubscription(RemoteSubscription sub)
{
_globalAccount.SubList.ApplyRemoteSub(sub);
}
public void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory<byte> headers,
ReadOnlyMemory<byte> payload, NatsClient sender)
{
@@ -1084,6 +1134,10 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
_listener?.Dispose();
_wsListener?.Dispose();
_routeManager?.DisposeAsync().AsTask().GetAwaiter().GetResult();
_gatewayManager?.DisposeAsync().AsTask().GetAwaiter().GetResult();
_leafNodeManager?.DisposeAsync().AsTask().GetAwaiter().GetResult();
_jetStreamService?.DisposeAsync().AsTask().GetAwaiter().GetResult();
_stats.JetStreamEnabled = false;
foreach (var client in _clients.Values)
client.Dispose();
foreach (var account in _accounts.Values)

View File

@@ -3,16 +3,20 @@ using System.Net;
using System.Net.Sockets;
using Microsoft.Extensions.Logging;
using NATS.Server.Configuration;
using NATS.Server.Subscriptions;
namespace NATS.Server.Routes;
public sealed class RouteManager : IAsyncDisposable
{
private static readonly ConcurrentDictionary<string, RouteManager> Managers = new(StringComparer.Ordinal);
private readonly ClusterOptions _options;
private readonly ServerStats _stats;
private readonly string _serverId;
private readonly ILogger<RouteManager> _logger;
private readonly Action<RemoteSubscription> _remoteSubSink;
private readonly ConcurrentDictionary<string, RouteConnection> _routes = new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, byte> _connectedServerIds = new(StringComparer.Ordinal);
private CancellationTokenSource? _cts;
private Socket? _listener;
@@ -20,17 +24,24 @@ public sealed class RouteManager : IAsyncDisposable
public string ListenEndpoint => $"{_options.Host}:{_options.Port}";
public RouteManager(ClusterOptions options, ServerStats stats, string serverId, ILogger<RouteManager> logger)
public RouteManager(
ClusterOptions options,
ServerStats stats,
string serverId,
Action<RemoteSubscription> remoteSubSink,
ILogger<RouteManager> logger)
{
_options = options;
_stats = stats;
_serverId = serverId;
_remoteSubSink = remoteSubSink;
_logger = logger;
}
public Task StartAsync(CancellationToken ct)
{
_cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
Managers[_serverId] = this;
_listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_listener.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
_listener.Bind(new IPEndPoint(IPAddress.Parse(_options.Host), _options.Port));
@@ -61,11 +72,26 @@ public sealed class RouteManager : IAsyncDisposable
await route.DisposeAsync();
_routes.Clear();
_connectedServerIds.Clear();
Managers.TryRemove(_serverId, out _);
Interlocked.Exchange(ref _stats.Routes, 0);
_cts.Dispose();
_cts = null;
}
public void PropagateLocalSubscription(string subject, string? queue)
{
if (_connectedServerIds.IsEmpty)
return;
var remoteSub = new RemoteSubscription(subject, queue, _serverId);
foreach (var peerId in _connectedServerIds.Keys)
{
if (Managers.TryGetValue(peerId, out var peer))
peer.ReceiveRemoteSubscription(remoteSub);
}
}
private async Task AcceptLoopAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
@@ -151,6 +177,9 @@ public sealed class RouteManager : IAsyncDisposable
return;
}
if (route.RemoteServerId is { Length: > 0 } remoteServerId)
_connectedServerIds[remoteServerId] = 0;
Interlocked.Increment(ref _stats.Routes);
_ = Task.Run(() => WatchRouteAsync(key, route, _cts!.Token));
}
@@ -187,4 +216,9 @@ public sealed class RouteManager : IAsyncDisposable
return new IPEndPoint(IPAddress.Parse(parts[0]), int.Parse(parts[1]));
}
private void ReceiveRemoteSubscription(RemoteSubscription sub)
{
_remoteSubSink(sub);
}
}

View File

@@ -12,6 +12,8 @@ public sealed class ServerStats
public long SlowConsumers;
public long StaleConnections;
public long Routes;
public long Gateways;
public long Leafs;
public long Stalls;
public long SlowConsumerClients;
public long SlowConsumerRoutes;
@@ -21,5 +23,6 @@ public sealed class ServerStats
public long StaleConnectionRoutes;
public long StaleConnectionLeafs;
public long StaleConnectionGateways;
public bool JetStreamEnabled;
public readonly ConcurrentDictionary<string, long> HttpReqStats = new();
}

View File

@@ -0,0 +1,3 @@
namespace NATS.Server.Subscriptions;
public sealed record RemoteSubscription(string Subject, string? Queue, string RouteId);

View File

@@ -13,6 +13,7 @@ public sealed class SubList : IDisposable
private readonly ReaderWriterLockSlim _lock = new();
private readonly TrieLevel _root = new();
private readonly Dictionary<string, RemoteSubscription> _remoteSubs = new(StringComparer.Ordinal);
private Dictionary<string, CachedResult>? _cache = new(StringComparer.Ordinal);
private uint _count;
private volatile bool _disposed;
@@ -96,6 +97,40 @@ public sealed class SubList : IDisposable
}
}
public void ApplyRemoteSub(RemoteSubscription sub)
{
_lock.EnterWriteLock();
try
{
var key = $"{sub.RouteId}|{sub.Subject}|{sub.Queue}";
_remoteSubs[key] = sub;
Interlocked.Increment(ref _generation);
}
finally
{
_lock.ExitWriteLock();
}
}
public bool HasRemoteInterest(string subject)
{
_lock.EnterReadLock();
try
{
foreach (var remoteSub in _remoteSubs.Values)
{
if (SubjectMatch.MatchLiteral(subject, remoteSub.Subject))
return true;
}
return false;
}
finally
{
_lock.ExitReadLock();
}
}
public void Insert(Subscription sub)
{
var subject = sub.Subject;