From 68f32c7b85efb8bc0656f2222db53a38eee091c3 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 21:10:32 -0500 Subject: [PATCH] feat(batch27): implement jsaccount limit checks and resource accounting --- .../JetStream/JetStreamEngine.cs | 179 ++++++++++++++++++ .../JetStream/JsAccount.Core.cs | 111 +++++++++++ porting.db | Bin 6713344 -> 6717440 bytes 3 files changed, 290 insertions(+) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs index 7402d39..ba08743 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs @@ -141,6 +141,185 @@ internal sealed class JetStreamEngine(JetStream state) internal static bool ShouldSendUsageUpdate(DateTime lastUpdateUtc) => DateTime.UtcNow - lastUpdateUtc >= MinUsageUpdateWindow; + + internal Exception? CheckAccountLimits( + JetStreamAccountLimits selected, + StreamConfig config, + long currentReservation) => + CheckLimits(selected, config, checkServer: false, currentReservation, 0); + + internal Exception? CheckAllLimits( + JetStreamAccountLimits selected, + StreamConfig config, + long currentReservation, + long maxBytesOffset) => + CheckLimits(selected, config, checkServer: true, currentReservation, maxBytesOffset); + + internal Exception? CheckLimits( + JetStreamAccountLimits selected, + StreamConfig config, + bool checkServer, + long currentReservation, + long maxBytesOffset) + { + if (config.MaxConsumers > 0 && selected.MaxConsumers > 0 && config.MaxConsumers > selected.MaxConsumers) + return new InvalidOperationException("maximum consumers limit exceeded"); + + return CheckBytesLimits(selected, config.MaxBytes, config.Storage, checkServer, currentReservation, maxBytesOffset); + } + + internal Exception? CheckBytesLimits( + JetStreamAccountLimits selected, + long addBytes, + StorageType storage, + bool checkServer, + long currentReservation, + long maxBytesOffset) + { + if (addBytes < 0) + addBytes = 1; + + var totalBytes = addBytes + maxBytesOffset; + switch (storage) + { + case StorageType.MemoryStorage: + if (selected.MaxMemory >= 0 && currentReservation + totalBytes > selected.MaxMemory) + return new InvalidOperationException("insufficient memory resources"); + if (checkServer && Interlocked.Read(ref _state.MemReserved) + totalBytes > _state.Config.MaxMemory) + return new InvalidOperationException("insufficient memory resources"); + break; + + case StorageType.FileStorage: + if (selected.MaxStore >= 0 && currentReservation + totalBytes > selected.MaxStore) + return new InvalidOperationException("insufficient storage resources"); + if (checkServer && Interlocked.Read(ref _state.StoreReserved) + totalBytes > _state.Config.MaxStore) + return new InvalidOperationException("insufficient storage resources"); + break; + } + + return null; + } + + internal JsAccount? LookupAccount(Account? account) + { + if (account == null) + return null; + + _state.Lock.EnterReadLock(); + try + { + _state.Accounts.TryGetValue(account.Name, out var jsa); + return jsa; + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal JetStreamStats UsageStats() + { + var stats = new JetStreamStats(); + _state.Lock.EnterReadLock(); + try + { + stats.Accounts = _state.Accounts.Count; + stats.ReservedMemory = (ulong)Math.Max(0, Interlocked.Read(ref _state.MemReserved)); + stats.ReservedStore = (ulong)Math.Max(0, Interlocked.Read(ref _state.StoreReserved)); + stats.Api.Level = JetStreamVersioning.JsApiLevel; + stats.Api.Total = (ulong)Math.Max(0, Interlocked.Read(ref _state.ApiTotal)); + stats.Api.Errors = (ulong)Math.Max(0, Interlocked.Read(ref _state.ApiErrors)); + stats.Api.Inflight = (ulong)Math.Max(0, Interlocked.Read(ref _state.ApiInflight)); + stats.Memory = (ulong)Math.Max(0, Interlocked.Read(ref _state.MemUsed)); + stats.Store = (ulong)Math.Max(0, Interlocked.Read(ref _state.StoreUsed)); + stats.HAAssets = 0; + } + finally + { + _state.Lock.ExitReadLock(); + } + + return stats; + } + + internal Exception? SufficientResources(Dictionary? limits) + { + if (limits == null || !_state.StandAlone) + return null; + + static (long MaxMem, long MaxStore) Totals(Dictionary source) + { + long mem = 0; + long store = 0; + foreach (var lim in source.Values) + { + if (lim.MaxMemory > 0) mem += lim.MaxMemory; + if (lim.MaxStore > 0) store += lim.MaxStore; + } + + return (mem, store); + } + + var (totalMem, totalStore) = Totals(limits); + if (Interlocked.Read(ref _state.MemReserved) + totalMem > _state.Config.MaxMemory) + return new InvalidOperationException("insufficient memory resources"); + if (Interlocked.Read(ref _state.StoreReserved) + totalStore > _state.Config.MaxStore) + return new InvalidOperationException("insufficient storage resources"); + + long reservedMem = 0; + long reservedStore = 0; + _state.Lock.EnterReadLock(); + try + { + foreach (var jsa in _state.Accounts.Values) + { + jsa.UsageLock.EnterReadLock(); + try + { + var (m, s) = Totals(jsa.Limits); + reservedMem += m; + reservedStore += s; + } + finally + { + jsa.UsageLock.ExitReadLock(); + } + } + } + finally + { + _state.Lock.ExitReadLock(); + } + + if (reservedMem + totalMem > _state.Config.MaxMemory) + return new InvalidOperationException("insufficient memory resources"); + if (reservedStore + totalStore > _state.Config.MaxStore) + return new InvalidOperationException("insufficient storage resources"); + + return null; + } + + internal void ReserveStreamResources(StreamConfig? cfg) + { + if (cfg == null || cfg.MaxBytes <= 0) + return; + + if (cfg.Storage == StorageType.MemoryStorage) + Interlocked.Add(ref _state.MemReserved, cfg.MaxBytes); + else if (cfg.Storage == StorageType.FileStorage) + Interlocked.Add(ref _state.StoreReserved, cfg.MaxBytes); + } + + internal void ReleaseStreamResources(StreamConfig? cfg) + { + if (cfg == null || cfg.MaxBytes <= 0) + return; + + if (cfg.Storage == StorageType.MemoryStorage) + Interlocked.Add(ref _state.MemReserved, -cfg.MaxBytes); + else if (cfg.Storage == StorageType.FileStorage) + Interlocked.Add(ref _state.StoreReserved, -cfg.MaxBytes); + } } internal sealed class StreamAssignmentView diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs index 450e9e2..7437ae5 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs @@ -176,6 +176,117 @@ internal sealed partial class JsAccount internal Account? Acc() => Account as Account; + internal (JetStreamAccountLimits Limits, string Tier, bool Found) SelectLimits(int replicas) + { + UsageLock.EnterReadLock(); + try + { + if (Limits.TryGetValue(string.Empty, out var selected)) + return (selected, string.Empty, true); + + var tier = JetStreamEngine.TierName(replicas); + if (Limits.TryGetValue(tier, out selected)) + return (selected, tier, true); + + return (new JetStreamAccountLimits(), string.Empty, false); + } + finally + { + UsageLock.ExitReadLock(); + } + } + + internal int CountStreams(string tier, StreamConfig cfg) + { + Lock.EnterReadLock(); + try + { + var count = 0; + foreach (var stream in Streams.Values.OfType()) + { + if ((!string.IsNullOrEmpty(tier) && !JetStreamEngine.IsSameTier(stream.Config, cfg)) || + string.Equals(stream.Config.Name, cfg.Name, StringComparison.Ordinal)) + continue; + count++; + } + + return count; + } + finally + { + Lock.ExitReadLock(); + } + } + + internal (ulong Mem, ulong Store) StorageTotals() + { + UsageLock.EnterReadLock(); + try + { + ulong mem = 0; + ulong store = 0; + foreach (var usage in Usage.Values) + { + mem += (ulong)Math.Max(0, usage.Total.Mem); + store += (ulong)Math.Max(0, usage.Total.Store); + } + + return (mem, store); + } + finally + { + UsageLock.ExitReadLock(); + } + } + + internal (bool Exceeded, JsApiError? Error) LimitsExceeded(StorageType storageType, string tier, int replicas) => + WouldExceedLimits(storageType, tier, replicas, string.Empty, null, null); + + internal (bool Exceeded, JsApiError? Error) WouldExceedLimits( + StorageType storageType, + string tier, + int replicas, + string subject, + byte[]? headers, + byte[]? payload) + { + UsageLock.EnterReadLock(); + try + { + if (!Limits.TryGetValue(tier, out var selected)) + return (true, JsApiErrors.NewJSNoLimitsError()); + if (!Usage.TryGetValue(tier, out var inUse)) + return (false, null); + + var r = Math.Max(1, replicas); + var lr = string.IsNullOrEmpty(tier) ? 1 : r; + var bytes = (subject.Length + (headers?.Length ?? 0) + (payload?.Length ?? 0)) * r; + + if (storageType == StorageType.MemoryStorage) + { + var totalMem = inUse.Total.Mem + bytes; + if (selected.MemoryMaxStreamBytes > 0 && totalMem > selected.MemoryMaxStreamBytes * lr) + return (true, null); + if (selected.MaxMemory >= 0 && totalMem > selected.MaxMemory * lr) + return (true, null); + } + else + { + var totalStore = inUse.Total.Store + bytes; + if (selected.StoreMaxStreamBytes > 0 && totalStore > selected.StoreMaxStreamBytes * lr) + return (true, null); + if (selected.MaxStore >= 0 && totalStore > selected.MaxStore * lr) + return (true, null); + } + + return (false, null); + } + finally + { + UsageLock.ExitReadLock(); + } + } + internal void Delete() { Lock.EnterWriteLock(); diff --git a/porting.db b/porting.db index 76bfcb7a4f9d1ddd24b15756ceb17d1bff9a010a..82f20b196d9dc9409eeceec36407b3af5935ec2d 100644 GIT binary patch delta 4266 zcmZ`)Yj9J?6~4RoUR|#(OKTw<2g|l(i~(b8BR_k`KnY2w?!;iK@DL8~jOPGv9NJC57bajQLDN0l^F%co5HrOT{2zP=U%|cjH~1jl zgSX+e*n^+N({L3Yg@<9I+$Ud@d*svjP5cAteOUjYClk8*(041_rG3&aX|oZh-AlWf z_E+V1%30-v(xH4$c~#k}G$>1yI%SelrsOMD#bCH^xNW#>_#+&5P)E)&xlMje-Y&0~ zee!&Hx;$Pkl5Mg{7NonYU3B^6NK8J?gsiMthtGb(ErqPw~-0K zl&=thTer|J!xa~(cOM{r8&$kqfR0l*3;O%eB-l5H-T@E7#a45B;=j5F;iE`0b>VKf z{|j0GZ(A`{8%69+qx>#>Iq<&ZZgUN+Jx+&(FUiH|*A9+)qUZ=mr=y5JYvEWFy}?mi z6dmU1wI~X4v^|R2I9eY?hdA;LAVYJy`ji>JRVE$8Jp$bJ;A*&h5)=4!FU^Mgwb%sC zDY&P#mN&n?7XJ)mVLN0^!!N>&I`ab<@&H{`gVS;2^SMMEiZ^ZE;*&emdj+eu>fozX zZFNNbtQ+4X6T!4hsL{T(@|9CtweZy_ZPmJupYns}A0l%Ee zXQN?dsBpLAGUg_7leo#;4BQmeaXF>$mC@*|kRdEY$Sy22k1;i*Gvfv0khI*?B9gD% zQ5G2v%RiTT#h;@4;IN}&s9Z-H!Qqv2pg%we^sFN#(7%-s$z5Gh1$HkvY!w|s%`V5@ ze2ajdZgQ39xiK0FA9%?qIO`?nBVu|y#SB06ky1@;4U2gluXD+hP44o%^04G{AK4B! zeB`W7@`n|qSht^{p6u3n->N6Wb>4s0lRXjdK0m3{IWGFis)$1?!WSTUI`8WN@&s6N z^0X31Xbw|2!ClV1HlAQ@)0Wm3$z!nmUWyDeR+1BuNcKizg^>-UKo@CI1Nl+ZTds3> z8cB0x3r#j+70K6m=dU9Bb>7U?q)_vwgtKU>-kSyJ8^i{Ut4Xb{ir&?vNEbv2lBLlq z2z)z8M(Vt0gXB9p?dmlTw}>2L`x>%Z=Pg}JB5jCxy=%!fomX6^_wN2`9pRSBEsdLz zTe|wyIuq_UC`?$O%m&3{J^|-F=5FZqn2l<^*IX!MjediGdi5n0!BwLTFB@~Mn^F^n$aRg$1qyV=vYQe7%gSAjL~t7IvFiz zw1Uw}MynWgF*=@6H>1^zPGGc#(TR*sVstX2Qy8sfbSk4?V{{s$PcS;2(I*+5!RSmz zXEFK|qjiiv&FE}KpJ8+kqjMR3meF~P&S!K1qYD{*j?qPoE@pHIqe~fG#_02mex1?f zjCvULGU{V=1*7$h`cKnO8i)n%^gN>?y6B9V6}8LuFbcQ6@mcp?vy z@$wUSt;u+fL|#jbXN%ThPvk*Hjkg@m4G$<<&AAV`O;RXNBk#~R=Kwff@{Wc6l>v*X zsysaSecsJ08k{aWct7&z#2S{HxKVd3p%^bGk=GUDDG!rTKdui9O>5mrq>3!mFPQh6 z^G$6gOL}*Dh4EdZBkkqXzoohfqnJ1gM)s(=ku!iHsmBAYgl?4gMfInv8J z;KB`Gv8GfsrLzo9*}PU*UlmXu(bpu=AA#ED6%l=8=Q7Cnm2aqCK1|y_H#8w~Ao|^* zc0@)tc$RyIgbyV{bh;CFACOlkkOZ8)8O&jNe z4e!OLKA;|-O#SIgOSIf=aPr_{Sq^t^9AvBtbi(2Z@ZXhz{DxjCY&f}Vu|w6r@*zF#4&vA*$W}VVCxUwu!shbL@D&h7HJf6}d=h6-5n|u?r zGb!gs|H!=`P?jW7c4Nr3prYI3;-4G;{CO3d_dVy3~+$V6T!#v2&G{|9gp B8Danc delta 2748 zcmai!ZE#f88OQHAd-m?0WOMIj6Cpxgb|FLzF$r%K6p}2&@X`n&Mg_vlZnB#Y3?U-L z8QW>VSb`dC5GRjigu#YV$_yE0&`T|2DdtNZ%cS_F(^81e#7<#U40a5nw*TEr&Tc+6 zncwF5o#&kAIcLwk*|X=cYtQhnT)orWM>I`uIT)y5N8S!sma;dx-Ym7>EM?(indW58 zZ}jP!#$2W_MvYgbwFkclfh}~kV zkEef~{#p9psBEv*OnYvcI@P|#b7=SzR>(1ozuBGcY-f}4&!!#QJKAEqVrbK+8Or{X zniFZr)sN?dgjFl^z!oagVMXKSx&zBm_AM;lVUE-A_JkHYCzN?QwT#HJFMi6VbNcWO zn?=46Hb6zU*j##|!nEw~M%Yzm;rsQC-&MS@uyp&H87TrtW~9o?s^@*l+D)Rs17nkKSQ^ zAujSQ8dXMkD;=n{zNA%*7u!9I|Aq;3G4H3bcdP<>FN<4ttBWs6r_HAR5~)~0MdY_qa;uv%r0 zc{}}VsnR+m=PO$aD^yks%T~6=>rdXUSOrtJD^|kP?TQsJb-QAD>J5o%ufJ0pR7D@b z)OVFt!A>e$20N^5DXd3X2=;Sjm9P$F6)^SEh;o>EX~aC(a`nWDxlr}QiaD_4naW_v zGnK-UXDUfNCgIh5ioLy>Z{3uaF{I@^v+E~~?t_T35>YRrv_$kv`&K{yOfKm*HfoC# zBl{4|O+>#yG%FFkfXJ!Qg~+M#JR+yYUYb0>A7Anc{`EK|4j^($^dNFdyo|^x@e(4Z z#EXcW68r6w1AJr9d%_pQnBHRhhIkF5;KJ-``{-J;E3G?tg##B5zzuZZv4dB|N)XuD{~GEdDKYKElpZsJCeo1tYS8Mn9gCOV%l` zNj(ucgL(&D=@f{_H)t#(|3v;8&)w`uqb!YQ4SK6d{(<FX0s4HaoaKzoXqBf@%HM941x}UO+vK5mmAiefZ<+k zyDJv7$J_49Gih6U{E&ydiL*%hwtaki#XICr>DLFHgqQxZL(X)1JQD6b9eEQ>V z>7kLZEQy~qgZ_KX?d}#G5~P82-~$=9=rF$jQr9nP4f@Z>6E+7tXY~{M>-tM{A#5I} zp|C0JXv8dZ8>#QQyN}9_TT^xSYw=P0T(fz|%?7$_+Du@9Nx%;>K^C|NOa@axHn<)8vof)H2=mVqkp2&e|j!3wYvtO7p< zHQ-UO8ms}eU@fQvkAZsdICug)3D$x2U;}6X8^I>98EgSz5CM}D86)~)6r*0?Nj U1NYY)w1WZmfv|5xo9{{gA4p+3s{jB1