feat(networking): expand gateway reply mapper and add leaf solicited connections (D4+D5)

D4: Add hash segment support to ReplyMapper (_GR_.{cluster}.{hash}.{reply}),
FNV-1a ComputeReplyHash, TryExtractClusterId/Hash, legacy format compat.
D5: Add ConnectSolicitedAsync with exponential backoff (1s-60s cap),
JetStreamDomain propagation in LEAF handshake, LeafNodeOptions.JetStreamDomain.
This commit is contained in:
Joseph Doherty
2026-02-24 15:22:24 -05:00
parent 7116988d03
commit efd053ba60
6 changed files with 677 additions and 29 deletions

View File

@@ -5,4 +5,11 @@ public sealed class LeafNodeOptions
public string Host { get; set; } = "0.0.0.0";
public int Port { get; set; }
public List<string> Remotes { get; set; } = [];
/// <summary>
/// JetStream domain for this leaf node. When set, the domain is propagated
/// during the leaf handshake for domain-aware JetStream routing.
/// Go reference: leafnode.go — JsDomain in leafNodeCfg.
/// </summary>
public string? JetStreamDomain { get; set; }
}

View File

@@ -1,21 +1,76 @@
namespace NATS.Server.Gateways;
/// <summary>
/// Maps reply subjects to gateway-prefixed forms and restores them.
/// The gateway reply format is <c>_GR_.{clusterId}.{hash}.{originalReply}</c>.
/// A legacy format <c>_GR_.{clusterId}.{originalReply}</c> (no hash) is also supported
/// for backward compatibility.
/// Go reference: gateway.go:2000-2100, gateway.go:340-380.
/// </summary>
public static class ReplyMapper
{
private const string GatewayReplyPrefix = "_GR_.";
/// <summary>
/// Checks whether the subject starts with the gateway reply prefix <c>_GR_.</c>.
/// </summary>
public static bool HasGatewayReplyPrefix(string? subject)
=> !string.IsNullOrWhiteSpace(subject)
&& subject.StartsWith(GatewayReplyPrefix, StringComparison.Ordinal);
/// <summary>
/// Computes a deterministic FNV-1a hash of the reply subject.
/// Go reference: gateway.go uses SHA-256 truncated to base-62; we use FNV-1a for speed
/// while maintaining determinism and good distribution.
/// </summary>
public static long ComputeReplyHash(string replyTo)
{
// FNV-1a 64-bit
const ulong fnvOffsetBasis = 14695981039346656037UL;
const ulong fnvPrime = 1099511628211UL;
var hash = fnvOffsetBasis;
foreach (var c in replyTo)
{
hash ^= (byte)c;
hash *= fnvPrime;
}
// Return as non-negative long
return (long)(hash & 0x7FFFFFFFFFFFFFFF);
}
/// <summary>
/// Converts a reply subject to gateway form with an explicit hash segment.
/// Format: <c>_GR_.{clusterId}.{hash}.{originalReply}</c>.
/// </summary>
public static string? ToGatewayReply(string? replyTo, string localClusterId, long hash)
{
if (string.IsNullOrWhiteSpace(replyTo))
return replyTo;
return $"{GatewayReplyPrefix}{localClusterId}.{hash}.{replyTo}";
}
/// <summary>
/// Converts a reply subject to gateway form, automatically computing the hash.
/// Format: <c>_GR_.{clusterId}.{hash}.{originalReply}</c>.
/// </summary>
public static string? ToGatewayReply(string? replyTo, string localClusterId)
{
if (string.IsNullOrWhiteSpace(replyTo))
return replyTo;
return $"{GatewayReplyPrefix}{localClusterId}.{replyTo}";
var hash = ComputeReplyHash(replyTo);
return ToGatewayReply(replyTo, localClusterId, hash);
}
/// <summary>
/// Restores the original reply subject from a gateway-prefixed reply.
/// Handles both new format (<c>_GR_.{clusterId}.{hash}.{originalReply}</c>) and
/// legacy format (<c>_GR_.{clusterId}.{originalReply}</c>).
/// Nested prefixes are unwrapped iteratively.
/// </summary>
public static bool TryRestoreGatewayReply(string? gatewayReply, out string restoredReply)
{
restoredReply = string.Empty;
@@ -26,14 +81,94 @@ public static class ReplyMapper
var current = gatewayReply!;
while (HasGatewayReplyPrefix(current))
{
var clusterSeparator = current.IndexOf('.', GatewayReplyPrefix.Length);
if (clusterSeparator < 0 || clusterSeparator == current.Length - 1)
// Skip the "_GR_." prefix
var afterPrefix = current[GatewayReplyPrefix.Length..];
// Find the first dot (end of clusterId)
var firstDot = afterPrefix.IndexOf('.');
if (firstDot < 0 || firstDot == afterPrefix.Length - 1)
return false;
current = current[(clusterSeparator + 1)..];
var afterCluster = afterPrefix[(firstDot + 1)..];
// Check if the next segment is a numeric hash
var secondDot = afterCluster.IndexOf('.');
if (secondDot > 0 && secondDot < afterCluster.Length - 1 && IsNumericSegment(afterCluster.AsSpan()[..secondDot]))
{
// New format: skip hash segment too
current = afterCluster[(secondDot + 1)..];
}
else
{
// Legacy format: no hash, the rest is the original reply
current = afterCluster;
}
}
restoredReply = current;
return true;
}
/// <summary>
/// Extracts the cluster ID from a gateway reply subject.
/// The cluster ID is the first segment after the <c>_GR_.</c> prefix.
/// </summary>
public static bool TryExtractClusterId(string? gatewayReply, out string clusterId)
{
clusterId = string.Empty;
if (!HasGatewayReplyPrefix(gatewayReply))
return false;
var afterPrefix = gatewayReply![GatewayReplyPrefix.Length..];
var dot = afterPrefix.IndexOf('.');
if (dot <= 0)
return false;
clusterId = afterPrefix[..dot];
return true;
}
/// <summary>
/// Extracts the hash from a gateway reply subject (new format only).
/// Returns false if the reply uses the legacy format without a hash.
/// </summary>
public static bool TryExtractHash(string? gatewayReply, out long hash)
{
hash = 0;
if (!HasGatewayReplyPrefix(gatewayReply))
return false;
var afterPrefix = gatewayReply![GatewayReplyPrefix.Length..];
// Skip clusterId
var firstDot = afterPrefix.IndexOf('.');
if (firstDot <= 0 || firstDot == afterPrefix.Length - 1)
return false;
var afterCluster = afterPrefix[(firstDot + 1)..];
// Try to parse hash segment
var secondDot = afterCluster.IndexOf('.');
if (secondDot <= 0)
return false;
var hashSegment = afterCluster[..secondDot];
return long.TryParse(hashSegment, out hash);
}
private static bool IsNumericSegment(ReadOnlySpan<char> segment)
{
if (segment.IsEmpty)
return false;
foreach (var c in segment)
{
if (c is not (>= '0' and <= '9'))
return false;
}
return true;
}
}

View File

@@ -4,6 +4,12 @@ using NATS.Server.Subscriptions;
namespace NATS.Server.LeafNodes;
/// <summary>
/// Represents a single leaf node connection (inbound or outbound).
/// Handles LEAF handshake, LS+/LS- interest propagation, and LMSG forwarding.
/// The JetStreamDomain property is propagated during handshake for domain-aware routing.
/// Go reference: leafnode.go.
/// </summary>
public sealed class LeafConnection(Socket socket) : IAsyncDisposable
{
private readonly NetworkStream _stream = new(socket, ownsSocket: true);
@@ -16,18 +22,32 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable
public Func<RemoteSubscription, Task>? RemoteSubscriptionReceived { get; set; }
public Func<LeafMessage, Task>? MessageReceived { get; set; }
/// <summary>
/// JetStream domain for this leaf connection. When set, the domain is propagated
/// in the LEAF handshake and included in LMSG frames for domain-aware routing.
/// Go reference: leafnode.go — jsClusterDomain field in leafInfo.
/// </summary>
public string? JetStreamDomain { get; set; }
/// <summary>
/// The JetStream domain advertised by the remote side during handshake.
/// </summary>
public string? RemoteJetStreamDomain { get; private set; }
public async Task PerformOutboundHandshakeAsync(string serverId, CancellationToken ct)
{
await WriteLineAsync($"LEAF {serverId}", ct);
var handshakeLine = BuildHandshakeLine(serverId);
await WriteLineAsync(handshakeLine, ct);
var line = await ReadLineAsync(ct);
RemoteId = ParseHandshake(line);
ParseHandshakeResponse(line);
}
public async Task PerformInboundHandshakeAsync(string serverId, CancellationToken ct)
{
var line = await ReadLineAsync(ct);
RemoteId = ParseHandshake(line);
await WriteLineAsync($"LEAF {serverId}", ct);
ParseHandshakeResponse(line);
var handshakeLine = BuildHandshakeLine(serverId);
await WriteLineAsync(handshakeLine, ct);
}
public void StartLoop(CancellationToken ct)
@@ -77,6 +97,39 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable
await _stream.DisposeAsync();
}
private string BuildHandshakeLine(string serverId)
{
if (!string.IsNullOrEmpty(JetStreamDomain))
return $"LEAF {serverId} domain={JetStreamDomain}";
return $"LEAF {serverId}";
}
private void ParseHandshakeResponse(string line)
{
if (!line.StartsWith("LEAF ", StringComparison.OrdinalIgnoreCase))
throw new InvalidOperationException("Invalid leaf handshake");
var rest = line[5..].Trim();
if (rest.Length == 0)
throw new InvalidOperationException("Leaf handshake missing id");
// Parse "serverId [domain=xxx]" format
var spaceIdx = rest.IndexOf(' ');
if (spaceIdx > 0)
{
RemoteId = rest[..spaceIdx];
var attrs = rest[(spaceIdx + 1)..];
const string domainPrefix = "domain=";
if (attrs.StartsWith(domainPrefix, StringComparison.OrdinalIgnoreCase))
RemoteJetStreamDomain = attrs[domainPrefix.Length..].Trim();
}
else
{
RemoteId = rest;
}
}
private async Task ReadLoopAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
@@ -198,17 +251,6 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable
return Encoding.ASCII.GetString([.. bytes]);
}
private static string ParseHandshake(string line)
{
if (!line.StartsWith("LEAF ", StringComparison.OrdinalIgnoreCase))
throw new InvalidOperationException("Invalid leaf handshake");
var id = line[5..].Trim();
if (id.Length == 0)
throw new InvalidOperationException("Leaf handshake missing id");
return id;
}
private static bool TryParseAccountScopedInterest(string[] parts, out string account, out string subject, out string? queue)
{
account = "$G";

View File

@@ -7,6 +7,11 @@ using NATS.Server.Subscriptions;
namespace NATS.Server.LeafNodes;
/// <summary>
/// Manages leaf node connections — both inbound (accepted) and outbound (solicited).
/// Outbound connections use exponential backoff retry: 1s, 2s, 4s, ..., capped at 60s.
/// Go reference: leafnode.go.
/// </summary>
public sealed class LeafNodeManager : IAsyncDisposable
{
private readonly LeafNodeOptions _options;
@@ -21,6 +26,17 @@ public sealed class LeafNodeManager : IAsyncDisposable
private Socket? _listener;
private Task? _acceptLoopTask;
/// <summary>
/// Initial retry delay for solicited connections (1 second).
/// Go reference: leafnode.go — DEFAULT_LEAF_NODE_RECONNECT constant.
/// </summary>
internal static readonly TimeSpan InitialRetryDelay = TimeSpan.FromSeconds(1);
/// <summary>
/// Maximum retry delay for solicited connections (60 seconds).
/// </summary>
internal static readonly TimeSpan MaxRetryDelay = TimeSpan.FromSeconds(60);
public string ListenEndpoint => $"{_options.Host}:{_options.Port}";
public LeafNodeManager(
@@ -52,12 +68,41 @@ public sealed class LeafNodeManager : IAsyncDisposable
_acceptLoopTask = Task.Run(() => AcceptLoopAsync(_cts.Token));
foreach (var remote in _options.Remotes.Distinct(StringComparer.OrdinalIgnoreCase))
_ = Task.Run(() => ConnectWithRetryAsync(remote, _cts.Token));
_ = Task.Run(() => ConnectSolicitedWithRetryAsync(remote, _options.JetStreamDomain, _cts.Token));
_logger.LogDebug("Leaf manager started (listen={Host}:{Port})", _options.Host, _options.Port);
return Task.CompletedTask;
}
/// <summary>
/// Establishes a single solicited (outbound) leaf connection to the specified URL.
/// Performs socket connection and LEAF handshake. If a JetStream domain is specified,
/// it is propagated during the handshake.
/// Go reference: leafnode.go — connectSolicited.
/// </summary>
public async Task<LeafConnection> ConnectSolicitedAsync(string url, string? account, CancellationToken ct)
{
var endPoint = ParseEndpoint(url);
var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
try
{
await socket.ConnectAsync(endPoint.Address, endPoint.Port, ct);
var connection = new LeafConnection(socket)
{
JetStreamDomain = _options.JetStreamDomain,
};
await connection.PerformOutboundHandshakeAsync(_serverId, ct);
Register(connection);
_logger.LogDebug("Solicited leaf connection established to {Url} (account={Account})", url, account ?? "$G");
return connection;
}
catch
{
socket.Dispose();
throw;
}
}
public async Task ForwardMessageAsync(string account, string subject, string? replyTo, ReadOnlyMemory<byte> payload, CancellationToken ct)
{
foreach (var connection in _connections.Values)
@@ -95,6 +140,17 @@ public sealed class LeafNodeManager : IAsyncDisposable
_logger.LogDebug("Leaf manager stopped");
}
/// <summary>
/// Computes the next backoff delay using exponential backoff with a cap.
/// Delay sequence: 1s, 2s, 4s, 8s, 16s, 32s, 60s, 60s, ...
/// </summary>
internal static TimeSpan ComputeBackoff(int attempt)
{
if (attempt < 0) attempt = 0;
var seconds = Math.Min(InitialRetryDelay.TotalSeconds * Math.Pow(2, attempt), MaxRetryDelay.TotalSeconds);
return TimeSpan.FromSeconds(seconds);
}
private async Task AcceptLoopAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
@@ -115,7 +171,10 @@ public sealed class LeafNodeManager : IAsyncDisposable
private async Task HandleInboundAsync(Socket socket, CancellationToken ct)
{
var connection = new LeafConnection(socket);
var connection = new LeafConnection(socket)
{
JetStreamDomain = _options.JetStreamDomain,
};
try
{
await connection.PerformInboundHandshakeAsync(_serverId, ct);
@@ -127,19 +186,32 @@ public sealed class LeafNodeManager : IAsyncDisposable
}
}
private async Task ConnectWithRetryAsync(string remote, CancellationToken ct)
private async Task ConnectSolicitedWithRetryAsync(string remote, string? jetStreamDomain, CancellationToken ct)
{
var attempt = 0;
while (!ct.IsCancellationRequested)
{
try
{
var endPoint = ParseEndpoint(remote);
var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await socket.ConnectAsync(endPoint.Address, endPoint.Port, ct);
var connection = new LeafConnection(socket);
await connection.PerformOutboundHandshakeAsync(_serverId, ct);
Register(connection);
return;
try
{
await socket.ConnectAsync(endPoint.Address, endPoint.Port, ct);
var connection = new LeafConnection(socket)
{
JetStreamDomain = jetStreamDomain,
};
await connection.PerformOutboundHandshakeAsync(_serverId, ct);
Register(connection);
_logger.LogDebug("Solicited leaf connection established to {Remote}", remote);
return;
}
catch
{
socket.Dispose();
throw;
}
}
catch (OperationCanceledException)
{
@@ -147,12 +219,14 @@ public sealed class LeafNodeManager : IAsyncDisposable
}
catch (Exception ex)
{
_logger.LogDebug(ex, "Leaf connect retry for {Remote}", remote);
_logger.LogDebug(ex, "Leaf connect retry for {Remote} (attempt {Attempt})", remote, attempt);
}
var delay = ComputeBackoff(attempt);
attempt++;
try
{
await Task.Delay(250, ct);
await Task.Delay(delay, ct);
}
catch (OperationCanceledException)
{