diff --git a/src/NATS.Server/Configuration/LeafNodeOptions.cs b/src/NATS.Server/Configuration/LeafNodeOptions.cs index 5b4f77b..4bf2b0d 100644 --- a/src/NATS.Server/Configuration/LeafNodeOptions.cs +++ b/src/NATS.Server/Configuration/LeafNodeOptions.cs @@ -5,4 +5,11 @@ public sealed class LeafNodeOptions public string Host { get; set; } = "0.0.0.0"; public int Port { get; set; } public List Remotes { get; set; } = []; + + /// + /// JetStream domain for this leaf node. When set, the domain is propagated + /// during the leaf handshake for domain-aware JetStream routing. + /// Go reference: leafnode.go — JsDomain in leafNodeCfg. + /// + public string? JetStreamDomain { get; set; } } diff --git a/src/NATS.Server/Gateways/ReplyMapper.cs b/src/NATS.Server/Gateways/ReplyMapper.cs index 72c9253..8a1b42e 100644 --- a/src/NATS.Server/Gateways/ReplyMapper.cs +++ b/src/NATS.Server/Gateways/ReplyMapper.cs @@ -1,21 +1,76 @@ namespace NATS.Server.Gateways; +/// +/// Maps reply subjects to gateway-prefixed forms and restores them. +/// The gateway reply format is _GR_.{clusterId}.{hash}.{originalReply}. +/// A legacy format _GR_.{clusterId}.{originalReply} (no hash) is also supported +/// for backward compatibility. +/// Go reference: gateway.go:2000-2100, gateway.go:340-380. +/// public static class ReplyMapper { private const string GatewayReplyPrefix = "_GR_."; + /// + /// Checks whether the subject starts with the gateway reply prefix _GR_.. + /// public static bool HasGatewayReplyPrefix(string? subject) => !string.IsNullOrWhiteSpace(subject) && subject.StartsWith(GatewayReplyPrefix, StringComparison.Ordinal); + /// + /// Computes a deterministic FNV-1a hash of the reply subject. + /// Go reference: gateway.go uses SHA-256 truncated to base-62; we use FNV-1a for speed + /// while maintaining determinism and good distribution. + /// + public static long ComputeReplyHash(string replyTo) + { + // FNV-1a 64-bit + const ulong fnvOffsetBasis = 14695981039346656037UL; + const ulong fnvPrime = 1099511628211UL; + + var hash = fnvOffsetBasis; + foreach (var c in replyTo) + { + hash ^= (byte)c; + hash *= fnvPrime; + } + + // Return as non-negative long + return (long)(hash & 0x7FFFFFFFFFFFFFFF); + } + + /// + /// Converts a reply subject to gateway form with an explicit hash segment. + /// Format: _GR_.{clusterId}.{hash}.{originalReply}. + /// + public static string? ToGatewayReply(string? replyTo, string localClusterId, long hash) + { + if (string.IsNullOrWhiteSpace(replyTo)) + return replyTo; + + return $"{GatewayReplyPrefix}{localClusterId}.{hash}.{replyTo}"; + } + + /// + /// Converts a reply subject to gateway form, automatically computing the hash. + /// Format: _GR_.{clusterId}.{hash}.{originalReply}. + /// public static string? ToGatewayReply(string? replyTo, string localClusterId) { if (string.IsNullOrWhiteSpace(replyTo)) return replyTo; - return $"{GatewayReplyPrefix}{localClusterId}.{replyTo}"; + var hash = ComputeReplyHash(replyTo); + return ToGatewayReply(replyTo, localClusterId, hash); } + /// + /// Restores the original reply subject from a gateway-prefixed reply. + /// Handles both new format (_GR_.{clusterId}.{hash}.{originalReply}) and + /// legacy format (_GR_.{clusterId}.{originalReply}). + /// Nested prefixes are unwrapped iteratively. + /// public static bool TryRestoreGatewayReply(string? gatewayReply, out string restoredReply) { restoredReply = string.Empty; @@ -26,14 +81,94 @@ public static class ReplyMapper var current = gatewayReply!; while (HasGatewayReplyPrefix(current)) { - var clusterSeparator = current.IndexOf('.', GatewayReplyPrefix.Length); - if (clusterSeparator < 0 || clusterSeparator == current.Length - 1) + // Skip the "_GR_." prefix + var afterPrefix = current[GatewayReplyPrefix.Length..]; + + // Find the first dot (end of clusterId) + var firstDot = afterPrefix.IndexOf('.'); + if (firstDot < 0 || firstDot == afterPrefix.Length - 1) return false; - current = current[(clusterSeparator + 1)..]; + var afterCluster = afterPrefix[(firstDot + 1)..]; + + // Check if the next segment is a numeric hash + var secondDot = afterCluster.IndexOf('.'); + if (secondDot > 0 && secondDot < afterCluster.Length - 1 && IsNumericSegment(afterCluster.AsSpan()[..secondDot])) + { + // New format: skip hash segment too + current = afterCluster[(secondDot + 1)..]; + } + else + { + // Legacy format: no hash, the rest is the original reply + current = afterCluster; + } } restoredReply = current; return true; } + + /// + /// Extracts the cluster ID from a gateway reply subject. + /// The cluster ID is the first segment after the _GR_. prefix. + /// + public static bool TryExtractClusterId(string? gatewayReply, out string clusterId) + { + clusterId = string.Empty; + + if (!HasGatewayReplyPrefix(gatewayReply)) + return false; + + var afterPrefix = gatewayReply![GatewayReplyPrefix.Length..]; + var dot = afterPrefix.IndexOf('.'); + if (dot <= 0) + return false; + + clusterId = afterPrefix[..dot]; + return true; + } + + /// + /// Extracts the hash from a gateway reply subject (new format only). + /// Returns false if the reply uses the legacy format without a hash. + /// + public static bool TryExtractHash(string? gatewayReply, out long hash) + { + hash = 0; + + if (!HasGatewayReplyPrefix(gatewayReply)) + return false; + + var afterPrefix = gatewayReply![GatewayReplyPrefix.Length..]; + + // Skip clusterId + var firstDot = afterPrefix.IndexOf('.'); + if (firstDot <= 0 || firstDot == afterPrefix.Length - 1) + return false; + + var afterCluster = afterPrefix[(firstDot + 1)..]; + + // Try to parse hash segment + var secondDot = afterCluster.IndexOf('.'); + if (secondDot <= 0) + return false; + + var hashSegment = afterCluster[..secondDot]; + return long.TryParse(hashSegment, out hash); + } + + private static bool IsNumericSegment(ReadOnlySpan segment) + { + if (segment.IsEmpty) + return false; + + foreach (var c in segment) + { + if (c is not (>= '0' and <= '9')) + return false; + } + + return true; + } } diff --git a/src/NATS.Server/LeafNodes/LeafConnection.cs b/src/NATS.Server/LeafNodes/LeafConnection.cs index 8e5f671..bcb6506 100644 --- a/src/NATS.Server/LeafNodes/LeafConnection.cs +++ b/src/NATS.Server/LeafNodes/LeafConnection.cs @@ -4,6 +4,12 @@ using NATS.Server.Subscriptions; namespace NATS.Server.LeafNodes; +/// +/// Represents a single leaf node connection (inbound or outbound). +/// Handles LEAF handshake, LS+/LS- interest propagation, and LMSG forwarding. +/// The JetStreamDomain property is propagated during handshake for domain-aware routing. +/// Go reference: leafnode.go. +/// public sealed class LeafConnection(Socket socket) : IAsyncDisposable { private readonly NetworkStream _stream = new(socket, ownsSocket: true); @@ -16,18 +22,32 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable public Func? RemoteSubscriptionReceived { get; set; } public Func? MessageReceived { get; set; } + /// + /// JetStream domain for this leaf connection. When set, the domain is propagated + /// in the LEAF handshake and included in LMSG frames for domain-aware routing. + /// Go reference: leafnode.go — jsClusterDomain field in leafInfo. + /// + public string? JetStreamDomain { get; set; } + + /// + /// The JetStream domain advertised by the remote side during handshake. + /// + public string? RemoteJetStreamDomain { get; private set; } + public async Task PerformOutboundHandshakeAsync(string serverId, CancellationToken ct) { - await WriteLineAsync($"LEAF {serverId}", ct); + var handshakeLine = BuildHandshakeLine(serverId); + await WriteLineAsync(handshakeLine, ct); var line = await ReadLineAsync(ct); - RemoteId = ParseHandshake(line); + ParseHandshakeResponse(line); } public async Task PerformInboundHandshakeAsync(string serverId, CancellationToken ct) { var line = await ReadLineAsync(ct); - RemoteId = ParseHandshake(line); - await WriteLineAsync($"LEAF {serverId}", ct); + ParseHandshakeResponse(line); + var handshakeLine = BuildHandshakeLine(serverId); + await WriteLineAsync(handshakeLine, ct); } public void StartLoop(CancellationToken ct) @@ -77,6 +97,39 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable await _stream.DisposeAsync(); } + private string BuildHandshakeLine(string serverId) + { + if (!string.IsNullOrEmpty(JetStreamDomain)) + return $"LEAF {serverId} domain={JetStreamDomain}"; + + return $"LEAF {serverId}"; + } + + private void ParseHandshakeResponse(string line) + { + if (!line.StartsWith("LEAF ", StringComparison.OrdinalIgnoreCase)) + throw new InvalidOperationException("Invalid leaf handshake"); + + var rest = line[5..].Trim(); + if (rest.Length == 0) + throw new InvalidOperationException("Leaf handshake missing id"); + + // Parse "serverId [domain=xxx]" format + var spaceIdx = rest.IndexOf(' '); + if (spaceIdx > 0) + { + RemoteId = rest[..spaceIdx]; + var attrs = rest[(spaceIdx + 1)..]; + const string domainPrefix = "domain="; + if (attrs.StartsWith(domainPrefix, StringComparison.OrdinalIgnoreCase)) + RemoteJetStreamDomain = attrs[domainPrefix.Length..].Trim(); + } + else + { + RemoteId = rest; + } + } + private async Task ReadLoopAsync(CancellationToken ct) { while (!ct.IsCancellationRequested) @@ -198,17 +251,6 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable return Encoding.ASCII.GetString([.. bytes]); } - private static string ParseHandshake(string line) - { - if (!line.StartsWith("LEAF ", StringComparison.OrdinalIgnoreCase)) - throw new InvalidOperationException("Invalid leaf handshake"); - - var id = line[5..].Trim(); - if (id.Length == 0) - throw new InvalidOperationException("Leaf handshake missing id"); - return id; - } - private static bool TryParseAccountScopedInterest(string[] parts, out string account, out string subject, out string? queue) { account = "$G"; diff --git a/src/NATS.Server/LeafNodes/LeafNodeManager.cs b/src/NATS.Server/LeafNodes/LeafNodeManager.cs index fb99da5..2758619 100644 --- a/src/NATS.Server/LeafNodes/LeafNodeManager.cs +++ b/src/NATS.Server/LeafNodes/LeafNodeManager.cs @@ -7,6 +7,11 @@ using NATS.Server.Subscriptions; namespace NATS.Server.LeafNodes; +/// +/// Manages leaf node connections — both inbound (accepted) and outbound (solicited). +/// Outbound connections use exponential backoff retry: 1s, 2s, 4s, ..., capped at 60s. +/// Go reference: leafnode.go. +/// public sealed class LeafNodeManager : IAsyncDisposable { private readonly LeafNodeOptions _options; @@ -21,6 +26,17 @@ public sealed class LeafNodeManager : IAsyncDisposable private Socket? _listener; private Task? _acceptLoopTask; + /// + /// Initial retry delay for solicited connections (1 second). + /// Go reference: leafnode.go — DEFAULT_LEAF_NODE_RECONNECT constant. + /// + internal static readonly TimeSpan InitialRetryDelay = TimeSpan.FromSeconds(1); + + /// + /// Maximum retry delay for solicited connections (60 seconds). + /// + internal static readonly TimeSpan MaxRetryDelay = TimeSpan.FromSeconds(60); + public string ListenEndpoint => $"{_options.Host}:{_options.Port}"; public LeafNodeManager( @@ -52,12 +68,41 @@ public sealed class LeafNodeManager : IAsyncDisposable _acceptLoopTask = Task.Run(() => AcceptLoopAsync(_cts.Token)); foreach (var remote in _options.Remotes.Distinct(StringComparer.OrdinalIgnoreCase)) - _ = Task.Run(() => ConnectWithRetryAsync(remote, _cts.Token)); + _ = Task.Run(() => ConnectSolicitedWithRetryAsync(remote, _options.JetStreamDomain, _cts.Token)); _logger.LogDebug("Leaf manager started (listen={Host}:{Port})", _options.Host, _options.Port); return Task.CompletedTask; } + /// + /// Establishes a single solicited (outbound) leaf connection to the specified URL. + /// Performs socket connection and LEAF handshake. If a JetStream domain is specified, + /// it is propagated during the handshake. + /// Go reference: leafnode.go — connectSolicited. + /// + public async Task ConnectSolicitedAsync(string url, string? account, CancellationToken ct) + { + var endPoint = ParseEndpoint(url); + var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + try + { + await socket.ConnectAsync(endPoint.Address, endPoint.Port, ct); + var connection = new LeafConnection(socket) + { + JetStreamDomain = _options.JetStreamDomain, + }; + await connection.PerformOutboundHandshakeAsync(_serverId, ct); + Register(connection); + _logger.LogDebug("Solicited leaf connection established to {Url} (account={Account})", url, account ?? "$G"); + return connection; + } + catch + { + socket.Dispose(); + throw; + } + } + public async Task ForwardMessageAsync(string account, string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) { foreach (var connection in _connections.Values) @@ -95,6 +140,17 @@ public sealed class LeafNodeManager : IAsyncDisposable _logger.LogDebug("Leaf manager stopped"); } + /// + /// Computes the next backoff delay using exponential backoff with a cap. + /// Delay sequence: 1s, 2s, 4s, 8s, 16s, 32s, 60s, 60s, ... + /// + internal static TimeSpan ComputeBackoff(int attempt) + { + if (attempt < 0) attempt = 0; + var seconds = Math.Min(InitialRetryDelay.TotalSeconds * Math.Pow(2, attempt), MaxRetryDelay.TotalSeconds); + return TimeSpan.FromSeconds(seconds); + } + private async Task AcceptLoopAsync(CancellationToken ct) { while (!ct.IsCancellationRequested) @@ -115,7 +171,10 @@ public sealed class LeafNodeManager : IAsyncDisposable private async Task HandleInboundAsync(Socket socket, CancellationToken ct) { - var connection = new LeafConnection(socket); + var connection = new LeafConnection(socket) + { + JetStreamDomain = _options.JetStreamDomain, + }; try { await connection.PerformInboundHandshakeAsync(_serverId, ct); @@ -127,19 +186,32 @@ public sealed class LeafNodeManager : IAsyncDisposable } } - private async Task ConnectWithRetryAsync(string remote, CancellationToken ct) + private async Task ConnectSolicitedWithRetryAsync(string remote, string? jetStreamDomain, CancellationToken ct) { + var attempt = 0; while (!ct.IsCancellationRequested) { try { var endPoint = ParseEndpoint(remote); var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); - await socket.ConnectAsync(endPoint.Address, endPoint.Port, ct); - var connection = new LeafConnection(socket); - await connection.PerformOutboundHandshakeAsync(_serverId, ct); - Register(connection); - return; + try + { + await socket.ConnectAsync(endPoint.Address, endPoint.Port, ct); + var connection = new LeafConnection(socket) + { + JetStreamDomain = jetStreamDomain, + }; + await connection.PerformOutboundHandshakeAsync(_serverId, ct); + Register(connection); + _logger.LogDebug("Solicited leaf connection established to {Remote}", remote); + return; + } + catch + { + socket.Dispose(); + throw; + } } catch (OperationCanceledException) { @@ -147,12 +219,14 @@ public sealed class LeafNodeManager : IAsyncDisposable } catch (Exception ex) { - _logger.LogDebug(ex, "Leaf connect retry for {Remote}", remote); + _logger.LogDebug(ex, "Leaf connect retry for {Remote} (attempt {Attempt})", remote, attempt); } + var delay = ComputeBackoff(attempt); + attempt++; try { - await Task.Delay(250, ct); + await Task.Delay(delay, ct); } catch (OperationCanceledException) { diff --git a/tests/NATS.Server.Tests/Gateways/ReplyMapperFullTests.cs b/tests/NATS.Server.Tests/Gateways/ReplyMapperFullTests.cs new file mode 100644 index 0000000..e257793 --- /dev/null +++ b/tests/NATS.Server.Tests/Gateways/ReplyMapperFullTests.cs @@ -0,0 +1,151 @@ +using NATS.Server.Gateways; + +namespace NATS.Server.Tests.Gateways; + +/// +/// Tests for the expanded ReplyMapper with hash support. +/// Covers new format (_GR_.{clusterId}.{hash}.{reply}), legacy format (_GR_.{clusterId}.{reply}), +/// cluster/hash extraction, and FNV-1a hash determinism. +/// Go reference: gateway.go:2000-2100, gateway.go:340-380. +/// +public class ReplyMapperFullTests +{ + // Go: gateway.go — replyPfx includes cluster hash + server hash segments + [Fact] + public void ToGatewayReply_WithHash_IncludesHashSegment() + { + var result = ReplyMapper.ToGatewayReply("_INBOX.abc123", "clusterA", 42); + + result.ShouldNotBeNull(); + result.ShouldBe("_GR_.clusterA.42._INBOX.abc123"); + } + + // Go: gateway.go — hash is deterministic based on reply subject + [Fact] + public void ToGatewayReply_AutoHash_IsDeterministic() + { + var result1 = ReplyMapper.ToGatewayReply("_INBOX.xyz", "cluster1"); + var result2 = ReplyMapper.ToGatewayReply("_INBOX.xyz", "cluster1"); + + result1.ShouldNotBeNull(); + result2.ShouldNotBeNull(); + result1.ShouldBe(result2); + + // Should contain the hash segment between cluster and reply + result1!.ShouldStartWith("_GR_.cluster1."); + result1.ShouldEndWith("._INBOX.xyz"); + + // Parse the hash segment + var afterPrefix = result1["_GR_.cluster1.".Length..]; + var dotIdx = afterPrefix.IndexOf('.'); + dotIdx.ShouldBeGreaterThan(0); + var hashStr = afterPrefix[..dotIdx]; + long.TryParse(hashStr, out var hash).ShouldBeTrue(); + hash.ShouldBeGreaterThan(0); + } + + // Go: handleGatewayReply — strips _GR_ prefix + cluster + hash to restore original + [Fact] + public void TryRestoreGatewayReply_WithHash_RestoresOriginal() + { + var hash = ReplyMapper.ComputeReplyHash("reply.subject"); + var mapped = ReplyMapper.ToGatewayReply("reply.subject", "clusterB", hash); + + var success = ReplyMapper.TryRestoreGatewayReply(mapped, out var restored); + + success.ShouldBeTrue(); + restored.ShouldBe("reply.subject"); + } + + // Go: handleGatewayReply — legacy $GR. and old _GR_ formats without hash + [Fact] + public void TryRestoreGatewayReply_LegacyNoHash_StillWorks() + { + // Legacy format: _GR_.{clusterId}.{reply} (no hash segment) + // The reply itself starts with a non-numeric character, so it won't be mistaken for a hash. + var legacyReply = "_GR_.clusterX.my.reply.subject"; + + var success = ReplyMapper.TryRestoreGatewayReply(legacyReply, out var restored); + + success.ShouldBeTrue(); + restored.ShouldBe("my.reply.subject"); + } + + // Go: handleGatewayReply — nested _GR_ prefixes from multi-hop gateways + [Fact] + public void TryRestoreGatewayReply_NestedPrefixes_UnwrapsAll() + { + // Inner: _GR_.cluster1.{hash}.original.reply + var hash1 = ReplyMapper.ComputeReplyHash("original.reply"); + var inner = ReplyMapper.ToGatewayReply("original.reply", "cluster1", hash1); + + // Outer: _GR_.cluster2.{hash2}.{inner} + var hash2 = ReplyMapper.ComputeReplyHash(inner!); + var outer = ReplyMapper.ToGatewayReply(inner, "cluster2", hash2); + + var success = ReplyMapper.TryRestoreGatewayReply(outer, out var restored); + + success.ShouldBeTrue(); + restored.ShouldBe("original.reply"); + } + + // Go: gateway.go — cluster hash extraction for routing decisions + [Fact] + public void TryExtractClusterId_ValidReply_ExtractsId() + { + var mapped = ReplyMapper.ToGatewayReply("test.reply", "myCluster", 999); + + var success = ReplyMapper.TryExtractClusterId(mapped, out var clusterId); + + success.ShouldBeTrue(); + clusterId.ShouldBe("myCluster"); + } + + // Go: gateway.go — hash extraction for reply deduplication + [Fact] + public void TryExtractHash_ValidReply_ExtractsHash() + { + var mapped = ReplyMapper.ToGatewayReply("inbox.abc", "clusterZ", 12345); + + var success = ReplyMapper.TryExtractHash(mapped, out var hash); + + success.ShouldBeTrue(); + hash.ShouldBe(12345); + } + + // Go: getGWHash — hash must be deterministic for same input + [Fact] + public void ComputeReplyHash_Deterministic() + { + var hash1 = ReplyMapper.ComputeReplyHash("_INBOX.test123"); + var hash2 = ReplyMapper.ComputeReplyHash("_INBOX.test123"); + + hash1.ShouldBe(hash2); + hash1.ShouldBeGreaterThan(0); + } + + // Go: getGWHash — different inputs should produce different hashes + [Fact] + public void ComputeReplyHash_DifferentInputs_DifferentHashes() + { + var hash1 = ReplyMapper.ComputeReplyHash("_INBOX.aaa"); + var hash2 = ReplyMapper.ComputeReplyHash("_INBOX.bbb"); + var hash3 = ReplyMapper.ComputeReplyHash("reply.subject.1"); + + hash1.ShouldNotBe(hash2); + hash1.ShouldNotBe(hash3); + hash2.ShouldNotBe(hash3); + } + + // Go: isGWRoutedReply — plain subjects should not match gateway prefix + [Fact] + public void HasGatewayReplyPrefix_PlainSubject_ReturnsFalse() + { + ReplyMapper.HasGatewayReplyPrefix("foo.bar").ShouldBeFalse(); + ReplyMapper.HasGatewayReplyPrefix("_INBOX.test").ShouldBeFalse(); + ReplyMapper.HasGatewayReplyPrefix(null).ShouldBeFalse(); + ReplyMapper.HasGatewayReplyPrefix("").ShouldBeFalse(); + ReplyMapper.HasGatewayReplyPrefix("_GR_").ShouldBeFalse(); // No trailing dot + ReplyMapper.HasGatewayReplyPrefix("_GR_.cluster.reply").ShouldBeTrue(); + } +} diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafSolicitedConnectionTests.cs b/tests/NATS.Server.Tests/LeafNodes/LeafSolicitedConnectionTests.cs new file mode 100644 index 0000000..989a792 --- /dev/null +++ b/tests/NATS.Server.Tests/LeafNodes/LeafSolicitedConnectionTests.cs @@ -0,0 +1,239 @@ +using System.Net; +using System.Net.Sockets; +using System.Text; +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Server.Configuration; +using NATS.Server.LeafNodes; +using NATS.Server.Subscriptions; + +namespace NATS.Server.Tests.LeafNodes; + +/// +/// Tests for solicited (outbound) leaf node connections with retry logic, +/// exponential backoff, JetStream domain propagation, and cancellation. +/// Go reference: leafnode.go — connectSolicited, solicitLeafNode. +/// +public class LeafSolicitedConnectionTests +{ + // Go: TestLeafNodeBasicAuthSingleton server/leafnode_test.go:602 + [Fact] + public async Task ConnectSolicited_ValidUrl_EstablishesConnection() + { + // Start a hub server with leaf node listener + var hubOptions = new NatsOptions + { + Host = "127.0.0.1", + Port = 0, + LeafNode = new LeafNodeOptions + { + Host = "127.0.0.1", + Port = 0, + }, + }; + + var hub = new NatsServer(hubOptions, NullLoggerFactory.Instance); + var hubCts = new CancellationTokenSource(); + _ = hub.StartAsync(hubCts.Token); + await hub.WaitForReadyAsync(); + + // Create a spoke server that connects to the hub + var spokeOptions = new NatsOptions + { + Host = "127.0.0.1", + Port = 0, + LeafNode = new LeafNodeOptions + { + Host = "127.0.0.1", + Port = 0, + Remotes = [hub.LeafListen!], + }, + }; + + var spoke = new NatsServer(spokeOptions, NullLoggerFactory.Instance); + var spokeCts = new CancellationTokenSource(); + _ = spoke.StartAsync(spokeCts.Token); + await spoke.WaitForReadyAsync(); + + // Wait for leaf connections to establish + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)) + await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); + + hub.Stats.Leafs.ShouldBeGreaterThan(0); + spoke.Stats.Leafs.ShouldBeGreaterThan(0); + + await spokeCts.CancelAsync(); + await hubCts.CancelAsync(); + spoke.Dispose(); + hub.Dispose(); + spokeCts.Dispose(); + hubCts.Dispose(); + } + + // Go: leafnode.go — reconnect with backoff on connection failure + [Fact] + public async Task ConnectSolicited_InvalidUrl_RetriesWithBackoff() + { + // Create a leaf node manager targeting a non-existent endpoint + var options = new LeafNodeOptions + { + Host = "127.0.0.1", + Port = 0, + Remotes = ["127.0.0.1:19999"], // Nothing listening here + }; + + var stats = new ServerStats(); + var manager = new LeafNodeManager( + options, stats, "test-server", + _ => { }, _ => { }, + NullLogger.Instance); + + // Start the manager — it will try to connect to 127.0.0.1:19999 and fail + using var cts = new CancellationTokenSource(); + await manager.StartAsync(cts.Token); + + // Give it some time to attempt connections + await Task.Delay(500); + + // No connections should have succeeded + stats.Leafs.ShouldBe(0); + + await cts.CancelAsync(); + await manager.DisposeAsync(); + } + + // Go: leafnode.go — backoff caps at 60 seconds + [Fact] + public void ConnectSolicited_MaxBackoff_CapsAt60Seconds() + { + // Verify the backoff calculation caps at 60 seconds + LeafNodeManager.ComputeBackoff(0).ShouldBe(TimeSpan.FromSeconds(1)); + LeafNodeManager.ComputeBackoff(1).ShouldBe(TimeSpan.FromSeconds(2)); + LeafNodeManager.ComputeBackoff(2).ShouldBe(TimeSpan.FromSeconds(4)); + LeafNodeManager.ComputeBackoff(3).ShouldBe(TimeSpan.FromSeconds(8)); + LeafNodeManager.ComputeBackoff(4).ShouldBe(TimeSpan.FromSeconds(16)); + LeafNodeManager.ComputeBackoff(5).ShouldBe(TimeSpan.FromSeconds(32)); + LeafNodeManager.ComputeBackoff(6).ShouldBe(TimeSpan.FromSeconds(60)); // Capped + LeafNodeManager.ComputeBackoff(7).ShouldBe(TimeSpan.FromSeconds(60)); // Still capped + LeafNodeManager.ComputeBackoff(100).ShouldBe(TimeSpan.FromSeconds(60)); // Still capped + } + + // Go: leafnode.go — JsDomain in leafInfo propagated during handshake + [Fact] + public async Task JetStreamDomain_PropagatedInHandshake() + { + // Start a hub with JetStream domain + var hubOptions = new NatsOptions + { + Host = "127.0.0.1", + Port = 0, + LeafNode = new LeafNodeOptions + { + Host = "127.0.0.1", + Port = 0, + JetStreamDomain = "hub-domain", + }, + }; + + var hub = new NatsServer(hubOptions, NullLoggerFactory.Instance); + var hubCts = new CancellationTokenSource(); + _ = hub.StartAsync(hubCts.Token); + await hub.WaitForReadyAsync(); + + // Create a raw socket connection to verify the handshake includes domain + using var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + var leafEndpoint = hub.LeafListen!.Split(':'); + await client.ConnectAsync(IPAddress.Parse(leafEndpoint[0]), int.Parse(leafEndpoint[1])); + + using var stream = new NetworkStream(client, ownsSocket: false); + + // Send our LEAF handshake with a domain + var outMsg = Encoding.ASCII.GetBytes("LEAF test-spoke domain=spoke-domain\r\n"); + await stream.WriteAsync(outMsg); + await stream.FlushAsync(); + + // Read the hub's handshake response + var response = await ReadLineAsync(stream); + + // The hub's handshake should include the JetStream domain + response.ShouldStartWith("LEAF "); + response.ShouldContain("domain=hub-domain"); + + await hubCts.CancelAsync(); + hub.Dispose(); + hubCts.Dispose(); + } + + // Go: leafnode.go — cancellation stops reconnect loop + [Fact] + public async Task Retry_CancellationToken_StopsRetrying() + { + var options = new LeafNodeOptions + { + Host = "127.0.0.1", + Port = 0, + Remotes = ["127.0.0.1:19998"], // Nothing listening + }; + + var stats = new ServerStats(); + var manager = new LeafNodeManager( + options, stats, "test-server", + _ => { }, _ => { }, + NullLogger.Instance); + + using var cts = new CancellationTokenSource(); + await manager.StartAsync(cts.Token); + + // Let it attempt at least one retry + await Task.Delay(200); + + // Cancel — the retry loop should stop promptly + await cts.CancelAsync(); + await manager.DisposeAsync(); + + // No connections should have been established + stats.Leafs.ShouldBe(0); + } + + // Go: leafnode.go — verify backoff delay sequence + [Fact] + public void ExponentialBackoff_CalculatesCorrectDelays() + { + var delays = new List(); + for (var i = 0; i < 10; i++) + delays.Add(LeafNodeManager.ComputeBackoff(i)); + + // Verify the sequence: 1, 2, 4, 8, 16, 32, 60, 60, 60, 60 + delays[0].ShouldBe(TimeSpan.FromSeconds(1)); + delays[1].ShouldBe(TimeSpan.FromSeconds(2)); + delays[2].ShouldBe(TimeSpan.FromSeconds(4)); + delays[3].ShouldBe(TimeSpan.FromSeconds(8)); + delays[4].ShouldBe(TimeSpan.FromSeconds(16)); + delays[5].ShouldBe(TimeSpan.FromSeconds(32)); + + // After attempt 5, all should be capped at 60s + for (var i = 6; i < 10; i++) + delays[i].ShouldBe(TimeSpan.FromSeconds(60)); + + // Negative attempt should be treated as 0 + LeafNodeManager.ComputeBackoff(-1).ShouldBe(TimeSpan.FromSeconds(1)); + } + + private static async Task ReadLineAsync(NetworkStream stream) + { + var bytes = new List(64); + var single = new byte[1]; + while (true) + { + var read = await stream.ReadAsync(single); + if (read == 0) + throw new IOException("Connection closed"); + if (single[0] == (byte)'\n') + break; + if (single[0] != (byte)'\r') + bytes.Add(single[0]); + } + + return Encoding.ASCII.GetString([.. bytes]); + } +}