feat: port sessions 14-16 — Routes, Leaf Nodes & Gateways

Session 14 (57 features, IDs 2895-2951):
- RouteTypes: RouteType enum, Route, RouteInfo, ConnectInfo, ASubs, GossipMode

Session 15 (71 features, IDs 1979-2049):
- LeafNodeTypes: Leaf, LeafNodeCfg (replaces stub), LeafConnectInfo

Session 16 (91 features, IDs 1263-1353):
- GatewayTypes: GatewayInterestMode, SrvGateway (replaces stub), GatewayCfg,
  Gateway, OutSide, InSide, SitAlly, GwReplyMap, GwReplyMapping
This commit is contained in:
Joseph Doherty
2026-02-26 15:50:51 -05:00
parent ce45dff994
commit 77403e3d31
7 changed files with 816 additions and 11 deletions

View File

@@ -0,0 +1,384 @@
// Copyright 2018-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Adapted from server/gateway.go in the NATS server Go source.
using System.Threading;
using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server;
// ============================================================================
// Session 16: Gateways
// ============================================================================
/// <summary>
/// Represents the interest mode for a given account on a gateway connection.
/// Mirrors Go <c>GatewayInterestMode</c> byte iota in gateway.go.
/// Do not change values — they are part of the wire-level gossip protocol.
/// </summary>
public enum GatewayInterestMode : byte
{
/// <summary>
/// Default mode: the cluster sends to a gateway unless told there is no
/// interest (applies to plain subscribers only).
/// </summary>
Optimistic = 0,
/// <summary>
/// Transitioning: the gateway has been sending too many no-interest signals
/// and is switching to <see cref="InterestOnly"/> mode for this account.
/// </summary>
Transitioning = 1,
/// <summary>
/// Interest-only mode: the cluster has sent all its subscription interest;
/// the gateway only forwards messages when explicit interest is known.
/// </summary>
InterestOnly = 2,
/// <summary>
/// Internal sentinel used after a cache flush; not part of the public wire enum.
/// </summary>
CacheFlushed = 3,
}
/// <summary>
/// Server-level gateway state kept on the <see cref="NatsServer"/> instance.
/// Replaces the stub that was in <c>NatsServerTypes.cs</c>.
/// Mirrors Go <c>srvGateway</c> struct in gateway.go.
/// </summary>
internal sealed class SrvGateway
{
/// <summary>
/// Total number of queue subs across all remote gateways.
/// Accessed via <c>Interlocked</c> — must be 64-bit aligned.
/// </summary>
public long TotalQSubs;
private readonly ReaderWriterLockSlim _lock = new(LockRecursionPolicy.SupportsRecursion);
/// <summary>
/// True if both a gateway name and port are configured (immutable after init).
/// </summary>
public bool Enabled { get; set; }
/// <summary>Name of this server's gateway cluster.</summary>
public string Name { get; set; } = string.Empty;
/// <summary>Outbound gateway connections keyed by remote gateway name.</summary>
public Dictionary<string, ClientConnection> Out { get; set; } = new();
/// <summary>
/// Outbound gateway connections in RTT order, used for message routing.
/// </summary>
public List<ClientConnection> Outo { get; set; } = [];
/// <summary>Inbound gateway connections keyed by connection ID.</summary>
public Dictionary<ulong, ClientConnection> In { get; set; } = new();
/// <summary>Per-remote-gateway configuration, keyed by gateway name.</summary>
public Dictionary<string, GatewayCfg> Remotes { get; set; } = new();
/// <summary>Reference-counted set of all gateway URLs in the cluster.</summary>
public RefCountedUrlSet Urls { get; set; } = new();
/// <summary>This server's own gateway URL (after random-port resolution).</summary>
public string Url { get; set; } = string.Empty;
/// <summary>Gateway INFO protocol object.</summary>
public ServerInfo? Info { get; set; }
/// <summary>Pre-marshalled INFO JSON bytes.</summary>
public byte[]? InfoJson { get; set; }
/// <summary>When true, reject connections from gateways not in <see cref="Remotes"/>.</summary>
public bool RejectUnknown { get; set; }
/// <summary>
/// Reply prefix bytes: <c>"$GNR.&lt;reserved&gt;.&lt;clusterHash&gt;.&lt;serverHash&gt;."</c>
/// </summary>
public byte[] ReplyPfx { get; set; } = [];
// Backward-compatibility reply prefix and hash (old "$GR." scheme)
public byte[] OldReplyPfx { get; set; } = [];
public byte[] OldHash { get; set; } = [];
// -------------------------------------------------------------------------
// pasi — per-account subject interest tally (protected by its own mutex)
// -------------------------------------------------------------------------
/// <summary>
/// Per-account subject-interest tally.
/// Outer key = account name; inner key = subject (or "subject queue" pair);
/// value = tally struct.
/// Mirrors Go's anonymous <c>pasi</c> embedded struct in <c>srvGateway</c>.
/// </summary>
private readonly Lock _pasiLock = new();
public Dictionary<string, Dictionary<string, SitAlly>> Pasi { get; set; } = new();
public Lock PasiLock => _pasiLock;
// -------------------------------------------------------------------------
// Recent subscription tracking (thread-safe map)
// -------------------------------------------------------------------------
/// <summary>
/// Recent subscriptions for a given account (subject → expiry ticks).
/// Mirrors Go's <c>rsubs sync.Map</c>.
/// </summary>
public System.Collections.Concurrent.ConcurrentDictionary<string, long> RSubs { get; set; } = new();
// -------------------------------------------------------------------------
// Other server-level gateway fields
// -------------------------------------------------------------------------
/// <summary>DNS resolver used before dialling gateway connections.</summary>
public INetResolver? Resolver { get; set; }
/// <summary>Max buffer size for sending queue-sub protocol (used in tests).</summary>
public int SqbSz { get; set; }
/// <summary>How long to look for a subscription match for a reply message.</summary>
public TimeSpan RecSubExp { get; set; }
/// <summary>Server ID hash (6 bytes) for routing mapped replies.</summary>
public byte[] SIdHash { get; set; } = [];
/// <summary>
/// Map from a route server's hashed ID (6 bytes) to the route client.
/// Mirrors Go's <c>routesIDByHash sync.Map</c>.
/// </summary>
public System.Collections.Concurrent.ConcurrentDictionary<string, ClientConnection> RoutesIdByHash { get; set; } = new();
/// <summary>
/// Gateway URLs from this server's own entry in the Gateways config block,
/// used for monitoring reports.
/// </summary>
public List<string> OwnCfgUrls { get; set; } = [];
// -------------------------------------------------------------------------
// Lock helpers
// -------------------------------------------------------------------------
public void AcquireReadLock() => _lock.EnterReadLock();
public void ReleaseReadLock() => _lock.ExitReadLock();
public void AcquireWriteLock() => _lock.EnterWriteLock();
public void ReleaseWriteLock() => _lock.ExitWriteLock();
}
/// <summary>
/// Subject-interest tally entry. Indicates whether the key in the map is a
/// queue subscription and how many matching subscriptions exist.
/// Mirrors Go <c>sitally</c> struct in gateway.go.
/// </summary>
internal sealed class SitAlly
{
/// <summary>Number of subscriptions directly matching the subject/queue key.</summary>
public int N { get; set; }
/// <summary>True if this entry represents a queue subscription.</summary>
public bool Q { get; set; }
}
/// <summary>
/// Runtime configuration for a single remote gateway.
/// Wraps <see cref="RemoteGatewayOpts"/> with connection-attempt state and a lock.
/// Mirrors Go <c>gatewayCfg</c> struct in gateway.go.
/// </summary>
internal sealed class GatewayCfg
{
private readonly ReaderWriterLockSlim _lock = new(LockRecursionPolicy.SupportsRecursion);
/// <summary>The raw remote-gateway options this cfg was built from.</summary>
public RemoteGatewayOpts? RemoteOpts { get; set; }
/// <summary>6-byte cluster hash used for reply routing.</summary>
public byte[] Hash { get; set; } = [];
/// <summary>4-byte old-style hash for backward compatibility.</summary>
public byte[] OldHash { get; set; } = [];
/// <summary>Map of URL string → parsed URL for this remote gateway.</summary>
public Dictionary<string, Uri> Urls { get; set; } = new();
/// <summary>Number of connection attempts made so far.</summary>
public int ConnAttempts { get; set; }
/// <summary>TLS server name override for SNI.</summary>
public string TlsName { get; set; } = string.Empty;
/// <summary>True if this remote was discovered via gossip (not configured).</summary>
public bool Implicit { get; set; }
/// <summary>When true, monitoring should refresh the URL list on next varz inspection.</summary>
public bool VarzUpdateUrls { get; set; }
// Forwarded properties from RemoteGatewayOpts
public string Name { get => RemoteOpts?.Name ?? string.Empty; }
// -------------------------------------------------------------------------
// Lock helpers
// -------------------------------------------------------------------------
public void AcquireReadLock() => _lock.EnterReadLock();
public void ReleaseReadLock() => _lock.ExitReadLock();
public void AcquireWriteLock() => _lock.EnterWriteLock();
public void ReleaseWriteLock() => _lock.ExitWriteLock();
}
/// <summary>
/// Per-connection gateway state embedded in <see cref="ClientConnection"/>
/// when the connection kind is <c>Gateway</c>.
/// Mirrors Go <c>gateway</c> struct in gateway.go.
/// </summary>
internal sealed class Gateway
{
/// <summary>Name of the remote gateway cluster.</summary>
public string Name { get; set; } = string.Empty;
/// <summary>Configuration block for the remote gateway.</summary>
public GatewayCfg? Cfg { get; set; }
/// <summary>URL used for CONNECT after receiving the remote INFO (outbound only).</summary>
public Uri? ConnectUrl { get; set; }
/// <summary>
/// Per-account subject interest (outbound connection).
/// Maps account name → <see cref="OutSide"/> for that account.
/// Uses a thread-safe map because it is read from multiple goroutines.
/// </summary>
public System.Collections.Concurrent.ConcurrentDictionary<string, OutSide>? OutSim { get; set; }
/// <summary>
/// Per-account no-interest subjects or interest-only mode (inbound connection).
/// </summary>
public Dictionary<string, InSide>? InSim { get; set; }
/// <summary>True if this is an outbound gateway connection.</summary>
public bool Outbound { get; set; }
/// <summary>
/// Set in the read loop without locking to record that the inbound side
/// sent its CONNECT protocol.
/// </summary>
public bool Connected { get; set; }
/// <summary>
/// True if the remote server only understands the old <c>$GR.</c> prefix,
/// not the newer <c>$GNR.</c> scheme.
/// </summary>
public bool UseOldPrefix { get; set; }
/// <summary>
/// When true the inbound side switches accounts to interest-only mode
/// immediately, so the outbound side can disregard optimistic mode.
/// </summary>
public bool InterestOnlyMode { get; set; }
/// <summary>Name of the remote server on this gateway connection.</summary>
public string RemoteName { get; set; } = string.Empty;
}
/// <summary>
/// Outbound subject-interest entry for a single account on a gateway connection.
/// Mirrors Go <c>outsie</c> struct in gateway.go.
/// </summary>
internal sealed class OutSide
{
private readonly ReaderWriterLockSlim _lock = new(LockRecursionPolicy.SupportsRecursion);
/// <summary>Current interest mode for this account on the outbound gateway.</summary>
public GatewayInterestMode Mode { get; set; }
/// <summary>
/// Set of subjects for which the remote has signalled no-interest.
/// Null when the remote has sent all its subscriptions (interest-only mode).
/// </summary>
public HashSet<string>? Ni { get; set; }
/// <summary>
/// Subscription index: contains queue subs in optimistic mode,
/// or all subs when <see cref="Mode"/> has been switched.
/// </summary>
public SubscriptionIndex? Sl { get; set; }
/// <summary>Number of queue subscriptions tracked in <see cref="Sl"/>.</summary>
public int Qsubs { get; set; }
// -------------------------------------------------------------------------
// Lock helpers
// -------------------------------------------------------------------------
public void AcquireReadLock() => _lock.EnterReadLock();
public void ReleaseReadLock() => _lock.ExitReadLock();
public void AcquireWriteLock() => _lock.EnterWriteLock();
public void ReleaseWriteLock() => _lock.ExitWriteLock();
}
/// <summary>
/// Inbound subject-interest entry for a single account on a gateway connection.
/// Tracks subjects for which an RS- was sent to the remote, and the current mode.
/// Mirrors Go <c>insie</c> struct in gateway.go.
/// </summary>
internal sealed class InSide
{
/// <summary>
/// Subjects for which RS- was sent to the remote (null when in interest-only mode).
/// </summary>
public HashSet<string>? Ni { get; set; }
/// <summary>Current interest mode for this account on the inbound gateway.</summary>
public GatewayInterestMode Mode { get; set; }
}
/// <summary>
/// A single gateway reply-mapping entry: the mapped subject and its expiry.
/// Mirrors Go <c>gwReplyMap</c> struct in gateway.go.
/// </summary>
internal sealed class GwReplyMap
{
/// <summary>The mapped (routed) subject string.</summary>
public string Ms { get; set; } = string.Empty;
/// <summary>Expiry expressed as <see cref="DateTime.Ticks"/> (UTC).</summary>
public long Exp { get; set; }
}
/// <summary>
/// Gateway reply routing table and a fast-path check flag.
/// Mirrors Go <c>gwReplyMapping</c> struct in gateway.go.
/// </summary>
internal sealed class GwReplyMapping
{
/// <summary>
/// Non-zero when the mapping table should be consulted while processing
/// inbound messages. Accessed via <c>Interlocked</c> — must be 32-bit aligned.
/// </summary>
public int Check;
/// <summary>Active reply-subject → GwReplyMap entries.</summary>
public Dictionary<string, GwReplyMap> Mapping { get; set; } = new();
/// <summary>
/// Returns the routed subject for <paramref name="subject"/> if a mapping
/// exists, otherwise returns the original subject and <c>false</c>.
/// Caller must hold any required lock before invoking.
/// </summary>
public (byte[] Subject, bool Found) Get(byte[] subject)
{
// TODO: session 16 — implement mapping lookup
return (subject, false);
}
}