From 6daa31adf2b09049ba6dadc5197da003ec6cdd6d Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 19:38:46 -0500 Subject: [PATCH 1/8] chore(batch19): start accounts core batch --- porting.db | Bin 6664192 -> 6664192 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/porting.db b/porting.db index d71177bda24907deb9c2254c08eb32492af667f1..15af1aedbf22df00ef4836cbe138d3384227974a 100644 GIT binary patch delta 401 zcmYk#yE7wk0KoD6WhL1po81u3JK}PP$dPd3{W>lFA!Le$-#;ZI|>e*xKMGUq2nQ!JiO#n zz#9rFqL>m&DWe=875J&-Edkz9MKv`9sU<`mVct_u0}-M$(!>XviP1tUZM4%tCtY;Y zLoa=Nq@Mvk5oeGghWX40ql_`m7rrvVB;WYX4}LPmG&9Wdi@7Vqk`{SMIc)^3CHg-- zl!>H`N3BSYNy_%>=87%{#Kz{r_RiMY&T1mz&4ezEzn=K6gl{)*Lz1ntQ)2+wW zV9tU3EGJ~ea%KsOQ&F#{qY}l5v^D9=8otcljd7zLrm}`<-n3x)ZCW%fnU*uD?8^GXfA2er AApigX delta 395 zcmXxdyH67V0D$qkUf*}U>jNlStl+B;3iha$iY-NxX&nK6nMZx9x=`Yk9oo*Q#|Dv&zWY1S>~9h$O4NjvCPU>!%`E&RjFn~ zZGW`5=C3Df#&5M#mO|p@n~l=?#@m3F7L@<(ug5)XxJht^*m@rNb= From dcf1df44d1aa2d84d2bc48033e1b4335244446d0 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 19:47:10 -0500 Subject: [PATCH 2/8] feat(batch19): implement account trace/counter/export core methods --- .../ZB.MOM.NatsNet.Server/Accounts/Account.cs | 319 ++++++++++++++++++ porting.db | Bin 6664192 -> 6668288 bytes 2 files changed, 319 insertions(+) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs index 2e3371f..fa0308b 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs @@ -475,6 +475,29 @@ public sealed class Account : INatsAccount } } + /// + /// Sets account-level message trace destination subject. + /// Mirrors Go (a *Account) setTraceDest(dest string). + /// + internal void SetTraceDest(string dest) => SetMessageTraceDestination(dest); + + /// + /// Returns trace destination and sampling. + /// Mirrors Go (a *Account) getTraceDestAndSampling() (string, int). + /// + internal (string Destination, int Sampling) GetTraceDestAndSampling() + { + _mu.EnterReadLock(); + try + { + return (_traceDest, _traceDestSampling); + } + finally + { + _mu.ExitReadLock(); + } + } + // ------------------------------------------------------------------------- // Factory // ------------------------------------------------------------------------- @@ -502,6 +525,12 @@ public sealed class Account : INatsAccount /// public override string ToString() => Name; + /// + /// Returns the account name. + /// Mirrors Go (a *Account) String() string. + /// + public string String() => Name; + // ------------------------------------------------------------------------- // Shallow copy for config reload // ------------------------------------------------------------------------- @@ -966,6 +995,12 @@ public sealed class Account : INatsAccount internal int NumLocalConnectionsLocked() => (_clients?.Count ?? 0) - _sysclients - _nleafs; + /// + /// Returns local non-system, non-leaf client count. Lock must be held. + /// Mirrors Go (a *Account) numLocalConnections() int. + /// + internal int NumLocalConnectionsInternal() => NumLocalConnectionsLocked(); + /// /// Returns all local connections including leaf nodes (minus system clients). /// Mirrors Go (a *Account) numLocalAndLeafConnections() int. @@ -1049,6 +1084,13 @@ public sealed class Account : INatsAccount return _nleafs + _nrleafs >= MaxLeafNodes; } + /// + /// Returns true if total leaf-node count reached the configured maximum. + /// Lock must be held by the caller. + /// Mirrors Go (a *Account) maxTotalLeafNodesReached() bool. + /// + internal bool MaxTotalLeafNodesReachedInternal() => MaxTotalLeafNodesReachedLocked(); + /// /// Returns the total leaf-node count (local + remote). /// Mirrors Go (a *Account) NumLeafNodes() int. @@ -1115,6 +1157,93 @@ public sealed class Account : INatsAccount } } + /// + /// Returns true when there is at least one matching subscription for . + /// Mirrors Go (a *Account) SubscriptionInterest(subject string) bool. + /// + public bool SubscriptionInterest(string subject) => Interest(subject) > 0; + + /// + /// Returns total number of plain and queue subscriptions matching . + /// Mirrors Go (a *Account) Interest(subject string) int. + /// + public int Interest(string subject) + { + _mu.EnterReadLock(); + try + { + if (Sublist == null) + return 0; + + var (np, nq) = Sublist.NumInterest(subject); + return np + nq; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Increments the leaf-node count for a remote cluster. + /// Mirrors Go (a *Account) registerLeafNodeCluster(cluster string). + /// + internal void RegisterLeafNodeCluster(string cluster) + { + _mu.EnterWriteLock(); + try + { + _leafClusters ??= new Dictionary(StringComparer.Ordinal); + _leafClusters.TryGetValue(cluster, out var current); + _leafClusters[cluster] = current + 1; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Returns true when this account already tracks one or more leaf nodes from . + /// Mirrors Go (a *Account) hasLeafNodeCluster(cluster string) bool. + /// + internal bool HasLeafNodeCluster(string cluster) + { + _mu.EnterReadLock(); + try + { + return _leafClusters != null && + _leafClusters.TryGetValue(cluster, out var count) && + count > 0; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Returns true when the account is leaf-cluster isolated to . + /// Mirrors Go (a *Account) isLeafNodeClusterIsolated(cluster string) bool. + /// + internal bool IsLeafNodeClusterIsolated(string cluster) + { + _mu.EnterReadLock(); + try + { + if (string.IsNullOrEmpty(cluster)) + return false; + if (_leafClusters == null || _leafClusters.Count > 1) + return false; + + return _leafClusters.TryGetValue(cluster, out var count) && count == (ulong)_nleafs; + } + finally + { + _mu.ExitReadLock(); + } + } + // ------------------------------------------------------------------------- // Subscription limit error throttle // ------------------------------------------------------------------------- @@ -1460,6 +1589,167 @@ public sealed class Account : INatsAccount } } + // ------------------------------------------------------------------------- + // Service export configuration + // ------------------------------------------------------------------------- + + /// + /// Configures an exported service with singleton response semantics. + /// Mirrors Go (a *Account) AddServiceExport(subject string, accounts []*Account) error. + /// + public Exception? AddServiceExport(string subject, IReadOnlyList? accounts = null) => + AddServiceExportWithResponseAndAccountPos(subject, ServiceRespType.Singleton, accounts, 0); + + /// + /// Configures an exported service with singleton response semantics and account-position auth. + /// Mirrors Go (a *Account) addServiceExportWithAccountPos(...). + /// + public Exception? AddServiceExportWithAccountPos(string subject, IReadOnlyList? accounts, uint accountPos) => + AddServiceExportWithResponseAndAccountPos(subject, ServiceRespType.Singleton, accounts, accountPos); + + /// + /// Configures an exported service with explicit response type. + /// Mirrors Go (a *Account) AddServiceExportWithResponse(...). + /// + public Exception? AddServiceExportWithResponse(string subject, ServiceRespType respType, IReadOnlyList? accounts = null) => + AddServiceExportWithResponseAndAccountPos(subject, respType, accounts, 0); + + /// + /// Configures an exported service with explicit response type and account-position auth. + /// Mirrors Go (a *Account) addServiceExportWithResponseAndAccountPos(...). + /// + public Exception? AddServiceExportWithResponseAndAccountPos(string subject, ServiceRespType respType, IReadOnlyList? accounts, uint accountPos) + { + if (!SubscriptionIndex.IsValidSubject(subject)) + return ServerErrors.ErrBadSubject; + + _mu.EnterWriteLock(); + try + { + Exports.Services ??= new Dictionary(StringComparer.Ordinal); + + if (!Exports.Services.TryGetValue(subject, out var serviceExport) || serviceExport == null) + serviceExport = new ServiceExportEntry(); + + if (respType != ServiceRespType.Singleton) + serviceExport.ResponseType = respType; + + if (accounts != null || accountPos > 0) + { + var authErr = SetExportAuth(serviceExport, subject, accounts, accountPos); + if (authErr != null) + return authErr; + } + + serviceExport.Account = this; + serviceExport.ResponseThreshold = ServerConstants.DefaultServiceExportResponseThreshold; + Exports.Services[subject] = serviceExport; + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Enables latency tracking for with default sampling. + /// Mirrors Go (a *Account) TrackServiceExport(service, results string) error. + /// + public Exception? TrackServiceExport(string service, string results) => + TrackServiceExportWithSampling(service, results, ServerConstants.DefaultServiceLatencySampling); + + /// + /// Enables latency tracking for with explicit sampling. + /// Mirrors Go (a *Account) TrackServiceExportWithSampling(...). + /// + public Exception? TrackServiceExportWithSampling(string service, string results, int sampling) + { + if (sampling != 0 && (sampling < 1 || sampling > 100)) + return ServerErrors.ErrBadSampling; + if (!SubscriptionIndex.IsValidPublishSubject(results)) + return ServerErrors.ErrBadPublishSubject; + if (IsExportService(results)) + return ServerErrors.ErrBadPublishSubject; + + _mu.EnterWriteLock(); + try + { + if (Exports.Services == null) + return ServerErrors.ErrMissingService; + if (!Exports.Services.TryGetValue(service, out var serviceExport)) + return ServerErrors.ErrMissingService; + + serviceExport ??= new ServiceExportEntry(); + if (serviceExport.ResponseType != ServiceRespType.Singleton) + return ServerErrors.ErrBadServiceType; + + serviceExport.Latency = new InternalServiceLatency + { + Sampling = sampling, + Subject = results, + }; + Exports.Services[service] = serviceExport; + + if (Imports.Services != null) + { + foreach (var imports in Imports.Services.Values) + { + foreach (var import in imports) + { + if (import?.Account?.Name != Name) + continue; + if (SubjectTransform.IsSubsetMatch(SubjectTransform.TokenizeSubject(import.To), service)) + import.Latency = serviceExport.Latency; + } + } + } + + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Disables latency tracking for the exported service. + /// Mirrors Go (a *Account) UnTrackServiceExport(service string). + /// + public void UnTrackServiceExport(string service) + { + _mu.EnterWriteLock(); + try + { + if (Exports.Services == null || !Exports.Services.TryGetValue(service, out var serviceExport) || serviceExport?.Latency == null) + return; + + serviceExport.Latency = null; + + if (Imports.Services == null) + return; + + foreach (var imports in Imports.Services.Values) + { + foreach (var import in imports) + { + if (import?.Account?.Name != Name) + continue; + if (SubjectTransform.IsSubsetMatch(SubjectTransform.TokenizeSubject(import.To), service)) + { + import.Latency = null; + import.M1 = null; + } + } + } + } + finally + { + _mu.ExitWriteLock(); + } + } + // ------------------------------------------------------------------------- // Export checks // ------------------------------------------------------------------------- @@ -2146,6 +2436,35 @@ public sealed class Account : INatsAccount return false; } + /// + /// Applies account-based authorization rules to an export descriptor. + /// Mirrors Go setExportAuth(&se.exportAuth, ...). + /// + private static Exception? SetExportAuth(ExportAuth auth, string subject, IReadOnlyList? accounts, uint accountPos) + { + if (!SubscriptionIndex.IsValidSubject(subject)) + return ServerErrors.ErrBadSubject; + + auth.AccountPosition = accountPos; + + if (accounts == null || accounts.Count == 0) + { + auth.Approved = null; + return null; + } + + var approved = new Dictionary(accounts.Count, StringComparer.Ordinal); + foreach (var account in accounts) + { + if (account == null) + continue; + approved[account.Name] = account; + } + + auth.Approved = approved; + return null; + } + // ------------------------------------------------------------------------- // Export equality helpers // ------------------------------------------------------------------------- diff --git a/porting.db b/porting.db index 15af1aedbf22df00ef4836cbe138d3384227974a..244b04efbc21679711b6161fa1ef46a44e7cd434 100644 GIT binary patch delta 7043 zcmc(jdr(x@9mnsvcVG9i=k9`n1a@UL1{HklA}o)@`l!YD3`8&x41t9eX)%j{Y0~Ma zYwegenTR&wx02MscG9u76CZWFkx>L=eE?GJ)M->I#7FC7+W5+(9b;;L`?_~mZ~xnY zKRyiKJ(u(Sp5MLa+~2ZkxQ*Mi3Aol&Z7aj*R&BSUwuAI}QIr7x{@$ z&M-`7DUoG24i?I)3ss)1j-kqs)qYeKS?xuo ztgu~5|AL&*l$pvHSFL-syQ#@t%k*PCMOpRvS1qevyfRN#XHjLy>I^E2tWKj6VeV~` zywUV~rcnO;1S(~Q9#qN<-Kdlqx=<-Ojz#5=FrzZ(epJewdr>KKwlnce#H2g@i<55M zCYJr&Lr#?R9qdVx!_$ZKHoZnKiXVvYie9lrtmFR9UE=z={+BhJUY&28WsITu&L z6>@o824~^K!@puh0vSixo6`MXv%@^AZHA&wHk%qbD+xcZYa!`lc99{j*3EN?Y%`cT z*=5q9kJ)8J&s&&pSdnS-nk_n36n-QmgN>LI!Ly&`MM2QNq5r;qm43N?F8>gQhe^_S z{!4yX=i~p(pX3kgey;m5zlU$<-{Cj#jl7F5fhw=r39ZDOE4n~J%u`EGM_qC% zN}Y&OJyEJVN_9o4V=(@qIW=={)Q9bnijCUeYzCy?H#y-$L%ayd-(J z$F=BINyhVR7GaI`aQ6aRDebzz_6c~<=qh|=TRnp;!XX+&i|7zNVnBFAK;k6NpmFE* zWKALaU*clQ#Rv41@l)Y8pKeIiJ<#rf2{w`s2YTu6ApYO16&klt9v1e|1h_mzpMp&n zlMI^doC1e*q>tthI5=Ohwk_fGZA*DA{9u4m80x1+u%C;(7D~_2e*s%eCxtIdS0;jA z4$wPruXBMFy0_7IICqr_FgS>=1An5=DA!5ve@ZQSsl1!c=GyKUbyf`%_l7W0yhQKt z1~Ltw)BF&DQN8R55s^dj6#uug_;om4N+Zh)@8g(8~#%W-T@ z^DazaNVzu=4s520aBP5%1wPIy!WRR$pu!=#3^rV)6QbvuuH*yV5bcN<{mT$dix^$= zS9%&Ymumvmf)$_9tcbzwpHYBoy`jOSmuO~SFqq6+=Usrwa=h`7RzVY`r!UZjI{4`& z`Ulwjw%!W%MOdrK%XCJ>guctv2MKlBpj*im%q`zJ@Lr+4fx7{oK^nP&M?-Xips8i2 z5-;-!dmJ1c!S@kknCLna^-k*ASK$TLVfG8vhIOhqz~Ok^4| z9hrg5MC^zI$wFo!*+>rJL~@ZlBp)e2o<*KRW+UH0<{)#CdB}X^yT}6Mdq^R&5P2S1 zge*puAWM;D$a3TbqzEZSUPM+PB}gf<5_t(JL&}ktkqTrL@(NOkyoyvI)rbqJksj@< z)!vzsz#L*snPM^hp>bE-ABBGk34ETRQolvFS9@ObwZ_8DXV;K-nM3{)=B;G!N?)14 zSXpaxqId1A)$Zz+^-b<(Jc}I(<q>}w~t`5(8E^hTN+5VF6t%Pt~_jY{Ee!+oXF zjLJP|cnGS$t{Mk@SInt#|EI&uDX=@ybB&5;L?wfsPNy?GaV$LRR6Lb_J7k%YA4|*C zDwYwAbjUNy5d%*Pezgpg)~(_h-64l;9oaFkwT5kLRcw{kH)LtgjiqIcilx%ihAm_I z+`3dOBbwEqCpN3GTxPY3XG9ko@^r*>omDEH5$$Er6WhyJuJctD&xpP;70Tm3?fG`t9MdWBS1BQI584D;1 zr)fk??-^{ykPuNDgfe$zOo4RNRK^!DUy!$ChH46wn#pNKdv4F$;rgp#J~PjEpWpso zmfM4}GT_Sk?cDCk#wv#4S65r-ld6{7#2B*pqk}Q(!5Ctz4Xof9hH#2OekAuRd*x2~ zw0u-aQ_7VhB|)~Uq1Q;J46#f!KZ%2^T6W65XwN)#e?JK!YVQE)WIZKMQzs*DD0;hivu%sbV^}@? zfDCdh!$9pYIZ=v}1h|@{0h2zI9CEC!_v*3NsHZP5?bp*DOrPm#m)m_k*elk1_b}z@ z=`NGSFw6CL2b(E+x{WD8Pd{Ob)YHZBOGwe!)3+^!t6{&A4#P^vkbGN#t)#@3&s=d! zm)+8jZt0R+(iYNZx!~wQ;t268bsN>;mfGD?n_D{LmcDmOs#|JyOGn+(e(E*;a5avm zIy^!Gnz;dPOf}WA=g7=P`G5V;T-w0>!u4_8+-1{_lXa|2O1BoiRamsW(6m?T;v`r2 zT*T&GuS^=g$~ zWxFZ^aaE*|USq-?TOfzA3e>6wtbDZU5LTjA)sKIqw0m4qrF!;)urNg%cH1?v6eH&_ zagJ*euV0JCYuXZ0I9A#ck|$Q$5+1`av58GhU7D{SoH$;bIoiaImqwe8>~c*<950J= z$Bq}px#PQ7Id^X{7bNlE|&m9ydMm(6SzF|)GsI|lk6EBw-fUu* zLBtc!aL0FyLEYKP?(j9&NJW_V)v9!jjU$kGlg(8RWGdIi%Ay_?amav-h({)5Mgnq- z@lcC;q?(`o6_AJb6RbqdyV zbds~V_#FKMI{pc^KwA~{f|xGKL;7_p!PfIMOq<-E=)ngX7@jvYdweL?a9;}*<~^p3 zWP}*_tBQ#}kd8QZ+OKZAMGJVy>7wT$q{FD~1d_XP7q~^)Tv(pu#gL@;A>G^J3w7P} zUC3*HHfr82Jczc}o*w!w4DK>pAT||G_}zUPt=$d}?^8R}RXYy|$9ie7^M<><^rGW% zQE;^nH>^!!yiU8c1H;)*s5tISr_u}|ka2`FLgEeD`rjeZH|aFzVjFJK1Mb}*=cBM1 zdEoo0znNWQILJNbVj;F{{m$wPvwC)rzQ=OO<^J%2O8wzhKTU%Y(Gm)a252mdeo8%| zVt}S=XNJ)MI^DTaa@kDF){+vNgyu+xZL<1zzQ?mVH&vp3reiytC29@s)hD1qGrY zWJSR!1WiGqXetUr(@;2yK#?d4O-Ik8XcU8D(F-UJ%|P*JCYps_LlqZQ~4v=U{bRcJNJ qL2J-jv<~H>^(YT*Q2FNkCnH9pL@^2k7kiJW_&?4J8KREcy8i`wQW)F- From 50f6b69fdad7530ba0ccd3967a928e30e415f4a8 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 19:53:59 -0500 Subject: [PATCH 3/8] feat(batch19): implement account latency and import-cycle methods --- .../ZB.MOM.NatsNet.Server/Accounts/Account.cs | 567 ++++++++++++++++++ .../Internal/AccessTimeServiceTests.cs | 10 + porting.db | Bin 6668288 -> 6676480 bytes 3 files changed, 577 insertions(+) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs index fa0308b..3fe30ed 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs @@ -17,6 +17,7 @@ using ZB.MOM.NatsNet.Server.Auth; using ZB.MOM.NatsNet.Server.Internal; using ZB.MOM.NatsNet.Server.Internal.DataStructures; using System.Text; +using System.Text.Json; namespace ZB.MOM.NatsNet.Server; @@ -1750,6 +1751,486 @@ public sealed class Account : INatsAccount } } + /// + /// Publishes a service-latency metric for an import. + /// Mirrors Go (a *Account) sendLatencyResult(...). + /// + internal void SendLatencyResult(ServiceImportEntry si, ServiceLatency sl) + { + sl.Type = AccountEventConstants.ServiceLatencyType; + sl.Id = NextEventId(); + sl.Time = DateTime.UtcNow; + + string? latencySubject; + _mu.EnterWriteLock(); + try + { + latencySubject = si.Latency?.Subject; + si.RequestingClient = null; + } + finally + { + _mu.ExitWriteLock(); + } + + if (string.IsNullOrWhiteSpace(latencySubject) || Server is not NatsServer server) + return; + + var payload = JsonSerializer.SerializeToUtf8Bytes(sl); + _ = server.SendInternalAccountMsg(this, latencySubject, payload); + } + + /// + /// Publishes a bad-request latency metric (missing or invalid request shape). + /// Mirrors Go (a *Account) sendBadRequestTrackingLatency(...). + /// + internal void SendBadRequestTrackingLatency(ServiceImportEntry si, ClientConnection requestor, Dictionary? header) + { + var sl = new ServiceLatency + { + Status = 400, + Error = "Bad Request", + Requestor = CreateClientInfo(requestor, si.Share), + RequestHeader = header, + RequestStart = DateTime.UtcNow.Subtract(requestor.GetRttValue()), + }; + SendLatencyResult(si, sl); + } + + /// + /// Publishes timeout latency when requestor interest is lost before response delivery. + /// Mirrors Go (a *Account) sendReplyInterestLostTrackLatency(...). + /// + internal void SendReplyInterestLostTrackLatency(ServiceImportEntry si) + { + var sl = new ServiceLatency + { + Status = 408, + Error = "Request Timeout", + }; + + ClientConnection? requestor; + bool share; + long timestamp; + _mu.EnterReadLock(); + try + { + requestor = si.RequestingClient; + share = si.Share; + timestamp = si.Timestamp; + sl.RequestHeader = si.TrackingHeader; + } + finally + { + _mu.ExitReadLock(); + } + + if (requestor != null) + sl.Requestor = CreateClientInfo(requestor, share); + + var reqRtt = sl.Requestor?.Rtt ?? TimeSpan.Zero; + sl.RequestStart = UnixNanoToDateTime(timestamp - TimeSpanToUnixNanos(reqRtt)); + SendLatencyResult(si, sl); + } + + /// + /// Publishes backend failure latency for response-service imports. + /// Mirrors Go (a *Account) sendBackendErrorTrackingLatency(...). + /// + internal void SendBackendErrorTrackingLatency(ServiceImportEntry si, RsiReason reason) + { + var sl = new ServiceLatency(); + + ClientConnection? requestor; + bool share; + long timestamp; + _mu.EnterReadLock(); + try + { + requestor = si.RequestingClient; + share = si.Share; + timestamp = si.Timestamp; + sl.RequestHeader = si.TrackingHeader; + } + finally + { + _mu.ExitReadLock(); + } + + if (requestor != null) + sl.Requestor = CreateClientInfo(requestor, share); + + var reqRtt = sl.Requestor?.Rtt ?? TimeSpan.Zero; + sl.RequestStart = UnixNanoToDateTime(timestamp - TimeSpanToUnixNanos(reqRtt)); + + if (reason == RsiReason.NoDelivery) + { + sl.Status = 503; + sl.Error = "Service Unavailable"; + } + else if (reason == RsiReason.Timeout) + { + sl.Status = 504; + sl.Error = "Service Timeout"; + } + + SendLatencyResult(si, sl); + } + + /// + /// Sends request/response latency metrics. Returns true when complete, false when waiting for remote-half merge. + /// Mirrors Go (a *Account) sendTrackingLatency(...). + /// + internal bool SendTrackingLatency(ServiceImportEntry si, ClientConnection? responder) + { + _mu.EnterReadLock(); + var requestor = si.RequestingClient; + _mu.ExitReadLock(); + + if (requestor == null) + return true; + + var nowUnixNanos = UtcNowUnixNanos(); + var serviceRtt = UnixNanosToTimeSpan(Math.Max(0, nowUnixNanos - si.Timestamp)); + var sl = new ServiceLatency + { + Status = 200, + Requestor = CreateClientInfo(requestor, si.Share), + Responder = responder == null ? null : CreateClientInfo(responder, true), + RequestHeader = si.TrackingHeader, + }; + + var respRtt = sl.Responder?.Rtt ?? TimeSpan.Zero; + var reqRtt = sl.Requestor?.Rtt ?? TimeSpan.Zero; + sl.RequestStart = UnixNanoToDateTime(si.Timestamp - TimeSpanToUnixNanos(reqRtt)); + sl.ServiceLatencyDuration = serviceRtt > respRtt ? serviceRtt - respRtt : TimeSpan.Zero; + sl.TotalLatency = reqRtt + serviceRtt; + if (respRtt > TimeSpan.Zero) + { + sl.SystemLatency = DateTime.UtcNow - UnixNanoToDateTime(nowUnixNanos); + if (sl.SystemLatency < TimeSpan.Zero) + sl.SystemLatency = TimeSpan.Zero; + sl.TotalLatency += sl.SystemLatency; + } + + if (responder != null && responder.Kind != ClientKind.Client) + { + if (si.M1 != null) + { + SendLatencyResult(si, sl); + return true; + } + + _mu.EnterWriteLock(); + try + { + si.M1 = sl; + } + finally + { + _mu.ExitWriteLock(); + } + return false; + } + + SendLatencyResult(si, sl); + return true; + } + + /// + /// Returns the lowest response threshold configured across all service exports. + /// Mirrors Go (a *Account) lowestServiceExportResponseTime() time.Duration. + /// + internal TimeSpan LowestServiceExportResponseTime() + { + var lowest = TimeSpan.FromMinutes(5); + + _mu.EnterReadLock(); + try + { + if (Exports.Services == null) + return lowest; + + foreach (var export in Exports.Services.Values) + { + if (export != null && export.ResponseThreshold < lowest) + lowest = export.ResponseThreshold; + } + + return lowest; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Adds a service import with claim authorization context. + /// Mirrors Go (a *Account) AddServiceImportWithClaim(...). + /// + public Exception? AddServiceImportWithClaim(Account destination, string from, string to, object? imClaim) => + AddServiceImportWithClaimInternal(destination, from, to, imClaim, false); + + /// + /// Internal service-import add path with optional authorization bypass. + /// Mirrors Go (a *Account) addServiceImportWithClaim(..., internal bool). + /// + internal Exception? AddServiceImportWithClaimInternal(Account destination, string from, string to, object? imClaim, bool internalRequest) + { + if (destination == null) + return ServerErrors.ErrMissingAccount; + + if (string.IsNullOrEmpty(to)) + to = from; + if (!SubscriptionIndex.IsValidSubject(from) || !SubscriptionIndex.IsValidSubject(to)) + return SubscriptionIndex.ErrInvalidSubject; + + if (!internalRequest && !destination.CheckServiceExportApproved(this, to, imClaim)) + return ServerErrors.ErrServiceImportAuthorization; + + var cycleErr = ServiceImportFormsCycle(destination, from); + if (cycleErr != null) + return cycleErr; + + var (_, addErr) = AddServiceImportInternal(destination, from, to, imClaim); + return addErr; + } + + /// + /// Checks whether adding a service import forms an account cycle. + /// Mirrors Go (a *Account) serviceImportFormsCycle(...). + /// + internal Exception? ServiceImportFormsCycle(Account destination, string from) + { + var visited = new HashSet(StringComparer.Ordinal) { Name }; + return destination.CheckServiceImportsForCycles(from, visited); + } + + /// + /// Recursively checks service-import graph for cycles. + /// Mirrors Go (a *Account) checkServiceImportsForCycles(...). + /// + internal Exception? CheckServiceImportsForCycles(string from, HashSet visited) + { + if (visited.Count >= AccountConstants.MaxCycleSearchDepth) + return ServerErrors.ErrCycleSearchDepth; + + List? snapshot = null; + _mu.EnterReadLock(); + try + { + if (Imports.Services == null || Imports.Services.Count == 0) + return null; + + snapshot = []; + foreach (var entries in Imports.Services.Values) + snapshot.AddRange(entries); + } + finally + { + _mu.ExitReadLock(); + } + + foreach (var import in snapshot) + { + if (import?.Account == null) + continue; + if (!SubscriptionIndex.SubjectsCollide(from, import.To)) + continue; + + if (visited.Contains(import.Account.Name)) + return ServerErrors.ErrImportFormsCycle; + + visited.Add(Name); + var nextFrom = SubscriptionIndex.SubjectIsSubsetMatch(import.From, from) ? import.From : from; + var err = import.Account.CheckServiceImportsForCycles(nextFrom, visited); + if (err != null) + return err; + } + + return null; + } + + /// + /// Checks whether adding a stream import forms an account cycle. + /// Mirrors Go (a *Account) streamImportFormsCycle(...). + /// + internal Exception? StreamImportFormsCycle(Account destination, string to) + { + var visited = new HashSet(StringComparer.Ordinal) { Name }; + return destination.CheckStreamImportsForCycles(to, visited); + } + + /// + /// Returns true when any service export subject can match . + /// Mirrors Go (a *Account) hasServiceExportMatching(to string) bool. + /// + internal bool HasServiceExportMatching(string to) + { + if (Exports.Services == null) + return false; + + foreach (var subject in Exports.Services.Keys) + { + if (SubscriptionIndex.SubjectIsSubsetMatch(to, subject)) + return true; + } + + return false; + } + + /// + /// Returns true when any stream export subject can match . + /// Mirrors Go (a *Account) hasStreamExportMatching(to string) bool. + /// + internal bool HasStreamExportMatching(string to) + { + if (Exports.Streams == null) + return false; + + foreach (var subject in Exports.Streams.Keys) + { + if (SubscriptionIndex.SubjectIsSubsetMatch(to, subject)) + return true; + } + + return false; + } + + /// + /// Recursively checks stream-import graph for cycles. + /// Mirrors Go (a *Account) checkStreamImportsForCycles(...). + /// + internal Exception? CheckStreamImportsForCycles(string to, HashSet visited) + { + if (visited.Count >= AccountConstants.MaxCycleSearchDepth) + return ServerErrors.ErrCycleSearchDepth; + + _mu.EnterReadLock(); + var hasMatchingExport = HasStreamExportMatching(to); + var streams = Imports.Streams == null ? null : new List(Imports.Streams); + _mu.ExitReadLock(); + + if (!hasMatchingExport || streams == null || streams.Count == 0) + return null; + + foreach (var stream in streams) + { + if (stream?.Account == null) + continue; + if (!SubscriptionIndex.SubjectsCollide(to, stream.To)) + continue; + + if (visited.Contains(stream.Account.Name)) + return ServerErrors.ErrImportFormsCycle; + + visited.Add(Name); + var nextTo = SubscriptionIndex.SubjectIsSubsetMatch(stream.To, to) ? stream.To : to; + var err = stream.Account.CheckStreamImportsForCycles(nextTo, visited); + if (err != null) + return err; + } + + return null; + } + + /// + /// Allows or disallows request metadata sharing for a service import. + /// Mirrors Go (a *Account) SetServiceImportSharing(...). + /// + public Exception? SetServiceImportSharing(Account destination, string to, bool allow) => + SetServiceImportSharingInternal(destination, to, true, allow); + + /// + /// Internal service-import sharing setter with optional claim-account check bypass. + /// Mirrors Go (a *Account) setServiceImportSharing(...). + /// + internal Exception? SetServiceImportSharingInternal(Account destination, string to, bool check, bool allow) + { + _mu.EnterWriteLock(); + try + { + if (check && IsClaimAccount()) + return new InvalidOperationException("claim based accounts can not be updated directly"); + + if (Imports.Services == null) + return new InvalidOperationException("service import not found"); + + foreach (var imports in Imports.Services.Values) + { + foreach (var import in imports) + { + if (import?.Account?.Name == destination.Name && import.To == to) + { + import.Share = allow; + return null; + } + } + } + + return new InvalidOperationException("service import not found"); + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Adds a service import from this account to . + /// Mirrors Go (a *Account) AddServiceImport(destination, from, to string) error. + /// + public Exception? AddServiceImport(Account destination, string from, string to) => + AddServiceImportWithClaim(destination, from, to, null); + + /// + /// Number of pending reverse-response map entries. + /// Mirrors Go (a *Account) NumPendingReverseResponses() int. + /// + public int NumPendingReverseResponses() + { + _mu.EnterReadLock(); + try { return Imports.ReverseResponseMap?.Count ?? 0; } + finally { _mu.ExitReadLock(); } + } + + /// + /// Total number of pending response imports across all service exports. + /// Mirrors Go (a *Account) NumPendingAllResponses() int. + /// + public int NumPendingAllResponses() => NumPendingResponses(string.Empty); + + /// + /// Number of pending response imports, optionally filtered by exported service subject. + /// Mirrors Go (a *Account) NumPendingResponses(filter string) int. + /// + public int NumPendingResponses(string filter) + { + _mu.EnterReadLock(); + try + { + if (string.IsNullOrEmpty(filter)) + return Exports.Responses?.Count ?? 0; + + var export = GetServiceExport(filter); + if (export == null || Exports.Responses == null) + return 0; + + var count = 0; + foreach (var import in Exports.Responses.Values) + { + if (ReferenceEquals(import.ServiceExport, export)) + count++; + } + return count; + } + finally + { + _mu.ExitReadLock(); + } + } + // ------------------------------------------------------------------------- // Export checks // ------------------------------------------------------------------------- @@ -2359,6 +2840,92 @@ public sealed class Account : INatsAccount return null; } + /// + /// Adds a service import entry to the import map. + /// Mirrors Go (a *Account) addServiceImport(...). + /// + private (ServiceImportEntry? Import, Exception? Error) AddServiceImportInternal(Account destination, string from, string to, object? claim) + { + _mu.EnterWriteLock(); + try + { + Imports.Services ??= new Dictionary>(StringComparer.Ordinal); + + var serviceImport = new ServiceImportEntry + { + Account = destination, + Claim = claim, + From = from, + To = to, + }; + + if (!Imports.Services.TryGetValue(from, out var entries)) + { + entries = []; + Imports.Services[from] = entries; + } + + entries.Add(serviceImport); + return (serviceImport, null); + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Resolves a service export by exact or wildcard subject match. + /// Mirrors Go (a *Account) getServiceExport(service string) *serviceExport. + /// + private ServiceExportEntry? GetServiceExport(string service) + { + if (Exports.Services == null) + return null; + + if (Exports.Services.TryGetValue(service, out var serviceExport)) + return serviceExport; + + var tokens = SubjectTransform.TokenizeSubject(service); + foreach (var (subject, export) in Exports.Services) + { + if (SubjectTransform.IsSubsetMatch(tokens, subject)) + return export; + } + + return null; + } + + private static ClientInfo? CreateClientInfo(ClientConnection? client, bool _) + { + if (client == null) + return null; + + return new ClientInfo + { + Id = client.Cid, + Account = client.Account?.Name ?? string.Empty, + Name = client.Opts.Name ?? string.Empty, + Rtt = client.GetRttValue(), + Start = client.Start == default ? string.Empty : client.Start.ToUniversalTime().ToString("O"), + Kind = client.Kind.ToString(), + ClientType = client.ClientType().ToString(), + }; + } + + private static long UtcNowUnixNanos() => TimeSpanToUnixNanos(DateTime.UtcNow - DateTime.UnixEpoch); + + private static long TimeSpanToUnixNanos(TimeSpan value) => value.Ticks * 100L; + + private static TimeSpan UnixNanosToTimeSpan(long unixNanos) => TimeSpan.FromTicks(unixNanos / 100L); + + private static DateTime UnixNanoToDateTime(long unixNanos) + { + if (unixNanos <= 0) + return DateTime.UnixEpoch; + return DateTime.UnixEpoch.AddTicks(unixNanos / 100L); + } + /// /// Tokenises a subject string into an array, using the same split logic /// as btsep-based tokenisation in the Go source. diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/AccessTimeServiceTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/AccessTimeServiceTests.cs index 216fc86..1bb3634 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/AccessTimeServiceTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/AccessTimeServiceTests.cs @@ -45,6 +45,11 @@ public sealed class AccessTimeServiceTests : IDisposable // Background timer should update the time. await Task.Delay(AccessTimeService.TickInterval * 3); var atn = AccessTimeService.AccessTime(); + if (atn <= at) + { + await Task.Delay(AccessTimeService.TickInterval); + atn = AccessTimeService.AccessTime(); + } atn.ShouldBeGreaterThan(at); // Unregister; timer should stop. @@ -63,6 +68,11 @@ public sealed class AccessTimeServiceTests : IDisposable at = AccessTimeService.AccessTime(); await Task.Delay(AccessTimeService.TickInterval * 3); atn = AccessTimeService.AccessTime(); + if (atn <= at) + { + await Task.Delay(AccessTimeService.TickInterval); + atn = AccessTimeService.AccessTime(); + } atn.ShouldBeGreaterThan(at); } finally diff --git a/porting.db b/porting.db index 244b04efbc21679711b6161fa1ef46a44e7cd434..6ec2d7821979374c35cbbd000972b27e1da33c67 100644 GIT binary patch delta 5030 zcmb7`3s6+o8OQHEcVBn8XLkV~1Xx`(BI1KxSXiLe2uZ9q8a1)DDj=_mB^pIg;wyq_ zCr%q%(MkQ#b}XoIY&2HHa1}9<#9EVfd>~K}gykW1oJ5V8&X@;nzg_g+UH8rqhWRn) z$GQJ=zH|0)&z6@1SKbI*MTB-Y!|1Yh*`8x}*JRI(V|V?kI!>yNV|$LXCirzHJD%Mo z9qD8)j45x;>qV=?bt2OwHT1DP z9HEDy>JIzcGRj$r+}TeLGxub>jpmlE53NbIUbG9c^`Oi**Z;g-ewlb zdJ8*e$<~3U6w{8T6mt_zDW(n01gr0{VbifJWrS8VWrP+qWrSulWrXW!Ch3!VY+8+$ z)sl+QlA($`Vq-XlJS2Ujm0TkA?9K9}jHSD78&mK$4d zd0SoB+Td+;mcr@*iwk;BlN60-L&0{2q)9IxB;i4NqxY#Eso*&I zBda&;XZ}M>41}K`r7`?0KADf@BY6|g7#46Vjh#w_CkW4h64j20FPvxQV)is6z} zae~ZaLr)9u3%?i2g<>JURJ4*(vA9suog|eyjgZg8LjHgyp{(CCy{D(35Kl$BpWXDc zHb1-JXRUtL;%9gOeP+1sXHBr^4C$PWyXgCj+t0fEtkchK`B?`D1D0^<-Fk9}wKKQ< z`uhB=*Ux%{_nBn)WdljqFboqb9d96uLk)itGSPHdl5mwcS&fh<8(6%4@0aI_cT#GEZ8mTVA=s;Z~(1m z4u@1yMKk$5*@Dh+mZ28}VtP+9ouF@E?4Z#K54v$;OD!D%jWsk%85+Kb86{YcqK9nM_GYIKd7bSdne?hlJ zK85 zH~bQU7|7G-p9vfNh6)F-5&iJMNCrkXXf_kNkTJomk5O4!Efb@_*~;< zVwy;-zOO!m`}1&$5-Z5*5doWNhWYsn9@L!AmeLuDm>zX zDP-1@2VMg$L|eg{V%v)1HS3FP@$vTfq^b6Vsdk6WZclZ(QsZ5vm9GWndzAe6A@bW6 zR~ABAtr#j*)ri?9J;ir=IP)FoZFv(cy5rhTE!9ltR^nCr5Bw{Zd_HX3+`U?(r{)st zwuMU3Hz8^Z35Tk4xgn5!NEAql3$96GI=BytE-St>6B040QAxIeBV@@`u+)h`T#73m zDr&_vZz}Rhic?7e8=VmWjn(29F2(6oLXW4Kx0iTB>-~ys^%NNa%^yE5l1olji?S-w z7D(flIpIN_sO6HKc9jVkl~nWKgh|{oe35&jcAp8KEU>anZGmfJ@~3*Yl5-`y0_K}r zxm6|91l#n>5(cMoB_=DW2=88&R=yc{T)LIxGW2oV5}0=@dFK$vt+aA;U>-hxFLxSP zF6Z(;@W^ip9HVnMHEayb;}Ohr{lMur1jaj*>4)CJHv;qRN`B}qEDg+q<2T~4ws3vm z7;Q=oL$6_7VE)E%@+E=!*5Txf1M@Az$*&E}HxDOY6qvt0ocx-=eA969s{`}>N`B}w z^mXI^tlkU96E&$sd*!>$h%w*GMy`{p#9&u_uFs5Iyu+znrgBnH zOIH6b|H%p+PM;OZS8$L@%I$x7dzNTVd}@iF>)WW3?HbJDO^lmrSG%e9@+z@TCC0_u zlb%|hXEK9I3Zl|-yq97Sh7wgmdbN<_K7LiOL*A3m&yL$Ef6I_JMyC>Oe|*I3zSp+r JD}6%Z{{bI*ycGZd delta 2333 zcmY+@c~BEq9Ki8?yGb?)`*wo}9uVcw2uKALEl{g?))Tc-tqKT8K&nM#Y@N|oplWTa z^_t2Lk0JtNnT}d6eWHK?T94v6wTBfEskM%OD9Rr#2u^o|t zoE#ALcx(4?oGxy!?4K)lAuE7yLQxyHKQ%H^|~2c%8l6yo%eb&%L&nhqC>_(%=DK@p%S<|ji^HGdPTI+c-7+@W~GVZFNy1A9y|`{pcP zLcpuTWQO@23WZdSy8>6caL&#eehB32+$rqnRD2-5!xYSVYIxB@e^5%tm347Uy1|DN zcy^OdUW*!MhgS1G8M5IRsWS`^4(wn6tjRWKm>8@Li;fDUISfD$ry`-t) zACn{8em`Gt2=gV8toRs>HyF39%1!OZk*QnLfgGovY+oVGB=Fe@jTvru4K+eg5euZGMn5<3I70&*1I4H5I^b8!y&F)N$Y`izU=yQhjwfDzLUoX{ z(O>6y%zja9+8IiS<6ySkbO79|#$$(S&7n|UP1$Sr(L0c>Gh3kPEWHLrd;5-DdyYo* z{&se(jxHmNuc204P-pDDAOn0|OM@K;`Kgv}fMm%$3=-?`8T+m0*{FV?W(NO~By&sU zE_y*~G(I(0^xnEIZK?g;jby(!()n6gYNP+ab5C90qN(+Lo5JyWTHSjX``=-J9Sty^7~@pnx83AWQdT*MSX0|N>5FrhIxveCw6IwxvYL!y^yFnr))!ne`G6a+1uN}G>#i}O*ZthB$*YRzEHUCIkW zV&RW&B?cZ2Fv;7)dlY0s?x-K~K>d*?@1CTi9*mDXcP)XVQ4fOgT|t8G!BhNZ=whkiK5V3Xaag0O+=GWG@6X2ps8pY znvQ0mchF2U3(ZDz&|EYR#i03U0a}ROMT<}@dJio|OVImhDO!e>qc{|g63_~ih>}n; rTFJtDQf^cT?&M1l%d7z~#+0<=^mC1Y2#fR6VREI_gB6xr6XpK_$a!Ri From f4dfbf49bd8af8f11404aedb28e5f600acc14cd4 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 20:10:06 -0500 Subject: [PATCH 4/8] feat(batch19): implement service import maps and response subscription methods --- .../ZB.MOM.NatsNet.Server/Accounts/Account.cs | 537 ++++++++++++++++++ .../ZB.MOM.NatsNet.Server/ClientConnection.cs | 12 + porting.db | Bin 6676480 -> 6684672 bytes 3 files changed, 549 insertions(+) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs index 3fe30ed..a6ef4db 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs @@ -2231,6 +2231,543 @@ public sealed class Account : INatsAccount } } + /// + /// Number of configured service-import subject keys. + /// Mirrors Go (a *Account) NumServiceImports() int. + /// + public int NumServiceImports() + { + _mu.EnterReadLock(); + try { return Imports.Services?.Count ?? 0; } + finally { _mu.ExitReadLock(); } + } + + /// + /// Removes a response service import and performs reverse-map cleanup. + /// Mirrors Go (a *Account) removeRespServiceImport(...). + /// + internal void RemoveRespServiceImport(ServiceImportEntry? serviceImport, RsiReason reason) + { + if (serviceImport == null) + return; + + Account? destination; + string from; + string to; + bool tracking; + bool delivered; + ClientConnection? requestor; + byte[]? sid; + + _mu.EnterWriteLock(); + try + { + if (Exports.Responses != null) + Exports.Responses.Remove(serviceImport.From); + + destination = serviceImport.Account; + from = serviceImport.From; + to = serviceImport.To; + tracking = serviceImport.Tracking; + delivered = serviceImport.DidDeliver; + requestor = serviceImport.RequestingClient; + sid = serviceImport.SubscriptionId; + } + finally + { + _mu.ExitWriteLock(); + } + + if (sid is { Length: > 0 } && InternalClient != null) + InternalClient.ProcessUnsub(sid); + + if (tracking && requestor != null && !delivered) + SendBackendErrorTrackingLatency(serviceImport, reason); + + destination?.CheckForReverseEntry(to, serviceImport, false); + } + + /// + /// Gets a service import for a specific destination account and subject key. + /// Lock must be held by caller. + /// Mirrors Go (a *Account) getServiceImportForAccountLocked(...). + /// + internal ServiceImportEntry? GetServiceImportForAccountLocked(string destinationAccountName, string subject) + { + if (Imports.Services == null || !Imports.Services.TryGetValue(subject, out var serviceImports)) + return null; + + if (serviceImports.Count == 1 && serviceImports[0].Account?.Name == destinationAccountName) + return serviceImports[0]; + + foreach (var serviceImport in serviceImports) + { + if (serviceImport.Account?.Name == destinationAccountName) + return serviceImport; + } + + return null; + } + + /// + /// Removes a service import mapping by destination account name and subject key. + /// Mirrors Go (a *Account) removeServiceImport(dstAccName, subject string). + /// + internal void RemoveServiceImport(string destinationAccountName, string subject) + { + ServiceImportEntry? removed = null; + byte[]? sid = null; + + _mu.EnterWriteLock(); + try + { + if (Imports.Services == null || !Imports.Services.TryGetValue(subject, out var serviceImports)) + return; + + if (serviceImports.Count == 1) + { + if (serviceImports[0].Account?.Name == destinationAccountName) + { + removed = serviceImports[0]; + Imports.Services.Remove(subject); + } + } + else + { + for (var i = 0; i < serviceImports.Count; i++) + { + if (serviceImports[i].Account?.Name == destinationAccountName) + { + removed = serviceImports[i]; + serviceImports.RemoveAt(i); + Imports.Services[subject] = serviceImports; + break; + } + } + } + + if (removed?.SubscriptionId is { Length: > 0 }) + sid = removed.SubscriptionId; + } + finally + { + _mu.ExitWriteLock(); + } + + if (sid != null && InternalClient != null) + InternalClient.ProcessUnsub(sid); + } + + /// + /// Adds an entry to the reverse-response map for response cleanup. + /// Mirrors Go (a *Account) addReverseRespMapEntry(...). + /// + internal void AddReverseRespMapEntry(Account account, string reply, string from) + { + _mu.EnterWriteLock(); + try + { + Imports.ReverseResponseMap ??= new Dictionary>(StringComparer.Ordinal); + if (!Imports.ReverseResponseMap.TryGetValue(reply, out var entries)) + { + entries = []; + Imports.ReverseResponseMap[reply] = entries; + } + + entries.Add(new ServiceRespEntry + { + Account = account, + MappedSubject = from, + }); + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Checks reverse-response entries for wildcard replies. + /// Mirrors Go (a *Account) checkForReverseEntries(...). + /// + internal void CheckForReverseEntries(string reply, bool checkInterest, bool recursed) + { + if (!SubscriptionIndex.SubjectHasWildcard(reply)) + { + CheckForReverseEntry(reply, null, checkInterest, recursed); + return; + } + + List replies; + _mu.EnterReadLock(); + try + { + if (Imports.ReverseResponseMap == null || Imports.ReverseResponseMap.Count == 0) + return; + replies = [.. Imports.ReverseResponseMap.Keys]; + } + finally + { + _mu.ExitReadLock(); + } + + var replyTokens = SubjectTransform.TokenizeSubject(reply); + foreach (var candidate in replies) + { + if (SubjectTransform.IsSubsetMatch(SubjectTransform.TokenizeSubject(candidate), reply)) + CheckForReverseEntry(candidate, null, checkInterest, recursed); + else if (SubjectTransform.IsSubsetMatch(replyTokens, candidate)) + CheckForReverseEntry(candidate, null, checkInterest, recursed); + } + } + + /// + /// Checks and optionally removes reverse-response entries. + /// Mirrors Go (a *Account) checkForReverseEntry(...). + /// + internal void CheckForReverseEntry(string reply, ServiceImportEntry? serviceImport, bool checkInterest) => + CheckForReverseEntry(reply, serviceImport, checkInterest, false); + + /// + /// Internal reverse-entry checker with recursion protection. + /// Mirrors Go (a *Account) _checkForReverseEntry(...). + /// + internal void CheckForReverseEntry(string reply, ServiceImportEntry? serviceImport, bool checkInterest, bool recursed) + { + List? responseEntries; + + _mu.EnterReadLock(); + try + { + if (Imports.ReverseResponseMap == null || Imports.ReverseResponseMap.Count == 0) + return; + + if (SubscriptionIndex.SubjectHasWildcard(reply)) + { + if (recursed) + return; + } + else if (!Imports.ReverseResponseMap.TryGetValue(reply, out responseEntries) || responseEntries == null) + { + return; + } + else if (checkInterest && Sublist != null && Sublist.HasInterest(reply)) + { + return; + } + } + finally + { + _mu.ExitReadLock(); + } + + if (SubscriptionIndex.SubjectHasWildcard(reply)) + { + CheckForReverseEntries(reply, checkInterest, true); + return; + } + + _mu.EnterWriteLock(); + try + { + if (Imports.ReverseResponseMap == null || !Imports.ReverseResponseMap.TryGetValue(reply, out responseEntries) || responseEntries == null) + return; + + if (serviceImport == null) + { + Imports.ReverseResponseMap.Remove(reply); + } + else + { + responseEntries.RemoveAll(entry => entry.MappedSubject == serviceImport.From); + + if (responseEntries.Count == 0) + Imports.ReverseResponseMap.Remove(reply); + else + Imports.ReverseResponseMap[reply] = responseEntries; + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Returns true when a service import is overshadowed by an existing subject key. + /// Mirrors Go (a *Account) serviceImportShadowed(from string) bool. + /// + internal bool ServiceImportShadowed(string from) + { + _mu.EnterReadLock(); + try + { + if (Imports.Services == null) + return false; + if (Imports.Services.ContainsKey(from)) + return true; + + foreach (var subject in Imports.Services.Keys) + { + if (SubscriptionIndex.SubjectIsSubsetMatch(from, subject)) + return true; + } + + return false; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Returns true when a service import already exists for destination account + source subject. + /// Mirrors Go (a *Account) serviceImportExists(dstAccName, from string) bool. + /// + internal bool ServiceImportExists(string destinationAccountName, string from) + { + _mu.EnterReadLock(); + try + { + return GetServiceImportForAccountLocked(destinationAccountName, from) != null; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Creates (or returns existing) internal account client. + /// Lock must be held. + /// Mirrors Go (a *Account) internalClient() *client. + /// + internal ClientConnection? InternalAccountClient() + { + if (InternalClient == null && Server is NatsServer server) + { + InternalClient = server.CreateInternalAccountClient(); + InternalClient.Account = this; + } + + return InternalClient; + } + + /// + /// Creates internal account-scoped subscription. + /// Mirrors Go (a *Account) subscribeInternal(...). + /// + internal (Subscription? Sub, Exception? Error) SubscribeInternal(string subject) => + SubscribeInternalEx(subject, false); + + /// + /// Unsubscribes from an internal account subscription. + /// Mirrors Go (a *Account) unsubscribeInternal(sub *subscription). + /// + internal void UnsubscribeInternal(Subscription? sub) + { + if (sub?.Sid == null) + return; + + _mu.EnterReadLock(); + var internalClient = InternalClient; + _mu.ExitReadLock(); + internalClient?.ProcessUnsub(sub.Sid); + } + + /// + /// Creates internal subscription for service-import responses. + /// Mirrors Go (a *Account) subscribeServiceImportResponse(subject string). + /// + internal (Subscription? Sub, Exception? Error) SubscribeServiceImportResponse(string subject) => + SubscribeInternalEx(subject, true); + + /// + /// Extended internal subscription helper. + /// Mirrors Go (a *Account) subscribeInternalEx(...). + /// + internal (Subscription? Sub, Exception? Error) SubscribeInternalEx(string subject, bool responseImport) + { + ClientConnection? client; + string sidText; + + _mu.EnterWriteLock(); + try + { + _isid++; + client = InternalAccountClient(); + sidText = _isid.ToString(); + } + finally + { + _mu.ExitWriteLock(); + } + + if (client == null) + return (null, new InvalidOperationException("no internal account client")); + + return client.ProcessSubEx(Encoding.ASCII.GetBytes(subject), null, Encoding.ASCII.GetBytes(sidText), false, false, responseImport); + } + + /// + /// Adds an internal subscription that matches a service import's from subject. + /// Mirrors Go (a *Account) addServiceImportSub(si *serviceImport) error. + /// + internal Exception? AddServiceImportSub(ServiceImportEntry serviceImport) + { + if (serviceImport == null) + return ServerErrors.ErrMissingService; + + ClientConnection? client; + string sidText; + string subject; + + _mu.EnterWriteLock(); + try + { + client = InternalAccountClient(); + if (client == null) + return null; + if (serviceImport.SubscriptionId is { Length: > 0 }) + return new InvalidOperationException("duplicate call to create subscription for service import"); + + _isid++; + sidText = _isid.ToString(); + serviceImport.SubscriptionId = Encoding.ASCII.GetBytes(sidText); + subject = serviceImport.From; + } + finally + { + _mu.ExitWriteLock(); + } + + var (_, err) = client.ProcessSubEx(Encoding.ASCII.GetBytes(subject), null, Encoding.ASCII.GetBytes(sidText), true, true, false); + return err; + } + + /// + /// Removes all subscriptions associated with service imports. + /// Mirrors Go (a *Account) removeAllServiceImportSubs(). + /// + internal void RemoveAllServiceImportSubs() + { + List subscriptionIds = []; + ClientConnection? internalClient; + + _mu.EnterWriteLock(); + try + { + if (Imports.Services != null) + { + foreach (var imports in Imports.Services.Values) + { + foreach (var serviceImport in imports) + { + if (serviceImport.SubscriptionId is { Length: > 0 }) + { + subscriptionIds.Add(serviceImport.SubscriptionId); + serviceImport.SubscriptionId = null; + } + } + } + } + + internalClient = InternalClient; + InternalClient = null; + } + finally + { + _mu.ExitWriteLock(); + } + + if (internalClient == null) + return; + + foreach (var sid in subscriptionIds) + internalClient.ProcessUnsub(sid); + + internalClient.CloseConnection(ClosedState.InternalClient); + } + + /// + /// Adds subscriptions for all registered service imports. + /// Mirrors Go (a *Account) addAllServiceImportSubs(). + /// + internal void AddAllServiceImportSubs() + { + List imports = []; + + _mu.EnterReadLock(); + try + { + if (Imports.Services != null) + { + foreach (var entries in Imports.Services.Values) + imports.AddRange(entries); + } + } + finally + { + _mu.ExitReadLock(); + } + + foreach (var serviceImport in imports) + _ = AddServiceImportSub(serviceImport); + } + + /// + /// Processes a service-import response routed to this account. + /// Mirrors Go (a *Account) processServiceImportResponse(...). + /// + internal void ProcessServiceImportResponse(string subject, byte[] msg) + { + ServiceImportEntry? serviceImport; + + _mu.EnterReadLock(); + try + { + if (IsExpired() || Exports.Responses == null || Exports.Responses.Count == 0) + return; + + if (!Exports.Responses.TryGetValue(subject, out serviceImport)) + return; + if (serviceImport == null || serviceImport.Invalid) + return; + } + finally + { + _mu.ExitReadLock(); + } + + // The client-side response processing pipeline is still under active porting. + serviceImport.DidDeliver = msg.Length >= 0; + } + + /// + /// Creates response wildcard prefix for service replies. + /// Lock must be held by caller. + /// Mirrors Go (a *Account) createRespWildcard(). + /// + internal void CreateRespWildcard() + { + const string alphabet = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; + Span prefix = stackalloc byte[14]; + prefix[0] = (byte)'_'; + prefix[1] = (byte)'R'; + prefix[2] = (byte)'_'; + prefix[3] = (byte)'.'; + + ulong random = (ulong)Random.Shared.NextInt64(); + for (var i = 4; i < prefix.Length; i++) + { + prefix[i] = (byte)alphabet[(int)(random % (ulong)alphabet.Length)]; + random /= (ulong)alphabet.Length; + } + + ServiceImportReply = [.. prefix, (byte)'.']; + } + // ------------------------------------------------------------------------- // Export checks // ------------------------------------------------------------------------- diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs index 60bc7d8..b0dea47 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs @@ -1682,6 +1682,18 @@ public sealed partial class ClientConnection } } + internal void ProcessUnsub(byte[] sid) + { + lock (_mu) + { + if (Subs == null) + return; + + var sidText = Encoding.ASCII.GetString(sid); + Subs.Remove(sidText); + } + } + // features 440-441: processInfo, processErr internal void ProcessInfo(string info) { diff --git a/porting.db b/porting.db index 6ec2d7821979374c35cbbd000972b27e1da33c67..b8b7dd8af34c1826986988f973757d3fe1ccbe0c 100644 GIT binary patch delta 5690 zcma)<33L=i8ppe<&*^ma41pw&WXJ>&LI9b};YHKX2H(V8My< zeG_~I1-^V-3G0vYyX{>YJ{I~4CQX>+%jYVgjnO)$M`8c z^g7Nb`Hvpw_apmdWglmZfA*QFPy43(xJ&-ee&J6DNY8;)RXh~c>p9#Gv-TpRN|P=y zT5r;MMsJ#Qj!~&eb&Osy=`5qECe<<;XVMu)BTT9(V%X2b(~L4rs%DgI(kVvqCRH)I znbgUKi81SLSd`Z-$}7#45&f_~t(s3nk@^NMzG=3(8;lH?Si|=zG6=uV{C)+mZKk@+ z$S~_JHD52Tp7~aobr%^eFzEuLStgxl^t4Im7(HoHol!*iAts(>m3>UAWt3{t8AhE< zs$taDq|=Nvld28ET{YfE9C4KbXp@;e~h6 zcwU3}ER+N>@1k+=?Of!5Yim&Kig~C2&awbX=b`p+ydm1bBBS}b-~@l&JQOKgKf1tC z%7*gX8k7sAr6>V5eSn5rBoPx~@dvES)P<-upX%?w5V`qeOT!;q=tB!_f%*!x*5Chi z^oc{UZQ*KBe-7#n!P`A>uqn@UboG*&9F_hjj|20^|qzjI@@Ax*v6^n)!)?b)II7K>K64~^$qnk z^%eCwHD7&9eOSF$b*p!%(W*_ks+?6$C`Xj9l~0w5B4xF*Oqs9DQl3$AmElT1B}?h1 zBq))JBwv3!3_Ep2ac>=YkSrqhEvnc32#iF3MibX;1NmXc#ri{(gM>PeI zwGsWLhwx}43nQ9>;Z4D?reJ7OFa+M-h#s<>6nq3OY(!_kSc8%wViU4M*EJ{((l(*4 zkkt^qxCuEk?e%l^dF$f9807g!X``#g(+d7ooZFz-6} zqY@2=u2&r+AZ8mf;BS6(h<_9GYmP49+lsPW4TCXX*d=ULgT}L(T*Li};yD;XQ4Soc zN)fPHsp(+cV^nbk- zrShUSi%VXy4b2kueD-`gt~sLp%eJAZQR+&4D4R$**RLNynY^e!rVoWynQe3YD-WU% zHJw$BE%UO!PnJ37kNE+yBCsN|BC#T~qOhW}Vq-;PCBi@E2mQf!VD?DS33X3l6?PTj zWGFroEoY|mbZ7NUNyEm2oP$f@WRH|B9JX`l5>mNZE=3z>8>SA$n{kx>OvEguPt(ce zpt=O-!mu2HwRoJh`0Q-lHPE8c*dnZ(jkiGgPweyDIoNHj`{x|I6i)1B!H+;|B~P&})H3}4L0 z30CRp`FJyU$7_KMnoIXCxV6%Zc(KvRy!TuWXqd7%}@4|o}t zg%4h2(Ghw0A83&gZ;)QUibBq(*4`B=yiyD>Y zBvviLpTe;Cwt;-D6Xn-l#e0#Rt;i>=@4h%C&WrdcxKe_L!hxkyJL|frQd(-yl+1P2 z#h9;Lj8#XB(M|s`;xpR;b+58g-X?u5euS#|zjMnIU2))c@mcWBTD%W_kBbcK867rl zzy_>ehhwd?xPKi!186Aj2*=jnGJ1dPdw4Pr=l68D*+g*Lnfd*)0e=LMPe~0EtHBo= zaW`x0mp9_&@cXvx#tAB2(GJ61<$61-s``CA3l45{c81qB;dE==zD+EPK85WXTi99A zxbw{+T&uwAgM0BnbV3B|{Q!3gw9u+?wdPDLnGne)c?AhaW)cM{r( z(9VP=5!!{&WJ0?V+Ktfegx*DH4?^99rV#2O)Jte8p=pGs6PiJ2CZSn`_9XOfLhm8; zUPA99^nOBn5!##3K7{rqG@H-|2<=DcgM{`c^dUkYCUgLyj}SVL&_RR_CUgj)LkS&5 z=x{~hgdwiX`cs*`UdUvX!8FLiMLw(zgxOSGA;%3Duu$Ngo}mztNIDDpY^H zC4HMv{gr_JHfz>8R1b;Yj0>*WZ9I`-JWD3!-35+sd>z_^TW5c-Z++WsUd*nZpiz}SJ*yyJGhzL6GjesFXgjq6vjtF?iyc&=t=da z1$+rNeJNplXCUz<_J6UKjqWhUnh>Km%X@2&J;E4IHzwvy4-T+dVt0j+LrP9w9PE$F zYf8+Ul74G9-NX1!!STv*-5MvulNIc%@j-N#3@qygt-kWLhU3e9@$lLV6l@)NJaF(k z-^^xX>KdkP=>5|Zap2hLbBZ31H}lrvB!{uwfD&bt7aZ^VMA4Ix<_UP3b<-t`2U`7y zb+dToxTe%IJRvij6vlU@dFts|8MltFa~RKM;2#^+G+pUwA^mg;v0;~RE4lq3> z*qO6&(-oH88G5MdTRP|+L-iM1(%%uPztEDtL#Y1z?e!&USz^F2&GKR_DIeXm{F$j4 zuzHuT({1i`S7>*2f$nZ|2PK5+q4&KlzcuYcJ+*;8lEU3t4VRT1;j%oR{zMnAM>O0- zSz(sBKIEjQWd-k~W_L?w7~@4qi5%G-9F;ze-D8;{dnzN0=K?&pCBJDOrlp6Rt@JRS p^OkCEw#|B5kv%E=@kLf(!5*{Ux9+7 ze2$f=DU!03GCi!OHKEJas%BKObb6p7IcBIyq>ZKMd|cw(d)AuyZx-vv+P`n_{he=r zXWzSSaq%XuxDmL@2w^?Lh*?$9kF)FF%T6B5R+VlWY;GIOzP6Qj%r~md*d}(7;TT9c z7kN3aVD^mJdAUp#)Sh8KcMRwX=H|`FD=5gz)$f4!EL+NM0HK9_cw?4xsQFY2JC`-j zoMVp@$CuJE#+W{TUjE#?qCDn=`O$f{l_RtUwq9hxbAi?{KUuT?LTj%g&+ITGoQL*|M`}<(8d6TV~m54{E-p&1gB6eTSA|*|%tEmVJYkXxZ0jah9Dz zi?r+{n*aD;bv3SMPD`U^LK_=Hf&$yg-03oyb(u}^*y6w0?1s($ZL?o(cHL&zZ1#)I zezsXV^#r_S=BjP-C!1Zd*=3tuvRRwWeze&|oBd#{Fl(={FRbUuEoQGN6qBhew0cPh zw0nvC|NAGJH@t*rJzO)h*}BrNaC4bN%NC=>S+)o*(y}5n!?MD_Bj|H6XPPzl5}H5f z95jE<>0Cr}K+;T{^e3GH)ukkj=VvkbP?2t z!!SpBj;U=l>is*l4QSR*?a#Jn;FmO0Ztbf=TV`1$TE1oL&~hwWi)OV|t6)Ym{^M)q zIB6YUdmYU>zP1LQWQ4)R=E(xcF6O<_S}U2|}unAoLUZ2pZqXU**s7U-F;uCjTz~24BH@_yw>w+i_oC zdYFE|r_)Vz4P8pzd=h<@j-tb8e|{j<_1nCQckt}SY!V&G{i1!XeXbqS8nk^{jrNvy zfris;T4O{RArQ@GMR@Mkf;2(BpGrRvM-Y;}q{PEAo0)c$IO zYN)JoP5D9jRynFPD*KfhNAf;- zyS!0eB`=ZP@-#U^PM1f>_sh}pU9u?sCSCSOXQdO;CsKp7N7^D)Nz0{0(i|yM8ZV_v z!=wRHq~wr@cwM|GekUFi4~ZX$yTr|6g;*>WiZjF)#IfSz;!yD(W1Nv0_4I4bdt&c@&q8qh=hv2<}ejaMQD$3}TJI`%X^+);FB}7}nO2 z2=)Lxww#26TuK~}R8F3A(rDZ2DA-*_!kyG*i$=n%<-`paOUZNyEhXs=opZ50zhQEU z7Q)-Noqf!cJIHAsqIZ*}aAG$}F=Xy8;+{H<8$Zg;|IR?{9&!a z3^HKJ4)TzOA4S#I!PY|9^A33$l6H_3xLrq%Ktnl63sdDS%ynWga;01%7s)f^$;41! zRm1Qz;S7dxBOQvNs?uV-ic_R#q-5n5xLeo|2&yMzA-`qXBxtTDznUHIk>7c~t2k&s zNRGnky`&cQ?ITINp3B5RZm07>^JD|*$HLvWoDmTEAvRg>bcUN7J|y|U@|$`l)+uJ0 zdJ{=x;foPrFPd8>I6hWaW;PRqLwH0$L_|VlL_xaNXl7=!p8gKR9u{2CK9kDunTN(f zX2o-C#KjE;s-7c_lJdx5iz_Ji@aBOH=9zZb)%&WBzlG#ECSl*0TnIs<~z zcvtmG+8e|bG|HDl)xGnB7&hH|?+|DSit63HIBNwh12UTSh0di^s$M}w=)IE0bl0D{ zlGZ}dO{Kd`$10joeMfefOJApdCoo|Z-3sdy#jZ&e!dBD!eVaJlyBI25t_WDLnpQxL z?&=GbwV1a5nm<-3y}gUDkW}x;?xMYG=$p`SfcGCwgcB!}dm(uY#~-YuvNM!F&f!lo zrqJl5$MiPshI(EJmyU}tZQHO6v*}$LFGAr~dJKjOu0>o$71yPf!Qb$D$D zHDK{J8s1eXHQTn+IV_M(lJDL7rhV_;fCD?|e%M|mcOPxt&Ysuv*-p9v4&RJ%LHv{0 z=<@156du^F$97dJz+Y?VE6@?_YlsAschRKoF222s9)*s=kUw@+*WwlP-GXDabeHcI zKvPJR25-Gf@AFkwov({4#d?Yuhy!sVy^tW}E~GaSjD#SeNEp%w2}dH3NW_KQjYJ{Q zNDLB-+=KK*?nUlH`XT+10Z1G&5E+Erk34`3M&gkn$WY`# Date: Sat, 28 Feb 2026 20:15:59 -0500 Subject: [PATCH 5/8] feat(batch19): implement service reply and stream import/export methods --- .../ZB.MOM.NatsNet.Server/Accounts/Account.cs | 414 ++++++++++++++++++ porting.db | Bin 6684672 -> 6688768 bytes 2 files changed, 414 insertions(+) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs index a6ef4db..c12203e 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs @@ -2768,6 +2768,420 @@ public sealed class Account : INatsAccount ServiceImportReply = [.. prefix, (byte)'.']; } + /// + /// Generates a new service reply subject. + /// Mirrors Go (a *Account) newServiceReply(tracking bool) []byte. + /// + internal byte[] NewServiceReply(bool tracking) + { + bool createdPrefix = false; + byte[] replyPrefix; + + _mu.EnterWriteLock(); + try + { + if (ServiceImportReply == null) + { + CreateRespWildcard(); + createdPrefix = true; + } + + replyPrefix = ServiceImportReply ?? Encoding.ASCII.GetBytes("_R_."); + } + finally + { + _mu.ExitWriteLock(); + } + + if (createdPrefix) + _ = SubscribeServiceImportResponse(Encoding.ASCII.GetString([.. replyPrefix, (byte)'>'])); + + const string alphabet = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; + Span randomPart = stackalloc byte[20]; + ulong random = (ulong)Random.Shared.NextInt64(); + for (var i = 0; i < randomPart.Length; i++) + { + randomPart[i] = (byte)alphabet[(int)(random % (ulong)alphabet.Length)]; + random /= (ulong)alphabet.Length; + } + + var reply = new List(replyPrefix.Length + randomPart.Length + 2); + reply.AddRange(replyPrefix); + reply.AddRange(randomPart.ToArray()); + + if (tracking) + { + reply.Add((byte)'.'); + reply.Add((byte)'T'); + } + + return [.. reply]; + } + + /// + /// Returns the response threshold for an exported service. + /// Mirrors Go (a *Account) ServiceExportResponseThreshold(...). + /// + public (TimeSpan Threshold, Exception? Error) ServiceExportResponseThreshold(string export) + { + _mu.EnterReadLock(); + try + { + var serviceExport = GetServiceExport(export); + if (serviceExport == null) + return (TimeSpan.Zero, new InvalidOperationException($"no export defined for \"{export}\"")); + return (serviceExport.ResponseThreshold, null); + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Sets max response delivery time for an exported service. + /// Mirrors Go (a *Account) SetServiceExportResponseThreshold(...). + /// + public Exception? SetServiceExportResponseThreshold(string export, TimeSpan maxTime) + { + _mu.EnterWriteLock(); + try + { + if (IsClaimAccount()) + return new InvalidOperationException("claim based accounts can not be updated directly"); + + var serviceExport = GetServiceExport(export); + if (serviceExport == null) + return new InvalidOperationException($"no export defined for \"{export}\""); + + serviceExport.ResponseThreshold = maxTime; + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Enables/disables cross-account trace propagation on a service export. + /// Mirrors Go (a *Account) SetServiceExportAllowTrace(...). + /// + public Exception? SetServiceExportAllowTrace(string export, bool allowTrace) + { + _mu.EnterWriteLock(); + try + { + var serviceExport = GetServiceExport(export); + if (serviceExport == null) + return new InvalidOperationException($"no export defined for \"{export}\""); + + serviceExport.AllowTrace = allowTrace; + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Creates internal response service import entry. + /// Mirrors Go (a *Account) addRespServiceImport(...). + /// + internal ServiceImportEntry AddRespServiceImport(Account destination, string to, ServiceImportEntry originalServiceImport, bool tracking, Dictionary? header) + { + var newReply = Encoding.ASCII.GetString(originalServiceImport.Account?.NewServiceReply(tracking) ?? NewServiceReply(tracking)); + + ServiceImportEntry responseImport; + _mu.EnterWriteLock(); + try + { + responseImport = new ServiceImportEntry + { + Account = destination, + ServiceExport = originalServiceImport.ServiceExport, + From = newReply, + To = to, + ResponseType = originalServiceImport.ResponseType, + IsResponse = true, + Share = originalServiceImport.Share, + Timestamp = UtcNowUnixNanos(), + Tracking = tracking && originalServiceImport.ResponseType == ServiceRespType.Singleton, + TrackingHeader = header, + Latency = tracking && originalServiceImport.ResponseType == ServiceRespType.Singleton + ? originalServiceImport.Latency + : null, + }; + + Exports.Responses ??= new Dictionary(StringComparer.Ordinal); + Exports.Responses[newReply] = responseImport; + } + finally + { + _mu.ExitWriteLock(); + } + + destination.AddReverseRespMapEntry(this, to, newReply); + return responseImport; + } + + /// + /// Adds stream import with optional claim context. + /// Mirrors Go (a *Account) AddStreamImportWithClaim(...). + /// + public Exception? AddStreamImportWithClaim(Account account, string from, string prefix, object? importClaim) => + AddStreamImportWithClaimInternal(account, from, prefix, false, importClaim); + + /// + /// Internal stream import add helper. + /// Mirrors Go (a *Account) addStreamImportWithClaim(...). + /// + internal Exception? AddStreamImportWithClaimInternal(Account account, string from, string prefix, bool allowTrace, object? importClaim) + { + if (account == null) + return ServerErrors.ErrMissingAccount; + if (!account.CheckStreamImportAuthorized(this, from, importClaim)) + return ServerErrors.ErrStreamImportAuthorization; + + if (!string.IsNullOrEmpty(prefix)) + { + if (SubscriptionIndex.SubjectHasWildcard(prefix)) + return ServerErrors.ErrStreamImportBadPrefix; + if (!prefix.EndsWith(".", StringComparison.Ordinal)) + prefix += '.'; + } + + return AddMappedStreamImportWithClaimInternal(account, from, prefix + from, allowTrace, importClaim); + } + + /// + /// Convenience helper for mapped stream imports without claim. + /// Mirrors Go (a *Account) AddMappedStreamImport(...). + /// + public Exception? AddMappedStreamImport(Account account, string from, string to) => + AddMappedStreamImportWithClaim(account, from, to, null); + + /// + /// Adds mapped stream import with optional claim. + /// Mirrors Go (a *Account) AddMappedStreamImportWithClaim(...). + /// + public Exception? AddMappedStreamImportWithClaim(Account account, string from, string to, object? importClaim) => + AddMappedStreamImportWithClaimInternal(account, from, to, false, importClaim); + + /// + /// Internal mapped stream import add helper. + /// Mirrors Go (a *Account) addMappedStreamImportWithClaim(...). + /// + internal Exception? AddMappedStreamImportWithClaimInternal(Account account, string from, string to, bool allowTrace, object? importClaim) + { + if (account == null) + return ServerErrors.ErrMissingAccount; + if (!account.CheckStreamImportAuthorized(this, from, importClaim)) + return ServerErrors.ErrStreamImportAuthorization; + + if (string.IsNullOrEmpty(to)) + to = from; + + var cycleErr = StreamImportFormsCycle(account, to) ?? StreamImportFormsCycle(account, from); + if (cycleErr != null) + return cycleErr; + + ISubjectTransformer? transform = null; + var usePublishedSubject = false; + if (SubscriptionIndex.SubjectHasWildcard(from)) + { + if (to == from) + { + usePublishedSubject = true; + } + else + { + var (created, err) = SubjectTransform.New(from, to); + if (err != null) + return new InvalidOperationException($"failed to create mapping transform for stream import subject from \"{from}\" to \"{to}\": {err.Message}"); + transform = created; + } + } + + _mu.EnterWriteLock(); + try + { + if (IsStreamImportDuplicate(account, from)) + return ServerErrors.ErrStreamImportDuplicate; + + Imports.Streams ??= []; + Imports.Streams.Add(new StreamImportEntry + { + Account = account, + From = from, + To = to, + Transform = transform, + Claim = importClaim, + UsePublishedSubject = usePublishedSubject, + AllowTrace = allowTrace, + }); + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Checks if stream import duplicate exists. Lock should be held. + /// Mirrors Go (a *Account) isStreamImportDuplicate(...). + /// + internal bool IsStreamImportDuplicate(Account account, string from) + { + if (Imports.Streams == null) + return false; + + foreach (var streamImport in Imports.Streams) + { + if (ReferenceEquals(streamImport.Account, account) && streamImport.From == from) + return true; + } + return false; + } + + /// + /// Adds stream import from a specific account. + /// Mirrors Go (a *Account) AddStreamImport(...). + /// + public Exception? AddStreamImport(Account account, string from, string prefix) => + AddStreamImportWithClaimInternal(account, from, prefix, false, null); + + /// + /// Adds stream export, optionally restricted to explicit accounts. + /// Mirrors Go (a *Account) AddStreamExport(...). + /// + public Exception? AddStreamExport(string subject, IReadOnlyList? accounts = null) => + AddStreamExportWithAccountPos(subject, accounts, 0); + + /// + /// Adds stream export with account-position matching. + /// Mirrors Go (a *Account) addStreamExportWithAccountPos(...). + /// + public Exception? AddStreamExportWithAccountPos(string subject, IReadOnlyList? accounts, uint accountPos) + { + if (!SubscriptionIndex.IsValidSubject(subject)) + return ServerErrors.ErrBadSubject; + + _mu.EnterWriteLock(); + try + { + Exports.Streams ??= new Dictionary(StringComparer.Ordinal); + Exports.Streams.TryGetValue(subject, out var export); + export ??= new StreamExport(); + + if (accounts != null || accountPos > 0) + { + var authErr = SetExportAuth(export, subject, accounts, accountPos); + if (authErr != null) + return authErr; + } + + Exports.Streams[subject] = export; + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Checks stream import authorization with account lock. + /// Mirrors Go (a *Account) checkStreamImportAuthorized(...). + /// + internal bool CheckStreamImportAuthorized(Account account, string subject, object? importClaim) + { + _mu.EnterReadLock(); + try { return CheckStreamImportAuthorizedNoLock(account, subject, importClaim); } + finally { _mu.ExitReadLock(); } + } + + /// + /// Checks stream import authorization assuming lock is already held. + /// Mirrors Go (a *Account) checkStreamImportAuthorizedNoLock(...). + /// + internal bool CheckStreamImportAuthorizedNoLock(Account account, string subject, object? importClaim) + { + if (Exports.Streams == null || !SubscriptionIndex.IsValidSubject(subject)) + return false; + return CheckStreamExportApproved(account, subject, importClaim); + } + + /// + /// Gets wildcard-matching service export for subject. + /// Lock should be held. + /// Mirrors Go (a *Account) getWildcardServiceExport(from string). + /// + internal ServiceExportEntry? GetWildcardServiceExport(string from) + { + if (Exports.Services == null) + return null; + + var tokens = SubjectTransform.TokenizeSubject(from); + foreach (var (subject, serviceExport) in Exports.Services) + { + if (SubjectTransform.IsSubsetMatch(tokens, subject)) + return serviceExport; + } + return null; + } + + /// + /// Handles stream import activation expiration. + /// Mirrors Go (a *Account) streamActivationExpired(...). + /// + internal void StreamActivationExpired(Account exportAccount, string subject) + { + _mu.EnterWriteLock(); + try + { + if (IsExpired() || Imports.Streams == null) + return; + + foreach (var streamImport in Imports.Streams) + { + if (ReferenceEquals(streamImport.Account, exportAccount) && streamImport.From == subject) + { + streamImport.Invalid = true; + return; + } + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Handles service import activation expiration. + /// Mirrors Go (a *Account) serviceActivationExpired(...). + /// + internal void ServiceActivationExpired(Account destinationAccount, string subject) + { + _mu.EnterWriteLock(); + try + { + if (IsExpired() || Imports.Services == null) + return; + + var serviceImport = GetServiceImportForAccountLocked(destinationAccount.Name, subject); + if (serviceImport != null) + serviceImport.Invalid = true; + } + finally + { + _mu.ExitWriteLock(); + } + } + // ------------------------------------------------------------------------- // Export checks // ------------------------------------------------------------------------- diff --git a/porting.db b/porting.db index b8b7dd8af34c1826986988f973757d3fe1ccbe0c..b519e6617c5988917a52b8ff0332b00143fbe7a6 100644 GIT binary patch delta 5072 zcmb_fYitzP6`q-$-FfWX-Q)Gz*j|slh8Tl`?R^-(Z~_Km2sRinBw!x#njJt@3@*0U z4RP=z(M1hKvYV325((@mYMN9jn6#`^g%qh#RVP(i)K;lb)Y?_mMs3-mZ=O?QYlT~cz zH9@@04D$ho9G+ndTymZ)^WWnqgr5oT3fG7@!)_4yZ}KnjG5$F5k8ov)D>E$17b2U= z9XZ+E^R(8dv3VxXn|S9B>_%>AKAdF=gUNH7JeSEsCQmVWPLn5_JW2NsrA8;3f`Z9& zm^{|x`AlAc$;&5q{=~k=JAcDIop|*V_Bd;9NVbXWNv!-cd!4ORwlZ~5?WlI@lon+r z>XpxXBJ~RAJ%M^I;`o%^$SZF$<%!l$*%LR9vTo%kOtiPJzl(KIt3$UCwRY;3Lak=q za#E{Nw`6LCbW5VvD%}#PRis-2wY<9Jpq8v#Y{LI0_kyiMvgL644?g$5l8FV>G7|Ht zWhCZN%SiOjXJpUwJoCJ)c^)MG_ZH_^E?!Yun7J*PP=3KRvYgUFrfTx`Brf0P-jW?O z&>3$#yX#|)!bZVU*eM(o0)Tu79G>m1+4SX~Ib>I77UF%mn@(L|G=zRF%0QZJL=i|U}xBX^IXJklbm zKwh><#!`6o-9fkKhgl5UJ{0tA;@BLr|83--Bg!QF6}k)7$Z9j;zHsEvRZz&Zq`qM; zD3f!a6{fFk;v-o-m)r%nkeW*_C~8^S@W`~H-Z5&>a8-KbbEIBLO$*3RU23r&b?XBi z6-V9lfV7Nck?|NwC zhB6GRFs#NA%r1}p&G+l3^f>PG1OtG$IyUb9fn2>k6?Hd z!+H!GFf?J^?$x^Re06-KJuT2IHnGitkuJYxc!pMN%dh3}h}7 zi*MIe+H%_O&&WsqF|D<&uKv=n0u7<+SiIHBCs{8=DhYg`EiYd%v9)>bZ9BNl($s_A{OW-Pwd%FljaAHuQt7Av2Oe|?cY(41y#PSyj* zMEQ?d`LNiSHe31Rs~4jWUK^^h$(rL@Ok<;!Pi|K(KJO-Ljvoe|p8Mk}98l^&@j6Yp#J3+-2$v(B31SZquURz9oqy$hVdaKe92 zo8i~YeOh_=ON{%l>m64C8c=?%gq`E^-{tMncO^+YCcGyscMRKqYj5N)CFmP|f?q~2 zy@)Ply7aQi~L9T$@n$%RPQ&YPnI!{@3vPjUgzkD=d<6c<0P{; zy4DzPc&NVG*lwBX?6c~~_vgcf>IvNJ%FR^eNvn!vY_|Y8lKlD!)zvRe>z=TRiN>W9 zs;ga^uI;r-2q}r`wM%l}?6FEXQWBx1kSLC~>k_8ZN&ia>(&KsX5|>()+S;WMD~j*7 x&c~O03W7^?Menjo6r?1Q$0f51tth_JDnWv~qx3?%+?D!@4~C3=+?dJB_J19R~@R6q(S#sAJg%lh!ko-_B} zbMKyen-A<&nw$43?Lo(GNz$S_!?W4$&!Yu-?29c2^UQ;JEc%&KJs>r!5t$;rtRP^_ zS3gj79G5n2yfy%~-+}ArB6ClQN!^8A+KZDRXm0@05&(b?BGN?1XAmux;j{ zD{Ku585NHlZUEWP;qI6@SJ`1UDXE?-DbtfOXCfo*8Y@y0k-6|1TX$@gbl>-{%CMq4h=y6wZA5M>>USw=vK3v!x5-xYCn9_8RYdmMKM>h#uOPD5UN*y@$XgXk ze_syG;*$m|-SXXfQ0Z?(cBPw$>`FHf*_Hl6WLN4l2R@axY`xPcmlC&c8mHu$j8dkj zSD2mUz9a6o^7Aq%pn?N5aDomlNX+q=<>!skU39%}xaer(Ug}xTCs6wdg?B^^Eu%~E zRy*=|p_(OMX1!9EG1mR1>xzEYc~ZOOxItNAw%yF#$cr7+Rl`4|z&w|rnp!@UdRsNG zvZ&QVkxVg)PSx^-bbWzSbd)H5S`g=v#1>v^i}UZTMb`IowcJPTb$q0?T7RR}l_Ie@ zYw6gwUO-!pc?I=d z#5eyD?xcbSo@;O1*ud}8;?mUrjcU~N6?f5(jXcYqcWDDZLfOY1c5k{HA0B+5k)Nj1 z{rI=hxv1X%79UGB_uSsLbxj-+zdrCo3Ire(hCvz(hY^qtBViN-&2>$oT`i-vk0h_u zBso^7jozF5U!%u;*43k5lA82XXQ8rLjxdqv`*LbllP+U3G;}Rd!@lD#$fSb184ds|;-C{eV zL&ceby15umdkpMsJ*|_5IA?s3sM^xVGnicD*+OKfDFilFl0eCjD|5V7II)5jE7v902ARccpN4{9z-A? zCc_kX0;a-~Fb$@|Q!oRbhMDjT%!1ia0EJKl&q6WGfw?db=EDMb4oYAlltLLi4~w82 z7Q+j$1YU#+SPIMFC0GtG!z-`?UWL~n3a>*Ytb{64^T!4*_%%;h@VHP@9L)d# From e53b4aa17a99a0b5fffba2fe671ce3a3b0308bf6 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 20:21:52 -0500 Subject: [PATCH 6/8] feat(batch19): implement activation, issuer, and external auth account methods --- .../ZB.MOM.NatsNet.Server/Accounts/Account.cs | 545 +++++++++++++++++- porting.db | Bin 6688768 -> 6692864 bytes 2 files changed, 541 insertions(+), 4 deletions(-) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs index c12203e..9718c11 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs @@ -3182,6 +3182,321 @@ public sealed class Account : INatsAccount } } + /// + /// Re-evaluates import validity when an activation token expiration timer fires. + /// Mirrors Go (a *Account) activationExpired(...). + /// + internal void ActivationExpired(Account exportAccount, string subject, object? kind) + { + var normalizedKind = NormalizeExportKind(kind); + if (string.Equals(normalizedKind, "stream", StringComparison.Ordinal)) + { + StreamActivationExpired(exportAccount, subject); + } + else if (string.Equals(normalizedKind, "service", StringComparison.Ordinal)) + { + ServiceActivationExpired(exportAccount, subject); + } + } + + /// + /// Validates an import activation claim/token. + /// Mirrors Go (a *Account) checkActivation(...). + /// + internal bool CheckActivation(Account importAccount, object? claim, ExportAuth? exportAuth, bool expirationTimer) + { + if (claim == null) + return false; + + if (!TryReadStringMember(claim, "Token", out var token) || string.IsNullOrWhiteSpace(token)) + return false; + + if (!TryDecodeJwtPayload(token, out var activationPayload)) + return false; + + if (!IsIssuerClaimTrusted(activationPayload)) + return false; + + if (TryReadLongMember(activationPayload, "exp", out var expires) && expires > 0) + { + var now = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); + if (expires <= now) + return false; + + if (expirationTimer) + { + var delay = TimeSpan.FromSeconds(expires - now); + string importSubject = ReadActivationImportSubject(activationPayload); + object? claimType = TryReadMember(claim, "Type", out var typeValue) ? typeValue : null; + + _ = new Timer( + _ => importAccount.ActivationExpired(this, importSubject, claimType), + null, + delay, + Timeout.InfiniteTimeSpan); + } + } + + if (exportAuth == null) + return true; + + string subject = TryReadStringMember(activationPayload, "sub", out var sub) ? sub : string.Empty; + long issuedAt = TryReadLongMember(activationPayload, "iat", out var iat) ? iat : 0; + return !IsRevoked(exportAuth.ActivationsRevoked, subject, issuedAt); + } + + /// + /// Returns true when activation issuer details are trusted for this account. + /// Mirrors Go (a *Account) isIssuerClaimTrusted(...). + /// + internal bool IsIssuerClaimTrusted(object? claims) + { + if (claims == null) + return false; + + string issuerAccount = + TryReadStringMember(claims, "IssuerAccount", out var ia) ? ia : + TryReadStringMember(claims, "issuer_account", out var iaAlt) ? iaAlt : + string.Empty; + + // If issuer-account is omitted, issuer defaults to the account itself. + if (string.IsNullOrEmpty(issuerAccount)) + return true; + + if (!string.Equals(Name, issuerAccount, StringComparison.Ordinal)) + { + if (Server is NatsServer server) + { + string importSubject = ReadActivationImportSubject(claims); + string importType = TryReadStringMember(claims, "import_type", out var it) ? it : string.Empty; + server.Errorf( + "Invalid issuer account {0} in activation claim (subject: {1} - type: {2}) for account {3}", + issuerAccount, + importSubject, + importType, + Name); + } + + return false; + } + + string issuer = + TryReadStringMember(claims, "Issuer", out var issuerValue) ? issuerValue : + TryReadStringMember(claims, "iss", out var issValue) ? issValue : + string.Empty; + + _mu.EnterReadLock(); + try + { + (_, var ok) = HasIssuerNoLock(issuer); + return ok; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Checks whether another account is approved to import this service export. + /// Mirrors Go (a *Account) checkServiceImportAuthorized(...). + /// + internal bool CheckServiceImportAuthorized(Account account, string subject, object? importClaim) + { + _mu.EnterReadLock(); + try { return CheckServiceImportAuthorizedNoLock(account, subject, importClaim); } + finally { _mu.ExitReadLock(); } + } + + /// + /// Lock-free helper for service import authorization checks. + /// Mirrors Go (a *Account) checkServiceImportAuthorizedNoLock(...). + /// + internal bool CheckServiceImportAuthorizedNoLock(Account account, string subject, object? importClaim) + { + if (Exports.Services == null) + return false; + + return CheckServiceExportApproved(account, subject, importClaim); + } + + /// + /// Returns whether bearer tokens should be rejected for this account. + /// Mirrors Go (a *Account) failBearer() bool. + /// + internal bool FailBearer() + { + _mu.EnterReadLock(); + try { return DisallowBearer; } + finally { _mu.ExitReadLock(); } + } + + /// + /// Updates expiration state/timer from claim data. + /// Mirrors Go (a *Account) checkExpiration(...). + /// + internal void CheckExpiration(object? claimsData) + { + long expires = + claimsData != null && TryReadLongMember(claimsData, "Expires", out var exp) ? exp : + claimsData != null && TryReadLongMember(claimsData, "exp", out var expUnix) ? expUnix : + 0; + + _mu.EnterWriteLock(); + try + { + ClearExpirationTimer(); + + if (expires == 0) + { + Interlocked.Exchange(ref _expired, 0); + return; + } + + long now = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); + if (expires <= now) + { + Interlocked.Exchange(ref _expired, 1); + return; + } + + SetExpirationTimer(TimeSpan.FromSeconds(expires - now)); + Interlocked.Exchange(ref _expired, 0); + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Returns signer scope for issuer, if present. + /// Mirrors Go (a *Account) hasIssuer(...). + /// + internal (object? Scope, bool Ok) HasIssuer(string issuer) + { + _mu.EnterReadLock(); + try { return HasIssuerNoLock(issuer); } + finally { _mu.ExitReadLock(); } + } + + /// + /// Lock-free signer lookup. + /// Mirrors Go (a *Account) hasIssuerNoLock(...). + /// + internal (object? Scope, bool Ok) HasIssuerNoLock(string issuer) + { + if (SigningKeys == null || string.IsNullOrEmpty(issuer)) + return (null, false); + + return SigningKeys.TryGetValue(issuer, out var scope) + ? (scope, true) + : (null, false); + } + + /// + /// Returns the leaf-node loop-detection subject. + /// Mirrors Go (a *Account) getLDSubject() string. + /// + internal string GetLDSubject() + { + _mu.EnterReadLock(); + try { return LoopDetectionSubject; } + finally { _mu.ExitReadLock(); } + } + + /// + /// Returns account label used in trace output. + /// Mirrors Go (a *Account) traceLabel() string. + /// + internal string TraceLabel() + { + if (string.IsNullOrEmpty(NameTag)) + return Name; + return $"{Name}/{NameTag}"; + } + + /// + /// Returns true when external auth is configured. + /// Mirrors Go (a *Account) hasExternalAuth() bool. + /// + internal bool HasExternalAuth() + { + _mu.EnterReadLock(); + try { return ExternalAuth != null; } + finally { _mu.ExitReadLock(); } + } + + /// + /// Returns true when is configured as an external-auth user. + /// Mirrors Go (a *Account) isExternalAuthUser(userID string) bool. + /// + internal bool IsExternalAuthUser(string userId) + { + _mu.EnterReadLock(); + try + { + foreach (var authUser in ReadStringListMember(ExternalAuth, "AuthUsers", "auth_users")) + { + if (string.Equals(userId, authUser, StringComparison.Ordinal)) + return true; + } + return false; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Returns configured external-auth xkey, or empty when unset. + /// Mirrors Go (a *Account) externalAuthXKey() string. + /// + internal string ExternalAuthXKey() + { + _mu.EnterReadLock(); + try + { + if (TryReadStringMember(ExternalAuth, "XKey", out var xkey) && !string.IsNullOrEmpty(xkey)) + return xkey; + if (TryReadStringMember(ExternalAuth, "xkey", out var xkeyAlt) && !string.IsNullOrEmpty(xkeyAlt)) + return xkeyAlt; + return string.Empty; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Returns whether external auth allows account switching to . + /// Mirrors Go (a *Account) isAllowedAcount(acc string) bool. + /// + internal bool IsAllowedAcount(string account) + { + _mu.EnterReadLock(); + try + { + var allowed = ReadStringListMember(ExternalAuth, "AllowedAccounts", "allowed_accounts"); + if (allowed.Count == 1 && string.Equals(allowed[0], "*", StringComparison.Ordinal)) + return true; + + foreach (var candidate in allowed) + { + if (string.Equals(candidate, account, StringComparison.Ordinal)) + return true; + } + + return false; + } + finally + { + _mu.ExitReadLock(); + } + } + // ------------------------------------------------------------------------- // Export checks // ------------------------------------------------------------------------- @@ -3877,6 +4192,230 @@ public sealed class Account : INatsAccount return DateTime.UnixEpoch.AddTicks(unixNanos / 100L); } + private static bool TryDecodeJwtPayload(string token, out JsonElement payload) + { + payload = default; + if (string.IsNullOrWhiteSpace(token)) + return false; + + var parts = token.Split('.'); + if (parts.Length < 2) + return false; + + string base64 = parts[1] + .Replace("-", "+", StringComparison.Ordinal) + .Replace("_", "/", StringComparison.Ordinal); + + int mod = base64.Length % 4; + if (mod > 0) + base64 = base64.PadRight(base64.Length + (4 - mod), '='); + + byte[] bytes; + try { bytes = Convert.FromBase64String(base64); } + catch { return false; } + + try + { + using var doc = JsonDocument.Parse(bytes); + payload = doc.RootElement.Clone(); + return payload.ValueKind == JsonValueKind.Object; + } + catch + { + return false; + } + } + + private static bool TryReadMember(object source, string name, out object? value) + { + value = null; + if (source == null) + return false; + + if (source is JsonElement element) + { + if (element.ValueKind != JsonValueKind.Object) + return false; + + foreach (var property in element.EnumerateObject()) + { + if (string.Equals(property.Name, name, StringComparison.OrdinalIgnoreCase)) + { + value = property.Value; + return true; + } + } + + return false; + } + + if (source is IDictionary dictionary && + dictionary.TryGetValue(name, out var dictionaryValue)) + { + value = dictionaryValue; + return true; + } + + if (source is IDictionary stringDictionary && + stringDictionary.TryGetValue(name, out var stringDictionaryValue)) + { + value = stringDictionaryValue; + return true; + } + + var propertyInfo = source + .GetType() + .GetProperty(name, System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.IgnoreCase); + if (propertyInfo == null) + return false; + + value = propertyInfo.GetValue(source); + return true; + } + + private static bool TryReadStringMember(object? source, string name, out string value) + { + value = string.Empty; + if (source == null || !TryReadMember(source, name, out var member)) + return false; + + if (member is JsonElement element) + { + if (element.ValueKind == JsonValueKind.String) + { + value = element.GetString() ?? string.Empty; + return true; + } + + if (element.ValueKind == JsonValueKind.Number) + { + value = element.ToString(); + return true; + } + + return false; + } + + value = member?.ToString() ?? string.Empty; + return true; + } + + private static bool TryReadLongMember(object source, string name, out long value) + { + value = 0; + if (!TryReadMember(source, name, out var member)) + return false; + + if (member is JsonElement element) + { + if (element.ValueKind == JsonValueKind.Number) + return element.TryGetInt64(out value); + + if (element.ValueKind == JsonValueKind.String) + return long.TryParse(element.GetString(), out value); + + return false; + } + + switch (member) + { + case byte b: + value = b; + return true; + case sbyte sb: + value = sb; + return true; + case short s: + value = s; + return true; + case ushort us: + value = us; + return true; + case int i: + value = i; + return true; + case uint ui: + value = ui; + return true; + case long l: + value = l; + return true; + case ulong ul when ul <= long.MaxValue: + value = (long)ul; + return true; + case string str: + return long.TryParse(str, out value); + default: + return false; + } + } + + private static IReadOnlyList ReadStringListMember(object? source, params string[] names) + { + if (source == null) + return []; + + foreach (var name in names) + { + if (!TryReadMember(source, name, out var member) || member == null) + continue; + + if (member is IEnumerable enumerableStrings) + return [.. enumerableStrings]; + + if (member is JsonElement element && element.ValueKind == JsonValueKind.Array) + { + var results = new List(); + foreach (var item in element.EnumerateArray()) + { + if (item.ValueKind == JsonValueKind.String) + results.Add(item.GetString() ?? string.Empty); + else + results.Add(item.ToString()); + } + return results; + } + + if (member is IEnumerable objectEnumerable) + { + var results = new List(); + foreach (var item in objectEnumerable) + results.Add(item?.ToString() ?? string.Empty); + return results; + } + } + + return []; + } + + private static string NormalizeExportKind(object? kind) + { + if (kind is JsonElement element) + return element.ToString().Trim().ToLowerInvariant(); + + return kind?.ToString()?.Trim().ToLowerInvariant() ?? string.Empty; + } + + private static string ReadActivationImportSubject(object claimOrPayload) + { + if (TryReadStringMember(claimOrPayload, "ImportSubject", out var importSubject) && !string.IsNullOrEmpty(importSubject)) + return importSubject; + if (TryReadStringMember(claimOrPayload, "import_subject", out var importSubjectSnake) && !string.IsNullOrEmpty(importSubjectSnake)) + return importSubjectSnake; + + if (claimOrPayload is JsonElement element && + element.ValueKind == JsonValueKind.Object && + element.TryGetProperty("nats", out var natsObj) && + natsObj.ValueKind == JsonValueKind.Object && + natsObj.TryGetProperty("import_subject", out var natsImportSubject) && + natsImportSubject.ValueKind == JsonValueKind.String) + { + return natsImportSubject.GetString() ?? string.Empty; + } + + return string.Empty; + } + /// /// Tokenises a subject string into an array, using the same split logic /// as btsep-based tokenisation in the Go source. @@ -3923,9 +4462,8 @@ public sealed class Account : INatsAccount /// Checks whether is authorised to use /// (either via explicit approval or token requirement). /// Mirrors Go (a *Account) checkAuth(...) bool. - /// TODO: session 11 — full JWT activation check. /// - private static bool CheckAuth( + private bool CheckAuth( ExportAuth ea, Account account, object? imClaim, @@ -3936,8 +4474,7 @@ public sealed class Account : INatsAccount if (ea.TokenRequired) { - // TODO: session 11 — validate activation token in imClaim. - return imClaim != null; + return CheckActivation(account, imClaim, ea, expirationTimer: true); } // No approved list and no token required → public export. diff --git a/porting.db b/porting.db index b519e6617c5988917a52b8ff0332b00143fbe7a6..4c7856ec802457a4aea3ed883c9236fcb41addb3 100644 GIT binary patch delta 3350 zcmai#ZA?>V6vumeZ*Om3p7J7~(w4TUylLs}8x$&v4%`%{f{GxBEpY1_EnorXoI_X^ z7PCzGKp)u$Ct-=3afxOLcOSM)H%D|^Hub}@WQOozn#?R2xMeY9VLM*nLb+}C;g|eR z&i|a}JTJXHcjXd!?$#ypqFxb^$>`=m^JXG)(^qaI{(Wo2#*f&D3pdns5uHb8(ptqV zHuuTXFgrnr9Dg?DrP*zY5LFIzV6I2eUEJ2w-r3oqvlv+>*-h55|C28^|{XOq01vTd? z0X66H@Ux%D>1^hTwo>-uvUu%%`6fcHt;E&tRWe*Eho?>Enag>37WQ@70s2@_~RgYyim;GPeUkwNiiSxp&Dl3X$S#2 zAPGnYQh-!o1&{`$0~x?dU=^?$$OQC&0Wk7ULs?nVX88>9jqIULubt9NYSijZ)hP3| z!cTobPBN4-g*Ic0nYgCzYw!1_$_Q^?685#h)u#kaXBK8u;u}Fz)H`|{H2s28yU|+w zYsj>8dYsvAI=@(Vx81Z}*xUS*@%C5ZnO0N5nBVkF6Y=Sp0j9$Qm;nor4Xgpy0_%Vr zAQ#93@__DLU;#Ve0Gxmea05j^G2j78fKtE1P%dBz+u1# z_{RkDY&`oS&wjGp1+Dw$Dk+?~GH z8Q}a~-6y#)!Q$zA-0lLmgLM>HS*Mk?n^}9Qy{OdT=ovmH_1gr0er*3ysUH*1TMF>h z_gqe#HI7JYtchj{Nd38q{G8OElgQsH^_vp;TcrN1ME+)}-yrzYVx1}fJ{1UPA{~gb zrjxz0q=6)zZb)Z9qo_dpHvB-n>WFGuwUfEZ$d!la&*&A35Ot4QOAcAoJ9zkJQRK=# z%W$u-X4rUk@!q}oCKd3IuHs_6MH5)9a-01vE$!W*E_{26%f$WDT&iL2yKNbMQ(9g+ zU+yl7TfTas{Os~_Jm_t(Vc&84B%LlhF4x3vATIy<@&;kTKEFJRJI9;TBZ`0`)(B#U)6x#?+zC28@h?EX zaDuN%%W>zQ2lc`pD5tYH{w;_-PQ~ak?SdY+MmQtk#MMLG%6R(_oAktJGWe`f9K;cS zGvZlvOMB$bpM~8W_brGcbV( zpDgTW8k{)!I!94X2OIxl#PjHsHZch20gue`ZF5HAOX(Tz5bC2o>fS^jpmwRhG?70f q^_L{_2c>?G;E(gC(I)ldn+mQFCx6Ff$Ei6HqlOr0nX8HUA^i_6_71`T delta 1381 zcmY+>TWnKx90&08Kj-xH^zv`F!Uk?@PgmGYw^iPuHu)mBxtR#jS6 z<*TZi^t(6N)rFkfxg=x@$6ih|7gEAV9BJ4{PNV@NIgomdWJlU;BpXt@kyIp~k*r8- zjieyijl^{SCGitUTjRcSzsrMUcGV)8T{ZgO^P&<^&pJaw<+63>SK=Nmo=Xg9Q|@f(kaM^f>g;b?3*0fMr>}D~nQ4?036d9nNcx-yHS!S8Y=)A;i^A z>l?}xHTzko-gk)q%5?oOACxIR%rDT5Db+h0lC0zn^L8o~6&FRr`~U?OEXMfkew$36 zhq*@6i;_$KKF(82?@sU&OdTz5sukF_S7>~d*O4c}UnI|fY_4`T!duMMRwDdODu2Ds z9G{5dFO`)@kMh6%@4q&}17^P(Lw|8z_R{4T51M1Y#Q58kzH07>-jDMQl{NV6w4yW> z1lP~}qPo=?cQh!XJSlx6CfG%xVEbKNteiQl&*gbUrDI9{87&-FyfmK1{4Z6nc_ex_ z#XmAOQu)wpZtjaT|BlLU@r|^c;a$dYyIh!FqTA8lVxHz^mUH^Z9Q3-3|OF zXW7ARH`EpDJ!?jpv3w{-*(qT~z9UPLr*iB|dMTs%o%V^BianKWPq-;LjAy!?)Q(fR zQ17Mb7r2d1r?gh{$V(~hTM|FG|9*>Ut;4+gKhxR?D%auOFu_6ZWwf@+1U{Q=R43IV z8u+0Z0_ctPeBOw!vQ!5Lof_sh(HuZAO>-G8V4YR#%VY?wyo8@|hWHzD37#jLG^LIiGj@1FSgAcK`qY From 8d00012ba85ab51ef4b086a11b657a99a5fccfe1 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 20:28:13 -0500 Subject: [PATCH 7/8] test(batch19): port accounts core mapped tests --- .../ImplBacklog/AccountTests.cs | 56 ++++++++++++++++++ .../ImplBacklog/GatewayHandlerTests.cs | 48 +++++++++++++++ .../LeafNodeHandlerTests.Impltests.cs | 51 ++++++++++++++++ .../ImplBacklog/MessageTracerTests.cs | 35 +++++++++++ porting.db | Bin 6692864 -> 6696960 bytes 5 files changed, 190 insertions(+) diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/AccountTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/AccountTests.cs index f993d79..831f0f7 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/AccountTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/AccountTests.cs @@ -246,6 +246,62 @@ public sealed class AccountTests system.CheckServiceExportApproved(global, "$SYS.REQ.INFO", null).ShouldBeTrue(); } + [Fact] // T:81 + public void MultipleStreamImportsWithSameSubjectDifferentPrefix_ShouldSucceed() + { + var fooAccount = Account.NewAccount("foo"); + var barAccount = Account.NewAccount("bar"); + var importAccount = Account.NewAccount("import"); + + fooAccount.AddStreamExport("test", null).ShouldBeNull(); + barAccount.AddStreamExport("test", null).ShouldBeNull(); + + importAccount.AddStreamImport(fooAccount, "test", "foo").ShouldBeNull(); + importAccount.AddStreamImport(barAccount, "test", "bar").ShouldBeNull(); + + importAccount.Imports.Streams.ShouldNotBeNull(); + importAccount.Imports.Streams!.Count.ShouldBe(2); + importAccount.Imports.Streams.Select(si => si.To).OrderBy(static t => t).ToArray() + .ShouldBe(["bar.test", "foo.test"]); + } + + [Fact] // T:82 + public void MultipleStreamImportsWithSameSubject_ShouldSucceed() + { + var fooAccount = Account.NewAccount("foo"); + var barAccount = Account.NewAccount("bar"); + var importAccount = Account.NewAccount("import"); + + fooAccount.AddStreamExport("test", null).ShouldBeNull(); + barAccount.AddStreamExport("test", null).ShouldBeNull(); + + importAccount.AddStreamImport(fooAccount, "test", string.Empty).ShouldBeNull(); + importAccount.AddStreamImport(fooAccount, "test", string.Empty).ShouldBe(ServerErrors.ErrStreamImportDuplicate); + importAccount.AddStreamImport(barAccount, "test", string.Empty).ShouldBeNull(); + + importAccount.Imports.Streams.ShouldNotBeNull(); + importAccount.Imports.Streams!.Count.ShouldBe(2); + importAccount.Imports.Streams.Select(si => si.Account?.Name).OrderBy(static n => n).ToArray() + .ShouldBe(["bar", "foo"]); + } + + [Fact] // T:95 + public void BenchmarkNewRouteReply() + { + var globalAccount = Account.NewAccount("$G"); + + var first = globalAccount.NewServiceReply(tracking: false); + var second = globalAccount.NewServiceReply(tracking: false); + var tracked = globalAccount.NewServiceReply(tracking: true); + + first.Length.ShouldBeGreaterThan(20); + second.Length.ShouldBeGreaterThan(20); + first.SequenceEqual(second).ShouldBeFalse(); + + var trackedText = Encoding.ASCII.GetString(tracked); + trackedText.EndsWith(".T", StringComparison.Ordinal).ShouldBeTrue(); + } + [Fact] // T:98 public void ImportSubscriptionPartialOverlapWithPrefix_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/GatewayHandlerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/GatewayHandlerTests.cs index 0da9237..7a46e03 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/GatewayHandlerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/GatewayHandlerTests.cs @@ -1,4 +1,5 @@ using System.Diagnostics; +using System.Linq; using Shouldly; using ZB.MOM.NatsNet.Server; using ZB.MOM.NatsNet.Server.Internal; @@ -122,6 +123,53 @@ public sealed partial class GatewayHandlerTests NatsServer.ValidateCluster(conflict).ShouldBe(ServerErrors.ErrClusterNameConfigConflict); } + [Fact] // T:658 + public void GatewaySendReplyAcrossGatewaysServiceImport_ShouldSucceed() + { + var fooAccount = Account.NewAccount("$foo"); + var barAccount = Account.NewAccount("$bar"); + + fooAccount.AddServiceExport("foo.request", null).ShouldBeNull(); + barAccount.AddServiceImport(fooAccount, "bar.request", "foo.request").ShouldBeNull(); + + var serviceImport = barAccount.Imports.Services!["bar.request"].Single(); + var responseImport = barAccount.AddRespServiceImport(fooAccount, "reply", serviceImport, tracking: false, header: null); + + responseImport.From.ShouldNotBe("reply"); + responseImport.To.ShouldBe("reply"); + barAccount.Exports.Responses.ShouldNotBeNull(); + barAccount.Exports.Responses!.ShouldContainKey(responseImport.From); + + fooAccount.Imports.ReverseResponseMap.ShouldNotBeNull(); + fooAccount.Imports.ReverseResponseMap!.ShouldContainKey("reply"); + fooAccount.Imports.ReverseResponseMap["reply"].Any(e => e.MappedSubject == responseImport.From).ShouldBeTrue(); + + barAccount.ProcessServiceImportResponse(responseImport.From, "ok"u8.ToArray()); + responseImport.DidDeliver.ShouldBeTrue(); + } + + [Fact] // T:666 + public void GatewayNoAccountUnsubWhenServiceReplyInUse_ShouldSucceed() + { + var fooAccount = Account.NewAccount("$foo"); + var barAccount = Account.NewAccount("$bar"); + + fooAccount.AddServiceExport("test.request", null).ShouldBeNull(); + barAccount.AddServiceImport(fooAccount, "foo.request", "test.request").ShouldBeNull(); + + var serviceImport = barAccount.Imports.Services!["foo.request"].Single(); + var responseImport1 = barAccount.AddRespServiceImport(fooAccount, "reply", serviceImport, tracking: false, header: null); + var responseImport2 = barAccount.AddRespServiceImport(fooAccount, "reply", serviceImport, tracking: false, header: null); + + fooAccount.Imports.ReverseResponseMap.ShouldNotBeNull(); + fooAccount.Imports.ReverseResponseMap!["reply"].Count.ShouldBe(2); + + fooAccount.CheckForReverseEntry("reply", responseImport1, checkInterest: false); + + fooAccount.Imports.ReverseResponseMap["reply"].Count.ShouldBe(1); + fooAccount.Imports.ReverseResponseMap["reply"].Single().MappedSubject.ShouldBe(responseImport2.From); + } + private static NatsServer CreateServer(ServerOptions? opts = null) { var (server, err) = NatsServer.NewServer(opts ?? new ServerOptions()); diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeHandlerTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeHandlerTests.Impltests.cs index 7ba10ef..b08b498 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeHandlerTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeHandlerTests.Impltests.cs @@ -1,5 +1,6 @@ using Shouldly; using ZB.MOM.NatsNet.Server; +using ZB.MOM.NatsNet.Server.Internal; namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; @@ -71,4 +72,54 @@ public sealed partial class LeafNodeHandlerTests remotes[1].FirstInfoTimeout.ShouldBe(TimeSpan.FromSeconds(3)); remotes[1].Urls[0].Scheme.ShouldBe("ws"); } + + [Fact] // T:1935 + public void LeafNodeNoDuplicateWithinCluster_ShouldSucceed() + { + var account = Account.NewAccount("$G"); + var leaf1 = new ClientConnection(ClientKind.Leaf); + var leaf2 = new ClientConnection(ClientKind.Leaf); + + ((INatsAccount)account).AddClient(leaf1); + ((INatsAccount)account).AddClient(leaf2); + + account.RegisterLeafNodeCluster("xyz"); + account.RegisterLeafNodeCluster("xyz"); + + account.NumLocalLeafNodes().ShouldBe(2); + account.HasLeafNodeCluster("xyz").ShouldBeTrue(); + account.IsLeafNodeClusterIsolated("xyz").ShouldBeTrue(); + + account.RegisterLeafNodeCluster("abc"); + account.IsLeafNodeClusterIsolated("xyz").ShouldBeFalse(); + } + + [Fact] // T:1952 + public void LeafNodeStreamImport_ShouldSucceed() + { + var exporter = Account.NewAccount("B"); + var importer = Account.NewAccount("C"); + + exporter.AddStreamExport(">", null).ShouldBeNull(); + importer.AddStreamImport(exporter, ">", string.Empty).ShouldBeNull(); + + importer.Imports.Streams.ShouldNotBeNull(); + importer.Imports.Streams!.Count.ShouldBe(1); + importer.Imports.Streams[0].From.ShouldBe(">"); + importer.Imports.Streams[0].To.ShouldBe(">"); + exporter.CheckStreamExportApproved(importer, "a", null).ShouldBeTrue(); + } + + [Fact] // T:1955 + public void LeafNodeUnsubOnRouteDisconnect_ShouldSucceed() + { + var account = Account.NewAccount("$G"); + var leaf = new ClientConnection(ClientKind.Leaf); + + ((INatsAccount)account).AddClient(leaf); + account.NumLocalLeafNodes().ShouldBe(1); + + ((INatsAccount)account).RemoveClient(leaf); + account.NumLocalLeafNodes().ShouldBe(0); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MessageTracerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MessageTracerTests.cs index 4873d29..bd0a121 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MessageTracerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MessageTracerTests.cs @@ -6,6 +6,41 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed class MessageTracerTests { + [Fact] // T:2359 + public void MsgTraceParseAccountDestWithSampling_ShouldSucceed() + { + var cases = new[] + { + (Name: "trace sampling no dest", Value: (object)new Dictionary { ["sampling"] = 50 }, Want: 0), + (Name: "trace dest only", Value: (object)new Dictionary { ["dest"] = "foo" }, Want: 100), + (Name: "trace dest with number only", Value: (object)new Dictionary { ["dest"] = "foo", ["sampling"] = 20 }, Want: 20), + (Name: "trace dest with percentage", Value: (object)new Dictionary { ["dest"] = "foo", ["sampling"] = "50%" }, Want: 50), + }; + + foreach (var testCase in cases) + { + var options = new ServerOptions(); + var errors = new List(); + var warnings = new List(); + + var accounts = new Dictionary + { + ["A"] = new Dictionary + { + ["msg_trace"] = testCase.Value, + }, + }; + + var parseError = ServerOptions.ParseAccounts(accounts, options, errors, warnings); + parseError.ShouldBeNull(testCase.Name); + errors.ShouldBeEmpty(testCase.Name); + options.Accounts.Count.ShouldBe(1, testCase.Name); + + var (_, sampling) = options.Accounts[0].GetTraceDestAndSampling(); + sampling.ShouldBe(testCase.Want, testCase.Name); + } + } + [Fact] // T:2331 public void MsgTraceBasic_ShouldSucceed() { diff --git a/porting.db b/porting.db index 4c7856ec802457a4aea3ed883c9236fcb41addb3..5452cfd445b596513b8519d308848f4e67d10758 100644 GIT binary patch delta 3155 zcmb8w3vg7`835pW&VBFhz2|NcB6%#_O;#X4Ojv>&2qXj~NFWI>1DKHAY!V!>q?kw5 zLLs3}Vq#1}jcPy1xN(KuEhCqZBMvk$9 zLLewt!1gtg2fB7A#K4iDoB;zj8=CB&F>6MEXI&?Y4hg4vqctZeBL%5QLppLGCvqW! z=LF5yZ1ZD*nJ?>Dg~I}pSVZ7SIUsQ zQmiD4{}MkJKNR1Vo|JY=pGtp`XUbD#k4zM&d{O>NJ}DoQUy-@|sC>VCr@T$RNv=}* zl|JQ&@|^OBvP)@Kwkn&Hu(DDqP_mVYszbS;HmWzMYt+T+Zgq#|)Oy9~q_e~%1cC)D z8K8hAQL-7%ox|Sq1lRKoo5S{Ab)&Cgu$SP9~7L9x7&sV@Y*%EBye+!xu&DFg$UFm~ikroXi(s<8-}J&?6^X z2W>@c0kNRAh$WA6i5@cCif7z7BGKtPgzb=6#42HGHuJ##A*}y}?=j_`A(BkIP%%Un zz>VKvLkTv(+DNo;2@5cCpYwge_%SJkiup7Rb}wcR!-?}`GF-SovY~4hD}~I(ED@|6 zHr0-vSTv%q1c+S0ix-HG3im+qC6Y)*3m(j2Gf4>kn!}dC&_&{czZbG(zHBxN6Y?4K z%wc}X%omE`^c=QTVv^7VzsO~ok!T6;%4L;Ab-W||L3&Yu{<-WG$lqZl!0$uswtF|x zWT=?O#>>kU{6TGhrI){z&+d_+w8MG;F3h7z;4GpZ?qA3r5F@W$AM_Qn5BcY977*A_ zObvd2yY&ohB}*A6OD!=|YNC0=^V|XBo9QIjhB;wpvvmr-BC3~v(rndJ>Uf8*xy@=+ zFr)0}eQnlL1Vgcj*A~+o2tRYDmEb44LGHC?$pw10Zt}ujOCh}QCCepIt&p#N#X3tR ztpOjF;@7PSk{-tBme;M*k~0^h++-Thx4dE99#2|8?zb}d)&15Zgdaa?`DO88VJml^ zvIaCo4hgTy1)vUC-DBky+NUgzspgRRrP*&DGha1dFrSbM_`v~7BK*kL)^>48uaim(HG0K6N10Kp)srnFq#yJ)IAuD8;-g#iW`nPFmgm9x{H_ZrAx#(_KBck zLv*s`{Oq^ZcS@ozj-kV2@5Y_iBT>+S(RmtvR&{?#6uOmeqG5PyZg4XAs5=+hsoQ{! z9=FMEjXX#{R!SQ=_B0yfl^$V;nu4*#RqA@)^_X$OIBFa=dc@<#UZYbv%2O=&aUv;) zm7bmJYm#_UjQedu>D~Nvtoto0E*FyF`FOXFV(lm5-3^iB6DGHodfX@m#iBSg2F0T* zc&TUX{{yE!@ zT}qrhZj-%Tl$?ri3^q2{zvHP5_TLm4>z89d*=%o)%$*9EpT#Ca?`AtMDtW7Wi_P6z zymfv!S?cjY*Dn(jp!X?nVbmCY#naw$^@sj}W8Or#<)Akn{`jodfG-}mqgTQ;?v1~d z8Zx&tQ;tY5Z;6=-&lqtjkTxSFeK@5){7=_h(J>xQo{9DH+h?SNRMEU7z?s=8iEt@5 z2FWqE&p*$}ozIA3^3QFZO z^LzmtmhT*s3Qu?2iST65H$Q4YtrvqnbS0XACZb6w4NXQ<&{UL;u0k0o6HP;YG#zE3 z8R%*hK-p*}nuT)EY%~YuqC7Mg<)e9MKDq`iKm}+ax)v=$g=jHaf{IWvDnU!pGPE46 zKr7L8s1&V2tI-;MF?hYy%UqX)L3|J7YDbo1f@3Ubc3Czx_QG7dY^-K>FPvyV729G{V2T*m7}$&0tNX;>7j}H$+!5y3N51QGrYJzBvBZ6FXZB(k3#_& z|93i1g?v;Gv>rMYLX{|ts!%mQbgHIcl9(iH1l3wM#YHdD&E!036u!n5e91BrnuT4m zE_kV@WK7lc#@h7_!G@aphI+LF$bs6z2) z%xSay{#ls;Od^evgzox3lZc}wx%=`-Qb$y>Z5PEHG5c8!%PzkRV?+wREp}bA%QWy?B$^EZ+hTreHgG(!b>>*yY;BZ% i(qW%m^FQkR*{P|cBvSWWzS5s1QASJBb@?PCm-}zX9tHLQ delta 3715 zcmd6o{Zmxe8OQHA_wK#RzMONHMUXe$T~P!zytoP~0>0rpDx%RqaY4{ggAxU6qX9IT zS#69M9em7aQgk{dCbqTJc+#}B7`2HnNsXq~CTfXkwBuXxHAXYq2Ml}f{s;Ntvva=p z`@EgAXYV$)ZIv5ax5^viEn6f>t$5v&N4ESvke5asPhQ@U7Tl3W#EvvNx5e7x7$bE= z2*)X!Phi@h?nUCQY5Z+laLjp%$YRWST>(uEg2l2%Ld&iCq$U zBRkE~vCBQDoJcFH=2b0ST2(18=Sh;I>j}DdyVe~pt%1lUmO<)ZbQ7CS%Ei_uwxGo= zkC2{~BM95hnpg=dWPaymVCj4(Y|7x#(2~KeKaCGXypzG-APsV}RM53y@_{m6N(iZ! z!?s(Bks*(kba-=@RM$LTRa<56XO6_0pGv8rD)nJrD|#&C{l(c}I&ZQP+t-B57sC=i zR=lRfBVpcQ%*zht9YSTauM68!ZCKthrwn<1UX-f*)0A-%RcNE+1S(_T)EiHr^65Jb2V8y5Ze5yu+SRI z8B_c77GY}f6)K}eby(g7%rm@2Vcumm*z-Wvzw-7mE;CaW<*{O+ru^Jp2brewAZB(ogK}Iv>QGSHKgt^FH-xJ(U7{r zJYDc>uS^N$zj=j&||P+B<~(+ea}><7b*rPpWsnjZ<%^~P#NbCgUaX~ zjVjce>mdRtuHpm&x;~K~two>WyuH&IO=5hpV z3#F7UkC-hw(SFvLf=&qY#$j50R~j9QDs=H^0ji3SH>Mb6=;qPUs6uaBItrEXb&N!1 z9LWe&#*yTr3LOa@4yV5-sn6K%o3f{%GO`O%8QGIjF*x;@;Q!B1K9kvVOvR>wGK^;{ z_E{M6cqEUGlEA*vlw54bbAa|vXLHyo%OK=2C*`=tA!Ar!L z9KM*4JFtH^&r~|@EnFSW>lLPPB+b*uVI zHJ}!$tJKBnF7>*45jF()E6p|ZC~VH>LoG$<6~W@Ra<4csf^Sw}n4dFB>%r~kDX@1S zk0%uJ2l8x4yhj*}?f6(RkUPOPnf4HwWBGPj>DVBgp1{9?;3&#q-w^&PBoD!q!iVH2 z+|9z(TUngL@<-$)m_C@l4NC^`1F-!8Nrcov*rvT>75_b!-s9)-Tka88kWA%*Ox5Ld z^rdLL+?LE~V< zC;At#b|G~@K>}rB)iHf-Gzo&GO;4j*itxAT?+|hEYdzf}XGsadbwzKts@99rVaq1a z+VvfgJc?`VA-loa*ctXW_D8lyw6^OCfxWkMRUEmcuaPG$tF5b`g_e4$Z|7<ueojz#j0C1*pok}KO|9CE|76NRjjA3I%miAdP14xEOG{%i=C?+ zGaVl|-gRDdo^pQV-0$4w+$ub}>pZc_=bzp`p+>qc5vt7=SKY1?6b>z{ii5vLyPQ-j z5f`IfwJOw5SA4KK(S<}JZX^onhD0MVqB^nrw%hUSnA7i=VlTH9L=;#HG{5S%^i^#1 zF?lRCXT+CMNfMLYvHKiiwI0g|>>TRr1ue5I4mjwFoe1gM6$UYpu`NK$A`Bx5Z0^{> z(00ztg2WSx#3AuW0+NXIKza(|@kCA~s)(xbtdSLmLplT1wVrjNSFPt873?cLD}mj1 zuY>$eF^Vi%B7?3~9^qQ$U786+J0r2*qPRF{{jGPxmVVJ(#Qx6voCYuK^yb2f553WF z@_nxpuD|E;3#;%(|AMl|!jtYhqCn;$S1;J>jPgOsAm0GYWpY_lPx$(p+bh-%@&zk7LDc!_d4!Wl#J3(7P6XY%QG-cXCtyEsC6>9nPh?c1($uEk9o|3hME?ocx zJ#>#yeI;cwvvrr)@~Wi^<}IkI)K)|N?#ei^zIVxbMR7cqQXwwJl?07Ts+cHDE!juO z0azU4Itbfm>50J$14@um(5`3_$=f$^}nQFn;aw!i}dH~$e}B5)~? zWg)4s^-3TKN>fW%@c5MgQjSz0bCF7M{7Ti(zH&FY0W^JHzh<9pyh!&DMHUV2d8-}& E2a__0Gynhq From a36bfb7389eb5205b8a59bc47934272d9acec2bf Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 20:35:42 -0500 Subject: [PATCH 8/8] feat(batch19): complete and verify accounts core --- porting.db | Bin 6696960 -> 6696960 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/porting.db b/porting.db index 5452cfd445b596513b8519d308848f4e67d10758..43c7dec75cea932b7e2aa4c95d717eb7f8eabe76 100644 GIT binary patch delta 3653 zcmY+HX;c(f7J%y&-BsO9-RhzRRHT5!1;Gv6STu=ZLY!b+P#6=%1;L4lisFJ}9HrYt z3}zfBlX#sR4^d}gjLAWaN-2j3<2LH3QHK~~6p=A;ImScG95Zmx%&lHt+dsZTeRbcv z@4frr)}g#*OIW$dezvxmMdDk{tdpHV_jOh-Teb-WmK4Dl-@-OAOt`_ajpiHdi0HCD z7Gmd=ELb$Zcxf@~CUpp^@x@Ixj*XNS3fVl6NP%>2eaMoz#ghe7#F;5D^9MF`jjf8! z=c7iD&ES?vHj!HfSr)f+vZ374$og=X>v&qfxkpG+34pG2neGz`LY zkz0?Dkul~-Jb#N_VC#(?x7l)L3BOF&7kii8I$&j@I99;6^RvbnIorenCOKRVi~S*1 zZx`c6*>9_r>MC`iI!n#9Ur~pvebpFMfyeeU_QTKytGVI-dZTujHpL*i_FrxYAq5~d6d*F zotKVS^Q~j0{nAcpi&QDC!kQF1dSe8gx`t^IUab)y#>s=_UUIZ-lRBh(W;xkaUvYf} z#{2DB4tZ&W;=|{fZ|wLSVznu>2B4p(_{W;d@iVd^tYKg#rE?7jofU z%=#640rzB{ymm4?4$ind1(N+UPHt{lLu-3m-q6Z3R_S6wu069(9ahzn)9K+ zfvtlbI+hngPyEWJC;I1nG8^i>E0~ez{Vz`yLNmG^hv~TKcVNe}b6|+mdmzsQPn=6& zBRmsO%Yzt9DuPRR@nySe>ixsdAL%a*wy*)eN*cYSjOsCcRb z28%<68IMXJpWy?S-VawVhC%cS_bi45m?i1Ga8xObHs7o*g$w4GM0|4z3^!|>n&E!k zW2YLwFb{7Wr5ZOafl_ouxy({JaKTbiDVv8$2(ifkx4zBxVd&1qTX?EV5xOXc+&{Hsk`51sZv zjurSvUGIU5-+_DP-KJ)EBHd+_cZ_?qZ&`OG{MWy?l^Y--+W-8~cF6Xh#JIZyjt8Fi z{~erx6Q2ASe)O+dw-c`WXI6az$$=4Pcfn)->+_$&_e@UBG&eEWsQ(ONf_Ck57#p9Gtm{m)}mkhen ztF?f9@Ja)Y1jHC~5a#mbjG99b8MJ@ZK?;lTr(OH3JKrqoTT$Mk8c)^3Rz4JueF;~1 z&W{hnuOhe4{_$iT&Kv;?pJse>6vUvl9D@&nw(10o4%)GQ!nC0EIR!2|U*Wln5X<-J zUgplGeLmxk!iH(|Ta!~w=il&eIK_ugYl5d4uhv5MXsXueKj4HedKB*X8qV^8%P+wa zIbhg!4O#PpUVxX$d@Kc7ifdKY(9M?X7Q)jt9&S z)6I^Cm0|jQ{$aqa7Cpm!`;A5aTc9t>SY_3{J?Mf}ZxDGK25fr+tk_`F)69O)2F@93@YVX#3UH`aKA8zJ-%aLr1xgW4>bR- z>BD$@cKmnVp6mT$9xIIZe*v(gFuoH1Tn@c(*PQqvzR?vebK;2v;wj>3q9@Ud=uPw? z5{bS<5|K>wBl;5qh=If)B85mL1`|Vwp~Ns^IFUxA6B$G%F@hLLJVQK7JV%ToMiW`Y z7~*;21tObxkr+$-hIom{A;u9e6XS^q#6)5ekxNV_rVx2VKJi;(D)9>ODlv_iZnVrP z$Qj9WqyM&oJ8ES3qT+=`&aj?Jn9c(K5g18(3a)AFO_Uo8`qGb-#)2Hjv@YR7Vj5#i iM-RQW1qN3fXe=OR5HpEc#B8FFm_rm92O5iWO8yUai_eV! delta 3583 zcma)-dsGzH9mjVryR$pX{B{=vg3AtzG2taD_~M}^K8X<%6@?lNhIi@<5ELx6k;OzB zZBkDg{n4J(*NGZmh>DC_Bh+B3Z8TJgi7_I0Voj`iXq%c70=B=M*&Uqx)x-H5zTf+M z+{eJZHC^=qHC~#y^_`pzU+?7lxMuX+n{fe8#P0$5mbS_%w z+&UNOb(r>dZp=E}_;{U5)45cgOVPPxolDZWM4cO{b0c8g<2xI!3kK<&L+9)|XVW>* zIZ@{Vbyp>s~J!}dGe-?-)SuCKXD&gdPHU3a-V$4!O@ zY=!}Q+HOlC9ph(YDrBV#~E<+NRmY*53XbYJ#GMlNzO{7*er{iY67QC^xAP zMMaXbD#|5C?Pq(K(f*K2!M_}2DY)ltwrimf5XzKzg#DqfkCT+zXD}(XPdF*HPl#4A zUdv0<@=~?D6nu1urDJ5DBM)~suq*g*AB)G>eh1*;!z^!^Rp%0&eCU(b(I_9x_8cx{k6)mv}$9O~RBzY#6RPM0=MHF}u-PZHULteQY%Dl$i_j zPO@P}vCv>cN53OXe&-Z>kHeqJEWs%5H>6-&ul*_HDUs)D3p#ti z0K!O^2%az#7D6DbM4;S%TI~BR4#_h~TQdU8D9wdLbcYo4IBzbzpp9>xB^dGiT!>L( zmX~Uif(+c)Dy+E*p%|PAOSDYfo(aR%4wo}w)4;fAp4!=j&GX>V!1(9$Azsb5Wcl*3 zF$Sd$M!@e_-rtj&ab^Wd}=n5Jaog?#8#$2PwT z0$wbD(MrHtJG(YA*uagiosEtxaAI5`TvZck2_nW8`R2m~MQ~dQ&?#9kzZjC#b<`IF zFQ|b!u;GPL7=8^w(wYzsmtyTU7_E*6bwR>(lb^mS+eP?fDIqqk0 znKvb-rX`KtS_9vxN`6oSp`5DGQVVx|^1E92+?Ua@9+q)7rOva|Q$`z{T8crnw845A z088J1GY%zMD|$-lD?R6VItjai-63jx@dmi5R@Vl4UZd))pW~4{s?M#0d%oG`{u25G ze~&fzmLx^s$~WPjT^$SYR8illp2_O8o{jLWZ*A3^AS%qKx9@~>-%jMayWpH(H@ydT z{|?XZhWoxbPwj!5zRGQTA>N;H@qKvY8*bkRw>fiS+SsJDF-h|21K=ERHx9y#0hiDK z7d6-12zN9iKR5zen!}mL;F7;1x|`stX8M}onr6N{4lNGf#Gd({m%XP3-cyUgnKFIU zFPj0itfwKom zp}6Z)xTqyoUWE#?-(k-UxaBwUwN8lAmebq?)tW(1H|+H1VcaeFwcpgW+ycTTx7@N% zfk9%2;Vu)I|E;rLvbM^W9(xBTfBjE;wHbSx*>t=w1li={-`nYxS!@l8sJq^4Cmci& z5ljpsLWoczjCg_wCk7KvB0|32>$+o7|HJ9Ja&1{B3Amu#e=UsSq^=(Y{r8gztC@6E zy)(kxfY||(dJE!P0n$Izaqr{uu2D)=?lZh@l(r9yZ!<~C9mUI^nxsE6br!nT0ehB# z30o{uvU=0}V3DePcbG2hz)iib!Prv;7B%rRtF+0N*a}h@ZV{!CN(3*6(iiFk6j0jo z*bmY+|7sj~-bPP}_hQ3uZPEcXS9vzh*`;wxK)u`@(tqs!Pu3hIeXo7L4UJNBW90H9 zkCpx=&pkqYiq7S^ulwgRV_Uvkc}RHtA>aL`RuG5>3)~~U1$=q!MYwcwz!Ek$8r9mY76L zCZ-TmiD|@iBAs}Sm_a;G%p@|1S;Py(i^ObV4l$R=B<2zGi7X