diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs index a6ef4db..c12203e 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs @@ -2768,6 +2768,420 @@ public sealed class Account : INatsAccount ServiceImportReply = [.. prefix, (byte)'.']; } + /// + /// Generates a new service reply subject. + /// Mirrors Go (a *Account) newServiceReply(tracking bool) []byte. + /// + internal byte[] NewServiceReply(bool tracking) + { + bool createdPrefix = false; + byte[] replyPrefix; + + _mu.EnterWriteLock(); + try + { + if (ServiceImportReply == null) + { + CreateRespWildcard(); + createdPrefix = true; + } + + replyPrefix = ServiceImportReply ?? Encoding.ASCII.GetBytes("_R_."); + } + finally + { + _mu.ExitWriteLock(); + } + + if (createdPrefix) + _ = SubscribeServiceImportResponse(Encoding.ASCII.GetString([.. replyPrefix, (byte)'>'])); + + const string alphabet = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; + Span randomPart = stackalloc byte[20]; + ulong random = (ulong)Random.Shared.NextInt64(); + for (var i = 0; i < randomPart.Length; i++) + { + randomPart[i] = (byte)alphabet[(int)(random % (ulong)alphabet.Length)]; + random /= (ulong)alphabet.Length; + } + + var reply = new List(replyPrefix.Length + randomPart.Length + 2); + reply.AddRange(replyPrefix); + reply.AddRange(randomPart.ToArray()); + + if (tracking) + { + reply.Add((byte)'.'); + reply.Add((byte)'T'); + } + + return [.. reply]; + } + + /// + /// Returns the response threshold for an exported service. + /// Mirrors Go (a *Account) ServiceExportResponseThreshold(...). + /// + public (TimeSpan Threshold, Exception? Error) ServiceExportResponseThreshold(string export) + { + _mu.EnterReadLock(); + try + { + var serviceExport = GetServiceExport(export); + if (serviceExport == null) + return (TimeSpan.Zero, new InvalidOperationException($"no export defined for \"{export}\"")); + return (serviceExport.ResponseThreshold, null); + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Sets max response delivery time for an exported service. + /// Mirrors Go (a *Account) SetServiceExportResponseThreshold(...). + /// + public Exception? SetServiceExportResponseThreshold(string export, TimeSpan maxTime) + { + _mu.EnterWriteLock(); + try + { + if (IsClaimAccount()) + return new InvalidOperationException("claim based accounts can not be updated directly"); + + var serviceExport = GetServiceExport(export); + if (serviceExport == null) + return new InvalidOperationException($"no export defined for \"{export}\""); + + serviceExport.ResponseThreshold = maxTime; + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Enables/disables cross-account trace propagation on a service export. + /// Mirrors Go (a *Account) SetServiceExportAllowTrace(...). + /// + public Exception? SetServiceExportAllowTrace(string export, bool allowTrace) + { + _mu.EnterWriteLock(); + try + { + var serviceExport = GetServiceExport(export); + if (serviceExport == null) + return new InvalidOperationException($"no export defined for \"{export}\""); + + serviceExport.AllowTrace = allowTrace; + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Creates internal response service import entry. + /// Mirrors Go (a *Account) addRespServiceImport(...). + /// + internal ServiceImportEntry AddRespServiceImport(Account destination, string to, ServiceImportEntry originalServiceImport, bool tracking, Dictionary? header) + { + var newReply = Encoding.ASCII.GetString(originalServiceImport.Account?.NewServiceReply(tracking) ?? NewServiceReply(tracking)); + + ServiceImportEntry responseImport; + _mu.EnterWriteLock(); + try + { + responseImport = new ServiceImportEntry + { + Account = destination, + ServiceExport = originalServiceImport.ServiceExport, + From = newReply, + To = to, + ResponseType = originalServiceImport.ResponseType, + IsResponse = true, + Share = originalServiceImport.Share, + Timestamp = UtcNowUnixNanos(), + Tracking = tracking && originalServiceImport.ResponseType == ServiceRespType.Singleton, + TrackingHeader = header, + Latency = tracking && originalServiceImport.ResponseType == ServiceRespType.Singleton + ? originalServiceImport.Latency + : null, + }; + + Exports.Responses ??= new Dictionary(StringComparer.Ordinal); + Exports.Responses[newReply] = responseImport; + } + finally + { + _mu.ExitWriteLock(); + } + + destination.AddReverseRespMapEntry(this, to, newReply); + return responseImport; + } + + /// + /// Adds stream import with optional claim context. + /// Mirrors Go (a *Account) AddStreamImportWithClaim(...). + /// + public Exception? AddStreamImportWithClaim(Account account, string from, string prefix, object? importClaim) => + AddStreamImportWithClaimInternal(account, from, prefix, false, importClaim); + + /// + /// Internal stream import add helper. + /// Mirrors Go (a *Account) addStreamImportWithClaim(...). + /// + internal Exception? AddStreamImportWithClaimInternal(Account account, string from, string prefix, bool allowTrace, object? importClaim) + { + if (account == null) + return ServerErrors.ErrMissingAccount; + if (!account.CheckStreamImportAuthorized(this, from, importClaim)) + return ServerErrors.ErrStreamImportAuthorization; + + if (!string.IsNullOrEmpty(prefix)) + { + if (SubscriptionIndex.SubjectHasWildcard(prefix)) + return ServerErrors.ErrStreamImportBadPrefix; + if (!prefix.EndsWith(".", StringComparison.Ordinal)) + prefix += '.'; + } + + return AddMappedStreamImportWithClaimInternal(account, from, prefix + from, allowTrace, importClaim); + } + + /// + /// Convenience helper for mapped stream imports without claim. + /// Mirrors Go (a *Account) AddMappedStreamImport(...). + /// + public Exception? AddMappedStreamImport(Account account, string from, string to) => + AddMappedStreamImportWithClaim(account, from, to, null); + + /// + /// Adds mapped stream import with optional claim. + /// Mirrors Go (a *Account) AddMappedStreamImportWithClaim(...). + /// + public Exception? AddMappedStreamImportWithClaim(Account account, string from, string to, object? importClaim) => + AddMappedStreamImportWithClaimInternal(account, from, to, false, importClaim); + + /// + /// Internal mapped stream import add helper. + /// Mirrors Go (a *Account) addMappedStreamImportWithClaim(...). + /// + internal Exception? AddMappedStreamImportWithClaimInternal(Account account, string from, string to, bool allowTrace, object? importClaim) + { + if (account == null) + return ServerErrors.ErrMissingAccount; + if (!account.CheckStreamImportAuthorized(this, from, importClaim)) + return ServerErrors.ErrStreamImportAuthorization; + + if (string.IsNullOrEmpty(to)) + to = from; + + var cycleErr = StreamImportFormsCycle(account, to) ?? StreamImportFormsCycle(account, from); + if (cycleErr != null) + return cycleErr; + + ISubjectTransformer? transform = null; + var usePublishedSubject = false; + if (SubscriptionIndex.SubjectHasWildcard(from)) + { + if (to == from) + { + usePublishedSubject = true; + } + else + { + var (created, err) = SubjectTransform.New(from, to); + if (err != null) + return new InvalidOperationException($"failed to create mapping transform for stream import subject from \"{from}\" to \"{to}\": {err.Message}"); + transform = created; + } + } + + _mu.EnterWriteLock(); + try + { + if (IsStreamImportDuplicate(account, from)) + return ServerErrors.ErrStreamImportDuplicate; + + Imports.Streams ??= []; + Imports.Streams.Add(new StreamImportEntry + { + Account = account, + From = from, + To = to, + Transform = transform, + Claim = importClaim, + UsePublishedSubject = usePublishedSubject, + AllowTrace = allowTrace, + }); + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Checks if stream import duplicate exists. Lock should be held. + /// Mirrors Go (a *Account) isStreamImportDuplicate(...). + /// + internal bool IsStreamImportDuplicate(Account account, string from) + { + if (Imports.Streams == null) + return false; + + foreach (var streamImport in Imports.Streams) + { + if (ReferenceEquals(streamImport.Account, account) && streamImport.From == from) + return true; + } + return false; + } + + /// + /// Adds stream import from a specific account. + /// Mirrors Go (a *Account) AddStreamImport(...). + /// + public Exception? AddStreamImport(Account account, string from, string prefix) => + AddStreamImportWithClaimInternal(account, from, prefix, false, null); + + /// + /// Adds stream export, optionally restricted to explicit accounts. + /// Mirrors Go (a *Account) AddStreamExport(...). + /// + public Exception? AddStreamExport(string subject, IReadOnlyList? accounts = null) => + AddStreamExportWithAccountPos(subject, accounts, 0); + + /// + /// Adds stream export with account-position matching. + /// Mirrors Go (a *Account) addStreamExportWithAccountPos(...). + /// + public Exception? AddStreamExportWithAccountPos(string subject, IReadOnlyList? accounts, uint accountPos) + { + if (!SubscriptionIndex.IsValidSubject(subject)) + return ServerErrors.ErrBadSubject; + + _mu.EnterWriteLock(); + try + { + Exports.Streams ??= new Dictionary(StringComparer.Ordinal); + Exports.Streams.TryGetValue(subject, out var export); + export ??= new StreamExport(); + + if (accounts != null || accountPos > 0) + { + var authErr = SetExportAuth(export, subject, accounts, accountPos); + if (authErr != null) + return authErr; + } + + Exports.Streams[subject] = export; + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Checks stream import authorization with account lock. + /// Mirrors Go (a *Account) checkStreamImportAuthorized(...). + /// + internal bool CheckStreamImportAuthorized(Account account, string subject, object? importClaim) + { + _mu.EnterReadLock(); + try { return CheckStreamImportAuthorizedNoLock(account, subject, importClaim); } + finally { _mu.ExitReadLock(); } + } + + /// + /// Checks stream import authorization assuming lock is already held. + /// Mirrors Go (a *Account) checkStreamImportAuthorizedNoLock(...). + /// + internal bool CheckStreamImportAuthorizedNoLock(Account account, string subject, object? importClaim) + { + if (Exports.Streams == null || !SubscriptionIndex.IsValidSubject(subject)) + return false; + return CheckStreamExportApproved(account, subject, importClaim); + } + + /// + /// Gets wildcard-matching service export for subject. + /// Lock should be held. + /// Mirrors Go (a *Account) getWildcardServiceExport(from string). + /// + internal ServiceExportEntry? GetWildcardServiceExport(string from) + { + if (Exports.Services == null) + return null; + + var tokens = SubjectTransform.TokenizeSubject(from); + foreach (var (subject, serviceExport) in Exports.Services) + { + if (SubjectTransform.IsSubsetMatch(tokens, subject)) + return serviceExport; + } + return null; + } + + /// + /// Handles stream import activation expiration. + /// Mirrors Go (a *Account) streamActivationExpired(...). + /// + internal void StreamActivationExpired(Account exportAccount, string subject) + { + _mu.EnterWriteLock(); + try + { + if (IsExpired() || Imports.Streams == null) + return; + + foreach (var streamImport in Imports.Streams) + { + if (ReferenceEquals(streamImport.Account, exportAccount) && streamImport.From == subject) + { + streamImport.Invalid = true; + return; + } + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Handles service import activation expiration. + /// Mirrors Go (a *Account) serviceActivationExpired(...). + /// + internal void ServiceActivationExpired(Account destinationAccount, string subject) + { + _mu.EnterWriteLock(); + try + { + if (IsExpired() || Imports.Services == null) + return; + + var serviceImport = GetServiceImportForAccountLocked(destinationAccount.Name, subject); + if (serviceImport != null) + serviceImport.Invalid = true; + } + finally + { + _mu.ExitWriteLock(); + } + } + // ------------------------------------------------------------------------- // Export checks // ------------------------------------------------------------------------- diff --git a/porting.db b/porting.db index b8b7dd8..b519e66 100644 Binary files a/porting.db and b/porting.db differ