From dcf1df44d1aa2d84d2bc48033e1b4335244446d0 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 19:47:10 -0500 Subject: [PATCH] feat(batch19): implement account trace/counter/export core methods --- .../ZB.MOM.NatsNet.Server/Accounts/Account.cs | 319 ++++++++++++++++++ porting.db | Bin 6664192 -> 6668288 bytes 2 files changed, 319 insertions(+) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs index 2e3371f..fa0308b 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs @@ -475,6 +475,29 @@ public sealed class Account : INatsAccount } } + /// + /// Sets account-level message trace destination subject. + /// Mirrors Go (a *Account) setTraceDest(dest string). + /// + internal void SetTraceDest(string dest) => SetMessageTraceDestination(dest); + + /// + /// Returns trace destination and sampling. + /// Mirrors Go (a *Account) getTraceDestAndSampling() (string, int). + /// + internal (string Destination, int Sampling) GetTraceDestAndSampling() + { + _mu.EnterReadLock(); + try + { + return (_traceDest, _traceDestSampling); + } + finally + { + _mu.ExitReadLock(); + } + } + // ------------------------------------------------------------------------- // Factory // ------------------------------------------------------------------------- @@ -502,6 +525,12 @@ public sealed class Account : INatsAccount /// public override string ToString() => Name; + /// + /// Returns the account name. + /// Mirrors Go (a *Account) String() string. + /// + public string String() => Name; + // ------------------------------------------------------------------------- // Shallow copy for config reload // ------------------------------------------------------------------------- @@ -966,6 +995,12 @@ public sealed class Account : INatsAccount internal int NumLocalConnectionsLocked() => (_clients?.Count ?? 0) - _sysclients - _nleafs; + /// + /// Returns local non-system, non-leaf client count. Lock must be held. + /// Mirrors Go (a *Account) numLocalConnections() int. + /// + internal int NumLocalConnectionsInternal() => NumLocalConnectionsLocked(); + /// /// Returns all local connections including leaf nodes (minus system clients). /// Mirrors Go (a *Account) numLocalAndLeafConnections() int. @@ -1049,6 +1084,13 @@ public sealed class Account : INatsAccount return _nleafs + _nrleafs >= MaxLeafNodes; } + /// + /// Returns true if total leaf-node count reached the configured maximum. + /// Lock must be held by the caller. + /// Mirrors Go (a *Account) maxTotalLeafNodesReached() bool. + /// + internal bool MaxTotalLeafNodesReachedInternal() => MaxTotalLeafNodesReachedLocked(); + /// /// Returns the total leaf-node count (local + remote). /// Mirrors Go (a *Account) NumLeafNodes() int. @@ -1115,6 +1157,93 @@ public sealed class Account : INatsAccount } } + /// + /// Returns true when there is at least one matching subscription for . + /// Mirrors Go (a *Account) SubscriptionInterest(subject string) bool. + /// + public bool SubscriptionInterest(string subject) => Interest(subject) > 0; + + /// + /// Returns total number of plain and queue subscriptions matching . + /// Mirrors Go (a *Account) Interest(subject string) int. + /// + public int Interest(string subject) + { + _mu.EnterReadLock(); + try + { + if (Sublist == null) + return 0; + + var (np, nq) = Sublist.NumInterest(subject); + return np + nq; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Increments the leaf-node count for a remote cluster. + /// Mirrors Go (a *Account) registerLeafNodeCluster(cluster string). + /// + internal void RegisterLeafNodeCluster(string cluster) + { + _mu.EnterWriteLock(); + try + { + _leafClusters ??= new Dictionary(StringComparer.Ordinal); + _leafClusters.TryGetValue(cluster, out var current); + _leafClusters[cluster] = current + 1; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Returns true when this account already tracks one or more leaf nodes from . + /// Mirrors Go (a *Account) hasLeafNodeCluster(cluster string) bool. + /// + internal bool HasLeafNodeCluster(string cluster) + { + _mu.EnterReadLock(); + try + { + return _leafClusters != null && + _leafClusters.TryGetValue(cluster, out var count) && + count > 0; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + /// Returns true when the account is leaf-cluster isolated to . + /// Mirrors Go (a *Account) isLeafNodeClusterIsolated(cluster string) bool. + /// + internal bool IsLeafNodeClusterIsolated(string cluster) + { + _mu.EnterReadLock(); + try + { + if (string.IsNullOrEmpty(cluster)) + return false; + if (_leafClusters == null || _leafClusters.Count > 1) + return false; + + return _leafClusters.TryGetValue(cluster, out var count) && count == (ulong)_nleafs; + } + finally + { + _mu.ExitReadLock(); + } + } + // ------------------------------------------------------------------------- // Subscription limit error throttle // ------------------------------------------------------------------------- @@ -1460,6 +1589,167 @@ public sealed class Account : INatsAccount } } + // ------------------------------------------------------------------------- + // Service export configuration + // ------------------------------------------------------------------------- + + /// + /// Configures an exported service with singleton response semantics. + /// Mirrors Go (a *Account) AddServiceExport(subject string, accounts []*Account) error. + /// + public Exception? AddServiceExport(string subject, IReadOnlyList? accounts = null) => + AddServiceExportWithResponseAndAccountPos(subject, ServiceRespType.Singleton, accounts, 0); + + /// + /// Configures an exported service with singleton response semantics and account-position auth. + /// Mirrors Go (a *Account) addServiceExportWithAccountPos(...). + /// + public Exception? AddServiceExportWithAccountPos(string subject, IReadOnlyList? accounts, uint accountPos) => + AddServiceExportWithResponseAndAccountPos(subject, ServiceRespType.Singleton, accounts, accountPos); + + /// + /// Configures an exported service with explicit response type. + /// Mirrors Go (a *Account) AddServiceExportWithResponse(...). + /// + public Exception? AddServiceExportWithResponse(string subject, ServiceRespType respType, IReadOnlyList? accounts = null) => + AddServiceExportWithResponseAndAccountPos(subject, respType, accounts, 0); + + /// + /// Configures an exported service with explicit response type and account-position auth. + /// Mirrors Go (a *Account) addServiceExportWithResponseAndAccountPos(...). + /// + public Exception? AddServiceExportWithResponseAndAccountPos(string subject, ServiceRespType respType, IReadOnlyList? accounts, uint accountPos) + { + if (!SubscriptionIndex.IsValidSubject(subject)) + return ServerErrors.ErrBadSubject; + + _mu.EnterWriteLock(); + try + { + Exports.Services ??= new Dictionary(StringComparer.Ordinal); + + if (!Exports.Services.TryGetValue(subject, out var serviceExport) || serviceExport == null) + serviceExport = new ServiceExportEntry(); + + if (respType != ServiceRespType.Singleton) + serviceExport.ResponseType = respType; + + if (accounts != null || accountPos > 0) + { + var authErr = SetExportAuth(serviceExport, subject, accounts, accountPos); + if (authErr != null) + return authErr; + } + + serviceExport.Account = this; + serviceExport.ResponseThreshold = ServerConstants.DefaultServiceExportResponseThreshold; + Exports.Services[subject] = serviceExport; + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Enables latency tracking for with default sampling. + /// Mirrors Go (a *Account) TrackServiceExport(service, results string) error. + /// + public Exception? TrackServiceExport(string service, string results) => + TrackServiceExportWithSampling(service, results, ServerConstants.DefaultServiceLatencySampling); + + /// + /// Enables latency tracking for with explicit sampling. + /// Mirrors Go (a *Account) TrackServiceExportWithSampling(...). + /// + public Exception? TrackServiceExportWithSampling(string service, string results, int sampling) + { + if (sampling != 0 && (sampling < 1 || sampling > 100)) + return ServerErrors.ErrBadSampling; + if (!SubscriptionIndex.IsValidPublishSubject(results)) + return ServerErrors.ErrBadPublishSubject; + if (IsExportService(results)) + return ServerErrors.ErrBadPublishSubject; + + _mu.EnterWriteLock(); + try + { + if (Exports.Services == null) + return ServerErrors.ErrMissingService; + if (!Exports.Services.TryGetValue(service, out var serviceExport)) + return ServerErrors.ErrMissingService; + + serviceExport ??= new ServiceExportEntry(); + if (serviceExport.ResponseType != ServiceRespType.Singleton) + return ServerErrors.ErrBadServiceType; + + serviceExport.Latency = new InternalServiceLatency + { + Sampling = sampling, + Subject = results, + }; + Exports.Services[service] = serviceExport; + + if (Imports.Services != null) + { + foreach (var imports in Imports.Services.Values) + { + foreach (var import in imports) + { + if (import?.Account?.Name != Name) + continue; + if (SubjectTransform.IsSubsetMatch(SubjectTransform.TokenizeSubject(import.To), service)) + import.Latency = serviceExport.Latency; + } + } + } + + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Disables latency tracking for the exported service. + /// Mirrors Go (a *Account) UnTrackServiceExport(service string). + /// + public void UnTrackServiceExport(string service) + { + _mu.EnterWriteLock(); + try + { + if (Exports.Services == null || !Exports.Services.TryGetValue(service, out var serviceExport) || serviceExport?.Latency == null) + return; + + serviceExport.Latency = null; + + if (Imports.Services == null) + return; + + foreach (var imports in Imports.Services.Values) + { + foreach (var import in imports) + { + if (import?.Account?.Name != Name) + continue; + if (SubjectTransform.IsSubsetMatch(SubjectTransform.TokenizeSubject(import.To), service)) + { + import.Latency = null; + import.M1 = null; + } + } + } + } + finally + { + _mu.ExitWriteLock(); + } + } + // ------------------------------------------------------------------------- // Export checks // ------------------------------------------------------------------------- @@ -2146,6 +2436,35 @@ public sealed class Account : INatsAccount return false; } + /// + /// Applies account-based authorization rules to an export descriptor. + /// Mirrors Go setExportAuth(&se.exportAuth, ...). + /// + private static Exception? SetExportAuth(ExportAuth auth, string subject, IReadOnlyList? accounts, uint accountPos) + { + if (!SubscriptionIndex.IsValidSubject(subject)) + return ServerErrors.ErrBadSubject; + + auth.AccountPosition = accountPos; + + if (accounts == null || accounts.Count == 0) + { + auth.Approved = null; + return null; + } + + var approved = new Dictionary(accounts.Count, StringComparer.Ordinal); + foreach (var account in accounts) + { + if (account == null) + continue; + approved[account.Name] = account; + } + + auth.Approved = approved; + return null; + } + // ------------------------------------------------------------------------- // Export equality helpers // ------------------------------------------------------------------------- diff --git a/porting.db b/porting.db index 15af1aedbf22df00ef4836cbe138d3384227974a..244b04efbc21679711b6161fa1ef46a44e7cd434 100644 GIT binary patch delta 7043 zcmc(jdr(x@9mnsvcVG9i=k9`n1a@UL1{HklA}o)@`l!YD3`8&x41t9eX)%j{Y0~Ma zYwegenTR&wx02MscG9u76CZWFkx>L=eE?GJ)M->I#7FC7+W5+(9b;;L`?_~mZ~xnY zKRyiKJ(u(Sp5MLa+~2ZkxQ*Mi3Aol&Z7aj*R&BSUwuAI}QIr7x{@$ z&M-`7DUoG24i?I)3ss)1j-kqs)qYeKS?xuo ztgu~5|AL&*l$pvHSFL-syQ#@t%k*PCMOpRvS1qevyfRN#XHjLy>I^E2tWKj6VeV~` zywUV~rcnO;1S(~Q9#qN<-Kdlqx=<-Ojz#5=FrzZ(epJewdr>KKwlnce#H2g@i<55M zCYJr&Lr#?R9qdVx!_$ZKHoZnKiXVvYie9lrtmFR9UE=z={+BhJUY&28WsITu&L z6>@o824~^K!@puh0vSixo6`MXv%@^AZHA&wHk%qbD+xcZYa!`lc99{j*3EN?Y%`cT z*=5q9kJ)8J&s&&pSdnS-nk_n36n-QmgN>LI!Ly&`MM2QNq5r;qm43N?F8>gQhe^_S z{!4yX=i~p(pX3kgey;m5zlU$<-{Cj#jl7F5fhw=r39ZDOE4n~J%u`EGM_qC% zN}Y&OJyEJVN_9o4V=(@qIW=={)Q9bnijCUeYzCy?H#y-$L%ayd-(J z$F=BINyhVR7GaI`aQ6aRDebzz_6c~<=qh|=TRnp;!XX+&i|7zNVnBFAK;k6NpmFE* zWKALaU*clQ#Rv41@l)Y8pKeIiJ<#rf2{w`s2YTu6ApYO16&klt9v1e|1h_mzpMp&n zlMI^doC1e*q>tthI5=Ohwk_fGZA*DA{9u4m80x1+u%C;(7D~_2e*s%eCxtIdS0;jA z4$wPruXBMFy0_7IICqr_FgS>=1An5=DA!5ve@ZQSsl1!c=GyKUbyf`%_l7W0yhQKt z1~Ltw)BF&DQN8R55s^dj6#uug_;om4N+Zh)@8g(8~#%W-T@ z^DazaNVzu=4s520aBP5%1wPIy!WRR$pu!=#3^rV)6QbvuuH*yV5bcN<{mT$dix^$= zS9%&Ymumvmf)$_9tcbzwpHYBoy`jOSmuO~SFqq6+=Usrwa=h`7RzVY`r!UZjI{4`& z`Ulwjw%!W%MOdrK%XCJ>guctv2MKlBpj*im%q`zJ@Lr+4fx7{oK^nP&M?-Xips8i2 z5-;-!dmJ1c!S@kknCLna^-k*ASK$TLVfG8vhIOhqz~Ok^4| z9hrg5MC^zI$wFo!*+>rJL~@ZlBp)e2o<*KRW+UH0<{)#CdB}X^yT}6Mdq^R&5P2S1 zge*puAWM;D$a3TbqzEZSUPM+PB}gf<5_t(JL&}ktkqTrL@(NOkyoyvI)rbqJksj@< z)!vzsz#L*snPM^hp>bE-ABBGk34ETRQolvFS9@ObwZ_8DXV;K-nM3{)=B;G!N?)14 zSXpaxqId1A)$Zz+^-b<(Jc}I(<q>}w~t`5(8E^hTN+5VF6t%Pt~_jY{Ee!+oXF zjLJP|cnGS$t{Mk@SInt#|EI&uDX=@ybB&5;L?wfsPNy?GaV$LRR6Lb_J7k%YA4|*C zDwYwAbjUNy5d%*Pezgpg)~(_h-64l;9oaFkwT5kLRcw{kH)LtgjiqIcilx%ihAm_I z+`3dOBbwEqCpN3GTxPY3XG9ko@^r*>omDEH5$$Er6WhyJuJctD&xpP;70Tm3?fG`t9MdWBS1BQI584D;1 zr)fk??-^{ykPuNDgfe$zOo4RNRK^!DUy!$ChH46wn#pNKdv4F$;rgp#J~PjEpWpso zmfM4}GT_Sk?cDCk#wv#4S65r-ld6{7#2B*pqk}Q(!5Ctz4Xof9hH#2OekAuRd*x2~ zw0u-aQ_7VhB|)~Uq1Q;J46#f!KZ%2^T6W65XwN)#e?JK!YVQE)WIZKMQzs*DD0;hivu%sbV^}@? zfDCdh!$9pYIZ=v}1h|@{0h2zI9CEC!_v*3NsHZP5?bp*DOrPm#m)m_k*elk1_b}z@ z=`NGSFw6CL2b(E+x{WD8Pd{Ob)YHZBOGwe!)3+^!t6{&A4#P^vkbGN#t)#@3&s=d! zm)+8jZt0R+(iYNZx!~wQ;t268bsN>;mfGD?n_D{LmcDmOs#|JyOGn+(e(E*;a5avm zIy^!Gnz;dPOf}WA=g7=P`G5V;T-w0>!u4_8+-1{_lXa|2O1BoiRamsW(6m?T;v`r2 zT*T&GuS^=g$~ zWxFZ^aaE*|USq-?TOfzA3e>6wtbDZU5LTjA)sKIqw0m4qrF!;)urNg%cH1?v6eH&_ zagJ*euV0JCYuXZ0I9A#ck|$Q$5+1`av58GhU7D{SoH$;bIoiaImqwe8>~c*<950J= z$Bq}px#PQ7Id^X{7bNlE|&m9ydMm(6SzF|)GsI|lk6EBw-fUu* zLBtc!aL0FyLEYKP?(j9&NJW_V)v9!jjU$kGlg(8RWGdIi%Ay_?amav-h({)5Mgnq- z@lcC;q?(`o6_AJb6RbqdyV zbds~V_#FKMI{pc^KwA~{f|xGKL;7_p!PfIMOq<-E=)ngX7@jvYdweL?a9;}*<~^p3 zWP}*_tBQ#}kd8QZ+OKZAMGJVy>7wT$q{FD~1d_XP7q~^)Tv(pu#gL@;A>G^J3w7P} zUC3*HHfr82Jczc}o*w!w4DK>pAT||G_}zUPt=$d}?^8R}RXYy|$9ie7^M<><^rGW% zQE;^nH>^!!yiU8c1H;)*s5tISr_u}|ka2`FLgEeD`rjeZH|aFzVjFJK1Mb}*=cBM1 zdEoo0znNWQILJNbVj;F{{m$wPvwC)rzQ=OO<^J%2O8wzhKTU%Y(Gm)a252mdeo8%| zVt}S=XNJ)MI^DTaa@kDF){+vNgyu+xZL<1zzQ?mVH&vp3reiytC29@s)hD1qGrY zWJSR!1WiGqXetUr(@;2yK#?d4O-Ik8XcU8D(F-UJ%|P*JCYps_LlqZQ~4v=U{bRcJNJ qL2J-jv<~H>^(YT*Q2FNkCnH9pL@^2k7kiJW_&?4J8KREcy8i`wQW)F-