diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs index ec6e7d0..adc3a6d 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs @@ -1305,9 +1305,10 @@ public sealed partial class Account : INatsAccount /// /// Invoked when the expiration timer fires: marks expired and collects clients. + /// Also callable externally (e.g., from removeCb) to forcibly expire an account. /// Mirrors Go (a *Account) expiredTimeout(). /// - private void ExpiredTimeout() + internal void ExpiredTimeout() { Interlocked.Exchange(ref _expired, 1); @@ -4643,4 +4644,45 @@ public sealed partial class Account : INatsAccount return true; } + + // ========================================================================= + // Locking helpers used by ServiceExportEntry.CheckExpiredResponses + // These expose just enough of the internal _mu to allow the extension method + // to perform read/write operations without exposing the lock directly. + // ========================================================================= + + /// Enters the account read lock. + internal void ReadLock() => _mu.EnterReadLock(); + + /// Exits the account read lock. + internal void ReadUnlock() => _mu.ExitReadLock(); + + /// Enters the account write lock. + internal void WriteLock() => _mu.EnterWriteLock(); + + /// Exits the account write lock. + internal void WriteUnlock() => _mu.ExitWriteLock(); + + /// + /// Returns the number of pending response entries that belong to + /// . + /// Account read lock must be held by the caller. + /// + internal int GetResponseCount(ServiceExportEntry se) + { + if (Exports.Responses == null) return 0; + int count = 0; + foreach (var si in Exports.Responses.Values) + { + if (si.ServiceExport == se) count++; + } + return count; + } + + /// + /// Returns a snapshot of all pending response entries. + /// Account read lock must be held by the caller. + /// + internal IEnumerable> GetResponseEntries() + => Exports.Responses ?? Enumerable.Empty>(); } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/AccountTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/AccountTypes.cs index fdb9259..a3fc128 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/AccountTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/AccountTypes.cs @@ -13,6 +13,7 @@ // // Adapted from server/accounts.go in the NATS server Go source. +using System.Globalization; using System.Text.Json.Serialization; using ZB.MOM.NatsNet.Server.Auth; using ZB.MOM.NatsNet.Server.Internal; @@ -735,3 +736,383 @@ public static class AccountEventConstants /// public const string ServiceLatencyType = "io.nats.server.metric.v1.service_latency"; } + +// ============================================================================ +// ServiceImportEntry extension methods +// Mirrors Go methods on *serviceImport (accounts.go). +// ============================================================================ + +/// +/// Extension methods for . +/// +internal static class ServiceImportEntryExtensions +{ + /// + /// Returns true when this import entry represents a pending response service import + /// (as opposed to an originating request import). + /// Mirrors Go (si *serviceImport) isRespServiceImport() bool. + /// + public static bool IsRespServiceImport(this ServiceImportEntry? si) + => si != null && si.IsResponse; +} + +// ============================================================================ +// ServiceExportEntry extension methods — response threshold timer +// Mirrors Go methods on *serviceExport (accounts.go ~lines 2446-2507). +// ============================================================================ + +/// +/// Extension methods for providing response +/// threshold timer management and expired-response cleanup. +/// +internal static class ServiceExportEntryExtensions +{ + /// + /// Sets the response threshold timer on if not already set. + /// The timer fires after being set, + /// calling . + /// Account lock must be held on entry. + /// Mirrors Go (se *serviceExport) setResponseThresholdTimer(). + /// + public static void SetResponseThresholdTimer(this ServiceExportEntry se) + { + if (se.ResponseTimer != null) + return; // already set + + se.ResponseTimer = new Timer( + _ => se.CheckExpiredResponses(), + null, + se.ResponseThreshold, + Timeout.InfiniteTimeSpan); + } + + /// + /// Stops and clears the response threshold timer. + /// Returns true when the timer was stopped before it fired (or was already null). + /// Account lock must be held on entry. + /// Mirrors Go (se *serviceExport) clearResponseThresholdTimer() bool. + /// + public static bool ClearResponseThresholdTimer(this ServiceExportEntry se) + { + if (se.ResponseTimer == null) + return true; + + se.ResponseTimer.Dispose(); + se.ResponseTimer = null; + return true; + } + + /// + /// Iterates the owning account's pending response map, removes any entries + /// belonging to whose timestamp is older than + /// , then resets or clears + /// the timer depending on whether responses remain. + /// Mirrors Go (se *serviceExport) checkExpiredResponses(). + /// + public static void CheckExpiredResponses(this ServiceExportEntry se) + { + var acc = se.Account; + if (acc == null) + { + se.ClearResponseThresholdTimer(); + return; + } + + var expired = new List(); + var minTs = DateTime.UtcNow.Ticks - se.ResponseThreshold.Ticks; + + int totalResponses; + // Read lock to collect expired entries. + acc.ReadLock(); + try + { + totalResponses = acc.GetResponseCount(se); + foreach (var (_, si) in acc.GetResponseEntries()) + { + if (si.ServiceExport == se && si.Timestamp <= minTs) + expired.Add(si); + } + } + finally + { + acc.ReadUnlock(); + } + + foreach (var si in expired) + acc.RemoveRespServiceImport(si, RsiReason.Timeout); + + totalResponses -= expired.Count; + + // Reset or clear the timer under a write lock. + acc.WriteLock(); + try + { + if (totalResponses > 0 && se.ResponseTimer != null) + { + se.ResponseTimer.Change(se.ResponseThreshold, Timeout.InfiniteTimeSpan); + } + else + { + se.ClearResponseThresholdTimer(); + } + } + finally + { + acc.WriteUnlock(); + } + } +} + +// ============================================================================ +// ServiceLatency extensions — merge and sanitize +// Mirrors Go functions in accounts.go ~lines 1354-1377. +// ============================================================================ + +/// +/// Extension and static helper methods for . +/// +public static class ServiceLatencyExtensions +{ + /// + /// Merges responder-side latency measurement into the + /// requestor-side measurement , computing system overhead. + /// Mirrors Go (m1 *ServiceLatency) merge(m2 *ServiceLatency). + /// + public static void Merge(this ServiceLatency m1, ServiceLatency m2) + { + var rtt = m2.Responder?.Rtt ?? TimeSpan.Zero; + m1.SystemLatency = m1.ServiceLatencyDuration - (m2.ServiceLatencyDuration + rtt); + m1.ServiceLatencyDuration = m2.ServiceLatencyDuration; + m1.Responder = m2.Responder; + m1.Sanitize(); + } + + /// + /// Clamps any negative latency durations to zero. + /// Mirrors Go sanitizeLatencyMetric(sl *ServiceLatency). + /// + public static void Sanitize(this ServiceLatency sl) + { + if (sl.ServiceLatencyDuration < TimeSpan.Zero) + sl.ServiceLatencyDuration = TimeSpan.Zero; + if (sl.SystemLatency < TimeSpan.Zero) + sl.SystemLatency = TimeSpan.Zero; + } + + /// + /// Clamps any negative latency durations on to zero. + /// Static helper that mirrors Go sanitizeLatencyMetric as a standalone function. + /// + public static void SanitizeLatencyMetric(ServiceLatency sl) => sl.Sanitize(); +} + +// ============================================================================ +// Tracing header builders +// Mirrors Go functions in accounts.go ~lines 2228-2276. +// ============================================================================ + +/// +/// Well-known distributed-tracing header keys used for sampling propagation. +/// Mirrors Go package-level vars trcUber, trcCtx, trcB3, ... in accounts.go. +/// +internal static class TracingHeaderKeys +{ + // Uber/Jaeger + public const string UberTraceId = "Uber-Trace-Id"; + // W3C TraceContext + public const string TraceParent = "Traceparent"; + public const string TraceState = "Tracestate"; + // Zipkin B3 single-header + public const string B3Single = "B3"; + // Zipkin B3 multi-header + public const string B3Sampled = "X-B3-Sampled"; + public const string B3TraceId = "X-B3-TraceId"; + public const string B3ParentSpanId = "X-B3-ParentSpanId"; + public const string B3SpanId = "X-B3-SpanId"; + // Uber context baggage prefix + public const string UberCtxPrefix = "Uberctx-"; +} + +/// +/// Factory helpers for distributed-tracing header propagation dictionaries. +/// Mirrors Go newB3Header, newUberHeader, newTraceCtxHeader +/// and shouldSample in accounts.go. +/// +internal static class TracingHeaders +{ + /// + /// Copies the Zipkin B3 multi-header keys from into a new header map. + /// Mirrors Go newB3Header(h http.Header) http.Header. + /// + public static Dictionary NewB3Header(Dictionary source) + { + var result = new Dictionary(StringComparer.OrdinalIgnoreCase); + TryCopy(source, result, TracingHeaderKeys.B3Sampled); + TryCopy(source, result, TracingHeaderKeys.B3TraceId); + TryCopy(source, result, TracingHeaderKeys.B3ParentSpanId); + TryCopy(source, result, TracingHeaderKeys.B3SpanId); + return result; + } + + /// + /// Builds a new Uber/Jaeger header map with plus any + /// Uberctx-* baggage headers copied from . + /// Mirrors Go newUberHeader(h http.Header, tId []string) http.Header. + /// + public static Dictionary NewUberHeader( + Dictionary source, + string[] traceId) + { + var result = new Dictionary(StringComparer.OrdinalIgnoreCase) + { + [TracingHeaderKeys.UberTraceId] = traceId, + }; + + foreach (var (k, v) in source) + { + if (k.StartsWith(TracingHeaderKeys.UberCtxPrefix, StringComparison.OrdinalIgnoreCase)) + result[k] = v; + } + + return result; + } + + /// + /// Builds a new W3C TraceContext header map with plus + /// optional Tracestate copied from . + /// Mirrors Go newTraceCtxHeader(h http.Header, tId []string) http.Header. + /// + public static Dictionary NewTraceCtxHeader( + Dictionary source, + string[] traceId) + { + var result = new Dictionary(StringComparer.OrdinalIgnoreCase) + { + [TracingHeaderKeys.TraceParent] = traceId, + }; + + TryCopy(source, result, TracingHeaderKeys.TraceState); + return result; + } + + /// + /// Determines whether a service-latency sample should be taken for a given + /// and . + /// Returns the sampling decision and an optional propagation header map. + /// Mirrors Go shouldSample(l *serviceLatency, c *client) (bool, http.Header). + /// + public static (bool Sample, Dictionary? Header) ShouldSample( + InternalServiceLatency? latencyConfig, + Dictionary? headers) + { + if (latencyConfig == null) + return (false, null); + + if (latencyConfig.Sampling < 0) + return (false, null); + + if (latencyConfig.Sampling >= 100) + return (true, null); + + if (latencyConfig.Sampling > 0 && Random.Shared.Next(100) < latencyConfig.Sampling) + return (true, null); + + if (headers == null || headers.Count == 0) + return (false, null); + + // Uber/Jaeger: check flag bit in last token of trace-id. + if (headers.TryGetValue(TracingHeaderKeys.UberTraceId, out var uberIds) && uberIds.Length > 0) + { + var tk = uberIds[0].Split(':'); + if (tk.Length == 4 && tk[3].Length is > 0 and <= 2) + { + var src = tk[3].Length == 1 ? $"0{tk[3]}" : tk[3]; + if (byte.TryParse(src, NumberStyles.HexNumber, null, out var flag) && (flag & 1) == 1) + return (true, NewUberHeader(headers, uberIds)); + } + return (false, null); + } + + // Zipkin B3 multi-header — sampled flag. + if (headers.TryGetValue(TracingHeaderKeys.B3Sampled, out var sampled) && sampled.Length > 0) + { + if (sampled[0] == "1") return (true, NewB3Header(headers)); + if (sampled[0] == "0") return (false, null); + } + + // Zipkin B3 multi-header — presence of trace-id means recipient decides. + if (headers.ContainsKey(TracingHeaderKeys.B3TraceId)) + return (true, NewB3Header(headers)); + + // Zipkin B3 single-header. + if (headers.TryGetValue(TracingHeaderKeys.B3Single, out var b3) && b3.Length > 0) + { + var tk = b3[0].Split('-'); + if (tk.Length > 2 && tk[2] == "0") return (false, null); + if (tk.Length == 1 && tk[0] == "0") return (false, null); + return (true, new Dictionary(StringComparer.OrdinalIgnoreCase) + { + [TracingHeaderKeys.B3Single] = b3, + }); + } + + // W3C TraceContext. + if (headers.TryGetValue(TracingHeaderKeys.TraceParent, out var tpIds) && tpIds.Length > 0) + { + var tk = tpIds[0].Split('-'); + bool sample = false; + if (tk.Length == 4 && tk[3].Length == 2) + { + if (byte.TryParse(tk[3], NumberStyles.HexNumber, null, out var flags)) + sample = (flags & 0x1) == 0x1; + } + if (sample) + return (true, NewTraceCtxHeader(headers, tpIds)); + return (false, null); + } + + return (false, null); + } + + // ------------------------------------------------------------------------- + // Private helpers + // ------------------------------------------------------------------------- + + private static void TryCopy( + Dictionary src, + Dictionary dst, + string key) + { + if (src.TryGetValue(key, out var v)) + dst[key] = v; + } +} + +// ============================================================================ +// UpdateAllClientsServiceExportResponseTime +// Mirrors Go updateAllClientsServiceExportResponseTime (accounts.go ~line 1527). +// ============================================================================ + +/// +/// Static helper that updates the response-time tracking threshold on all clients +/// registered for a service export. +/// Mirrors Go updateAllClientsServiceExportResponseTime(clients []*client, lrt time.Duration). +/// +internal static class ServiceExportClientHelpers +{ + /// + /// Iterates and, for each one whose RrTracking.Lrt + /// differs from , updates the threshold + /// and resets the pending-timer. + /// Mirrors Go updateAllClientsServiceExportResponseTime. + /// + public static void UpdateAllClientsServiceExportResponseTime( + IEnumerable clients, + TimeSpan lowestResponseThreshold) + { + foreach (var c in clients) + { + c.UpdateRrTrackingThreshold(lowestResponseThreshold); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs index 321524d..3232812 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs @@ -1807,6 +1807,25 @@ public sealed partial class ClientConnection internal bool IsWebSocket() => Ws != null; internal bool IsHubLeafNode() => Kind == ClientKind.Leaf && Leaf?.IsSpoke != true; internal string RemoteCluster() => Leaf?.RemoteCluster ?? string.Empty; + + /// + /// Updates the response-round-trip tracking threshold on this client when it + /// differs from , and resets the + /// pending-timer to the new value. + /// Mirrors Go logic in updateAllClientsServiceExportResponseTime for a + /// single client (accounts.go ~line 1527). + /// + internal void UpdateRrTrackingThreshold(TimeSpan lowestResponseThreshold) + { + lock (_mu) + { + if (_rrTracking == null || _rrTracking.Lrt == lowestResponseThreshold) + return; + + _rrTracking.Lrt = lowestResponseThreshold; + _rrTracking.Ptmr?.Change(lowestResponseThreshold, Timeout.InfiniteTimeSpan); + } + } } // ============================================================================ diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.AccountResolvers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.AccountResolvers.cs new file mode 100644 index 0000000..9ad187e --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.AccountResolvers.cs @@ -0,0 +1,630 @@ +// Copyright 2018-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/accounts.go in the NATS server Go source. +// Batch 43: account resolver and service latency tracking methods. + +using ZB.MOM.NatsNet.Server.Auth; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + // ========================================================================= + // SetAccountResolver (Group E) + // Mirrors Go (s *Server) SetAccountResolver(ar AccountResolver) + // accounts.go ~line 3267. + // ========================================================================= + + /// + /// Assigns the account resolver used to fetch/store account JWTs. + /// Mirrors Go Server.SetAccountResolver. + /// + public void SetAccountResolver(IAccountResolver ar) + { + _mu.EnterWriteLock(); + try + { + _accResolver = ar; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Returns the currently configured account resolver. + /// Mirrors Go Server.AccountResolver. + /// + public IAccountResolver? GetAccountResolver() + { + _mu.EnterReadLock(); + try + { + return _accResolver; + } + finally + { + _mu.ExitReadLock(); + } + } + + // ========================================================================= + // UpdateAccountClaims (Group E) + // Mirrors Go (s *Server) UpdateAccountClaims(a *Account, ac *jwt.AccountClaims) + // accounts.go ~line 3290. + // ========================================================================= + + /// + /// Updates an existing account with new JWT claims. + /// This replaces any exports or imports previously defined. + /// Mirrors Go Server.UpdateAccountClaims. + /// Lock must NOT be held on entry. + /// + public void UpdateAccountClaims(Account a, AccountClaims ac) + { + UpdateAccountClaimsWithRefresh(a, ac, refreshImportingAccounts: true); + } + + // ========================================================================= + // updateAccountClaimsWithRefresh (Group E) + // Mirrors Go (s *Server) updateAccountClaimsWithRefresh(...) + // accounts.go ~line 3374. + // ========================================================================= + + /// + /// Updates an existing account with new JWT claims, optionally also refreshing + /// incomplete importing accounts that depend on this one. + /// Mirrors Go Server.updateAccountClaimsWithRefresh. + /// Lock must NOT be held on entry. + /// + public void UpdateAccountClaimsWithRefresh(Account a, AccountClaims ac, bool refreshImportingAccounts) + { + if (a == null) return; + if (ac == null) return; + + Debugf("Updating account claims: {0}/{1}", a.Name, ac.Subject); + + // Update name tag and tags from claims. + a.WriteLock(); + try + { + a.NameTag = ac.Name ?? string.Empty; + // Tags are stubbed — full JWT integration handles this. + + // Reset exports. + a.Exports = new ExportMap(); + + // Move current imports aside so we can clear them. + a.Imports.Streams = null; + a.Imports.Services = null; + } + finally + { + a.WriteUnlock(); + } + + // Update signing keys. + a.WriteLock(); + try + { + a.SigningKeys = null; + if (!_strictSigningKeyUsage.Contains(a.Issuer)) + { + a.SigningKeys = new Dictionary(StringComparer.Ordinal) + { + [a.Name] = null, + }; + } + } + finally + { + a.WriteUnlock(); + } + + // Apply limits from claims (stub — full JWT limits in session 11). + // For now just update the JWT timestamp. + a.Updated = DateTime.UtcNow; + a.ClaimJwt = ac.Subject; // placeholder — full decode in session 11. + } + + // ========================================================================= + // buildPermissionsFromJwt (Group E) + // Mirrors Go buildPermissionsFromJwt(uc *jwt.Permissions) *Permissions + // accounts.go ~line 3979. + // ========================================================================= + + /// + /// Builds a object from JWT NatsPermissions claims. + /// Returns null when the input is null or has no constraints. + /// Mirrors Go buildPermissionsFromJwt. + /// + public static Permissions? BuildPermissionsFromJwt(JwtPermissions? uc) + { + if (uc == null) return null; + + Permissions? p = null; + + if (uc.Publish.Allow?.Count > 0 || uc.Publish.Deny?.Count > 0) + { + p ??= new Permissions(); + p.Publish = new SubjectPermission + { + Allow = uc.Publish.Allow, + Deny = uc.Publish.Deny, + }; + } + + if (uc.Subscribe.Allow?.Count > 0 || uc.Subscribe.Deny?.Count > 0) + { + p ??= new Permissions(); + p.Subscribe = new SubjectPermission + { + Allow = uc.Subscribe.Allow, + Deny = uc.Subscribe.Deny, + }; + } + + if (uc.Response != null) + { + p ??= new Permissions(); + p.Response = new ResponsePermission + { + MaxMsgs = uc.Response.MaxMsgs, + Expires = uc.Response.Expires, + }; + ValidateResponsePermissions(p); + } + + return p; + } + + // ========================================================================= + // buildInternalNkeyUser (Group E) + // Mirrors Go buildInternalNkeyUser(uc *jwt.UserClaims, acts map[string]struct{}, acc *Account) + // accounts.go ~line 4012. + // ========================================================================= + + /// + /// Builds an internal from JWT user claims for use with + /// system-account communication. + /// Mirrors Go buildInternalNkeyUser. + /// + public static NkeyUser BuildInternalNkeyUser( + JwtUserClaims uc, + HashSet? allowedConnectionTypes, + Account acc) + { + var nu = new NkeyUser + { + Nkey = uc.Subject, + Account = acc, + AllowedConnectionTypes = allowedConnectionTypes, + Issued = uc.IssuedAt, + }; + + if (!string.IsNullOrEmpty(uc.IssuerAccount)) + nu.SigningKey = uc.Issuer; + + var p = BuildPermissionsFromJwt(uc.Permissions); + if (p == null && acc.DefaultPerms != null) + p = acc.DefaultPerms.Clone(); + + nu.Permissions = p; + return nu; + } + + // ========================================================================= + // fetchAccount (Group E) + // Mirrors Go fetchAccount(res AccountResolver, name string) (string, error) + // accounts.go ~line 4027. + // ========================================================================= + + /// + /// Fetches account JWT from after validating that + /// looks like a valid account public key. + /// Mirrors Go fetchAccount. + /// + public static async Task<(string Jwt, Exception? Error)> FetchAccountFromResolverAsync( + IAccountResolver resolver, + string name, + CancellationToken ct = default) + { + // In Go this validates via nkeys.IsValidPublicAccountKey. + // We perform a basic non-empty check until nkeys is fully integrated. + if (string.IsNullOrWhiteSpace(name)) + return (string.Empty, new InvalidOperationException("will only fetch valid account keys")); + + try + { + var jwt = await resolver.FetchAsync(name, ct).ConfigureAwait(false); + return (jwt, null); + } + catch (Exception ex) + { + return (string.Empty, ex); + } + } + + // ========================================================================= + // authAccounts (Group E) + // Mirrors Go authAccounts(tokenReq bool) []*Account + // accounts.go ~line 3259. + // ========================================================================= + + /// + /// Returns a sentinel non-null list when is true, + /// indicating that import of this service requires an auth token. + /// Mirrors Go authAccounts(tokenReq bool) []*Account. + /// + public static IReadOnlyList? AuthAccounts(bool tokenRequired) + => tokenRequired ? Array.Empty() : null; + + // ========================================================================= + // respondToUpdate (Group E) + // Mirrors Go respondToUpdate(s *Server, respSubj string, acc string, message string, err error) + // accounts.go ~line 4177. + // ========================================================================= + + /// + /// Logs the outcome of an account-update operation and, when a reply subject is + /// provided, sends a JSON response to that subject. + /// Mirrors Go respondToUpdate. + /// + public void RespondToUpdate(string replySubject, string acc, string message, Exception? err) + { + if (err == null) + { + if (string.IsNullOrEmpty(acc)) + Debugf("{0}", message); + else + Debugf("{0} - {1}", message, acc); + } + else + { + if (string.IsNullOrEmpty(acc)) + Errorf("{0} - {1}", message, err.Message); + else + Errorf("{0} - {1} - {2}", message, acc, err.Message); + } + + if (string.IsNullOrEmpty(replySubject)) + return; + + var response = err == null + ? (object)new ClaimUpdateResponse + { + Data = new ClaimUpdateStatus + { + Account = acc, + Code = 200, + Message = message, + }, + } + : new ClaimUpdateResponse + { + Error = new ClaimUpdateError + { + Account = acc, + Code = 500, + Description = $"{message} - {err.Message}", + }, + }; + + SendInternalMsgLocked(replySubject, response); + } + + // ========================================================================= + // handleListRequest (Group E) + // Mirrors Go handleListRequest(store *DirJWTStore, s *Server, reply string) + // accounts.go ~line 4216. + // ========================================================================= + + /// + /// Responds to an account-list API request by returning all known account IDs + /// from . + /// Mirrors Go handleListRequest. + /// + public void HandleListRequest(DirJwtStore store, string reply) + { + if (string.IsNullOrEmpty(reply)) return; + + var accIds = new List(capacity: 1024); + + try + { + store.PackWalk(1, partialPackMsg => + { + var tk = partialPackMsg.Split('|'); + if (tk.Length == 2) + accIds.Add(tk[0]); + }); + + Debugf("list request responded with {0} account ids", accIds.Count); + var response = new { data = accIds }; + SendInternalMsgLocked(reply, response); + } + catch (Exception ex) + { + Errorf("list request error: {0}", ex.Message); + } + } + + // ========================================================================= + // handleDeleteRequest (Group E) + // Mirrors Go handleDeleteRequest(store *DirJWTStore, s *Server, msg []byte, reply string) + // accounts.go ~line 4236. + // ========================================================================= + + /// + /// Handles an account-delete API request by decoding the signed JWT payload, + /// validating the issuer, and deleting the listed accounts. + /// Mirrors Go handleDeleteRequest. + /// + public void HandleDeleteRequest(DirJwtStore store, ReadOnlySpan msg, string reply) + { + // Full JWT decode requires nkeys integration (session 11). + // Stub: return an error response indicating not yet supported. + RespondToUpdate( + reply, + string.Empty, + "delete accounts request", + new NotSupportedException("delete request handling requires full JWT integration")); + } + + // ========================================================================= + // getOperatorKeys (Group E) + // Mirrors Go getOperatorKeys(s *Server) (string, map[string]struct{}, bool, error) + // accounts.go ~line 4290. + // ========================================================================= + + /// + /// Returns the primary operator public key, a set of all signing keys, and + /// whether strict signing-key usage is enforced. + /// Mirrors Go getOperatorKeys. + /// + public (string Operator, HashSet Keys, bool Strict, Exception? Error) GetOperatorKeys() + { + var opts = GetOpts(); + if (opts.TrustedOperators == null || opts.TrustedOperators.Count == 0) + return (string.Empty, new HashSet(), false, new InvalidOperationException("no operator key found")); + + // TrustedOperators is stored as List (stub until full JWT integration). + // Return a basic stub result with the trusted keys we do have. + var keys = new HashSet(StringComparer.Ordinal); + foreach (var k in _trustedKeys ?? []) + keys.Add(k); + + if (keys.Count == 0) + return (string.Empty, keys, false, new InvalidOperationException("no operator key found")); + + var op = keys.Count > 0 ? keys.First() : string.Empty; + var strict = _strictSigningKeyUsage.Count > 0; + + return (op, keys, strict, null); + } + + // ========================================================================= + // claimValidate (Group E) + // Mirrors Go claimValidate(claim *jwt.AccountClaims) error + // accounts.go ~line 4310. + // ========================================================================= + + /// + /// Validates for blocking validation errors. + /// Full JWT validation requires nkeys integration; this stub validates + /// that basic required fields are present. + /// Mirrors Go claimValidate. + /// + public static Exception? ClaimValidate(AccountClaims claim) + { + if (claim == null) + return new InvalidOperationException("nil account claim"); + if (string.IsNullOrEmpty(claim.Subject)) + return new InvalidOperationException("account claim has no subject"); + if (string.IsNullOrEmpty(claim.Issuer)) + return new InvalidOperationException("account claim has no issuer"); + return null; + } + + // ========================================================================= + // removeCb (Group E) + // Mirrors Go removeCb(s *Server, pubKey string) + // accounts.go ~line 4319. + // ========================================================================= + + /// + /// Disables an account that has been removed from the directory resolver by + /// locking out new clients, expiring the account, and disabling JetStream. + /// Mirrors Go removeCb. + /// + public void RemoveCb(string pubKey) + { + if (!_accounts.TryGetValue(pubKey, out var a)) return; + + Debugf("Disable account {0} due to remove", pubKey); + + a.WriteLock(); + try + { + a.MaxSubscriptions = 0; + a.MaxPayload = 0; + a.MaxConnections = 0; + a.MaxLeafNodes = 0; + a.Updated = DateTime.UtcNow; + } + finally + { + a.WriteUnlock(); + } + + // Trigger account expiration to disconnect existing clients. + a.ExpiredTimeout(); + } + + // ========================================================================= + // Server.fetch (Group F) + // Mirrors Go (s *Server) fetch(res AccountResolver, name string, timeout time.Duration) + // accounts.go ~line 4599. + // ========================================================================= + + /// + /// Performs a server-level cluster-wide account lookup using the system messaging + /// infrastructure. + /// + /// The full implementation sends a $SYS.REQ.ACCOUNT.{name}.CLAIMS.UPDATE + /// request and waits for peer responses; that path requires the system messaging + /// subscription infrastructure from session 12. This stub falls back to a direct + /// resolver fetch. + /// + /// Mirrors Go Server.fetch. + /// + public (string Jwt, Exception? Error) ServerFetch( + IAccountResolver resolver, + string name, + TimeSpan timeout) + { + if (resolver == null) + return (string.Empty, ServerErrors.ErrNoAccountResolver); + + try + { + using var cts = new CancellationTokenSource(timeout); + var jwt = resolver.FetchAsync(name, cts.Token).GetAwaiter().GetResult(); + return (jwt, null); + } + catch (OperationCanceledException) + { + return (string.Empty, new TimeoutException($"fetch timed out for account {name}")); + } + catch (Exception ex) + { + return (string.Empty, ex); + } + } + + // ========================================================================= + // Stub private helpers used by the methods above + // ========================================================================= + + /// + /// Sends an internal message on with the given payload. + /// Stub — full wiring deferred to session 12 (events.go). + /// + internal void SendInternalMsgLocked(string subject, object payload) + { + // TODO (session 12): implement system-account send loop. + Debugf("sendInternalMsgLocked on {0} (stub)", subject); + } + + /// + /// Validates and clamps a to legal ranges. + /// Mirrors Go validateResponsePermissions in auth.go. + /// + private static void ValidateResponsePermissions(Permissions p) + { + if (p?.Response == null) return; + if (p.Response.MaxMsgs < 0) p.Response.MaxMsgs = 0; + if (p.Response.Expires < TimeSpan.Zero) p.Response.Expires = TimeSpan.Zero; + } +} + +// ============================================================================ +// JWT stub types used by BuildPermissionsFromJwt / BuildInternalNkeyUser +// Full implementation deferred to session 11 JWT integration. +// ============================================================================ + +/// +/// Stub for a JWT publish/subscribe permission pair. +/// Mirrors Go jwt.Permission. +/// +public sealed class JwtSubjectPermission +{ + public List? Allow { get; set; } + public List? Deny { get; set; } +} + +/// +/// Stub for JWT response permissions. +/// Mirrors Go jwt.RespPermission. +/// +public sealed class JwtResponsePermission +{ + public int MaxMsgs { get; set; } + public TimeSpan Expires { get; set; } +} + +/// +/// Stub for JWT combined permissions (pub, sub, response). +/// Mirrors Go jwt.Permissions. +/// +public sealed class JwtPermissions +{ + public JwtSubjectPermission Publish { get; set; } = new(); + public JwtSubjectPermission Subscribe { get; set; } = new(); + public JwtResponsePermission? Response { get; set; } +} + +/// +/// Stub for JWT user claims. +/// Mirrors Go jwt.UserClaims. +/// +public sealed class JwtUserClaims +{ + public string Subject { get; set; } = string.Empty; + public string Issuer { get; set; } = string.Empty; + public string IssuerAccount { get; set; } = string.Empty; + public long IssuedAt { get; set; } + public JwtPermissions? Permissions { get; set; } +} + +// AccountClaims.Name is defined directly on the type in NatsServerTypes.cs. + +// ============================================================================ +// API response types for account resolver operations +// Mirrors Go ServerAPIClaimUpdateResponse, ClaimUpdateStatus, ClaimUpdateError +// accounts.go ~line 4159-4175. +// ============================================================================ + +/// +/// Response envelope for account JWT claim update API calls. +/// Mirrors Go ServerAPIClaimUpdateResponse. +/// +public sealed class ClaimUpdateResponse +{ + public ClaimUpdateStatus? Data { get; set; } + public ClaimUpdateError? Error { get; set; } +} + +/// +/// Success payload for account JWT claim update. +/// Mirrors Go ClaimUpdateStatus. +/// +public sealed class ClaimUpdateStatus +{ + public string Account { get; set; } = string.Empty; + public int Code { get; set; } + public string Message { get; set; } = string.Empty; +} + +/// +/// Error payload for account JWT claim update. +/// Mirrors Go ClaimUpdateError. +/// +public sealed class ClaimUpdateError +{ + public string Account { get; set; } = string.Empty; + public int Code { get; set; } + public string Description { get; set; } = string.Empty; +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs index 9bc5d10..f59d247 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs @@ -360,6 +360,12 @@ public sealed class AccountClaims /// Operator or signing-key that issued this JWT. public string Issuer { get; set; } = string.Empty; + /// + /// Human-readable name tag for the account (JWT claim field name). + /// Mirrors Go jwt.AccountClaims.Name. + /// + public string? Name { get; set; } + /// /// Minimal stub decoder — returns null until session 11 provides full JWT parsing. /// In Go: jwt.DecodeAccountClaims(claimJWT).