D4: Add hash segment support to ReplyMapper (_GR_.{cluster}.{hash}.{reply}),
FNV-1a ComputeReplyHash, TryExtractClusterId/Hash, legacy format compat.
D5: Add ConnectSolicitedAsync with exponential backoff (1s-60s cap),
JetStreamDomain propagation in LEAF handshake, LeafNodeOptions.JetStreamDomain.
175 lines
5.7 KiB
C#
175 lines
5.7 KiB
C#
namespace NATS.Server.Gateways;
|
|
|
|
/// <summary>
|
|
/// Maps reply subjects to gateway-prefixed forms and restores them.
|
|
/// The gateway reply format is <c>_GR_.{clusterId}.{hash}.{originalReply}</c>.
|
|
/// A legacy format <c>_GR_.{clusterId}.{originalReply}</c> (no hash) is also supported
|
|
/// for backward compatibility.
|
|
/// Go reference: gateway.go:2000-2100, gateway.go:340-380.
|
|
/// </summary>
|
|
public static class ReplyMapper
|
|
{
|
|
private const string GatewayReplyPrefix = "_GR_.";
|
|
|
|
/// <summary>
|
|
/// Checks whether the subject starts with the gateway reply prefix <c>_GR_.</c>.
|
|
/// </summary>
|
|
public static bool HasGatewayReplyPrefix(string? subject)
|
|
=> !string.IsNullOrWhiteSpace(subject)
|
|
&& subject.StartsWith(GatewayReplyPrefix, StringComparison.Ordinal);
|
|
|
|
/// <summary>
|
|
/// Computes a deterministic FNV-1a hash of the reply subject.
|
|
/// Go reference: gateway.go uses SHA-256 truncated to base-62; we use FNV-1a for speed
|
|
/// while maintaining determinism and good distribution.
|
|
/// </summary>
|
|
public static long ComputeReplyHash(string replyTo)
|
|
{
|
|
// FNV-1a 64-bit
|
|
const ulong fnvOffsetBasis = 14695981039346656037UL;
|
|
const ulong fnvPrime = 1099511628211UL;
|
|
|
|
var hash = fnvOffsetBasis;
|
|
foreach (var c in replyTo)
|
|
{
|
|
hash ^= (byte)c;
|
|
hash *= fnvPrime;
|
|
}
|
|
|
|
// Return as non-negative long
|
|
return (long)(hash & 0x7FFFFFFFFFFFFFFF);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Converts a reply subject to gateway form with an explicit hash segment.
|
|
/// Format: <c>_GR_.{clusterId}.{hash}.{originalReply}</c>.
|
|
/// </summary>
|
|
public static string? ToGatewayReply(string? replyTo, string localClusterId, long hash)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(replyTo))
|
|
return replyTo;
|
|
|
|
return $"{GatewayReplyPrefix}{localClusterId}.{hash}.{replyTo}";
|
|
}
|
|
|
|
/// <summary>
|
|
/// Converts a reply subject to gateway form, automatically computing the hash.
|
|
/// Format: <c>_GR_.{clusterId}.{hash}.{originalReply}</c>.
|
|
/// </summary>
|
|
public static string? ToGatewayReply(string? replyTo, string localClusterId)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(replyTo))
|
|
return replyTo;
|
|
|
|
var hash = ComputeReplyHash(replyTo);
|
|
return ToGatewayReply(replyTo, localClusterId, hash);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Restores the original reply subject from a gateway-prefixed reply.
|
|
/// Handles both new format (<c>_GR_.{clusterId}.{hash}.{originalReply}</c>) and
|
|
/// legacy format (<c>_GR_.{clusterId}.{originalReply}</c>).
|
|
/// Nested prefixes are unwrapped iteratively.
|
|
/// </summary>
|
|
public static bool TryRestoreGatewayReply(string? gatewayReply, out string restoredReply)
|
|
{
|
|
restoredReply = string.Empty;
|
|
|
|
if (!HasGatewayReplyPrefix(gatewayReply))
|
|
return false;
|
|
|
|
var current = gatewayReply!;
|
|
while (HasGatewayReplyPrefix(current))
|
|
{
|
|
// Skip the "_GR_." prefix
|
|
var afterPrefix = current[GatewayReplyPrefix.Length..];
|
|
|
|
// Find the first dot (end of clusterId)
|
|
var firstDot = afterPrefix.IndexOf('.');
|
|
if (firstDot < 0 || firstDot == afterPrefix.Length - 1)
|
|
return false;
|
|
|
|
var afterCluster = afterPrefix[(firstDot + 1)..];
|
|
|
|
// Check if the next segment is a numeric hash
|
|
var secondDot = afterCluster.IndexOf('.');
|
|
if (secondDot > 0 && secondDot < afterCluster.Length - 1 && IsNumericSegment(afterCluster.AsSpan()[..secondDot]))
|
|
{
|
|
// New format: skip hash segment too
|
|
current = afterCluster[(secondDot + 1)..];
|
|
}
|
|
else
|
|
{
|
|
// Legacy format: no hash, the rest is the original reply
|
|
current = afterCluster;
|
|
}
|
|
}
|
|
|
|
restoredReply = current;
|
|
return true;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Extracts the cluster ID from a gateway reply subject.
|
|
/// The cluster ID is the first segment after the <c>_GR_.</c> prefix.
|
|
/// </summary>
|
|
public static bool TryExtractClusterId(string? gatewayReply, out string clusterId)
|
|
{
|
|
clusterId = string.Empty;
|
|
|
|
if (!HasGatewayReplyPrefix(gatewayReply))
|
|
return false;
|
|
|
|
var afterPrefix = gatewayReply![GatewayReplyPrefix.Length..];
|
|
var dot = afterPrefix.IndexOf('.');
|
|
if (dot <= 0)
|
|
return false;
|
|
|
|
clusterId = afterPrefix[..dot];
|
|
return true;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Extracts the hash from a gateway reply subject (new format only).
|
|
/// Returns false if the reply uses the legacy format without a hash.
|
|
/// </summary>
|
|
public static bool TryExtractHash(string? gatewayReply, out long hash)
|
|
{
|
|
hash = 0;
|
|
|
|
if (!HasGatewayReplyPrefix(gatewayReply))
|
|
return false;
|
|
|
|
var afterPrefix = gatewayReply![GatewayReplyPrefix.Length..];
|
|
|
|
// Skip clusterId
|
|
var firstDot = afterPrefix.IndexOf('.');
|
|
if (firstDot <= 0 || firstDot == afterPrefix.Length - 1)
|
|
return false;
|
|
|
|
var afterCluster = afterPrefix[(firstDot + 1)..];
|
|
|
|
// Try to parse hash segment
|
|
var secondDot = afterCluster.IndexOf('.');
|
|
if (secondDot <= 0)
|
|
return false;
|
|
|
|
var hashSegment = afterCluster[..secondDot];
|
|
return long.TryParse(hashSegment, out hash);
|
|
}
|
|
|
|
private static bool IsNumericSegment(ReadOnlySpan<char> segment)
|
|
{
|
|
if (segment.IsEmpty)
|
|
return false;
|
|
|
|
foreach (var c in segment)
|
|
{
|
|
if (c is not (>= '0' and <= '9'))
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
}
|