From 549370328004cc0a9c39d7b93b06f70ca302920c Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 20:15:59 -0500 Subject: [PATCH] feat(batch19): implement service reply and stream import/export methods --- .../ZB.MOM.NatsNet.Server/Accounts/Account.cs | 414 ++++++++++++++++++ porting.db | Bin 6684672 -> 6688768 bytes 2 files changed, 414 insertions(+) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs index a6ef4db..c12203e 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs @@ -2768,6 +2768,420 @@ public sealed class Account : INatsAccount ServiceImportReply = [.. prefix, (byte)'.']; } + /// + /// Generates a new service reply subject. + /// Mirrors Go (a *Account) newServiceReply(tracking bool) []byte. + /// + internal byte[] NewServiceReply(bool tracking) + { + bool createdPrefix = false; + byte[] replyPrefix; + + _mu.EnterWriteLock(); + try + { + if (ServiceImportReply == null) + { + CreateRespWildcard(); + createdPrefix = true; + } + + replyPrefix = ServiceImportReply ?? Encoding.ASCII.GetBytes("_R_."); + } + finally + { + _mu.ExitWriteLock(); + } + + if (createdPrefix) + _ = SubscribeServiceImportResponse(Encoding.ASCII.GetString([.. replyPrefix, (byte)'>'])); + + const string alphabet = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; + Span randomPart = stackalloc byte[20]; + ulong random = (ulong)Random.Shared.NextInt64(); + for (var i = 0; i < randomPart.Length; i++) + { + randomPart[i] = (byte)alphabet[(int)(random % (ulong)alphabet.Length)]; + random /= (ulong)alphabet.Length; + } + + var reply = new List(replyPrefix.Length + randomPart.Length + 2); + reply.AddRange(replyPrefix); + reply.AddRange(randomPart.ToArray()); + + if (tracking) + { + reply.Add((byte)'.'); + reply.Add((byte)'T'); + } + + return [.. reply]; + } + + /// + /// Returns the response threshold for an exported service. + /// Mirrors Go (a *Account) ServiceExportResponseThreshold(...). + /// + public (TimeSpan Threshold, Exception? Error) ServiceExportResponseThreshold(string export) + { + _mu.EnterReadLock(); + try + { + var serviceExport = GetServiceExport(export); + if (serviceExport == null) + return (TimeSpan.Zero, new InvalidOperationException($"no export defined for \"{export}\"")); + return (serviceExport.ResponseThreshold, null); + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Sets max response delivery time for an exported service. + /// Mirrors Go (a *Account) SetServiceExportResponseThreshold(...). + /// + public Exception? SetServiceExportResponseThreshold(string export, TimeSpan maxTime) + { + _mu.EnterWriteLock(); + try + { + if (IsClaimAccount()) + return new InvalidOperationException("claim based accounts can not be updated directly"); + + var serviceExport = GetServiceExport(export); + if (serviceExport == null) + return new InvalidOperationException($"no export defined for \"{export}\""); + + serviceExport.ResponseThreshold = maxTime; + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Enables/disables cross-account trace propagation on a service export. + /// Mirrors Go (a *Account) SetServiceExportAllowTrace(...). + /// + public Exception? SetServiceExportAllowTrace(string export, bool allowTrace) + { + _mu.EnterWriteLock(); + try + { + var serviceExport = GetServiceExport(export); + if (serviceExport == null) + return new InvalidOperationException($"no export defined for \"{export}\""); + + serviceExport.AllowTrace = allowTrace; + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Creates internal response service import entry. + /// Mirrors Go (a *Account) addRespServiceImport(...). + /// + internal ServiceImportEntry AddRespServiceImport(Account destination, string to, ServiceImportEntry originalServiceImport, bool tracking, Dictionary? header) + { + var newReply = Encoding.ASCII.GetString(originalServiceImport.Account?.NewServiceReply(tracking) ?? NewServiceReply(tracking)); + + ServiceImportEntry responseImport; + _mu.EnterWriteLock(); + try + { + responseImport = new ServiceImportEntry + { + Account = destination, + ServiceExport = originalServiceImport.ServiceExport, + From = newReply, + To = to, + ResponseType = originalServiceImport.ResponseType, + IsResponse = true, + Share = originalServiceImport.Share, + Timestamp = UtcNowUnixNanos(), + Tracking = tracking && originalServiceImport.ResponseType == ServiceRespType.Singleton, + TrackingHeader = header, + Latency = tracking && originalServiceImport.ResponseType == ServiceRespType.Singleton + ? originalServiceImport.Latency + : null, + }; + + Exports.Responses ??= new Dictionary(StringComparer.Ordinal); + Exports.Responses[newReply] = responseImport; + } + finally + { + _mu.ExitWriteLock(); + } + + destination.AddReverseRespMapEntry(this, to, newReply); + return responseImport; + } + + /// + /// Adds stream import with optional claim context. + /// Mirrors Go (a *Account) AddStreamImportWithClaim(...). + /// + public Exception? AddStreamImportWithClaim(Account account, string from, string prefix, object? importClaim) => + AddStreamImportWithClaimInternal(account, from, prefix, false, importClaim); + + /// + /// Internal stream import add helper. + /// Mirrors Go (a *Account) addStreamImportWithClaim(...). + /// + internal Exception? AddStreamImportWithClaimInternal(Account account, string from, string prefix, bool allowTrace, object? importClaim) + { + if (account == null) + return ServerErrors.ErrMissingAccount; + if (!account.CheckStreamImportAuthorized(this, from, importClaim)) + return ServerErrors.ErrStreamImportAuthorization; + + if (!string.IsNullOrEmpty(prefix)) + { + if (SubscriptionIndex.SubjectHasWildcard(prefix)) + return ServerErrors.ErrStreamImportBadPrefix; + if (!prefix.EndsWith(".", StringComparison.Ordinal)) + prefix += '.'; + } + + return AddMappedStreamImportWithClaimInternal(account, from, prefix + from, allowTrace, importClaim); + } + + /// + /// Convenience helper for mapped stream imports without claim. + /// Mirrors Go (a *Account) AddMappedStreamImport(...). + /// + public Exception? AddMappedStreamImport(Account account, string from, string to) => + AddMappedStreamImportWithClaim(account, from, to, null); + + /// + /// Adds mapped stream import with optional claim. + /// Mirrors Go (a *Account) AddMappedStreamImportWithClaim(...). + /// + public Exception? AddMappedStreamImportWithClaim(Account account, string from, string to, object? importClaim) => + AddMappedStreamImportWithClaimInternal(account, from, to, false, importClaim); + + /// + /// Internal mapped stream import add helper. + /// Mirrors Go (a *Account) addMappedStreamImportWithClaim(...). + /// + internal Exception? AddMappedStreamImportWithClaimInternal(Account account, string from, string to, bool allowTrace, object? importClaim) + { + if (account == null) + return ServerErrors.ErrMissingAccount; + if (!account.CheckStreamImportAuthorized(this, from, importClaim)) + return ServerErrors.ErrStreamImportAuthorization; + + if (string.IsNullOrEmpty(to)) + to = from; + + var cycleErr = StreamImportFormsCycle(account, to) ?? StreamImportFormsCycle(account, from); + if (cycleErr != null) + return cycleErr; + + ISubjectTransformer? transform = null; + var usePublishedSubject = false; + if (SubscriptionIndex.SubjectHasWildcard(from)) + { + if (to == from) + { + usePublishedSubject = true; + } + else + { + var (created, err) = SubjectTransform.New(from, to); + if (err != null) + return new InvalidOperationException($"failed to create mapping transform for stream import subject from \"{from}\" to \"{to}\": {err.Message}"); + transform = created; + } + } + + _mu.EnterWriteLock(); + try + { + if (IsStreamImportDuplicate(account, from)) + return ServerErrors.ErrStreamImportDuplicate; + + Imports.Streams ??= []; + Imports.Streams.Add(new StreamImportEntry + { + Account = account, + From = from, + To = to, + Transform = transform, + Claim = importClaim, + UsePublishedSubject = usePublishedSubject, + AllowTrace = allowTrace, + }); + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Checks if stream import duplicate exists. Lock should be held. + /// Mirrors Go (a *Account) isStreamImportDuplicate(...). + /// + internal bool IsStreamImportDuplicate(Account account, string from) + { + if (Imports.Streams == null) + return false; + + foreach (var streamImport in Imports.Streams) + { + if (ReferenceEquals(streamImport.Account, account) && streamImport.From == from) + return true; + } + return false; + } + + /// + /// Adds stream import from a specific account. + /// Mirrors Go (a *Account) AddStreamImport(...). + /// + public Exception? AddStreamImport(Account account, string from, string prefix) => + AddStreamImportWithClaimInternal(account, from, prefix, false, null); + + /// + /// Adds stream export, optionally restricted to explicit accounts. + /// Mirrors Go (a *Account) AddStreamExport(...). + /// + public Exception? AddStreamExport(string subject, IReadOnlyList? accounts = null) => + AddStreamExportWithAccountPos(subject, accounts, 0); + + /// + /// Adds stream export with account-position matching. + /// Mirrors Go (a *Account) addStreamExportWithAccountPos(...). + /// + public Exception? AddStreamExportWithAccountPos(string subject, IReadOnlyList? accounts, uint accountPos) + { + if (!SubscriptionIndex.IsValidSubject(subject)) + return ServerErrors.ErrBadSubject; + + _mu.EnterWriteLock(); + try + { + Exports.Streams ??= new Dictionary(StringComparer.Ordinal); + Exports.Streams.TryGetValue(subject, out var export); + export ??= new StreamExport(); + + if (accounts != null || accountPos > 0) + { + var authErr = SetExportAuth(export, subject, accounts, accountPos); + if (authErr != null) + return authErr; + } + + Exports.Streams[subject] = export; + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Checks stream import authorization with account lock. + /// Mirrors Go (a *Account) checkStreamImportAuthorized(...). + /// + internal bool CheckStreamImportAuthorized(Account account, string subject, object? importClaim) + { + _mu.EnterReadLock(); + try { return CheckStreamImportAuthorizedNoLock(account, subject, importClaim); } + finally { _mu.ExitReadLock(); } + } + + /// + /// Checks stream import authorization assuming lock is already held. + /// Mirrors Go (a *Account) checkStreamImportAuthorizedNoLock(...). + /// + internal bool CheckStreamImportAuthorizedNoLock(Account account, string subject, object? importClaim) + { + if (Exports.Streams == null || !SubscriptionIndex.IsValidSubject(subject)) + return false; + return CheckStreamExportApproved(account, subject, importClaim); + } + + /// + /// Gets wildcard-matching service export for subject. + /// Lock should be held. + /// Mirrors Go (a *Account) getWildcardServiceExport(from string). + /// + internal ServiceExportEntry? GetWildcardServiceExport(string from) + { + if (Exports.Services == null) + return null; + + var tokens = SubjectTransform.TokenizeSubject(from); + foreach (var (subject, serviceExport) in Exports.Services) + { + if (SubjectTransform.IsSubsetMatch(tokens, subject)) + return serviceExport; + } + return null; + } + + /// + /// Handles stream import activation expiration. + /// Mirrors Go (a *Account) streamActivationExpired(...). + /// + internal void StreamActivationExpired(Account exportAccount, string subject) + { + _mu.EnterWriteLock(); + try + { + if (IsExpired() || Imports.Streams == null) + return; + + foreach (var streamImport in Imports.Streams) + { + if (ReferenceEquals(streamImport.Account, exportAccount) && streamImport.From == subject) + { + streamImport.Invalid = true; + return; + } + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Handles service import activation expiration. + /// Mirrors Go (a *Account) serviceActivationExpired(...). + /// + internal void ServiceActivationExpired(Account destinationAccount, string subject) + { + _mu.EnterWriteLock(); + try + { + if (IsExpired() || Imports.Services == null) + return; + + var serviceImport = GetServiceImportForAccountLocked(destinationAccount.Name, subject); + if (serviceImport != null) + serviceImport.Invalid = true; + } + finally + { + _mu.ExitWriteLock(); + } + } + // ------------------------------------------------------------------------- // Export checks // ------------------------------------------------------------------------- diff --git a/porting.db b/porting.db index b8b7dd8af34c1826986988f973757d3fe1ccbe0c..b519e6617c5988917a52b8ff0332b00143fbe7a6 100644 GIT binary patch delta 5072 zcmb_fYitzP6`q-$-FfWX-Q)Gz*j|slh8Tl`?R^-(Z~_Km2sRinBw!x#njJt@3@*0U z4RP=z(M1hKvYV325((@mYMN9jn6#`^g%qh#RVP(i)K;lb)Y?_mMs3-mZ=O?QYlT~cz zH9@@04D$ho9G+ndTymZ)^WWnqgr5oT3fG7@!)_4yZ}KnjG5$F5k8ov)D>E$17b2U= z9XZ+E^R(8dv3VxXn|S9B>_%>AKAdF=gUNH7JeSEsCQmVWPLn5_JW2NsrA8;3f`Z9& zm^{|x`AlAc$;&5q{=~k=JAcDIop|*V_Bd;9NVbXWNv!-cd!4ORwlZ~5?WlI@lon+r z>XpxXBJ~RAJ%M^I;`o%^$SZF$<%!l$*%LR9vTo%kOtiPJzl(KIt3$UCwRY;3Lak=q za#E{Nw`6LCbW5VvD%}#PRis-2wY<9Jpq8v#Y{LI0_kyiMvgL644?g$5l8FV>G7|Ht zWhCZN%SiOjXJpUwJoCJ)c^)MG_ZH_^E?!Yun7J*PP=3KRvYgUFrfTx`Brf0P-jW?O z&>3$#yX#|)!bZVU*eM(o0)Tu79G>m1+4SX~Ib>I77UF%mn@(L|G=zRF%0QZJL=i|U}xBX^IXJklbm zKwh><#!`6o-9fkKhgl5UJ{0tA;@BLr|83--Bg!QF6}k)7$Z9j;zHsEvRZz&Zq`qM; zD3f!a6{fFk;v-o-m)r%nkeW*_C~8^S@W`~H-Z5&>a8-KbbEIBLO$*3RU23r&b?XBi z6-V9lfV7Nck?|NwC zhB6GRFs#NA%r1}p&G+l3^f>PG1OtG$IyUb9fn2>k6?Hd z!+H!GFf?J^?$x^Re06-KJuT2IHnGitkuJYxc!pMN%dh3}h}7 zi*MIe+H%_O&&WsqF|D<&uKv=n0u7<+SiIHBCs{8=DhYg`EiYd%v9)>bZ9BNl($s_A{OW-Pwd%FljaAHuQt7Av2Oe|?cY(41y#PSyj* zMEQ?d`LNiSHe31Rs~4jWUK^^h$(rL@Ok<;!Pi|K(KJO-Ljvoe|p8Mk}98l^&@j6Yp#J3+-2$v(B31SZquURz9oqy$hVdaKe92 zo8i~YeOh_=ON{%l>m64C8c=?%gq`E^-{tMncO^+YCcGyscMRKqYj5N)CFmP|f?q~2 zy@)Ply7aQi~L9T$@n$%RPQ&YPnI!{@3vPjUgzkD=d<6c<0P{; zy4DzPc&NVG*lwBX?6c~~_vgcf>IvNJ%FR^eNvn!vY_|Y8lKlD!)zvRe>z=TRiN>W9 zs;ga^uI;r-2q}r`wM%l}?6FEXQWBx1kSLC~>k_8ZN&ia>(&KsX5|>()+S;WMD~j*7 x&c~O03W7^?Menjo6r?1Q$0f51tth_JDnWv~qx3?%+?D!@4~C3=+?dJB_J19R~@R6q(S#sAJg%lh!ko-_B} zbMKyen-A<&nw$43?Lo(GNz$S_!?W4$&!Yu-?29c2^UQ;JEc%&KJs>r!5t$;rtRP^_ zS3gj79G5n2yfy%~-+}ArB6ClQN!^8A+KZDRXm0@05&(b?BGN?1XAmux;j{ zD{Ku585NHlZUEWP;qI6@SJ`1UDXE?-DbtfOXCfo*8Y@y0k-6|1TX$@gbl>-{%CMq4h=y6wZA5M>>USw=vK3v!x5-xYCn9_8RYdmMKM>h#uOPD5UN*y@$XgXk ze_syG;*$m|-SXXfQ0Z?(cBPw$>`FHf*_Hl6WLN4l2R@axY`xPcmlC&c8mHu$j8dkj zSD2mUz9a6o^7Aq%pn?N5aDomlNX+q=<>!skU39%}xaer(Ug}xTCs6wdg?B^^Eu%~E zRy*=|p_(OMX1!9EG1mR1>xzEYc~ZOOxItNAw%yF#$cr7+Rl`4|z&w|rnp!@UdRsNG zvZ&QVkxVg)PSx^-bbWzSbd)H5S`g=v#1>v^i}UZTMb`IowcJPTb$q0?T7RR}l_Ie@ zYw6gwUO-!pc?I=d z#5eyD?xcbSo@;O1*ud}8;?mUrjcU~N6?f5(jXcYqcWDDZLfOY1c5k{HA0B+5k)Nj1 z{rI=hxv1X%79UGB_uSsLbxj-+zdrCo3Ire(hCvz(hY^qtBViN-&2>$oT`i-vk0h_u zBso^7jozF5U!%u;*43k5lA82XXQ8rLjxdqv`*LbllP+U3G;}Rd!@lD#$fSb184ds|;-C{eV zL&ceby15umdkpMsJ*|_5IA?s3sM^xVGnicD*+OKfDFilFl0eCjD|5V7II)5jE7v902ARccpN4{9z-A? zCc_kX0;a-~Fb$@|Q!oRbhMDjT%!1ia0EJKl&q6WGfw?db=EDMb4oYAlltLLi4~w82 z7Q+j$1YU#+SPIMFC0GtG!z-`?UWL~n3a>*Ytb{64^T!4*_%%;h@VHP@9L)d#