Improve source XML docs and refresh profiling artifacts

This captures the iterative CommentChecker cleanup plus updated snapshot/report outputs used to validate and benchmark the latest JetStream and transport work.
This commit is contained in:
Joseph Doherty
2026-03-14 03:13:17 -04:00
parent 56c773dc71
commit ba0d65317a
76 changed files with 3058 additions and 29987 deletions

View File

@@ -15,10 +15,15 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable
private readonly ConcurrentDictionary<string, HashSet<string>> _queueSubscriptions = new(StringComparer.Ordinal);
private Task? _loopTask;
/// <summary>Remote gateway server id learned during handshake.</summary>
public string? RemoteId { get; private set; }
/// <summary>Indicates whether this is an outbound (solicited) gateway connection.</summary>
public bool IsOutbound { get; internal set; }
/// <summary>Remote endpoint string for diagnostics and monitoring.</summary>
public string RemoteEndpoint => socket.RemoteEndPoint?.ToString() ?? Guid.NewGuid().ToString("N");
/// <summary>Callback invoked when remote A+/A- interest updates are received.</summary>
public Func<RemoteSubscription, Task>? RemoteSubscriptionReceived { get; set; }
/// <summary>Callback invoked when remote GMSG payloads are received.</summary>
public Func<GatewayMessage, Task>? MessageReceived { get; set; }
/// <summary>
@@ -31,6 +36,8 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable
/// Adds a subject to the account-specific subscription set for this gateway connection.
/// Go: gateway.go — per-account subscription routing state on outbound connections.
/// </summary>
/// <param name="account">Account name for the subscription.</param>
/// <param name="subject">Subject to track.</param>
public void AddAccountSubscription(string account, string subject)
{
var subs = _accountSubscriptions.GetOrAdd(account, _ => new HashSet<string>(StringComparer.Ordinal));
@@ -40,6 +47,8 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable
/// <summary>
/// Removes a subject from the account-specific subscription set for this gateway connection.
/// </summary>
/// <param name="account">Account name for the subscription.</param>
/// <param name="subject">Subject to untrack.</param>
public void RemoveAccountSubscription(string account, string subject)
{
if (_accountSubscriptions.TryGetValue(account, out var subs))
@@ -49,6 +58,8 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable
/// <summary>
/// Returns a snapshot of all subjects tracked for the given account on this connection.
/// </summary>
/// <param name="account">Account name to query.</param>
/// <returns>Snapshot of tracked subjects.</returns>
public IReadOnlySet<string> GetAccountSubscriptions(string account)
{
if (_accountSubscriptions.TryGetValue(account, out var subs))
@@ -59,6 +70,8 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable
/// <summary>
/// Returns the number of subjects tracked for the given account. Returns 0 for unknown accounts.
/// </summary>
/// <param name="account">Account name to query.</param>
/// <returns>Number of tracked subjects for the account.</returns>
public int AccountSubscriptionCount(string account)
{
if (_accountSubscriptions.TryGetValue(account, out var subs))
@@ -70,6 +83,8 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable
/// Registers a queue group subscription for propagation to this gateway.
/// Go reference: gateway.go — sendQueueSubsToGateway.
/// </summary>
/// <param name="subject">Subject for the queue subscription.</param>
/// <param name="queueGroup">Queue group name.</param>
public void AddQueueSubscription(string subject, string queueGroup)
{
var groups = _queueSubscriptions.GetOrAdd(subject, _ => new HashSet<string>(StringComparer.Ordinal));
@@ -80,6 +95,8 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable
/// Removes a queue group subscription from this gateway connection's tracking state.
/// Go reference: gateway.go — sendQueueSubsToGateway (removal path).
/// </summary>
/// <param name="subject">Subject for the queue subscription.</param>
/// <param name="queueGroup">Queue group name.</param>
public void RemoveQueueSubscription(string subject, string queueGroup)
{
if (_queueSubscriptions.TryGetValue(subject, out var groups))
@@ -89,6 +106,8 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable
/// <summary>
/// Returns a snapshot of all queue group names registered for the given subject.
/// </summary>
/// <param name="subject">Subject to query.</param>
/// <returns>Snapshot of queue group names.</returns>
public IReadOnlySet<string> GetQueueGroups(string subject)
{
if (_queueSubscriptions.TryGetValue(subject, out var groups))
@@ -104,6 +123,9 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable
/// <summary>
/// Returns true if the given subject/queueGroup pair is currently registered on this gateway connection.
/// </summary>
/// <param name="subject">Subject to query.</param>
/// <param name="queueGroup">Queue group name to query.</param>
/// <returns><see langword="true"/> when the pair is registered.</returns>
public bool HasQueueSubscription(string subject, string queueGroup)
{
if (!_queueSubscriptions.TryGetValue(subject, out var groups))
@@ -111,6 +133,11 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable
lock (groups) return groups.Contains(queueGroup);
}
/// <summary>
/// Performs outbound gateway handshake by sending local id and reading remote id.
/// </summary>
/// <param name="serverId">Local server id.</param>
/// <param name="ct">Cancellation token for I/O operations.</param>
public async Task PerformOutboundHandshakeAsync(string serverId, CancellationToken ct)
{
await WriteLineAsync($"GATEWAY {serverId}", ct);
@@ -118,6 +145,11 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable
RemoteId = ParseHandshake(line);
}
/// <summary>
/// Performs inbound gateway handshake by reading remote id and sending local id.
/// </summary>
/// <param name="serverId">Local server id.</param>
/// <param name="ct">Cancellation token for I/O operations.</param>
public async Task PerformInboundHandshakeAsync(string serverId, CancellationToken ct)
{
var line = await ReadLineAsync(ct);
@@ -125,6 +157,10 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable
await WriteLineAsync($"GATEWAY {serverId}", ct);
}
/// <summary>
/// Starts the background frame read loop for this connection.
/// </summary>
/// <param name="ct">Cancellation token controlling loop lifetime.</param>
public void StartLoop(CancellationToken ct)
{
if (_loopTask != null)
@@ -134,15 +170,42 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable
_loopTask = Task.Run(() => ReadLoopAsync(linked.Token), linked.Token);
}
/// <summary>
/// Waits for the gateway read loop to exit.
/// </summary>
/// <param name="ct">Cancellation token for wait operation.</param>
/// <returns>A task that completes when loop exits.</returns>
public Task WaitUntilClosedAsync(CancellationToken ct)
=> _loopTask?.WaitAsync(ct) ?? Task.CompletedTask;
/// <summary>
/// Sends an A+ protocol line to advertise interest.
/// </summary>
/// <param name="account">Account for the interest update.</param>
/// <param name="subject">Subject being added.</param>
/// <param name="queue">Optional queue group.</param>
/// <param name="ct">Cancellation token for I/O operations.</param>
public Task SendAPlusAsync(string account, string subject, string? queue, CancellationToken ct)
=> WriteLineAsync(queue is { Length: > 0 } ? $"A+ {account} {subject} {queue}" : $"A+ {account} {subject}", ct);
/// <summary>
/// Sends an A- protocol line to remove advertised interest.
/// </summary>
/// <param name="account">Account for the interest update.</param>
/// <param name="subject">Subject being removed.</param>
/// <param name="queue">Optional queue group.</param>
/// <param name="ct">Cancellation token for I/O operations.</param>
public Task SendAMinusAsync(string account, string subject, string? queue, CancellationToken ct)
=> WriteLineAsync(queue is { Length: > 0 } ? $"A- {account} {subject} {queue}" : $"A- {account} {subject}", ct);
/// <summary>
/// Sends a GMSG payload to the remote gateway when interest permits forwarding.
/// </summary>
/// <param name="account">Account associated with the message.</param>
/// <param name="subject">Subject being forwarded.</param>
/// <param name="replyTo">Optional reply subject.</param>
/// <param name="payload">Payload bytes.</param>
/// <param name="ct">Cancellation token for I/O operations.</param>
public async Task SendMessageAsync(string account, string subject, string? replyTo, ReadOnlyMemory<byte> payload, CancellationToken ct)
{
// Go: gateway.go:2900 (shouldForwardMsg) — check interest tracker before sending
@@ -166,6 +229,9 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable
}
}
/// <summary>
/// Disposes this gateway connection and stops background processing.
/// </summary>
public async ValueTask DisposeAsync()
{
await _closedCts.CancelAsync();

View File

@@ -20,6 +20,8 @@ public static class ReplyMapper
/// Checks whether the subject starts with either gateway reply prefix:
/// <c>_GR_.</c> (current) or <c>$GR.</c> (legacy).
/// </summary>
/// <param name="subject">Subject to inspect.</param>
/// <returns><see langword="true"/> when the subject is gateway-routed.</returns>
public static bool HasGatewayReplyPrefix(string? subject)
=> IsGatewayRoutedSubject(subject, out _);
@@ -28,6 +30,9 @@ public static class ReplyMapper
/// old prefix (<c>$GR.</c>) was used.
/// Go reference: isGWRoutedSubjectAndIsOldPrefix.
/// </summary>
/// <param name="subject">Subject to inspect.</param>
/// <param name="isOldPrefix">Set to <see langword="true"/> when the legacy prefix is used.</param>
/// <returns><see langword="true"/> when the subject is gateway-routed.</returns>
public static bool IsGatewayRoutedSubject(string? subject, out bool isOldPrefix)
{
isOldPrefix = false;
@@ -51,6 +56,8 @@ public static class ReplyMapper
/// Go reference: gateway.go uses SHA-256 truncated to base-62; we use FNV-1a for speed
/// while maintaining determinism and good distribution.
/// </summary>
/// <param name="replyTo">Reply subject to hash.</param>
/// <returns>Non-negative deterministic hash value.</returns>
public static long ComputeReplyHash(string replyTo)
{
// FNV-1a 64-bit
@@ -72,6 +79,8 @@ public static class ReplyMapper
/// Computes the short (6-char) gateway hash used in modern gateway reply routing.
/// Go reference: getGWHash.
/// </summary>
/// <param name="gatewayName">Gateway name to hash.</param>
/// <returns>Lowercase 6-character hash token.</returns>
public static string ComputeGatewayHash(string gatewayName)
{
var digest = System.Security.Cryptography.SHA256.HashData(System.Text.Encoding.UTF8.GetBytes(gatewayName));
@@ -82,6 +91,8 @@ public static class ReplyMapper
/// Computes the short (4-char) legacy gateway hash used with old prefixes.
/// Go reference: getOldHash.
/// </summary>
/// <param name="gatewayName">Gateway name to hash.</param>
/// <returns>Lowercase 4-character hash token.</returns>
public static string ComputeOldGatewayHash(string gatewayName)
{
var digest = System.Security.Cryptography.SHA256.HashData(System.Text.Encoding.UTF8.GetBytes(gatewayName));
@@ -92,6 +103,10 @@ public static class ReplyMapper
/// Converts a reply subject to gateway form with an explicit hash segment.
/// Format: <c>_GR_.{clusterId}.{hash}.{originalReply}</c>.
/// </summary>
/// <param name="replyTo">Original reply subject.</param>
/// <param name="localClusterId">Local cluster identifier to embed.</param>
/// <param name="hash">Precomputed reply hash.</param>
/// <returns>Gateway-form reply subject, or original when null/empty.</returns>
public static string? ToGatewayReply(string? replyTo, string localClusterId, long hash)
{
if (string.IsNullOrWhiteSpace(replyTo))
@@ -104,6 +119,9 @@ public static class ReplyMapper
/// Converts a reply subject to gateway form, automatically computing the hash.
/// Format: <c>_GR_.{clusterId}.{hash}.{originalReply}</c>.
/// </summary>
/// <param name="replyTo">Original reply subject.</param>
/// <param name="localClusterId">Local cluster identifier to embed.</param>
/// <returns>Gateway-form reply subject, or original when null/empty.</returns>
public static string? ToGatewayReply(string? replyTo, string localClusterId)
{
if (string.IsNullOrWhiteSpace(replyTo))
@@ -119,6 +137,9 @@ public static class ReplyMapper
/// legacy format (<c>_GR_.{clusterId}.{originalReply}</c>).
/// Nested prefixes are unwrapped iteratively.
/// </summary>
/// <param name="gatewayReply">Gateway-form reply subject.</param>
/// <param name="restoredReply">Receives restored original reply subject on success.</param>
/// <returns><see langword="true"/> when restoration succeeds.</returns>
public static bool TryRestoreGatewayReply(string? gatewayReply, out string restoredReply)
{
restoredReply = string.Empty;
@@ -161,6 +182,9 @@ public static class ReplyMapper
/// Extracts the cluster ID from a gateway reply subject.
/// The cluster ID is the first segment after the <c>_GR_.</c> prefix.
/// </summary>
/// <param name="gatewayReply">Gateway-form reply subject.</param>
/// <param name="clusterId">Receives extracted cluster identifier on success.</param>
/// <returns><see langword="true"/> when extraction succeeds.</returns>
public static bool TryExtractClusterId(string? gatewayReply, out string clusterId)
{
clusterId = string.Empty;
@@ -181,6 +205,9 @@ public static class ReplyMapper
/// Extracts the hash from a gateway reply subject (new format only).
/// Returns false if the reply uses the legacy format without a hash.
/// </summary>
/// <param name="gatewayReply">Gateway-form reply subject.</param>
/// <param name="hash">Receives extracted hash on success.</param>
/// <returns><see langword="true"/> when extraction succeeds.</returns>
public static bool TryExtractHash(string? gatewayReply, out long hash)
{
hash = 0;
@@ -236,6 +263,11 @@ public sealed class ReplyMapCache
private long _hits;
private long _misses;
/// <summary>
/// Creates an LRU reply mapping cache with TTL expiration.
/// </summary>
/// <param name="capacity">Maximum number of entries to retain.</param>
/// <param name="ttlMs">Time-to-live for entries in milliseconds.</param>
public ReplyMapCache(int capacity = 4096, int ttlMs = 60_000)
{
_capacity = capacity;
@@ -243,10 +275,19 @@ public sealed class ReplyMapCache
_map = new Dictionary<string, LinkedListNode<CacheEntry>>(capacity, StringComparer.Ordinal);
}
/// <summary>Total cache hits since creation.</summary>
public long Hits => Interlocked.Read(ref _hits);
/// <summary>Total cache misses since creation.</summary>
public long Misses => Interlocked.Read(ref _misses);
/// <summary>Current number of entries in the cache.</summary>
public int Count { get { lock (_lock) return _map.Count; } }
/// <summary>
/// Attempts to get a cached mapping value.
/// </summary>
/// <param name="key">Cache lookup key.</param>
/// <param name="value">Resolved cached value when found and not expired.</param>
/// <returns><see langword="true"/> when an unexpired value exists.</returns>
public bool TryGet(string key, out string? value)
{
lock (_lock)
@@ -276,6 +317,11 @@ public sealed class ReplyMapCache
return false;
}
/// <summary>
/// Inserts or updates a cached mapping value.
/// </summary>
/// <param name="key">Cache key.</param>
/// <param name="value">Cache value.</param>
public void Set(string key, string value)
{
lock (_lock)
@@ -302,6 +348,9 @@ public sealed class ReplyMapCache
}
}
/// <summary>
/// Clears all cached mappings.
/// </summary>
public void Clear()
{
lock (_lock)