Merge branch 'worktree-agent-a5ce7551'

This commit is contained in:
Joseph Doherty
2026-03-01 08:43:24 -05:00
5 changed files with 1079 additions and 1 deletions

View File

@@ -1305,9 +1305,10 @@ public sealed partial class Account : INatsAccount
/// <summary>
/// Invoked when the expiration timer fires: marks expired and collects clients.
/// Also callable externally (e.g., from <c>removeCb</c>) to forcibly expire an account.
/// Mirrors Go <c>(a *Account) expiredTimeout()</c>.
/// </summary>
private void ExpiredTimeout()
internal void ExpiredTimeout()
{
Interlocked.Exchange(ref _expired, 1);
@@ -4643,4 +4644,45 @@ public sealed partial class Account : INatsAccount
return true;
}
// =========================================================================
// Locking helpers used by ServiceExportEntry.CheckExpiredResponses
// These expose just enough of the internal _mu to allow the extension method
// to perform read/write operations without exposing the lock directly.
// =========================================================================
/// <summary>Enters the account read lock.</summary>
internal void ReadLock() => _mu.EnterReadLock();
/// <summary>Exits the account read lock.</summary>
internal void ReadUnlock() => _mu.ExitReadLock();
/// <summary>Enters the account write lock.</summary>
internal void WriteLock() => _mu.EnterWriteLock();
/// <summary>Exits the account write lock.</summary>
internal void WriteUnlock() => _mu.ExitWriteLock();
/// <summary>
/// Returns the number of pending response entries that belong to
/// <paramref name="se"/>.
/// Account read lock must be held by the caller.
/// </summary>
internal int GetResponseCount(ServiceExportEntry se)
{
if (Exports.Responses == null) return 0;
int count = 0;
foreach (var si in Exports.Responses.Values)
{
if (si.ServiceExport == se) count++;
}
return count;
}
/// <summary>
/// Returns a snapshot of all pending response entries.
/// Account read lock must be held by the caller.
/// </summary>
internal IEnumerable<KeyValuePair<string, ServiceImportEntry>> GetResponseEntries()
=> Exports.Responses ?? Enumerable.Empty<KeyValuePair<string, ServiceImportEntry>>();
}

View File

@@ -13,6 +13,7 @@
//
// Adapted from server/accounts.go in the NATS server Go source.
using System.Globalization;
using System.Text.Json.Serialization;
using ZB.MOM.NatsNet.Server.Auth;
using ZB.MOM.NatsNet.Server.Internal;
@@ -735,3 +736,383 @@ public static class AccountEventConstants
/// </summary>
public const string ServiceLatencyType = "io.nats.server.metric.v1.service_latency";
}
// ============================================================================
// ServiceImportEntry extension methods
// Mirrors Go methods on *serviceImport (accounts.go).
// ============================================================================
/// <summary>
/// Extension methods for <see cref="ServiceImportEntry"/>.
/// </summary>
internal static class ServiceImportEntryExtensions
{
/// <summary>
/// Returns true when this import entry represents a pending response service import
/// (as opposed to an originating request import).
/// Mirrors Go <c>(si *serviceImport) isRespServiceImport() bool</c>.
/// </summary>
public static bool IsRespServiceImport(this ServiceImportEntry? si)
=> si != null && si.IsResponse;
}
// ============================================================================
// ServiceExportEntry extension methods — response threshold timer
// Mirrors Go methods on *serviceExport (accounts.go ~lines 2446-2507).
// ============================================================================
/// <summary>
/// Extension methods for <see cref="ServiceExportEntry"/> providing response
/// threshold timer management and expired-response cleanup.
/// </summary>
internal static class ServiceExportEntryExtensions
{
/// <summary>
/// Sets the response threshold timer on <paramref name="se"/> if not already set.
/// The timer fires <see cref="ServiceExportEntry.ResponseThreshold"/> after being set,
/// calling <see cref="CheckExpiredResponses"/>.
/// Account lock must be held on entry.
/// Mirrors Go <c>(se *serviceExport) setResponseThresholdTimer()</c>.
/// </summary>
public static void SetResponseThresholdTimer(this ServiceExportEntry se)
{
if (se.ResponseTimer != null)
return; // already set
se.ResponseTimer = new Timer(
_ => se.CheckExpiredResponses(),
null,
se.ResponseThreshold,
Timeout.InfiniteTimeSpan);
}
/// <summary>
/// Stops and clears the response threshold timer.
/// Returns true when the timer was stopped before it fired (or was already null).
/// Account lock must be held on entry.
/// Mirrors Go <c>(se *serviceExport) clearResponseThresholdTimer() bool</c>.
/// </summary>
public static bool ClearResponseThresholdTimer(this ServiceExportEntry se)
{
if (se.ResponseTimer == null)
return true;
se.ResponseTimer.Dispose();
se.ResponseTimer = null;
return true;
}
/// <summary>
/// Iterates the owning account's pending response map, removes any entries
/// belonging to <paramref name="se"/> whose timestamp is older than
/// <see cref="ServiceExportEntry.ResponseThreshold"/>, then resets or clears
/// the timer depending on whether responses remain.
/// Mirrors Go <c>(se *serviceExport) checkExpiredResponses()</c>.
/// </summary>
public static void CheckExpiredResponses(this ServiceExportEntry se)
{
var acc = se.Account;
if (acc == null)
{
se.ClearResponseThresholdTimer();
return;
}
var expired = new List<ServiceImportEntry>();
var minTs = DateTime.UtcNow.Ticks - se.ResponseThreshold.Ticks;
int totalResponses;
// Read lock to collect expired entries.
acc.ReadLock();
try
{
totalResponses = acc.GetResponseCount(se);
foreach (var (_, si) in acc.GetResponseEntries())
{
if (si.ServiceExport == se && si.Timestamp <= minTs)
expired.Add(si);
}
}
finally
{
acc.ReadUnlock();
}
foreach (var si in expired)
acc.RemoveRespServiceImport(si, RsiReason.Timeout);
totalResponses -= expired.Count;
// Reset or clear the timer under a write lock.
acc.WriteLock();
try
{
if (totalResponses > 0 && se.ResponseTimer != null)
{
se.ResponseTimer.Change(se.ResponseThreshold, Timeout.InfiniteTimeSpan);
}
else
{
se.ClearResponseThresholdTimer();
}
}
finally
{
acc.WriteUnlock();
}
}
}
// ============================================================================
// ServiceLatency extensions — merge and sanitize
// Mirrors Go functions in accounts.go ~lines 1354-1377.
// ============================================================================
/// <summary>
/// Extension and static helper methods for <see cref="ServiceLatency"/>.
/// </summary>
public static class ServiceLatencyExtensions
{
/// <summary>
/// Merges responder-side latency measurement <paramref name="m2"/> into the
/// requestor-side measurement <paramref name="m1"/>, computing system overhead.
/// Mirrors Go <c>(m1 *ServiceLatency) merge(m2 *ServiceLatency)</c>.
/// </summary>
public static void Merge(this ServiceLatency m1, ServiceLatency m2)
{
var rtt = m2.Responder?.Rtt ?? TimeSpan.Zero;
m1.SystemLatency = m1.ServiceLatencyDuration - (m2.ServiceLatencyDuration + rtt);
m1.ServiceLatencyDuration = m2.ServiceLatencyDuration;
m1.Responder = m2.Responder;
m1.Sanitize();
}
/// <summary>
/// Clamps any negative latency durations to zero.
/// Mirrors Go <c>sanitizeLatencyMetric(sl *ServiceLatency)</c>.
/// </summary>
public static void Sanitize(this ServiceLatency sl)
{
if (sl.ServiceLatencyDuration < TimeSpan.Zero)
sl.ServiceLatencyDuration = TimeSpan.Zero;
if (sl.SystemLatency < TimeSpan.Zero)
sl.SystemLatency = TimeSpan.Zero;
}
/// <summary>
/// Clamps any negative latency durations on <paramref name="sl"/> to zero.
/// Static helper that mirrors Go <c>sanitizeLatencyMetric</c> as a standalone function.
/// </summary>
public static void SanitizeLatencyMetric(ServiceLatency sl) => sl.Sanitize();
}
// ============================================================================
// Tracing header builders
// Mirrors Go functions in accounts.go ~lines 2228-2276.
// ============================================================================
/// <summary>
/// Well-known distributed-tracing header keys used for sampling propagation.
/// Mirrors Go package-level vars <c>trcUber, trcCtx, trcB3, ...</c> in accounts.go.
/// </summary>
internal static class TracingHeaderKeys
{
// Uber/Jaeger
public const string UberTraceId = "Uber-Trace-Id";
// W3C TraceContext
public const string TraceParent = "Traceparent";
public const string TraceState = "Tracestate";
// Zipkin B3 single-header
public const string B3Single = "B3";
// Zipkin B3 multi-header
public const string B3Sampled = "X-B3-Sampled";
public const string B3TraceId = "X-B3-TraceId";
public const string B3ParentSpanId = "X-B3-ParentSpanId";
public const string B3SpanId = "X-B3-SpanId";
// Uber context baggage prefix
public const string UberCtxPrefix = "Uberctx-";
}
/// <summary>
/// Factory helpers for distributed-tracing header propagation dictionaries.
/// Mirrors Go <c>newB3Header</c>, <c>newUberHeader</c>, <c>newTraceCtxHeader</c>
/// and <c>shouldSample</c> in accounts.go.
/// </summary>
internal static class TracingHeaders
{
/// <summary>
/// Copies the Zipkin B3 multi-header keys from <paramref name="source"/> into a new header map.
/// Mirrors Go <c>newB3Header(h http.Header) http.Header</c>.
/// </summary>
public static Dictionary<string, string[]> NewB3Header(Dictionary<string, string[]> source)
{
var result = new Dictionary<string, string[]>(StringComparer.OrdinalIgnoreCase);
TryCopy(source, result, TracingHeaderKeys.B3Sampled);
TryCopy(source, result, TracingHeaderKeys.B3TraceId);
TryCopy(source, result, TracingHeaderKeys.B3ParentSpanId);
TryCopy(source, result, TracingHeaderKeys.B3SpanId);
return result;
}
/// <summary>
/// Builds a new Uber/Jaeger header map with <paramref name="traceId"/> plus any
/// <c>Uberctx-*</c> baggage headers copied from <paramref name="source"/>.
/// Mirrors Go <c>newUberHeader(h http.Header, tId []string) http.Header</c>.
/// </summary>
public static Dictionary<string, string[]> NewUberHeader(
Dictionary<string, string[]> source,
string[] traceId)
{
var result = new Dictionary<string, string[]>(StringComparer.OrdinalIgnoreCase)
{
[TracingHeaderKeys.UberTraceId] = traceId,
};
foreach (var (k, v) in source)
{
if (k.StartsWith(TracingHeaderKeys.UberCtxPrefix, StringComparison.OrdinalIgnoreCase))
result[k] = v;
}
return result;
}
/// <summary>
/// Builds a new W3C TraceContext header map with <paramref name="traceId"/> plus
/// optional <c>Tracestate</c> copied from <paramref name="source"/>.
/// Mirrors Go <c>newTraceCtxHeader(h http.Header, tId []string) http.Header</c>.
/// </summary>
public static Dictionary<string, string[]> NewTraceCtxHeader(
Dictionary<string, string[]> source,
string[] traceId)
{
var result = new Dictionary<string, string[]>(StringComparer.OrdinalIgnoreCase)
{
[TracingHeaderKeys.TraceParent] = traceId,
};
TryCopy(source, result, TracingHeaderKeys.TraceState);
return result;
}
/// <summary>
/// Determines whether a service-latency sample should be taken for a given
/// <paramref name="latencyConfig"/> and <paramref name="headers"/>.
/// Returns the sampling decision and an optional propagation header map.
/// Mirrors Go <c>shouldSample(l *serviceLatency, c *client) (bool, http.Header)</c>.
/// </summary>
public static (bool Sample, Dictionary<string, string[]>? Header) ShouldSample(
InternalServiceLatency? latencyConfig,
Dictionary<string, string[]>? headers)
{
if (latencyConfig == null)
return (false, null);
if (latencyConfig.Sampling < 0)
return (false, null);
if (latencyConfig.Sampling >= 100)
return (true, null);
if (latencyConfig.Sampling > 0 && Random.Shared.Next(100) < latencyConfig.Sampling)
return (true, null);
if (headers == null || headers.Count == 0)
return (false, null);
// Uber/Jaeger: check flag bit in last token of trace-id.
if (headers.TryGetValue(TracingHeaderKeys.UberTraceId, out var uberIds) && uberIds.Length > 0)
{
var tk = uberIds[0].Split(':');
if (tk.Length == 4 && tk[3].Length is > 0 and <= 2)
{
var src = tk[3].Length == 1 ? $"0{tk[3]}" : tk[3];
if (byte.TryParse(src, NumberStyles.HexNumber, null, out var flag) && (flag & 1) == 1)
return (true, NewUberHeader(headers, uberIds));
}
return (false, null);
}
// Zipkin B3 multi-header — sampled flag.
if (headers.TryGetValue(TracingHeaderKeys.B3Sampled, out var sampled) && sampled.Length > 0)
{
if (sampled[0] == "1") return (true, NewB3Header(headers));
if (sampled[0] == "0") return (false, null);
}
// Zipkin B3 multi-header — presence of trace-id means recipient decides.
if (headers.ContainsKey(TracingHeaderKeys.B3TraceId))
return (true, NewB3Header(headers));
// Zipkin B3 single-header.
if (headers.TryGetValue(TracingHeaderKeys.B3Single, out var b3) && b3.Length > 0)
{
var tk = b3[0].Split('-');
if (tk.Length > 2 && tk[2] == "0") return (false, null);
if (tk.Length == 1 && tk[0] == "0") return (false, null);
return (true, new Dictionary<string, string[]>(StringComparer.OrdinalIgnoreCase)
{
[TracingHeaderKeys.B3Single] = b3,
});
}
// W3C TraceContext.
if (headers.TryGetValue(TracingHeaderKeys.TraceParent, out var tpIds) && tpIds.Length > 0)
{
var tk = tpIds[0].Split('-');
bool sample = false;
if (tk.Length == 4 && tk[3].Length == 2)
{
if (byte.TryParse(tk[3], NumberStyles.HexNumber, null, out var flags))
sample = (flags & 0x1) == 0x1;
}
if (sample)
return (true, NewTraceCtxHeader(headers, tpIds));
return (false, null);
}
return (false, null);
}
// -------------------------------------------------------------------------
// Private helpers
// -------------------------------------------------------------------------
private static void TryCopy(
Dictionary<string, string[]> src,
Dictionary<string, string[]> dst,
string key)
{
if (src.TryGetValue(key, out var v))
dst[key] = v;
}
}
// ============================================================================
// UpdateAllClientsServiceExportResponseTime
// Mirrors Go updateAllClientsServiceExportResponseTime (accounts.go ~line 1527).
// ============================================================================
/// <summary>
/// Static helper that updates the response-time tracking threshold on all clients
/// registered for a service export.
/// Mirrors Go <c>updateAllClientsServiceExportResponseTime(clients []*client, lrt time.Duration)</c>.
/// </summary>
internal static class ServiceExportClientHelpers
{
/// <summary>
/// Iterates <paramref name="clients"/> and, for each one whose <c>RrTracking.Lrt</c>
/// differs from <paramref name="lowestResponseThreshold"/>, updates the threshold
/// and resets the pending-timer.
/// Mirrors Go <c>updateAllClientsServiceExportResponseTime</c>.
/// </summary>
public static void UpdateAllClientsServiceExportResponseTime(
IEnumerable<ClientConnection> clients,
TimeSpan lowestResponseThreshold)
{
foreach (var c in clients)
{
c.UpdateRrTrackingThreshold(lowestResponseThreshold);
}
}
}

View File

@@ -1807,6 +1807,25 @@ public sealed partial class ClientConnection
internal bool IsWebSocket() => Ws != null;
internal bool IsHubLeafNode() => Kind == ClientKind.Leaf && Leaf?.IsSpoke != true;
internal string RemoteCluster() => Leaf?.RemoteCluster ?? string.Empty;
/// <summary>
/// Updates the response-round-trip tracking threshold on this client when it
/// differs from <paramref name="lowestResponseThreshold"/>, and resets the
/// pending-timer to the new value.
/// Mirrors Go logic in <c>updateAllClientsServiceExportResponseTime</c> for a
/// single client (accounts.go ~line 1527).
/// </summary>
internal void UpdateRrTrackingThreshold(TimeSpan lowestResponseThreshold)
{
lock (_mu)
{
if (_rrTracking == null || _rrTracking.Lrt == lowestResponseThreshold)
return;
_rrTracking.Lrt = lowestResponseThreshold;
_rrTracking.Ptmr?.Change(lowestResponseThreshold, Timeout.InfiniteTimeSpan);
}
}
}
// ============================================================================

View File

@@ -0,0 +1,630 @@
// Copyright 2018-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Adapted from server/accounts.go in the NATS server Go source.
// Batch 43: account resolver and service latency tracking methods.
using ZB.MOM.NatsNet.Server.Auth;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
// =========================================================================
// SetAccountResolver (Group E)
// Mirrors Go (s *Server) SetAccountResolver(ar AccountResolver)
// accounts.go ~line 3267.
// =========================================================================
/// <summary>
/// Assigns the account resolver used to fetch/store account JWTs.
/// Mirrors Go <c>Server.SetAccountResolver</c>.
/// </summary>
public void SetAccountResolver(IAccountResolver ar)
{
_mu.EnterWriteLock();
try
{
_accResolver = ar;
}
finally
{
_mu.ExitWriteLock();
}
}
/// <summary>
/// Returns the currently configured account resolver.
/// Mirrors Go <c>Server.AccountResolver</c>.
/// </summary>
public IAccountResolver? GetAccountResolver()
{
_mu.EnterReadLock();
try
{
return _accResolver;
}
finally
{
_mu.ExitReadLock();
}
}
// =========================================================================
// UpdateAccountClaims (Group E)
// Mirrors Go (s *Server) UpdateAccountClaims(a *Account, ac *jwt.AccountClaims)
// accounts.go ~line 3290.
// =========================================================================
/// <summary>
/// Updates an existing account with new JWT claims.
/// This replaces any exports or imports previously defined.
/// Mirrors Go <c>Server.UpdateAccountClaims</c>.
/// Lock must NOT be held on entry.
/// </summary>
public void UpdateAccountClaims(Account a, AccountClaims ac)
{
UpdateAccountClaimsWithRefresh(a, ac, refreshImportingAccounts: true);
}
// =========================================================================
// updateAccountClaimsWithRefresh (Group E)
// Mirrors Go (s *Server) updateAccountClaimsWithRefresh(...)
// accounts.go ~line 3374.
// =========================================================================
/// <summary>
/// Updates an existing account with new JWT claims, optionally also refreshing
/// incomplete importing accounts that depend on this one.
/// Mirrors Go <c>Server.updateAccountClaimsWithRefresh</c>.
/// Lock must NOT be held on entry.
/// </summary>
public void UpdateAccountClaimsWithRefresh(Account a, AccountClaims ac, bool refreshImportingAccounts)
{
if (a == null) return;
if (ac == null) return;
Debugf("Updating account claims: {0}/{1}", a.Name, ac.Subject);
// Update name tag and tags from claims.
a.WriteLock();
try
{
a.NameTag = ac.Name ?? string.Empty;
// Tags are stubbed — full JWT integration handles this.
// Reset exports.
a.Exports = new ExportMap();
// Move current imports aside so we can clear them.
a.Imports.Streams = null;
a.Imports.Services = null;
}
finally
{
a.WriteUnlock();
}
// Update signing keys.
a.WriteLock();
try
{
a.SigningKeys = null;
if (!_strictSigningKeyUsage.Contains(a.Issuer))
{
a.SigningKeys = new Dictionary<string, object?>(StringComparer.Ordinal)
{
[a.Name] = null,
};
}
}
finally
{
a.WriteUnlock();
}
// Apply limits from claims (stub — full JWT limits in session 11).
// For now just update the JWT timestamp.
a.Updated = DateTime.UtcNow;
a.ClaimJwt = ac.Subject; // placeholder — full decode in session 11.
}
// =========================================================================
// buildPermissionsFromJwt (Group E)
// Mirrors Go buildPermissionsFromJwt(uc *jwt.Permissions) *Permissions
// accounts.go ~line 3979.
// =========================================================================
/// <summary>
/// Builds a <see cref="Permissions"/> object from JWT NatsPermissions claims.
/// Returns null when the input is null or has no constraints.
/// Mirrors Go <c>buildPermissionsFromJwt</c>.
/// </summary>
public static Permissions? BuildPermissionsFromJwt(JwtPermissions? uc)
{
if (uc == null) return null;
Permissions? p = null;
if (uc.Publish.Allow?.Count > 0 || uc.Publish.Deny?.Count > 0)
{
p ??= new Permissions();
p.Publish = new SubjectPermission
{
Allow = uc.Publish.Allow,
Deny = uc.Publish.Deny,
};
}
if (uc.Subscribe.Allow?.Count > 0 || uc.Subscribe.Deny?.Count > 0)
{
p ??= new Permissions();
p.Subscribe = new SubjectPermission
{
Allow = uc.Subscribe.Allow,
Deny = uc.Subscribe.Deny,
};
}
if (uc.Response != null)
{
p ??= new Permissions();
p.Response = new ResponsePermission
{
MaxMsgs = uc.Response.MaxMsgs,
Expires = uc.Response.Expires,
};
ValidateResponsePermissions(p);
}
return p;
}
// =========================================================================
// buildInternalNkeyUser (Group E)
// Mirrors Go buildInternalNkeyUser(uc *jwt.UserClaims, acts map[string]struct{}, acc *Account)
// accounts.go ~line 4012.
// =========================================================================
/// <summary>
/// Builds an internal <see cref="NkeyUser"/> from JWT user claims for use with
/// system-account communication.
/// Mirrors Go <c>buildInternalNkeyUser</c>.
/// </summary>
public static NkeyUser BuildInternalNkeyUser(
JwtUserClaims uc,
HashSet<string>? allowedConnectionTypes,
Account acc)
{
var nu = new NkeyUser
{
Nkey = uc.Subject,
Account = acc,
AllowedConnectionTypes = allowedConnectionTypes,
Issued = uc.IssuedAt,
};
if (!string.IsNullOrEmpty(uc.IssuerAccount))
nu.SigningKey = uc.Issuer;
var p = BuildPermissionsFromJwt(uc.Permissions);
if (p == null && acc.DefaultPerms != null)
p = acc.DefaultPerms.Clone();
nu.Permissions = p;
return nu;
}
// =========================================================================
// fetchAccount (Group E)
// Mirrors Go fetchAccount(res AccountResolver, name string) (string, error)
// accounts.go ~line 4027.
// =========================================================================
/// <summary>
/// Fetches account JWT from <paramref name="resolver"/> after validating that
/// <paramref name="name"/> looks like a valid account public key.
/// Mirrors Go <c>fetchAccount</c>.
/// </summary>
public static async Task<(string Jwt, Exception? Error)> FetchAccountFromResolverAsync(
IAccountResolver resolver,
string name,
CancellationToken ct = default)
{
// In Go this validates via nkeys.IsValidPublicAccountKey.
// We perform a basic non-empty check until nkeys is fully integrated.
if (string.IsNullOrWhiteSpace(name))
return (string.Empty, new InvalidOperationException("will only fetch valid account keys"));
try
{
var jwt = await resolver.FetchAsync(name, ct).ConfigureAwait(false);
return (jwt, null);
}
catch (Exception ex)
{
return (string.Empty, ex);
}
}
// =========================================================================
// authAccounts (Group E)
// Mirrors Go authAccounts(tokenReq bool) []*Account
// accounts.go ~line 3259.
// =========================================================================
/// <summary>
/// Returns a sentinel non-null list when <paramref name="tokenRequired"/> is true,
/// indicating that import of this service requires an auth token.
/// Mirrors Go <c>authAccounts(tokenReq bool) []*Account</c>.
/// </summary>
public static IReadOnlyList<Account>? AuthAccounts(bool tokenRequired)
=> tokenRequired ? Array.Empty<Account>() : null;
// =========================================================================
// respondToUpdate (Group E)
// Mirrors Go respondToUpdate(s *Server, respSubj string, acc string, message string, err error)
// accounts.go ~line 4177.
// =========================================================================
/// <summary>
/// Logs the outcome of an account-update operation and, when a reply subject is
/// provided, sends a JSON response to that subject.
/// Mirrors Go <c>respondToUpdate</c>.
/// </summary>
public void RespondToUpdate(string replySubject, string acc, string message, Exception? err)
{
if (err == null)
{
if (string.IsNullOrEmpty(acc))
Debugf("{0}", message);
else
Debugf("{0} - {1}", message, acc);
}
else
{
if (string.IsNullOrEmpty(acc))
Errorf("{0} - {1}", message, err.Message);
else
Errorf("{0} - {1} - {2}", message, acc, err.Message);
}
if (string.IsNullOrEmpty(replySubject))
return;
var response = err == null
? (object)new ClaimUpdateResponse
{
Data = new ClaimUpdateStatus
{
Account = acc,
Code = 200,
Message = message,
},
}
: new ClaimUpdateResponse
{
Error = new ClaimUpdateError
{
Account = acc,
Code = 500,
Description = $"{message} - {err.Message}",
},
};
SendInternalMsgLocked(replySubject, response);
}
// =========================================================================
// handleListRequest (Group E)
// Mirrors Go handleListRequest(store *DirJWTStore, s *Server, reply string)
// accounts.go ~line 4216.
// =========================================================================
/// <summary>
/// Responds to an account-list API request by returning all known account IDs
/// from <paramref name="store"/>.
/// Mirrors Go <c>handleListRequest</c>.
/// </summary>
public void HandleListRequest(DirJwtStore store, string reply)
{
if (string.IsNullOrEmpty(reply)) return;
var accIds = new List<string>(capacity: 1024);
try
{
store.PackWalk(1, partialPackMsg =>
{
var tk = partialPackMsg.Split('|');
if (tk.Length == 2)
accIds.Add(tk[0]);
});
Debugf("list request responded with {0} account ids", accIds.Count);
var response = new { data = accIds };
SendInternalMsgLocked(reply, response);
}
catch (Exception ex)
{
Errorf("list request error: {0}", ex.Message);
}
}
// =========================================================================
// handleDeleteRequest (Group E)
// Mirrors Go handleDeleteRequest(store *DirJWTStore, s *Server, msg []byte, reply string)
// accounts.go ~line 4236.
// =========================================================================
/// <summary>
/// Handles an account-delete API request by decoding the signed JWT payload,
/// validating the issuer, and deleting the listed accounts.
/// Mirrors Go <c>handleDeleteRequest</c>.
/// </summary>
public void HandleDeleteRequest(DirJwtStore store, ReadOnlySpan<byte> msg, string reply)
{
// Full JWT decode requires nkeys integration (session 11).
// Stub: return an error response indicating not yet supported.
RespondToUpdate(
reply,
string.Empty,
"delete accounts request",
new NotSupportedException("delete request handling requires full JWT integration"));
}
// =========================================================================
// getOperatorKeys (Group E)
// Mirrors Go getOperatorKeys(s *Server) (string, map[string]struct{}, bool, error)
// accounts.go ~line 4290.
// =========================================================================
/// <summary>
/// Returns the primary operator public key, a set of all signing keys, and
/// whether strict signing-key usage is enforced.
/// Mirrors Go <c>getOperatorKeys</c>.
/// </summary>
public (string Operator, HashSet<string> Keys, bool Strict, Exception? Error) GetOperatorKeys()
{
var opts = GetOpts();
if (opts.TrustedOperators == null || opts.TrustedOperators.Count == 0)
return (string.Empty, new HashSet<string>(), false, new InvalidOperationException("no operator key found"));
// TrustedOperators is stored as List<object> (stub until full JWT integration).
// Return a basic stub result with the trusted keys we do have.
var keys = new HashSet<string>(StringComparer.Ordinal);
foreach (var k in _trustedKeys ?? [])
keys.Add(k);
if (keys.Count == 0)
return (string.Empty, keys, false, new InvalidOperationException("no operator key found"));
var op = keys.Count > 0 ? keys.First() : string.Empty;
var strict = _strictSigningKeyUsage.Count > 0;
return (op, keys, strict, null);
}
// =========================================================================
// claimValidate (Group E)
// Mirrors Go claimValidate(claim *jwt.AccountClaims) error
// accounts.go ~line 4310.
// =========================================================================
/// <summary>
/// Validates <paramref name="claim"/> for blocking validation errors.
/// Full JWT validation requires nkeys integration; this stub validates
/// that basic required fields are present.
/// Mirrors Go <c>claimValidate</c>.
/// </summary>
public static Exception? ClaimValidate(AccountClaims claim)
{
if (claim == null)
return new InvalidOperationException("nil account claim");
if (string.IsNullOrEmpty(claim.Subject))
return new InvalidOperationException("account claim has no subject");
if (string.IsNullOrEmpty(claim.Issuer))
return new InvalidOperationException("account claim has no issuer");
return null;
}
// =========================================================================
// removeCb (Group E)
// Mirrors Go removeCb(s *Server, pubKey string)
// accounts.go ~line 4319.
// =========================================================================
/// <summary>
/// Disables an account that has been removed from the directory resolver by
/// locking out new clients, expiring the account, and disabling JetStream.
/// Mirrors Go <c>removeCb</c>.
/// </summary>
public void RemoveCb(string pubKey)
{
if (!_accounts.TryGetValue(pubKey, out var a)) return;
Debugf("Disable account {0} due to remove", pubKey);
a.WriteLock();
try
{
a.MaxSubscriptions = 0;
a.MaxPayload = 0;
a.MaxConnections = 0;
a.MaxLeafNodes = 0;
a.Updated = DateTime.UtcNow;
}
finally
{
a.WriteUnlock();
}
// Trigger account expiration to disconnect existing clients.
a.ExpiredTimeout();
}
// =========================================================================
// Server.fetch (Group F)
// Mirrors Go (s *Server) fetch(res AccountResolver, name string, timeout time.Duration)
// accounts.go ~line 4599.
// =========================================================================
/// <summary>
/// Performs a server-level cluster-wide account lookup using the system messaging
/// infrastructure.
/// <para>
/// The full implementation sends a <c>$SYS.REQ.ACCOUNT.{name}.CLAIMS.UPDATE</c>
/// request and waits for peer responses; that path requires the system messaging
/// subscription infrastructure from session 12. This stub falls back to a direct
/// resolver fetch.
/// </para>
/// Mirrors Go <c>Server.fetch</c>.
/// </summary>
public (string Jwt, Exception? Error) ServerFetch(
IAccountResolver resolver,
string name,
TimeSpan timeout)
{
if (resolver == null)
return (string.Empty, ServerErrors.ErrNoAccountResolver);
try
{
using var cts = new CancellationTokenSource(timeout);
var jwt = resolver.FetchAsync(name, cts.Token).GetAwaiter().GetResult();
return (jwt, null);
}
catch (OperationCanceledException)
{
return (string.Empty, new TimeoutException($"fetch timed out for account {name}"));
}
catch (Exception ex)
{
return (string.Empty, ex);
}
}
// =========================================================================
// Stub private helpers used by the methods above
// =========================================================================
/// <summary>
/// Sends an internal message on <paramref name="subject"/> with the given payload.
/// Stub — full wiring deferred to session 12 (events.go).
/// </summary>
internal void SendInternalMsgLocked(string subject, object payload)
{
// TODO (session 12): implement system-account send loop.
Debugf("sendInternalMsgLocked on {0} (stub)", subject);
}
/// <summary>
/// Validates and clamps a <see cref="ResponsePermission"/> to legal ranges.
/// Mirrors Go <c>validateResponsePermissions</c> in auth.go.
/// </summary>
private static void ValidateResponsePermissions(Permissions p)
{
if (p?.Response == null) return;
if (p.Response.MaxMsgs < 0) p.Response.MaxMsgs = 0;
if (p.Response.Expires < TimeSpan.Zero) p.Response.Expires = TimeSpan.Zero;
}
}
// ============================================================================
// JWT stub types used by BuildPermissionsFromJwt / BuildInternalNkeyUser
// Full implementation deferred to session 11 JWT integration.
// ============================================================================
/// <summary>
/// Stub for a JWT publish/subscribe permission pair.
/// Mirrors Go <c>jwt.Permission</c>.
/// </summary>
public sealed class JwtSubjectPermission
{
public List<string>? Allow { get; set; }
public List<string>? Deny { get; set; }
}
/// <summary>
/// Stub for JWT response permissions.
/// Mirrors Go <c>jwt.RespPermission</c>.
/// </summary>
public sealed class JwtResponsePermission
{
public int MaxMsgs { get; set; }
public TimeSpan Expires { get; set; }
}
/// <summary>
/// Stub for JWT combined permissions (pub, sub, response).
/// Mirrors Go <c>jwt.Permissions</c>.
/// </summary>
public sealed class JwtPermissions
{
public JwtSubjectPermission Publish { get; set; } = new();
public JwtSubjectPermission Subscribe { get; set; } = new();
public JwtResponsePermission? Response { get; set; }
}
/// <summary>
/// Stub for JWT user claims.
/// Mirrors Go <c>jwt.UserClaims</c>.
/// </summary>
public sealed class JwtUserClaims
{
public string Subject { get; set; } = string.Empty;
public string Issuer { get; set; } = string.Empty;
public string IssuerAccount { get; set; } = string.Empty;
public long IssuedAt { get; set; }
public JwtPermissions? Permissions { get; set; }
}
// AccountClaims.Name is defined directly on the type in NatsServerTypes.cs.
// ============================================================================
// API response types for account resolver operations
// Mirrors Go ServerAPIClaimUpdateResponse, ClaimUpdateStatus, ClaimUpdateError
// accounts.go ~line 4159-4175.
// ============================================================================
/// <summary>
/// Response envelope for account JWT claim update API calls.
/// Mirrors Go <c>ServerAPIClaimUpdateResponse</c>.
/// </summary>
public sealed class ClaimUpdateResponse
{
public ClaimUpdateStatus? Data { get; set; }
public ClaimUpdateError? Error { get; set; }
}
/// <summary>
/// Success payload for account JWT claim update.
/// Mirrors Go <c>ClaimUpdateStatus</c>.
/// </summary>
public sealed class ClaimUpdateStatus
{
public string Account { get; set; } = string.Empty;
public int Code { get; set; }
public string Message { get; set; } = string.Empty;
}
/// <summary>
/// Error payload for account JWT claim update.
/// Mirrors Go <c>ClaimUpdateError</c>.
/// </summary>
public sealed class ClaimUpdateError
{
public string Account { get; set; } = string.Empty;
public int Code { get; set; }
public string Description { get; set; } = string.Empty;
}

View File

@@ -360,6 +360,12 @@ public sealed class AccountClaims
/// <summary>Operator or signing-key that issued this JWT.</summary>
public string Issuer { get; set; } = string.Empty;
/// <summary>
/// Human-readable name tag for the account (JWT claim field <c>name</c>).
/// Mirrors Go <c>jwt.AccountClaims.Name</c>.
/// </summary>
public string? Name { get; set; }
/// <summary>
/// Minimal stub decoder — returns null until session 11 provides full JWT parsing.
/// In Go: <c>jwt.DecodeAccountClaims(claimJWT)</c>.