diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs index 2e3371f..9718c11 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs @@ -17,6 +17,7 @@ using ZB.MOM.NatsNet.Server.Auth; using ZB.MOM.NatsNet.Server.Internal; using ZB.MOM.NatsNet.Server.Internal.DataStructures; using System.Text; +using System.Text.Json; namespace ZB.MOM.NatsNet.Server; @@ -475,6 +476,29 @@ public sealed class Account : INatsAccount } } + /// + /// Sets account-level message trace destination subject. + /// Mirrors Go (a *Account) setTraceDest(dest string). + /// + internal void SetTraceDest(string dest) => SetMessageTraceDestination(dest); + + /// + /// Returns trace destination and sampling. + /// Mirrors Go (a *Account) getTraceDestAndSampling() (string, int). + /// + internal (string Destination, int Sampling) GetTraceDestAndSampling() + { + _mu.EnterReadLock(); + try + { + return (_traceDest, _traceDestSampling); + } + finally + { + _mu.ExitReadLock(); + } + } + // ------------------------------------------------------------------------- // Factory // ------------------------------------------------------------------------- @@ -502,6 +526,12 @@ public sealed class Account : INatsAccount /// public override string ToString() => Name; + /// + /// Returns the account name. + /// Mirrors Go (a *Account) String() string. + /// + public string String() => Name; + // ------------------------------------------------------------------------- // Shallow copy for config reload // ------------------------------------------------------------------------- @@ -966,6 +996,12 @@ public sealed class Account : INatsAccount internal int NumLocalConnectionsLocked() => (_clients?.Count ?? 0) - _sysclients - _nleafs; + /// + /// Returns local non-system, non-leaf client count. Lock must be held. + /// Mirrors Go (a *Account) numLocalConnections() int. + /// + internal int NumLocalConnectionsInternal() => NumLocalConnectionsLocked(); + /// /// Returns all local connections including leaf nodes (minus system clients). /// Mirrors Go (a *Account) numLocalAndLeafConnections() int. @@ -1049,6 +1085,13 @@ public sealed class Account : INatsAccount return _nleafs + _nrleafs >= MaxLeafNodes; } + /// + /// Returns true if total leaf-node count reached the configured maximum. + /// Lock must be held by the caller. + /// Mirrors Go (a *Account) maxTotalLeafNodesReached() bool. + /// + internal bool MaxTotalLeafNodesReachedInternal() => MaxTotalLeafNodesReachedLocked(); + /// /// Returns the total leaf-node count (local + remote). /// Mirrors Go (a *Account) NumLeafNodes() int. @@ -1115,6 +1158,93 @@ public sealed class Account : INatsAccount } } + /// + /// Returns true when there is at least one matching subscription for . + /// Mirrors Go (a *Account) SubscriptionInterest(subject string) bool. + /// + public bool SubscriptionInterest(string subject) => Interest(subject) > 0; + + /// + /// Returns total number of plain and queue subscriptions matching . + /// Mirrors Go (a *Account) Interest(subject string) int. + /// + public int Interest(string subject) + { + _mu.EnterReadLock(); + try + { + if (Sublist == null) + return 0; + + var (np, nq) = Sublist.NumInterest(subject); + return np + nq; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Increments the leaf-node count for a remote cluster. + /// Mirrors Go (a *Account) registerLeafNodeCluster(cluster string). + /// + internal void RegisterLeafNodeCluster(string cluster) + { + _mu.EnterWriteLock(); + try + { + _leafClusters ??= new Dictionary(StringComparer.Ordinal); + _leafClusters.TryGetValue(cluster, out var current); + _leafClusters[cluster] = current + 1; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Returns true when this account already tracks one or more leaf nodes from . + /// Mirrors Go (a *Account) hasLeafNodeCluster(cluster string) bool. + /// + internal bool HasLeafNodeCluster(string cluster) + { + _mu.EnterReadLock(); + try + { + return _leafClusters != null && + _leafClusters.TryGetValue(cluster, out var count) && + count > 0; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Returns true when the account is leaf-cluster isolated to . + /// Mirrors Go (a *Account) isLeafNodeClusterIsolated(cluster string) bool. + /// + internal bool IsLeafNodeClusterIsolated(string cluster) + { + _mu.EnterReadLock(); + try + { + if (string.IsNullOrEmpty(cluster)) + return false; + if (_leafClusters == null || _leafClusters.Count > 1) + return false; + + return _leafClusters.TryGetValue(cluster, out var count) && count == (ulong)_nleafs; + } + finally + { + _mu.ExitReadLock(); + } + } + // ------------------------------------------------------------------------- // Subscription limit error throttle // ------------------------------------------------------------------------- @@ -1460,6 +1590,1913 @@ public sealed class Account : INatsAccount } } + // ------------------------------------------------------------------------- + // Service export configuration + // ------------------------------------------------------------------------- + + /// + /// Configures an exported service with singleton response semantics. + /// Mirrors Go (a *Account) AddServiceExport(subject string, accounts []*Account) error. + /// + public Exception? AddServiceExport(string subject, IReadOnlyList? accounts = null) => + AddServiceExportWithResponseAndAccountPos(subject, ServiceRespType.Singleton, accounts, 0); + + /// + /// Configures an exported service with singleton response semantics and account-position auth. + /// Mirrors Go (a *Account) addServiceExportWithAccountPos(...). + /// + public Exception? AddServiceExportWithAccountPos(string subject, IReadOnlyList? accounts, uint accountPos) => + AddServiceExportWithResponseAndAccountPos(subject, ServiceRespType.Singleton, accounts, accountPos); + + /// + /// Configures an exported service with explicit response type. + /// Mirrors Go (a *Account) AddServiceExportWithResponse(...). + /// + public Exception? AddServiceExportWithResponse(string subject, ServiceRespType respType, IReadOnlyList? accounts = null) => + AddServiceExportWithResponseAndAccountPos(subject, respType, accounts, 0); + + /// + /// Configures an exported service with explicit response type and account-position auth. + /// Mirrors Go (a *Account) addServiceExportWithResponseAndAccountPos(...). + /// + public Exception? AddServiceExportWithResponseAndAccountPos(string subject, ServiceRespType respType, IReadOnlyList? accounts, uint accountPos) + { + if (!SubscriptionIndex.IsValidSubject(subject)) + return ServerErrors.ErrBadSubject; + + _mu.EnterWriteLock(); + try + { + Exports.Services ??= new Dictionary(StringComparer.Ordinal); + + if (!Exports.Services.TryGetValue(subject, out var serviceExport) || serviceExport == null) + serviceExport = new ServiceExportEntry(); + + if (respType != ServiceRespType.Singleton) + serviceExport.ResponseType = respType; + + if (accounts != null || accountPos > 0) + { + var authErr = SetExportAuth(serviceExport, subject, accounts, accountPos); + if (authErr != null) + return authErr; + } + + serviceExport.Account = this; + serviceExport.ResponseThreshold = ServerConstants.DefaultServiceExportResponseThreshold; + Exports.Services[subject] = serviceExport; + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Enables latency tracking for with default sampling. + /// Mirrors Go (a *Account) TrackServiceExport(service, results string) error. + /// + public Exception? TrackServiceExport(string service, string results) => + TrackServiceExportWithSampling(service, results, ServerConstants.DefaultServiceLatencySampling); + + /// + /// Enables latency tracking for with explicit sampling. + /// Mirrors Go (a *Account) TrackServiceExportWithSampling(...). + /// + public Exception? TrackServiceExportWithSampling(string service, string results, int sampling) + { + if (sampling != 0 && (sampling < 1 || sampling > 100)) + return ServerErrors.ErrBadSampling; + if (!SubscriptionIndex.IsValidPublishSubject(results)) + return ServerErrors.ErrBadPublishSubject; + if (IsExportService(results)) + return ServerErrors.ErrBadPublishSubject; + + _mu.EnterWriteLock(); + try + { + if (Exports.Services == null) + return ServerErrors.ErrMissingService; + if (!Exports.Services.TryGetValue(service, out var serviceExport)) + return ServerErrors.ErrMissingService; + + serviceExport ??= new ServiceExportEntry(); + if (serviceExport.ResponseType != ServiceRespType.Singleton) + return ServerErrors.ErrBadServiceType; + + serviceExport.Latency = new InternalServiceLatency + { + Sampling = sampling, + Subject = results, + }; + Exports.Services[service] = serviceExport; + + if (Imports.Services != null) + { + foreach (var imports in Imports.Services.Values) + { + foreach (var import in imports) + { + if (import?.Account?.Name != Name) + continue; + if (SubjectTransform.IsSubsetMatch(SubjectTransform.TokenizeSubject(import.To), service)) + import.Latency = serviceExport.Latency; + } + } + } + + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Disables latency tracking for the exported service. + /// Mirrors Go (a *Account) UnTrackServiceExport(service string). + /// + public void UnTrackServiceExport(string service) + { + _mu.EnterWriteLock(); + try + { + if (Exports.Services == null || !Exports.Services.TryGetValue(service, out var serviceExport) || serviceExport?.Latency == null) + return; + + serviceExport.Latency = null; + + if (Imports.Services == null) + return; + + foreach (var imports in Imports.Services.Values) + { + foreach (var import in imports) + { + if (import?.Account?.Name != Name) + continue; + if (SubjectTransform.IsSubsetMatch(SubjectTransform.TokenizeSubject(import.To), service)) + { + import.Latency = null; + import.M1 = null; + } + } + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Publishes a service-latency metric for an import. + /// Mirrors Go (a *Account) sendLatencyResult(...). + /// + internal void SendLatencyResult(ServiceImportEntry si, ServiceLatency sl) + { + sl.Type = AccountEventConstants.ServiceLatencyType; + sl.Id = NextEventId(); + sl.Time = DateTime.UtcNow; + + string? latencySubject; + _mu.EnterWriteLock(); + try + { + latencySubject = si.Latency?.Subject; + si.RequestingClient = null; + } + finally + { + _mu.ExitWriteLock(); + } + + if (string.IsNullOrWhiteSpace(latencySubject) || Server is not NatsServer server) + return; + + var payload = JsonSerializer.SerializeToUtf8Bytes(sl); + _ = server.SendInternalAccountMsg(this, latencySubject, payload); + } + + /// + /// Publishes a bad-request latency metric (missing or invalid request shape). + /// Mirrors Go (a *Account) sendBadRequestTrackingLatency(...). + /// + internal void SendBadRequestTrackingLatency(ServiceImportEntry si, ClientConnection requestor, Dictionary? header) + { + var sl = new ServiceLatency + { + Status = 400, + Error = "Bad Request", + Requestor = CreateClientInfo(requestor, si.Share), + RequestHeader = header, + RequestStart = DateTime.UtcNow.Subtract(requestor.GetRttValue()), + }; + SendLatencyResult(si, sl); + } + + /// + /// Publishes timeout latency when requestor interest is lost before response delivery. + /// Mirrors Go (a *Account) sendReplyInterestLostTrackLatency(...). + /// + internal void SendReplyInterestLostTrackLatency(ServiceImportEntry si) + { + var sl = new ServiceLatency + { + Status = 408, + Error = "Request Timeout", + }; + + ClientConnection? requestor; + bool share; + long timestamp; + _mu.EnterReadLock(); + try + { + requestor = si.RequestingClient; + share = si.Share; + timestamp = si.Timestamp; + sl.RequestHeader = si.TrackingHeader; + } + finally + { + _mu.ExitReadLock(); + } + + if (requestor != null) + sl.Requestor = CreateClientInfo(requestor, share); + + var reqRtt = sl.Requestor?.Rtt ?? TimeSpan.Zero; + sl.RequestStart = UnixNanoToDateTime(timestamp - TimeSpanToUnixNanos(reqRtt)); + SendLatencyResult(si, sl); + } + + /// + /// Publishes backend failure latency for response-service imports. + /// Mirrors Go (a *Account) sendBackendErrorTrackingLatency(...). + /// + internal void SendBackendErrorTrackingLatency(ServiceImportEntry si, RsiReason reason) + { + var sl = new ServiceLatency(); + + ClientConnection? requestor; + bool share; + long timestamp; + _mu.EnterReadLock(); + try + { + requestor = si.RequestingClient; + share = si.Share; + timestamp = si.Timestamp; + sl.RequestHeader = si.TrackingHeader; + } + finally + { + _mu.ExitReadLock(); + } + + if (requestor != null) + sl.Requestor = CreateClientInfo(requestor, share); + + var reqRtt = sl.Requestor?.Rtt ?? TimeSpan.Zero; + sl.RequestStart = UnixNanoToDateTime(timestamp - TimeSpanToUnixNanos(reqRtt)); + + if (reason == RsiReason.NoDelivery) + { + sl.Status = 503; + sl.Error = "Service Unavailable"; + } + else if (reason == RsiReason.Timeout) + { + sl.Status = 504; + sl.Error = "Service Timeout"; + } + + SendLatencyResult(si, sl); + } + + /// + /// Sends request/response latency metrics. Returns true when complete, false when waiting for remote-half merge. + /// Mirrors Go (a *Account) sendTrackingLatency(...). + /// + internal bool SendTrackingLatency(ServiceImportEntry si, ClientConnection? responder) + { + _mu.EnterReadLock(); + var requestor = si.RequestingClient; + _mu.ExitReadLock(); + + if (requestor == null) + return true; + + var nowUnixNanos = UtcNowUnixNanos(); + var serviceRtt = UnixNanosToTimeSpan(Math.Max(0, nowUnixNanos - si.Timestamp)); + var sl = new ServiceLatency + { + Status = 200, + Requestor = CreateClientInfo(requestor, si.Share), + Responder = responder == null ? null : CreateClientInfo(responder, true), + RequestHeader = si.TrackingHeader, + }; + + var respRtt = sl.Responder?.Rtt ?? TimeSpan.Zero; + var reqRtt = sl.Requestor?.Rtt ?? TimeSpan.Zero; + sl.RequestStart = UnixNanoToDateTime(si.Timestamp - TimeSpanToUnixNanos(reqRtt)); + sl.ServiceLatencyDuration = serviceRtt > respRtt ? serviceRtt - respRtt : TimeSpan.Zero; + sl.TotalLatency = reqRtt + serviceRtt; + if (respRtt > TimeSpan.Zero) + { + sl.SystemLatency = DateTime.UtcNow - UnixNanoToDateTime(nowUnixNanos); + if (sl.SystemLatency < TimeSpan.Zero) + sl.SystemLatency = TimeSpan.Zero; + sl.TotalLatency += sl.SystemLatency; + } + + if (responder != null && responder.Kind != ClientKind.Client) + { + if (si.M1 != null) + { + SendLatencyResult(si, sl); + return true; + } + + _mu.EnterWriteLock(); + try + { + si.M1 = sl; + } + finally + { + _mu.ExitWriteLock(); + } + return false; + } + + SendLatencyResult(si, sl); + return true; + } + + /// + /// Returns the lowest response threshold configured across all service exports. + /// Mirrors Go (a *Account) lowestServiceExportResponseTime() time.Duration. + /// + internal TimeSpan LowestServiceExportResponseTime() + { + var lowest = TimeSpan.FromMinutes(5); + + _mu.EnterReadLock(); + try + { + if (Exports.Services == null) + return lowest; + + foreach (var export in Exports.Services.Values) + { + if (export != null && export.ResponseThreshold < lowest) + lowest = export.ResponseThreshold; + } + + return lowest; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Adds a service import with claim authorization context. + /// Mirrors Go (a *Account) AddServiceImportWithClaim(...). + /// + public Exception? AddServiceImportWithClaim(Account destination, string from, string to, object? imClaim) => + AddServiceImportWithClaimInternal(destination, from, to, imClaim, false); + + /// + /// Internal service-import add path with optional authorization bypass. + /// Mirrors Go (a *Account) addServiceImportWithClaim(..., internal bool). + /// + internal Exception? AddServiceImportWithClaimInternal(Account destination, string from, string to, object? imClaim, bool internalRequest) + { + if (destination == null) + return ServerErrors.ErrMissingAccount; + + if (string.IsNullOrEmpty(to)) + to = from; + if (!SubscriptionIndex.IsValidSubject(from) || !SubscriptionIndex.IsValidSubject(to)) + return SubscriptionIndex.ErrInvalidSubject; + + if (!internalRequest && !destination.CheckServiceExportApproved(this, to, imClaim)) + return ServerErrors.ErrServiceImportAuthorization; + + var cycleErr = ServiceImportFormsCycle(destination, from); + if (cycleErr != null) + return cycleErr; + + var (_, addErr) = AddServiceImportInternal(destination, from, to, imClaim); + return addErr; + } + + /// + /// Checks whether adding a service import forms an account cycle. + /// Mirrors Go (a *Account) serviceImportFormsCycle(...). + /// + internal Exception? ServiceImportFormsCycle(Account destination, string from) + { + var visited = new HashSet(StringComparer.Ordinal) { Name }; + return destination.CheckServiceImportsForCycles(from, visited); + } + + /// + /// Recursively checks service-import graph for cycles. + /// Mirrors Go (a *Account) checkServiceImportsForCycles(...). + /// + internal Exception? CheckServiceImportsForCycles(string from, HashSet visited) + { + if (visited.Count >= AccountConstants.MaxCycleSearchDepth) + return ServerErrors.ErrCycleSearchDepth; + + List? snapshot = null; + _mu.EnterReadLock(); + try + { + if (Imports.Services == null || Imports.Services.Count == 0) + return null; + + snapshot = []; + foreach (var entries in Imports.Services.Values) + snapshot.AddRange(entries); + } + finally + { + _mu.ExitReadLock(); + } + + foreach (var import in snapshot) + { + if (import?.Account == null) + continue; + if (!SubscriptionIndex.SubjectsCollide(from, import.To)) + continue; + + if (visited.Contains(import.Account.Name)) + return ServerErrors.ErrImportFormsCycle; + + visited.Add(Name); + var nextFrom = SubscriptionIndex.SubjectIsSubsetMatch(import.From, from) ? import.From : from; + var err = import.Account.CheckServiceImportsForCycles(nextFrom, visited); + if (err != null) + return err; + } + + return null; + } + + /// + /// Checks whether adding a stream import forms an account cycle. + /// Mirrors Go (a *Account) streamImportFormsCycle(...). + /// + internal Exception? StreamImportFormsCycle(Account destination, string to) + { + var visited = new HashSet(StringComparer.Ordinal) { Name }; + return destination.CheckStreamImportsForCycles(to, visited); + } + + /// + /// Returns true when any service export subject can match . + /// Mirrors Go (a *Account) hasServiceExportMatching(to string) bool. + /// + internal bool HasServiceExportMatching(string to) + { + if (Exports.Services == null) + return false; + + foreach (var subject in Exports.Services.Keys) + { + if (SubscriptionIndex.SubjectIsSubsetMatch(to, subject)) + return true; + } + + return false; + } + + /// + /// Returns true when any stream export subject can match . + /// Mirrors Go (a *Account) hasStreamExportMatching(to string) bool. + /// + internal bool HasStreamExportMatching(string to) + { + if (Exports.Streams == null) + return false; + + foreach (var subject in Exports.Streams.Keys) + { + if (SubscriptionIndex.SubjectIsSubsetMatch(to, subject)) + return true; + } + + return false; + } + + /// + /// Recursively checks stream-import graph for cycles. + /// Mirrors Go (a *Account) checkStreamImportsForCycles(...). + /// + internal Exception? CheckStreamImportsForCycles(string to, HashSet visited) + { + if (visited.Count >= AccountConstants.MaxCycleSearchDepth) + return ServerErrors.ErrCycleSearchDepth; + + _mu.EnterReadLock(); + var hasMatchingExport = HasStreamExportMatching(to); + var streams = Imports.Streams == null ? null : new List(Imports.Streams); + _mu.ExitReadLock(); + + if (!hasMatchingExport || streams == null || streams.Count == 0) + return null; + + foreach (var stream in streams) + { + if (stream?.Account == null) + continue; + if (!SubscriptionIndex.SubjectsCollide(to, stream.To)) + continue; + + if (visited.Contains(stream.Account.Name)) + return ServerErrors.ErrImportFormsCycle; + + visited.Add(Name); + var nextTo = SubscriptionIndex.SubjectIsSubsetMatch(stream.To, to) ? stream.To : to; + var err = stream.Account.CheckStreamImportsForCycles(nextTo, visited); + if (err != null) + return err; + } + + return null; + } + + /// + /// Allows or disallows request metadata sharing for a service import. + /// Mirrors Go (a *Account) SetServiceImportSharing(...). + /// + public Exception? SetServiceImportSharing(Account destination, string to, bool allow) => + SetServiceImportSharingInternal(destination, to, true, allow); + + /// + /// Internal service-import sharing setter with optional claim-account check bypass. + /// Mirrors Go (a *Account) setServiceImportSharing(...). + /// + internal Exception? SetServiceImportSharingInternal(Account destination, string to, bool check, bool allow) + { + _mu.EnterWriteLock(); + try + { + if (check && IsClaimAccount()) + return new InvalidOperationException("claim based accounts can not be updated directly"); + + if (Imports.Services == null) + return new InvalidOperationException("service import not found"); + + foreach (var imports in Imports.Services.Values) + { + foreach (var import in imports) + { + if (import?.Account?.Name == destination.Name && import.To == to) + { + import.Share = allow; + return null; + } + } + } + + return new InvalidOperationException("service import not found"); + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Adds a service import from this account to . + /// Mirrors Go (a *Account) AddServiceImport(destination, from, to string) error. + /// + public Exception? AddServiceImport(Account destination, string from, string to) => + AddServiceImportWithClaim(destination, from, to, null); + + /// + /// Number of pending reverse-response map entries. + /// Mirrors Go (a *Account) NumPendingReverseResponses() int. + /// + public int NumPendingReverseResponses() + { + _mu.EnterReadLock(); + try { return Imports.ReverseResponseMap?.Count ?? 0; } + finally { _mu.ExitReadLock(); } + } + + /// + /// Total number of pending response imports across all service exports. + /// Mirrors Go (a *Account) NumPendingAllResponses() int. + /// + public int NumPendingAllResponses() => NumPendingResponses(string.Empty); + + /// + /// Number of pending response imports, optionally filtered by exported service subject. + /// Mirrors Go (a *Account) NumPendingResponses(filter string) int. + /// + public int NumPendingResponses(string filter) + { + _mu.EnterReadLock(); + try + { + if (string.IsNullOrEmpty(filter)) + return Exports.Responses?.Count ?? 0; + + var export = GetServiceExport(filter); + if (export == null || Exports.Responses == null) + return 0; + + var count = 0; + foreach (var import in Exports.Responses.Values) + { + if (ReferenceEquals(import.ServiceExport, export)) + count++; + } + return count; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Number of configured service-import subject keys. + /// Mirrors Go (a *Account) NumServiceImports() int. + /// + public int NumServiceImports() + { + _mu.EnterReadLock(); + try { return Imports.Services?.Count ?? 0; } + finally { _mu.ExitReadLock(); } + } + + /// + /// Removes a response service import and performs reverse-map cleanup. + /// Mirrors Go (a *Account) removeRespServiceImport(...). + /// + internal void RemoveRespServiceImport(ServiceImportEntry? serviceImport, RsiReason reason) + { + if (serviceImport == null) + return; + + Account? destination; + string from; + string to; + bool tracking; + bool delivered; + ClientConnection? requestor; + byte[]? sid; + + _mu.EnterWriteLock(); + try + { + if (Exports.Responses != null) + Exports.Responses.Remove(serviceImport.From); + + destination = serviceImport.Account; + from = serviceImport.From; + to = serviceImport.To; + tracking = serviceImport.Tracking; + delivered = serviceImport.DidDeliver; + requestor = serviceImport.RequestingClient; + sid = serviceImport.SubscriptionId; + } + finally + { + _mu.ExitWriteLock(); + } + + if (sid is { Length: > 0 } && InternalClient != null) + InternalClient.ProcessUnsub(sid); + + if (tracking && requestor != null && !delivered) + SendBackendErrorTrackingLatency(serviceImport, reason); + + destination?.CheckForReverseEntry(to, serviceImport, false); + } + + /// + /// Gets a service import for a specific destination account and subject key. + /// Lock must be held by caller. + /// Mirrors Go (a *Account) getServiceImportForAccountLocked(...). + /// + internal ServiceImportEntry? GetServiceImportForAccountLocked(string destinationAccountName, string subject) + { + if (Imports.Services == null || !Imports.Services.TryGetValue(subject, out var serviceImports)) + return null; + + if (serviceImports.Count == 1 && serviceImports[0].Account?.Name == destinationAccountName) + return serviceImports[0]; + + foreach (var serviceImport in serviceImports) + { + if (serviceImport.Account?.Name == destinationAccountName) + return serviceImport; + } + + return null; + } + + /// + /// Removes a service import mapping by destination account name and subject key. + /// Mirrors Go (a *Account) removeServiceImport(dstAccName, subject string). + /// + internal void RemoveServiceImport(string destinationAccountName, string subject) + { + ServiceImportEntry? removed = null; + byte[]? sid = null; + + _mu.EnterWriteLock(); + try + { + if (Imports.Services == null || !Imports.Services.TryGetValue(subject, out var serviceImports)) + return; + + if (serviceImports.Count == 1) + { + if (serviceImports[0].Account?.Name == destinationAccountName) + { + removed = serviceImports[0]; + Imports.Services.Remove(subject); + } + } + else + { + for (var i = 0; i < serviceImports.Count; i++) + { + if (serviceImports[i].Account?.Name == destinationAccountName) + { + removed = serviceImports[i]; + serviceImports.RemoveAt(i); + Imports.Services[subject] = serviceImports; + break; + } + } + } + + if (removed?.SubscriptionId is { Length: > 0 }) + sid = removed.SubscriptionId; + } + finally + { + _mu.ExitWriteLock(); + } + + if (sid != null && InternalClient != null) + InternalClient.ProcessUnsub(sid); + } + + /// + /// Adds an entry to the reverse-response map for response cleanup. + /// Mirrors Go (a *Account) addReverseRespMapEntry(...). + /// + internal void AddReverseRespMapEntry(Account account, string reply, string from) + { + _mu.EnterWriteLock(); + try + { + Imports.ReverseResponseMap ??= new Dictionary>(StringComparer.Ordinal); + if (!Imports.ReverseResponseMap.TryGetValue(reply, out var entries)) + { + entries = []; + Imports.ReverseResponseMap[reply] = entries; + } + + entries.Add(new ServiceRespEntry + { + Account = account, + MappedSubject = from, + }); + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Checks reverse-response entries for wildcard replies. + /// Mirrors Go (a *Account) checkForReverseEntries(...). + /// + internal void CheckForReverseEntries(string reply, bool checkInterest, bool recursed) + { + if (!SubscriptionIndex.SubjectHasWildcard(reply)) + { + CheckForReverseEntry(reply, null, checkInterest, recursed); + return; + } + + List replies; + _mu.EnterReadLock(); + try + { + if (Imports.ReverseResponseMap == null || Imports.ReverseResponseMap.Count == 0) + return; + replies = [.. Imports.ReverseResponseMap.Keys]; + } + finally + { + _mu.ExitReadLock(); + } + + var replyTokens = SubjectTransform.TokenizeSubject(reply); + foreach (var candidate in replies) + { + if (SubjectTransform.IsSubsetMatch(SubjectTransform.TokenizeSubject(candidate), reply)) + CheckForReverseEntry(candidate, null, checkInterest, recursed); + else if (SubjectTransform.IsSubsetMatch(replyTokens, candidate)) + CheckForReverseEntry(candidate, null, checkInterest, recursed); + } + } + + /// + /// Checks and optionally removes reverse-response entries. + /// Mirrors Go (a *Account) checkForReverseEntry(...). + /// + internal void CheckForReverseEntry(string reply, ServiceImportEntry? serviceImport, bool checkInterest) => + CheckForReverseEntry(reply, serviceImport, checkInterest, false); + + /// + /// Internal reverse-entry checker with recursion protection. + /// Mirrors Go (a *Account) _checkForReverseEntry(...). + /// + internal void CheckForReverseEntry(string reply, ServiceImportEntry? serviceImport, bool checkInterest, bool recursed) + { + List? responseEntries; + + _mu.EnterReadLock(); + try + { + if (Imports.ReverseResponseMap == null || Imports.ReverseResponseMap.Count == 0) + return; + + if (SubscriptionIndex.SubjectHasWildcard(reply)) + { + if (recursed) + return; + } + else if (!Imports.ReverseResponseMap.TryGetValue(reply, out responseEntries) || responseEntries == null) + { + return; + } + else if (checkInterest && Sublist != null && Sublist.HasInterest(reply)) + { + return; + } + } + finally + { + _mu.ExitReadLock(); + } + + if (SubscriptionIndex.SubjectHasWildcard(reply)) + { + CheckForReverseEntries(reply, checkInterest, true); + return; + } + + _mu.EnterWriteLock(); + try + { + if (Imports.ReverseResponseMap == null || !Imports.ReverseResponseMap.TryGetValue(reply, out responseEntries) || responseEntries == null) + return; + + if (serviceImport == null) + { + Imports.ReverseResponseMap.Remove(reply); + } + else + { + responseEntries.RemoveAll(entry => entry.MappedSubject == serviceImport.From); + + if (responseEntries.Count == 0) + Imports.ReverseResponseMap.Remove(reply); + else + Imports.ReverseResponseMap[reply] = responseEntries; + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Returns true when a service import is overshadowed by an existing subject key. + /// Mirrors Go (a *Account) serviceImportShadowed(from string) bool. + /// + internal bool ServiceImportShadowed(string from) + { + _mu.EnterReadLock(); + try + { + if (Imports.Services == null) + return false; + if (Imports.Services.ContainsKey(from)) + return true; + + foreach (var subject in Imports.Services.Keys) + { + if (SubscriptionIndex.SubjectIsSubsetMatch(from, subject)) + return true; + } + + return false; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Returns true when a service import already exists for destination account + source subject. + /// Mirrors Go (a *Account) serviceImportExists(dstAccName, from string) bool. + /// + internal bool ServiceImportExists(string destinationAccountName, string from) + { + _mu.EnterReadLock(); + try + { + return GetServiceImportForAccountLocked(destinationAccountName, from) != null; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Creates (or returns existing) internal account client. + /// Lock must be held. + /// Mirrors Go (a *Account) internalClient() *client. + /// + internal ClientConnection? InternalAccountClient() + { + if (InternalClient == null && Server is NatsServer server) + { + InternalClient = server.CreateInternalAccountClient(); + InternalClient.Account = this; + } + + return InternalClient; + } + + /// + /// Creates internal account-scoped subscription. + /// Mirrors Go (a *Account) subscribeInternal(...). + /// + internal (Subscription? Sub, Exception? Error) SubscribeInternal(string subject) => + SubscribeInternalEx(subject, false); + + /// + /// Unsubscribes from an internal account subscription. + /// Mirrors Go (a *Account) unsubscribeInternal(sub *subscription). + /// + internal void UnsubscribeInternal(Subscription? sub) + { + if (sub?.Sid == null) + return; + + _mu.EnterReadLock(); + var internalClient = InternalClient; + _mu.ExitReadLock(); + internalClient?.ProcessUnsub(sub.Sid); + } + + /// + /// Creates internal subscription for service-import responses. + /// Mirrors Go (a *Account) subscribeServiceImportResponse(subject string). + /// + internal (Subscription? Sub, Exception? Error) SubscribeServiceImportResponse(string subject) => + SubscribeInternalEx(subject, true); + + /// + /// Extended internal subscription helper. + /// Mirrors Go (a *Account) subscribeInternalEx(...). + /// + internal (Subscription? Sub, Exception? Error) SubscribeInternalEx(string subject, bool responseImport) + { + ClientConnection? client; + string sidText; + + _mu.EnterWriteLock(); + try + { + _isid++; + client = InternalAccountClient(); + sidText = _isid.ToString(); + } + finally + { + _mu.ExitWriteLock(); + } + + if (client == null) + return (null, new InvalidOperationException("no internal account client")); + + return client.ProcessSubEx(Encoding.ASCII.GetBytes(subject), null, Encoding.ASCII.GetBytes(sidText), false, false, responseImport); + } + + /// + /// Adds an internal subscription that matches a service import's from subject. + /// Mirrors Go (a *Account) addServiceImportSub(si *serviceImport) error. + /// + internal Exception? AddServiceImportSub(ServiceImportEntry serviceImport) + { + if (serviceImport == null) + return ServerErrors.ErrMissingService; + + ClientConnection? client; + string sidText; + string subject; + + _mu.EnterWriteLock(); + try + { + client = InternalAccountClient(); + if (client == null) + return null; + if (serviceImport.SubscriptionId is { Length: > 0 }) + return new InvalidOperationException("duplicate call to create subscription for service import"); + + _isid++; + sidText = _isid.ToString(); + serviceImport.SubscriptionId = Encoding.ASCII.GetBytes(sidText); + subject = serviceImport.From; + } + finally + { + _mu.ExitWriteLock(); + } + + var (_, err) = client.ProcessSubEx(Encoding.ASCII.GetBytes(subject), null, Encoding.ASCII.GetBytes(sidText), true, true, false); + return err; + } + + /// + /// Removes all subscriptions associated with service imports. + /// Mirrors Go (a *Account) removeAllServiceImportSubs(). + /// + internal void RemoveAllServiceImportSubs() + { + List subscriptionIds = []; + ClientConnection? internalClient; + + _mu.EnterWriteLock(); + try + { + if (Imports.Services != null) + { + foreach (var imports in Imports.Services.Values) + { + foreach (var serviceImport in imports) + { + if (serviceImport.SubscriptionId is { Length: > 0 }) + { + subscriptionIds.Add(serviceImport.SubscriptionId); + serviceImport.SubscriptionId = null; + } + } + } + } + + internalClient = InternalClient; + InternalClient = null; + } + finally + { + _mu.ExitWriteLock(); + } + + if (internalClient == null) + return; + + foreach (var sid in subscriptionIds) + internalClient.ProcessUnsub(sid); + + internalClient.CloseConnection(ClosedState.InternalClient); + } + + /// + /// Adds subscriptions for all registered service imports. + /// Mirrors Go (a *Account) addAllServiceImportSubs(). + /// + internal void AddAllServiceImportSubs() + { + List imports = []; + + _mu.EnterReadLock(); + try + { + if (Imports.Services != null) + { + foreach (var entries in Imports.Services.Values) + imports.AddRange(entries); + } + } + finally + { + _mu.ExitReadLock(); + } + + foreach (var serviceImport in imports) + _ = AddServiceImportSub(serviceImport); + } + + /// + /// Processes a service-import response routed to this account. + /// Mirrors Go (a *Account) processServiceImportResponse(...). + /// + internal void ProcessServiceImportResponse(string subject, byte[] msg) + { + ServiceImportEntry? serviceImport; + + _mu.EnterReadLock(); + try + { + if (IsExpired() || Exports.Responses == null || Exports.Responses.Count == 0) + return; + + if (!Exports.Responses.TryGetValue(subject, out serviceImport)) + return; + if (serviceImport == null || serviceImport.Invalid) + return; + } + finally + { + _mu.ExitReadLock(); + } + + // The client-side response processing pipeline is still under active porting. + serviceImport.DidDeliver = msg.Length >= 0; + } + + /// + /// Creates response wildcard prefix for service replies. + /// Lock must be held by caller. + /// Mirrors Go (a *Account) createRespWildcard(). + /// + internal void CreateRespWildcard() + { + const string alphabet = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; + Span prefix = stackalloc byte[14]; + prefix[0] = (byte)'_'; + prefix[1] = (byte)'R'; + prefix[2] = (byte)'_'; + prefix[3] = (byte)'.'; + + ulong random = (ulong)Random.Shared.NextInt64(); + for (var i = 4; i < prefix.Length; i++) + { + prefix[i] = (byte)alphabet[(int)(random % (ulong)alphabet.Length)]; + random /= (ulong)alphabet.Length; + } + + ServiceImportReply = [.. prefix, (byte)'.']; + } + + /// + /// Generates a new service reply subject. + /// Mirrors Go (a *Account) newServiceReply(tracking bool) []byte. + /// + internal byte[] NewServiceReply(bool tracking) + { + bool createdPrefix = false; + byte[] replyPrefix; + + _mu.EnterWriteLock(); + try + { + if (ServiceImportReply == null) + { + CreateRespWildcard(); + createdPrefix = true; + } + + replyPrefix = ServiceImportReply ?? Encoding.ASCII.GetBytes("_R_."); + } + finally + { + _mu.ExitWriteLock(); + } + + if (createdPrefix) + _ = SubscribeServiceImportResponse(Encoding.ASCII.GetString([.. replyPrefix, (byte)'>'])); + + const string alphabet = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; + Span randomPart = stackalloc byte[20]; + ulong random = (ulong)Random.Shared.NextInt64(); + for (var i = 0; i < randomPart.Length; i++) + { + randomPart[i] = (byte)alphabet[(int)(random % (ulong)alphabet.Length)]; + random /= (ulong)alphabet.Length; + } + + var reply = new List(replyPrefix.Length + randomPart.Length + 2); + reply.AddRange(replyPrefix); + reply.AddRange(randomPart.ToArray()); + + if (tracking) + { + reply.Add((byte)'.'); + reply.Add((byte)'T'); + } + + return [.. reply]; + } + + /// + /// Returns the response threshold for an exported service. + /// Mirrors Go (a *Account) ServiceExportResponseThreshold(...). + /// + public (TimeSpan Threshold, Exception? Error) ServiceExportResponseThreshold(string export) + { + _mu.EnterReadLock(); + try + { + var serviceExport = GetServiceExport(export); + if (serviceExport == null) + return (TimeSpan.Zero, new InvalidOperationException($"no export defined for \"{export}\"")); + return (serviceExport.ResponseThreshold, null); + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Sets max response delivery time for an exported service. + /// Mirrors Go (a *Account) SetServiceExportResponseThreshold(...). + /// + public Exception? SetServiceExportResponseThreshold(string export, TimeSpan maxTime) + { + _mu.EnterWriteLock(); + try + { + if (IsClaimAccount()) + return new InvalidOperationException("claim based accounts can not be updated directly"); + + var serviceExport = GetServiceExport(export); + if (serviceExport == null) + return new InvalidOperationException($"no export defined for \"{export}\""); + + serviceExport.ResponseThreshold = maxTime; + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Enables/disables cross-account trace propagation on a service export. + /// Mirrors Go (a *Account) SetServiceExportAllowTrace(...). + /// + public Exception? SetServiceExportAllowTrace(string export, bool allowTrace) + { + _mu.EnterWriteLock(); + try + { + var serviceExport = GetServiceExport(export); + if (serviceExport == null) + return new InvalidOperationException($"no export defined for \"{export}\""); + + serviceExport.AllowTrace = allowTrace; + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Creates internal response service import entry. + /// Mirrors Go (a *Account) addRespServiceImport(...). + /// + internal ServiceImportEntry AddRespServiceImport(Account destination, string to, ServiceImportEntry originalServiceImport, bool tracking, Dictionary? header) + { + var newReply = Encoding.ASCII.GetString(originalServiceImport.Account?.NewServiceReply(tracking) ?? NewServiceReply(tracking)); + + ServiceImportEntry responseImport; + _mu.EnterWriteLock(); + try + { + responseImport = new ServiceImportEntry + { + Account = destination, + ServiceExport = originalServiceImport.ServiceExport, + From = newReply, + To = to, + ResponseType = originalServiceImport.ResponseType, + IsResponse = true, + Share = originalServiceImport.Share, + Timestamp = UtcNowUnixNanos(), + Tracking = tracking && originalServiceImport.ResponseType == ServiceRespType.Singleton, + TrackingHeader = header, + Latency = tracking && originalServiceImport.ResponseType == ServiceRespType.Singleton + ? originalServiceImport.Latency + : null, + }; + + Exports.Responses ??= new Dictionary(StringComparer.Ordinal); + Exports.Responses[newReply] = responseImport; + } + finally + { + _mu.ExitWriteLock(); + } + + destination.AddReverseRespMapEntry(this, to, newReply); + return responseImport; + } + + /// + /// Adds stream import with optional claim context. + /// Mirrors Go (a *Account) AddStreamImportWithClaim(...). + /// + public Exception? AddStreamImportWithClaim(Account account, string from, string prefix, object? importClaim) => + AddStreamImportWithClaimInternal(account, from, prefix, false, importClaim); + + /// + /// Internal stream import add helper. + /// Mirrors Go (a *Account) addStreamImportWithClaim(...). + /// + internal Exception? AddStreamImportWithClaimInternal(Account account, string from, string prefix, bool allowTrace, object? importClaim) + { + if (account == null) + return ServerErrors.ErrMissingAccount; + if (!account.CheckStreamImportAuthorized(this, from, importClaim)) + return ServerErrors.ErrStreamImportAuthorization; + + if (!string.IsNullOrEmpty(prefix)) + { + if (SubscriptionIndex.SubjectHasWildcard(prefix)) + return ServerErrors.ErrStreamImportBadPrefix; + if (!prefix.EndsWith(".", StringComparison.Ordinal)) + prefix += '.'; + } + + return AddMappedStreamImportWithClaimInternal(account, from, prefix + from, allowTrace, importClaim); + } + + /// + /// Convenience helper for mapped stream imports without claim. + /// Mirrors Go (a *Account) AddMappedStreamImport(...). + /// + public Exception? AddMappedStreamImport(Account account, string from, string to) => + AddMappedStreamImportWithClaim(account, from, to, null); + + /// + /// Adds mapped stream import with optional claim. + /// Mirrors Go (a *Account) AddMappedStreamImportWithClaim(...). + /// + public Exception? AddMappedStreamImportWithClaim(Account account, string from, string to, object? importClaim) => + AddMappedStreamImportWithClaimInternal(account, from, to, false, importClaim); + + /// + /// Internal mapped stream import add helper. + /// Mirrors Go (a *Account) addMappedStreamImportWithClaim(...). + /// + internal Exception? AddMappedStreamImportWithClaimInternal(Account account, string from, string to, bool allowTrace, object? importClaim) + { + if (account == null) + return ServerErrors.ErrMissingAccount; + if (!account.CheckStreamImportAuthorized(this, from, importClaim)) + return ServerErrors.ErrStreamImportAuthorization; + + if (string.IsNullOrEmpty(to)) + to = from; + + var cycleErr = StreamImportFormsCycle(account, to) ?? StreamImportFormsCycle(account, from); + if (cycleErr != null) + return cycleErr; + + ISubjectTransformer? transform = null; + var usePublishedSubject = false; + if (SubscriptionIndex.SubjectHasWildcard(from)) + { + if (to == from) + { + usePublishedSubject = true; + } + else + { + var (created, err) = SubjectTransform.New(from, to); + if (err != null) + return new InvalidOperationException($"failed to create mapping transform for stream import subject from \"{from}\" to \"{to}\": {err.Message}"); + transform = created; + } + } + + _mu.EnterWriteLock(); + try + { + if (IsStreamImportDuplicate(account, from)) + return ServerErrors.ErrStreamImportDuplicate; + + Imports.Streams ??= []; + Imports.Streams.Add(new StreamImportEntry + { + Account = account, + From = from, + To = to, + Transform = transform, + Claim = importClaim, + UsePublishedSubject = usePublishedSubject, + AllowTrace = allowTrace, + }); + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Checks if stream import duplicate exists. Lock should be held. + /// Mirrors Go (a *Account) isStreamImportDuplicate(...). + /// + internal bool IsStreamImportDuplicate(Account account, string from) + { + if (Imports.Streams == null) + return false; + + foreach (var streamImport in Imports.Streams) + { + if (ReferenceEquals(streamImport.Account, account) && streamImport.From == from) + return true; + } + return false; + } + + /// + /// Adds stream import from a specific account. + /// Mirrors Go (a *Account) AddStreamImport(...). + /// + public Exception? AddStreamImport(Account account, string from, string prefix) => + AddStreamImportWithClaimInternal(account, from, prefix, false, null); + + /// + /// Adds stream export, optionally restricted to explicit accounts. + /// Mirrors Go (a *Account) AddStreamExport(...). + /// + public Exception? AddStreamExport(string subject, IReadOnlyList? accounts = null) => + AddStreamExportWithAccountPos(subject, accounts, 0); + + /// + /// Adds stream export with account-position matching. + /// Mirrors Go (a *Account) addStreamExportWithAccountPos(...). + /// + public Exception? AddStreamExportWithAccountPos(string subject, IReadOnlyList? accounts, uint accountPos) + { + if (!SubscriptionIndex.IsValidSubject(subject)) + return ServerErrors.ErrBadSubject; + + _mu.EnterWriteLock(); + try + { + Exports.Streams ??= new Dictionary(StringComparer.Ordinal); + Exports.Streams.TryGetValue(subject, out var export); + export ??= new StreamExport(); + + if (accounts != null || accountPos > 0) + { + var authErr = SetExportAuth(export, subject, accounts, accountPos); + if (authErr != null) + return authErr; + } + + Exports.Streams[subject] = export; + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Checks stream import authorization with account lock. + /// Mirrors Go (a *Account) checkStreamImportAuthorized(...). + /// + internal bool CheckStreamImportAuthorized(Account account, string subject, object? importClaim) + { + _mu.EnterReadLock(); + try { return CheckStreamImportAuthorizedNoLock(account, subject, importClaim); } + finally { _mu.ExitReadLock(); } + } + + /// + /// Checks stream import authorization assuming lock is already held. + /// Mirrors Go (a *Account) checkStreamImportAuthorizedNoLock(...). + /// + internal bool CheckStreamImportAuthorizedNoLock(Account account, string subject, object? importClaim) + { + if (Exports.Streams == null || !SubscriptionIndex.IsValidSubject(subject)) + return false; + return CheckStreamExportApproved(account, subject, importClaim); + } + + /// + /// Gets wildcard-matching service export for subject. + /// Lock should be held. + /// Mirrors Go (a *Account) getWildcardServiceExport(from string). + /// + internal ServiceExportEntry? GetWildcardServiceExport(string from) + { + if (Exports.Services == null) + return null; + + var tokens = SubjectTransform.TokenizeSubject(from); + foreach (var (subject, serviceExport) in Exports.Services) + { + if (SubjectTransform.IsSubsetMatch(tokens, subject)) + return serviceExport; + } + return null; + } + + /// + /// Handles stream import activation expiration. + /// Mirrors Go (a *Account) streamActivationExpired(...). + /// + internal void StreamActivationExpired(Account exportAccount, string subject) + { + _mu.EnterWriteLock(); + try + { + if (IsExpired() || Imports.Streams == null) + return; + + foreach (var streamImport in Imports.Streams) + { + if (ReferenceEquals(streamImport.Account, exportAccount) && streamImport.From == subject) + { + streamImport.Invalid = true; + return; + } + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Handles service import activation expiration. + /// Mirrors Go (a *Account) serviceActivationExpired(...). + /// + internal void ServiceActivationExpired(Account destinationAccount, string subject) + { + _mu.EnterWriteLock(); + try + { + if (IsExpired() || Imports.Services == null) + return; + + var serviceImport = GetServiceImportForAccountLocked(destinationAccount.Name, subject); + if (serviceImport != null) + serviceImport.Invalid = true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Re-evaluates import validity when an activation token expiration timer fires. + /// Mirrors Go (a *Account) activationExpired(...). + /// + internal void ActivationExpired(Account exportAccount, string subject, object? kind) + { + var normalizedKind = NormalizeExportKind(kind); + if (string.Equals(normalizedKind, "stream", StringComparison.Ordinal)) + { + StreamActivationExpired(exportAccount, subject); + } + else if (string.Equals(normalizedKind, "service", StringComparison.Ordinal)) + { + ServiceActivationExpired(exportAccount, subject); + } + } + + /// + /// Validates an import activation claim/token. + /// Mirrors Go (a *Account) checkActivation(...). + /// + internal bool CheckActivation(Account importAccount, object? claim, ExportAuth? exportAuth, bool expirationTimer) + { + if (claim == null) + return false; + + if (!TryReadStringMember(claim, "Token", out var token) || string.IsNullOrWhiteSpace(token)) + return false; + + if (!TryDecodeJwtPayload(token, out var activationPayload)) + return false; + + if (!IsIssuerClaimTrusted(activationPayload)) + return false; + + if (TryReadLongMember(activationPayload, "exp", out var expires) && expires > 0) + { + var now = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); + if (expires <= now) + return false; + + if (expirationTimer) + { + var delay = TimeSpan.FromSeconds(expires - now); + string importSubject = ReadActivationImportSubject(activationPayload); + object? claimType = TryReadMember(claim, "Type", out var typeValue) ? typeValue : null; + + _ = new Timer( + _ => importAccount.ActivationExpired(this, importSubject, claimType), + null, + delay, + Timeout.InfiniteTimeSpan); + } + } + + if (exportAuth == null) + return true; + + string subject = TryReadStringMember(activationPayload, "sub", out var sub) ? sub : string.Empty; + long issuedAt = TryReadLongMember(activationPayload, "iat", out var iat) ? iat : 0; + return !IsRevoked(exportAuth.ActivationsRevoked, subject, issuedAt); + } + + /// + /// Returns true when activation issuer details are trusted for this account. + /// Mirrors Go (a *Account) isIssuerClaimTrusted(...). + /// + internal bool IsIssuerClaimTrusted(object? claims) + { + if (claims == null) + return false; + + string issuerAccount = + TryReadStringMember(claims, "IssuerAccount", out var ia) ? ia : + TryReadStringMember(claims, "issuer_account", out var iaAlt) ? iaAlt : + string.Empty; + + // If issuer-account is omitted, issuer defaults to the account itself. + if (string.IsNullOrEmpty(issuerAccount)) + return true; + + if (!string.Equals(Name, issuerAccount, StringComparison.Ordinal)) + { + if (Server is NatsServer server) + { + string importSubject = ReadActivationImportSubject(claims); + string importType = TryReadStringMember(claims, "import_type", out var it) ? it : string.Empty; + server.Errorf( + "Invalid issuer account {0} in activation claim (subject: {1} - type: {2}) for account {3}", + issuerAccount, + importSubject, + importType, + Name); + } + + return false; + } + + string issuer = + TryReadStringMember(claims, "Issuer", out var issuerValue) ? issuerValue : + TryReadStringMember(claims, "iss", out var issValue) ? issValue : + string.Empty; + + _mu.EnterReadLock(); + try + { + (_, var ok) = HasIssuerNoLock(issuer); + return ok; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Checks whether another account is approved to import this service export. + /// Mirrors Go (a *Account) checkServiceImportAuthorized(...). + /// + internal bool CheckServiceImportAuthorized(Account account, string subject, object? importClaim) + { + _mu.EnterReadLock(); + try { return CheckServiceImportAuthorizedNoLock(account, subject, importClaim); } + finally { _mu.ExitReadLock(); } + } + + /// + /// Lock-free helper for service import authorization checks. + /// Mirrors Go (a *Account) checkServiceImportAuthorizedNoLock(...). + /// + internal bool CheckServiceImportAuthorizedNoLock(Account account, string subject, object? importClaim) + { + if (Exports.Services == null) + return false; + + return CheckServiceExportApproved(account, subject, importClaim); + } + + /// + /// Returns whether bearer tokens should be rejected for this account. + /// Mirrors Go (a *Account) failBearer() bool. + /// + internal bool FailBearer() + { + _mu.EnterReadLock(); + try { return DisallowBearer; } + finally { _mu.ExitReadLock(); } + } + + /// + /// Updates expiration state/timer from claim data. + /// Mirrors Go (a *Account) checkExpiration(...). + /// + internal void CheckExpiration(object? claimsData) + { + long expires = + claimsData != null && TryReadLongMember(claimsData, "Expires", out var exp) ? exp : + claimsData != null && TryReadLongMember(claimsData, "exp", out var expUnix) ? expUnix : + 0; + + _mu.EnterWriteLock(); + try + { + ClearExpirationTimer(); + + if (expires == 0) + { + Interlocked.Exchange(ref _expired, 0); + return; + } + + long now = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); + if (expires <= now) + { + Interlocked.Exchange(ref _expired, 1); + return; + } + + SetExpirationTimer(TimeSpan.FromSeconds(expires - now)); + Interlocked.Exchange(ref _expired, 0); + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Returns signer scope for issuer, if present. + /// Mirrors Go (a *Account) hasIssuer(...). + /// + internal (object? Scope, bool Ok) HasIssuer(string issuer) + { + _mu.EnterReadLock(); + try { return HasIssuerNoLock(issuer); } + finally { _mu.ExitReadLock(); } + } + + /// + /// Lock-free signer lookup. + /// Mirrors Go (a *Account) hasIssuerNoLock(...). + /// + internal (object? Scope, bool Ok) HasIssuerNoLock(string issuer) + { + if (SigningKeys == null || string.IsNullOrEmpty(issuer)) + return (null, false); + + return SigningKeys.TryGetValue(issuer, out var scope) + ? (scope, true) + : (null, false); + } + + /// + /// Returns the leaf-node loop-detection subject. + /// Mirrors Go (a *Account) getLDSubject() string. + /// + internal string GetLDSubject() + { + _mu.EnterReadLock(); + try { return LoopDetectionSubject; } + finally { _mu.ExitReadLock(); } + } + + /// + /// Returns account label used in trace output. + /// Mirrors Go (a *Account) traceLabel() string. + /// + internal string TraceLabel() + { + if (string.IsNullOrEmpty(NameTag)) + return Name; + return $"{Name}/{NameTag}"; + } + + /// + /// Returns true when external auth is configured. + /// Mirrors Go (a *Account) hasExternalAuth() bool. + /// + internal bool HasExternalAuth() + { + _mu.EnterReadLock(); + try { return ExternalAuth != null; } + finally { _mu.ExitReadLock(); } + } + + /// + /// Returns true when is configured as an external-auth user. + /// Mirrors Go (a *Account) isExternalAuthUser(userID string) bool. + /// + internal bool IsExternalAuthUser(string userId) + { + _mu.EnterReadLock(); + try + { + foreach (var authUser in ReadStringListMember(ExternalAuth, "AuthUsers", "auth_users")) + { + if (string.Equals(userId, authUser, StringComparison.Ordinal)) + return true; + } + return false; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Returns configured external-auth xkey, or empty when unset. + /// Mirrors Go (a *Account) externalAuthXKey() string. + /// + internal string ExternalAuthXKey() + { + _mu.EnterReadLock(); + try + { + if (TryReadStringMember(ExternalAuth, "XKey", out var xkey) && !string.IsNullOrEmpty(xkey)) + return xkey; + if (TryReadStringMember(ExternalAuth, "xkey", out var xkeyAlt) && !string.IsNullOrEmpty(xkeyAlt)) + return xkeyAlt; + return string.Empty; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Returns whether external auth allows account switching to . + /// Mirrors Go (a *Account) isAllowedAcount(acc string) bool. + /// + internal bool IsAllowedAcount(string account) + { + _mu.EnterReadLock(); + try + { + var allowed = ReadStringListMember(ExternalAuth, "AllowedAccounts", "allowed_accounts"); + if (allowed.Count == 1 && string.Equals(allowed[0], "*", StringComparison.Ordinal)) + return true; + + foreach (var candidate in allowed) + { + if (string.Equals(candidate, account, StringComparison.Ordinal)) + return true; + } + + return false; + } + finally + { + _mu.ExitReadLock(); + } + } + // ------------------------------------------------------------------------- // Export checks // ------------------------------------------------------------------------- @@ -2069,6 +4106,316 @@ public sealed class Account : INatsAccount return null; } + /// + /// Adds a service import entry to the import map. + /// Mirrors Go (a *Account) addServiceImport(...). + /// + private (ServiceImportEntry? Import, Exception? Error) AddServiceImportInternal(Account destination, string from, string to, object? claim) + { + _mu.EnterWriteLock(); + try + { + Imports.Services ??= new Dictionary>(StringComparer.Ordinal); + + var serviceImport = new ServiceImportEntry + { + Account = destination, + Claim = claim, + From = from, + To = to, + }; + + if (!Imports.Services.TryGetValue(from, out var entries)) + { + entries = []; + Imports.Services[from] = entries; + } + + entries.Add(serviceImport); + return (serviceImport, null); + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Resolves a service export by exact or wildcard subject match. + /// Mirrors Go (a *Account) getServiceExport(service string) *serviceExport. + /// + private ServiceExportEntry? GetServiceExport(string service) + { + if (Exports.Services == null) + return null; + + if (Exports.Services.TryGetValue(service, out var serviceExport)) + return serviceExport; + + var tokens = SubjectTransform.TokenizeSubject(service); + foreach (var (subject, export) in Exports.Services) + { + if (SubjectTransform.IsSubsetMatch(tokens, subject)) + return export; + } + + return null; + } + + private static ClientInfo? CreateClientInfo(ClientConnection? client, bool _) + { + if (client == null) + return null; + + return new ClientInfo + { + Id = client.Cid, + Account = client.Account?.Name ?? string.Empty, + Name = client.Opts.Name ?? string.Empty, + Rtt = client.GetRttValue(), + Start = client.Start == default ? string.Empty : client.Start.ToUniversalTime().ToString("O"), + Kind = client.Kind.ToString(), + ClientType = client.ClientType().ToString(), + }; + } + + private static long UtcNowUnixNanos() => TimeSpanToUnixNanos(DateTime.UtcNow - DateTime.UnixEpoch); + + private static long TimeSpanToUnixNanos(TimeSpan value) => value.Ticks * 100L; + + private static TimeSpan UnixNanosToTimeSpan(long unixNanos) => TimeSpan.FromTicks(unixNanos / 100L); + + private static DateTime UnixNanoToDateTime(long unixNanos) + { + if (unixNanos <= 0) + return DateTime.UnixEpoch; + return DateTime.UnixEpoch.AddTicks(unixNanos / 100L); + } + + private static bool TryDecodeJwtPayload(string token, out JsonElement payload) + { + payload = default; + if (string.IsNullOrWhiteSpace(token)) + return false; + + var parts = token.Split('.'); + if (parts.Length < 2) + return false; + + string base64 = parts[1] + .Replace("-", "+", StringComparison.Ordinal) + .Replace("_", "/", StringComparison.Ordinal); + + int mod = base64.Length % 4; + if (mod > 0) + base64 = base64.PadRight(base64.Length + (4 - mod), '='); + + byte[] bytes; + try { bytes = Convert.FromBase64String(base64); } + catch { return false; } + + try + { + using var doc = JsonDocument.Parse(bytes); + payload = doc.RootElement.Clone(); + return payload.ValueKind == JsonValueKind.Object; + } + catch + { + return false; + } + } + + private static bool TryReadMember(object source, string name, out object? value) + { + value = null; + if (source == null) + return false; + + if (source is JsonElement element) + { + if (element.ValueKind != JsonValueKind.Object) + return false; + + foreach (var property in element.EnumerateObject()) + { + if (string.Equals(property.Name, name, StringComparison.OrdinalIgnoreCase)) + { + value = property.Value; + return true; + } + } + + return false; + } + + if (source is IDictionary dictionary && + dictionary.TryGetValue(name, out var dictionaryValue)) + { + value = dictionaryValue; + return true; + } + + if (source is IDictionary stringDictionary && + stringDictionary.TryGetValue(name, out var stringDictionaryValue)) + { + value = stringDictionaryValue; + return true; + } + + var propertyInfo = source + .GetType() + .GetProperty(name, System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.IgnoreCase); + if (propertyInfo == null) + return false; + + value = propertyInfo.GetValue(source); + return true; + } + + private static bool TryReadStringMember(object? source, string name, out string value) + { + value = string.Empty; + if (source == null || !TryReadMember(source, name, out var member)) + return false; + + if (member is JsonElement element) + { + if (element.ValueKind == JsonValueKind.String) + { + value = element.GetString() ?? string.Empty; + return true; + } + + if (element.ValueKind == JsonValueKind.Number) + { + value = element.ToString(); + return true; + } + + return false; + } + + value = member?.ToString() ?? string.Empty; + return true; + } + + private static bool TryReadLongMember(object source, string name, out long value) + { + value = 0; + if (!TryReadMember(source, name, out var member)) + return false; + + if (member is JsonElement element) + { + if (element.ValueKind == JsonValueKind.Number) + return element.TryGetInt64(out value); + + if (element.ValueKind == JsonValueKind.String) + return long.TryParse(element.GetString(), out value); + + return false; + } + + switch (member) + { + case byte b: + value = b; + return true; + case sbyte sb: + value = sb; + return true; + case short s: + value = s; + return true; + case ushort us: + value = us; + return true; + case int i: + value = i; + return true; + case uint ui: + value = ui; + return true; + case long l: + value = l; + return true; + case ulong ul when ul <= long.MaxValue: + value = (long)ul; + return true; + case string str: + return long.TryParse(str, out value); + default: + return false; + } + } + + private static IReadOnlyList ReadStringListMember(object? source, params string[] names) + { + if (source == null) + return []; + + foreach (var name in names) + { + if (!TryReadMember(source, name, out var member) || member == null) + continue; + + if (member is IEnumerable enumerableStrings) + return [.. enumerableStrings]; + + if (member is JsonElement element && element.ValueKind == JsonValueKind.Array) + { + var results = new List(); + foreach (var item in element.EnumerateArray()) + { + if (item.ValueKind == JsonValueKind.String) + results.Add(item.GetString() ?? string.Empty); + else + results.Add(item.ToString()); + } + return results; + } + + if (member is IEnumerable objectEnumerable) + { + var results = new List(); + foreach (var item in objectEnumerable) + results.Add(item?.ToString() ?? string.Empty); + return results; + } + } + + return []; + } + + private static string NormalizeExportKind(object? kind) + { + if (kind is JsonElement element) + return element.ToString().Trim().ToLowerInvariant(); + + return kind?.ToString()?.Trim().ToLowerInvariant() ?? string.Empty; + } + + private static string ReadActivationImportSubject(object claimOrPayload) + { + if (TryReadStringMember(claimOrPayload, "ImportSubject", out var importSubject) && !string.IsNullOrEmpty(importSubject)) + return importSubject; + if (TryReadStringMember(claimOrPayload, "import_subject", out var importSubjectSnake) && !string.IsNullOrEmpty(importSubjectSnake)) + return importSubjectSnake; + + if (claimOrPayload is JsonElement element && + element.ValueKind == JsonValueKind.Object && + element.TryGetProperty("nats", out var natsObj) && + natsObj.ValueKind == JsonValueKind.Object && + natsObj.TryGetProperty("import_subject", out var natsImportSubject) && + natsImportSubject.ValueKind == JsonValueKind.String) + { + return natsImportSubject.GetString() ?? string.Empty; + } + + return string.Empty; + } + /// /// Tokenises a subject string into an array, using the same split logic /// as btsep-based tokenisation in the Go source. @@ -2115,9 +4462,8 @@ public sealed class Account : INatsAccount /// Checks whether is authorised to use /// (either via explicit approval or token requirement). /// Mirrors Go (a *Account) checkAuth(...) bool. - /// TODO: session 11 — full JWT activation check. /// - private static bool CheckAuth( + private bool CheckAuth( ExportAuth ea, Account account, object? imClaim, @@ -2128,8 +4474,7 @@ public sealed class Account : INatsAccount if (ea.TokenRequired) { - // TODO: session 11 — validate activation token in imClaim. - return imClaim != null; + return CheckActivation(account, imClaim, ea, expirationTimer: true); } // No approved list and no token required → public export. @@ -2146,6 +4491,35 @@ public sealed class Account : INatsAccount return false; } + /// + /// Applies account-based authorization rules to an export descriptor. + /// Mirrors Go setExportAuth(&se.exportAuth, ...). + /// + private static Exception? SetExportAuth(ExportAuth auth, string subject, IReadOnlyList? accounts, uint accountPos) + { + if (!SubscriptionIndex.IsValidSubject(subject)) + return ServerErrors.ErrBadSubject; + + auth.AccountPosition = accountPos; + + if (accounts == null || accounts.Count == 0) + { + auth.Approved = null; + return null; + } + + var approved = new Dictionary(accounts.Count, StringComparer.Ordinal); + foreach (var account in accounts) + { + if (account == null) + continue; + approved[account.Name] = account; + } + + auth.Approved = approved; + return null; + } + // ------------------------------------------------------------------------- // Export equality helpers // ------------------------------------------------------------------------- diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs index 516ad45..7888792 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs @@ -1686,6 +1686,18 @@ public sealed partial class ClientConnection } } + internal void ProcessUnsub(byte[] sid) + { + lock (_mu) + { + if (Subs == null) + return; + + var sidText = Encoding.ASCII.GetString(sid); + Subs.Remove(sidText); + } + } + // features 440-441: processInfo, processErr internal void ProcessInfo(string info) { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/AccountTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/AccountTests.cs index f993d79..831f0f7 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/AccountTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/AccountTests.cs @@ -246,6 +246,62 @@ public sealed class AccountTests system.CheckServiceExportApproved(global, "$SYS.REQ.INFO", null).ShouldBeTrue(); } + [Fact] // T:81 + public void MultipleStreamImportsWithSameSubjectDifferentPrefix_ShouldSucceed() + { + var fooAccount = Account.NewAccount("foo"); + var barAccount = Account.NewAccount("bar"); + var importAccount = Account.NewAccount("import"); + + fooAccount.AddStreamExport("test", null).ShouldBeNull(); + barAccount.AddStreamExport("test", null).ShouldBeNull(); + + importAccount.AddStreamImport(fooAccount, "test", "foo").ShouldBeNull(); + importAccount.AddStreamImport(barAccount, "test", "bar").ShouldBeNull(); + + importAccount.Imports.Streams.ShouldNotBeNull(); + importAccount.Imports.Streams!.Count.ShouldBe(2); + importAccount.Imports.Streams.Select(si => si.To).OrderBy(static t => t).ToArray() + .ShouldBe(["bar.test", "foo.test"]); + } + + [Fact] // T:82 + public void MultipleStreamImportsWithSameSubject_ShouldSucceed() + { + var fooAccount = Account.NewAccount("foo"); + var barAccount = Account.NewAccount("bar"); + var importAccount = Account.NewAccount("import"); + + fooAccount.AddStreamExport("test", null).ShouldBeNull(); + barAccount.AddStreamExport("test", null).ShouldBeNull(); + + importAccount.AddStreamImport(fooAccount, "test", string.Empty).ShouldBeNull(); + importAccount.AddStreamImport(fooAccount, "test", string.Empty).ShouldBe(ServerErrors.ErrStreamImportDuplicate); + importAccount.AddStreamImport(barAccount, "test", string.Empty).ShouldBeNull(); + + importAccount.Imports.Streams.ShouldNotBeNull(); + importAccount.Imports.Streams!.Count.ShouldBe(2); + importAccount.Imports.Streams.Select(si => si.Account?.Name).OrderBy(static n => n).ToArray() + .ShouldBe(["bar", "foo"]); + } + + [Fact] // T:95 + public void BenchmarkNewRouteReply() + { + var globalAccount = Account.NewAccount("$G"); + + var first = globalAccount.NewServiceReply(tracking: false); + var second = globalAccount.NewServiceReply(tracking: false); + var tracked = globalAccount.NewServiceReply(tracking: true); + + first.Length.ShouldBeGreaterThan(20); + second.Length.ShouldBeGreaterThan(20); + first.SequenceEqual(second).ShouldBeFalse(); + + var trackedText = Encoding.ASCII.GetString(tracked); + trackedText.EndsWith(".T", StringComparison.Ordinal).ShouldBeTrue(); + } + [Fact] // T:98 public void ImportSubscriptionPartialOverlapWithPrefix_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/GatewayHandlerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/GatewayHandlerTests.cs index 0da9237..7a46e03 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/GatewayHandlerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/GatewayHandlerTests.cs @@ -1,4 +1,5 @@ using System.Diagnostics; +using System.Linq; using Shouldly; using ZB.MOM.NatsNet.Server; using ZB.MOM.NatsNet.Server.Internal; @@ -122,6 +123,53 @@ public sealed partial class GatewayHandlerTests NatsServer.ValidateCluster(conflict).ShouldBe(ServerErrors.ErrClusterNameConfigConflict); } + [Fact] // T:658 + public void GatewaySendReplyAcrossGatewaysServiceImport_ShouldSucceed() + { + var fooAccount = Account.NewAccount("$foo"); + var barAccount = Account.NewAccount("$bar"); + + fooAccount.AddServiceExport("foo.request", null).ShouldBeNull(); + barAccount.AddServiceImport(fooAccount, "bar.request", "foo.request").ShouldBeNull(); + + var serviceImport = barAccount.Imports.Services!["bar.request"].Single(); + var responseImport = barAccount.AddRespServiceImport(fooAccount, "reply", serviceImport, tracking: false, header: null); + + responseImport.From.ShouldNotBe("reply"); + responseImport.To.ShouldBe("reply"); + barAccount.Exports.Responses.ShouldNotBeNull(); + barAccount.Exports.Responses!.ShouldContainKey(responseImport.From); + + fooAccount.Imports.ReverseResponseMap.ShouldNotBeNull(); + fooAccount.Imports.ReverseResponseMap!.ShouldContainKey("reply"); + fooAccount.Imports.ReverseResponseMap["reply"].Any(e => e.MappedSubject == responseImport.From).ShouldBeTrue(); + + barAccount.ProcessServiceImportResponse(responseImport.From, "ok"u8.ToArray()); + responseImport.DidDeliver.ShouldBeTrue(); + } + + [Fact] // T:666 + public void GatewayNoAccountUnsubWhenServiceReplyInUse_ShouldSucceed() + { + var fooAccount = Account.NewAccount("$foo"); + var barAccount = Account.NewAccount("$bar"); + + fooAccount.AddServiceExport("test.request", null).ShouldBeNull(); + barAccount.AddServiceImport(fooAccount, "foo.request", "test.request").ShouldBeNull(); + + var serviceImport = barAccount.Imports.Services!["foo.request"].Single(); + var responseImport1 = barAccount.AddRespServiceImport(fooAccount, "reply", serviceImport, tracking: false, header: null); + var responseImport2 = barAccount.AddRespServiceImport(fooAccount, "reply", serviceImport, tracking: false, header: null); + + fooAccount.Imports.ReverseResponseMap.ShouldNotBeNull(); + fooAccount.Imports.ReverseResponseMap!["reply"].Count.ShouldBe(2); + + fooAccount.CheckForReverseEntry("reply", responseImport1, checkInterest: false); + + fooAccount.Imports.ReverseResponseMap["reply"].Count.ShouldBe(1); + fooAccount.Imports.ReverseResponseMap["reply"].Single().MappedSubject.ShouldBe(responseImport2.From); + } + private static NatsServer CreateServer(ServerOptions? opts = null) { var (server, err) = NatsServer.NewServer(opts ?? new ServerOptions()); diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeHandlerTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeHandlerTests.Impltests.cs index 7ba10ef..b08b498 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeHandlerTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeHandlerTests.Impltests.cs @@ -1,5 +1,6 @@ using Shouldly; using ZB.MOM.NatsNet.Server; +using ZB.MOM.NatsNet.Server.Internal; namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; @@ -71,4 +72,54 @@ public sealed partial class LeafNodeHandlerTests remotes[1].FirstInfoTimeout.ShouldBe(TimeSpan.FromSeconds(3)); remotes[1].Urls[0].Scheme.ShouldBe("ws"); } + + [Fact] // T:1935 + public void LeafNodeNoDuplicateWithinCluster_ShouldSucceed() + { + var account = Account.NewAccount("$G"); + var leaf1 = new ClientConnection(ClientKind.Leaf); + var leaf2 = new ClientConnection(ClientKind.Leaf); + + ((INatsAccount)account).AddClient(leaf1); + ((INatsAccount)account).AddClient(leaf2); + + account.RegisterLeafNodeCluster("xyz"); + account.RegisterLeafNodeCluster("xyz"); + + account.NumLocalLeafNodes().ShouldBe(2); + account.HasLeafNodeCluster("xyz").ShouldBeTrue(); + account.IsLeafNodeClusterIsolated("xyz").ShouldBeTrue(); + + account.RegisterLeafNodeCluster("abc"); + account.IsLeafNodeClusterIsolated("xyz").ShouldBeFalse(); + } + + [Fact] // T:1952 + public void LeafNodeStreamImport_ShouldSucceed() + { + var exporter = Account.NewAccount("B"); + var importer = Account.NewAccount("C"); + + exporter.AddStreamExport(">", null).ShouldBeNull(); + importer.AddStreamImport(exporter, ">", string.Empty).ShouldBeNull(); + + importer.Imports.Streams.ShouldNotBeNull(); + importer.Imports.Streams!.Count.ShouldBe(1); + importer.Imports.Streams[0].From.ShouldBe(">"); + importer.Imports.Streams[0].To.ShouldBe(">"); + exporter.CheckStreamExportApproved(importer, "a", null).ShouldBeTrue(); + } + + [Fact] // T:1955 + public void LeafNodeUnsubOnRouteDisconnect_ShouldSucceed() + { + var account = Account.NewAccount("$G"); + var leaf = new ClientConnection(ClientKind.Leaf); + + ((INatsAccount)account).AddClient(leaf); + account.NumLocalLeafNodes().ShouldBe(1); + + ((INatsAccount)account).RemoveClient(leaf); + account.NumLocalLeafNodes().ShouldBe(0); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MessageTracerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MessageTracerTests.cs index 4873d29..bd0a121 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MessageTracerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MessageTracerTests.cs @@ -6,6 +6,41 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed class MessageTracerTests { + [Fact] // T:2359 + public void MsgTraceParseAccountDestWithSampling_ShouldSucceed() + { + var cases = new[] + { + (Name: "trace sampling no dest", Value: (object)new Dictionary { ["sampling"] = 50 }, Want: 0), + (Name: "trace dest only", Value: (object)new Dictionary { ["dest"] = "foo" }, Want: 100), + (Name: "trace dest with number only", Value: (object)new Dictionary { ["dest"] = "foo", ["sampling"] = 20 }, Want: 20), + (Name: "trace dest with percentage", Value: (object)new Dictionary { ["dest"] = "foo", ["sampling"] = "50%" }, Want: 50), + }; + + foreach (var testCase in cases) + { + var options = new ServerOptions(); + var errors = new List(); + var warnings = new List(); + + var accounts = new Dictionary + { + ["A"] = new Dictionary + { + ["msg_trace"] = testCase.Value, + }, + }; + + var parseError = ServerOptions.ParseAccounts(accounts, options, errors, warnings); + parseError.ShouldBeNull(testCase.Name); + errors.ShouldBeEmpty(testCase.Name); + options.Accounts.Count.ShouldBe(1, testCase.Name); + + var (_, sampling) = options.Accounts[0].GetTraceDestAndSampling(); + sampling.ShouldBe(testCase.Want, testCase.Name); + } + } + [Fact] // T:2331 public void MsgTraceBasic_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/AccessTimeServiceTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/AccessTimeServiceTests.cs index 216fc86..1bb3634 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/AccessTimeServiceTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/AccessTimeServiceTests.cs @@ -45,6 +45,11 @@ public sealed class AccessTimeServiceTests : IDisposable // Background timer should update the time. await Task.Delay(AccessTimeService.TickInterval * 3); var atn = AccessTimeService.AccessTime(); + if (atn <= at) + { + await Task.Delay(AccessTimeService.TickInterval); + atn = AccessTimeService.AccessTime(); + } atn.ShouldBeGreaterThan(at); // Unregister; timer should stop. @@ -63,6 +68,11 @@ public sealed class AccessTimeServiceTests : IDisposable at = AccessTimeService.AccessTime(); await Task.Delay(AccessTimeService.TickInterval * 3); atn = AccessTimeService.AccessTime(); + if (atn <= at) + { + await Task.Delay(AccessTimeService.TickInterval); + atn = AccessTimeService.AccessTime(); + } atn.ShouldBeGreaterThan(at); } finally diff --git a/reports/current.md b/reports/current.md index 379e5c8..b10b4b3 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-03-01 01:28:40 UTC +Generated: 2026-03-01 01:39:44 UTC ## Modules (12 total)