diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs index 3fe30ed..a6ef4db 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs @@ -2231,6 +2231,543 @@ public sealed class Account : INatsAccount } } + /// + /// Number of configured service-import subject keys. + /// Mirrors Go (a *Account) NumServiceImports() int. + /// + public int NumServiceImports() + { + _mu.EnterReadLock(); + try { return Imports.Services?.Count ?? 0; } + finally { _mu.ExitReadLock(); } + } + + /// + /// Removes a response service import and performs reverse-map cleanup. + /// Mirrors Go (a *Account) removeRespServiceImport(...). + /// + internal void RemoveRespServiceImport(ServiceImportEntry? serviceImport, RsiReason reason) + { + if (serviceImport == null) + return; + + Account? destination; + string from; + string to; + bool tracking; + bool delivered; + ClientConnection? requestor; + byte[]? sid; + + _mu.EnterWriteLock(); + try + { + if (Exports.Responses != null) + Exports.Responses.Remove(serviceImport.From); + + destination = serviceImport.Account; + from = serviceImport.From; + to = serviceImport.To; + tracking = serviceImport.Tracking; + delivered = serviceImport.DidDeliver; + requestor = serviceImport.RequestingClient; + sid = serviceImport.SubscriptionId; + } + finally + { + _mu.ExitWriteLock(); + } + + if (sid is { Length: > 0 } && InternalClient != null) + InternalClient.ProcessUnsub(sid); + + if (tracking && requestor != null && !delivered) + SendBackendErrorTrackingLatency(serviceImport, reason); + + destination?.CheckForReverseEntry(to, serviceImport, false); + } + + /// + /// Gets a service import for a specific destination account and subject key. + /// Lock must be held by caller. + /// Mirrors Go (a *Account) getServiceImportForAccountLocked(...). + /// + internal ServiceImportEntry? GetServiceImportForAccountLocked(string destinationAccountName, string subject) + { + if (Imports.Services == null || !Imports.Services.TryGetValue(subject, out var serviceImports)) + return null; + + if (serviceImports.Count == 1 && serviceImports[0].Account?.Name == destinationAccountName) + return serviceImports[0]; + + foreach (var serviceImport in serviceImports) + { + if (serviceImport.Account?.Name == destinationAccountName) + return serviceImport; + } + + return null; + } + + /// + /// Removes a service import mapping by destination account name and subject key. + /// Mirrors Go (a *Account) removeServiceImport(dstAccName, subject string). + /// + internal void RemoveServiceImport(string destinationAccountName, string subject) + { + ServiceImportEntry? removed = null; + byte[]? sid = null; + + _mu.EnterWriteLock(); + try + { + if (Imports.Services == null || !Imports.Services.TryGetValue(subject, out var serviceImports)) + return; + + if (serviceImports.Count == 1) + { + if (serviceImports[0].Account?.Name == destinationAccountName) + { + removed = serviceImports[0]; + Imports.Services.Remove(subject); + } + } + else + { + for (var i = 0; i < serviceImports.Count; i++) + { + if (serviceImports[i].Account?.Name == destinationAccountName) + { + removed = serviceImports[i]; + serviceImports.RemoveAt(i); + Imports.Services[subject] = serviceImports; + break; + } + } + } + + if (removed?.SubscriptionId is { Length: > 0 }) + sid = removed.SubscriptionId; + } + finally + { + _mu.ExitWriteLock(); + } + + if (sid != null && InternalClient != null) + InternalClient.ProcessUnsub(sid); + } + + /// + /// Adds an entry to the reverse-response map for response cleanup. + /// Mirrors Go (a *Account) addReverseRespMapEntry(...). + /// + internal void AddReverseRespMapEntry(Account account, string reply, string from) + { + _mu.EnterWriteLock(); + try + { + Imports.ReverseResponseMap ??= new Dictionary>(StringComparer.Ordinal); + if (!Imports.ReverseResponseMap.TryGetValue(reply, out var entries)) + { + entries = []; + Imports.ReverseResponseMap[reply] = entries; + } + + entries.Add(new ServiceRespEntry + { + Account = account, + MappedSubject = from, + }); + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Checks reverse-response entries for wildcard replies. + /// Mirrors Go (a *Account) checkForReverseEntries(...). + /// + internal void CheckForReverseEntries(string reply, bool checkInterest, bool recursed) + { + if (!SubscriptionIndex.SubjectHasWildcard(reply)) + { + CheckForReverseEntry(reply, null, checkInterest, recursed); + return; + } + + List replies; + _mu.EnterReadLock(); + try + { + if (Imports.ReverseResponseMap == null || Imports.ReverseResponseMap.Count == 0) + return; + replies = [.. Imports.ReverseResponseMap.Keys]; + } + finally + { + _mu.ExitReadLock(); + } + + var replyTokens = SubjectTransform.TokenizeSubject(reply); + foreach (var candidate in replies) + { + if (SubjectTransform.IsSubsetMatch(SubjectTransform.TokenizeSubject(candidate), reply)) + CheckForReverseEntry(candidate, null, checkInterest, recursed); + else if (SubjectTransform.IsSubsetMatch(replyTokens, candidate)) + CheckForReverseEntry(candidate, null, checkInterest, recursed); + } + } + + /// + /// Checks and optionally removes reverse-response entries. + /// Mirrors Go (a *Account) checkForReverseEntry(...). + /// + internal void CheckForReverseEntry(string reply, ServiceImportEntry? serviceImport, bool checkInterest) => + CheckForReverseEntry(reply, serviceImport, checkInterest, false); + + /// + /// Internal reverse-entry checker with recursion protection. + /// Mirrors Go (a *Account) _checkForReverseEntry(...). + /// + internal void CheckForReverseEntry(string reply, ServiceImportEntry? serviceImport, bool checkInterest, bool recursed) + { + List? responseEntries; + + _mu.EnterReadLock(); + try + { + if (Imports.ReverseResponseMap == null || Imports.ReverseResponseMap.Count == 0) + return; + + if (SubscriptionIndex.SubjectHasWildcard(reply)) + { + if (recursed) + return; + } + else if (!Imports.ReverseResponseMap.TryGetValue(reply, out responseEntries) || responseEntries == null) + { + return; + } + else if (checkInterest && Sublist != null && Sublist.HasInterest(reply)) + { + return; + } + } + finally + { + _mu.ExitReadLock(); + } + + if (SubscriptionIndex.SubjectHasWildcard(reply)) + { + CheckForReverseEntries(reply, checkInterest, true); + return; + } + + _mu.EnterWriteLock(); + try + { + if (Imports.ReverseResponseMap == null || !Imports.ReverseResponseMap.TryGetValue(reply, out responseEntries) || responseEntries == null) + return; + + if (serviceImport == null) + { + Imports.ReverseResponseMap.Remove(reply); + } + else + { + responseEntries.RemoveAll(entry => entry.MappedSubject == serviceImport.From); + + if (responseEntries.Count == 0) + Imports.ReverseResponseMap.Remove(reply); + else + Imports.ReverseResponseMap[reply] = responseEntries; + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Returns true when a service import is overshadowed by an existing subject key. + /// Mirrors Go (a *Account) serviceImportShadowed(from string) bool. + /// + internal bool ServiceImportShadowed(string from) + { + _mu.EnterReadLock(); + try + { + if (Imports.Services == null) + return false; + if (Imports.Services.ContainsKey(from)) + return true; + + foreach (var subject in Imports.Services.Keys) + { + if (SubscriptionIndex.SubjectIsSubsetMatch(from, subject)) + return true; + } + + return false; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Returns true when a service import already exists for destination account + source subject. + /// Mirrors Go (a *Account) serviceImportExists(dstAccName, from string) bool. + /// + internal bool ServiceImportExists(string destinationAccountName, string from) + { + _mu.EnterReadLock(); + try + { + return GetServiceImportForAccountLocked(destinationAccountName, from) != null; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Creates (or returns existing) internal account client. + /// Lock must be held. + /// Mirrors Go (a *Account) internalClient() *client. + /// + internal ClientConnection? InternalAccountClient() + { + if (InternalClient == null && Server is NatsServer server) + { + InternalClient = server.CreateInternalAccountClient(); + InternalClient.Account = this; + } + + return InternalClient; + } + + /// + /// Creates internal account-scoped subscription. + /// Mirrors Go (a *Account) subscribeInternal(...). + /// + internal (Subscription? Sub, Exception? Error) SubscribeInternal(string subject) => + SubscribeInternalEx(subject, false); + + /// + /// Unsubscribes from an internal account subscription. + /// Mirrors Go (a *Account) unsubscribeInternal(sub *subscription). + /// + internal void UnsubscribeInternal(Subscription? sub) + { + if (sub?.Sid == null) + return; + + _mu.EnterReadLock(); + var internalClient = InternalClient; + _mu.ExitReadLock(); + internalClient?.ProcessUnsub(sub.Sid); + } + + /// + /// Creates internal subscription for service-import responses. + /// Mirrors Go (a *Account) subscribeServiceImportResponse(subject string). + /// + internal (Subscription? Sub, Exception? Error) SubscribeServiceImportResponse(string subject) => + SubscribeInternalEx(subject, true); + + /// + /// Extended internal subscription helper. + /// Mirrors Go (a *Account) subscribeInternalEx(...). + /// + internal (Subscription? Sub, Exception? Error) SubscribeInternalEx(string subject, bool responseImport) + { + ClientConnection? client; + string sidText; + + _mu.EnterWriteLock(); + try + { + _isid++; + client = InternalAccountClient(); + sidText = _isid.ToString(); + } + finally + { + _mu.ExitWriteLock(); + } + + if (client == null) + return (null, new InvalidOperationException("no internal account client")); + + return client.ProcessSubEx(Encoding.ASCII.GetBytes(subject), null, Encoding.ASCII.GetBytes(sidText), false, false, responseImport); + } + + /// + /// Adds an internal subscription that matches a service import's from subject. + /// Mirrors Go (a *Account) addServiceImportSub(si *serviceImport) error. + /// + internal Exception? AddServiceImportSub(ServiceImportEntry serviceImport) + { + if (serviceImport == null) + return ServerErrors.ErrMissingService; + + ClientConnection? client; + string sidText; + string subject; + + _mu.EnterWriteLock(); + try + { + client = InternalAccountClient(); + if (client == null) + return null; + if (serviceImport.SubscriptionId is { Length: > 0 }) + return new InvalidOperationException("duplicate call to create subscription for service import"); + + _isid++; + sidText = _isid.ToString(); + serviceImport.SubscriptionId = Encoding.ASCII.GetBytes(sidText); + subject = serviceImport.From; + } + finally + { + _mu.ExitWriteLock(); + } + + var (_, err) = client.ProcessSubEx(Encoding.ASCII.GetBytes(subject), null, Encoding.ASCII.GetBytes(sidText), true, true, false); + return err; + } + + /// + /// Removes all subscriptions associated with service imports. + /// Mirrors Go (a *Account) removeAllServiceImportSubs(). + /// + internal void RemoveAllServiceImportSubs() + { + List subscriptionIds = []; + ClientConnection? internalClient; + + _mu.EnterWriteLock(); + try + { + if (Imports.Services != null) + { + foreach (var imports in Imports.Services.Values) + { + foreach (var serviceImport in imports) + { + if (serviceImport.SubscriptionId is { Length: > 0 }) + { + subscriptionIds.Add(serviceImport.SubscriptionId); + serviceImport.SubscriptionId = null; + } + } + } + } + + internalClient = InternalClient; + InternalClient = null; + } + finally + { + _mu.ExitWriteLock(); + } + + if (internalClient == null) + return; + + foreach (var sid in subscriptionIds) + internalClient.ProcessUnsub(sid); + + internalClient.CloseConnection(ClosedState.InternalClient); + } + + /// + /// Adds subscriptions for all registered service imports. + /// Mirrors Go (a *Account) addAllServiceImportSubs(). + /// + internal void AddAllServiceImportSubs() + { + List imports = []; + + _mu.EnterReadLock(); + try + { + if (Imports.Services != null) + { + foreach (var entries in Imports.Services.Values) + imports.AddRange(entries); + } + } + finally + { + _mu.ExitReadLock(); + } + + foreach (var serviceImport in imports) + _ = AddServiceImportSub(serviceImport); + } + + /// + /// Processes a service-import response routed to this account. + /// Mirrors Go (a *Account) processServiceImportResponse(...). + /// + internal void ProcessServiceImportResponse(string subject, byte[] msg) + { + ServiceImportEntry? serviceImport; + + _mu.EnterReadLock(); + try + { + if (IsExpired() || Exports.Responses == null || Exports.Responses.Count == 0) + return; + + if (!Exports.Responses.TryGetValue(subject, out serviceImport)) + return; + if (serviceImport == null || serviceImport.Invalid) + return; + } + finally + { + _mu.ExitReadLock(); + } + + // The client-side response processing pipeline is still under active porting. + serviceImport.DidDeliver = msg.Length >= 0; + } + + /// + /// Creates response wildcard prefix for service replies. + /// Lock must be held by caller. + /// Mirrors Go (a *Account) createRespWildcard(). + /// + internal void CreateRespWildcard() + { + const string alphabet = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; + Span prefix = stackalloc byte[14]; + prefix[0] = (byte)'_'; + prefix[1] = (byte)'R'; + prefix[2] = (byte)'_'; + prefix[3] = (byte)'.'; + + ulong random = (ulong)Random.Shared.NextInt64(); + for (var i = 4; i < prefix.Length; i++) + { + prefix[i] = (byte)alphabet[(int)(random % (ulong)alphabet.Length)]; + random /= (ulong)alphabet.Length; + } + + ServiceImportReply = [.. prefix, (byte)'.']; + } + // ------------------------------------------------------------------------- // Export checks // ------------------------------------------------------------------------- diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs index 60bc7d8..b0dea47 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs @@ -1682,6 +1682,18 @@ public sealed partial class ClientConnection } } + internal void ProcessUnsub(byte[] sid) + { + lock (_mu) + { + if (Subs == null) + return; + + var sidText = Encoding.ASCII.GetString(sid); + Subs.Remove(sidText); + } + } + // features 440-441: processInfo, processErr internal void ProcessInfo(string info) { diff --git a/porting.db b/porting.db index 6ec2d78..b8b7dd8 100644 Binary files a/porting.db and b/porting.db differ