From 6726c077499ceb7dd06985e8355836debdc0363f Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 21:07:24 -0500 Subject: [PATCH] feat(batch27): implement jetstream usage foundations and limit helpers --- .../Accounts/Account.JetStream.cs | 156 ++++++++++++++ .../JetStream/JetStreamEngine.cs | 102 +++++++++ .../JetStream/JsAccount.Core.cs | 194 ++++++++++++++++++ porting.db | Bin 6709248 -> 6713344 bytes 4 files changed, 452 insertions(+) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs index d04e8ae..a62381e 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs @@ -250,6 +250,162 @@ public sealed partial class Account return null; } + public JetStreamAccountStats JetStreamUsage() + { + _mu.EnterReadLock(); + var jsa = JetStream; + var accountName = Name; + var configuredLimits = JetStreamLimits; + _mu.ExitReadLock(); + + var stats = new JetStreamAccountStats(); + if (jsa == null) + return stats; + + var (js, _) = jsa.JetStreamAndClustered(); + if (js == null) + return stats; + + jsa.UsageLock.EnterReadLock(); + try + { + long mem = 0; + long store = 0; + foreach (var usage in jsa.Usage.Values) + { + mem += usage.Total.Mem; + store += usage.Total.Store; + } + + stats.Memory = (ulong)Math.Max(0, mem); + stats.Store = (ulong)Math.Max(0, store); + stats.Domain = js.Config.Domain; + stats.Api = new JetStreamApiStats + { + Level = JetStreamVersioning.JsApiLevel, + Total = jsa.ApiTotal, + Errors = jsa.ApiErrors, + }; + + if (jsa.Limits.TryGetValue(string.Empty, out var defaultTier)) + { + stats.Limits = defaultTier; + } + else + { + stats.Tiers = new Dictionary(StringComparer.Ordinal); + foreach (var (tier, usage) in jsa.Usage) + { + jsa.Limits.TryGetValue(tier, out var tierLimits); + stats.Tiers[tier] = new JetStreamTier + { + Memory = (ulong)Math.Max(0, usage.Total.Mem), + Store = (ulong)Math.Max(0, usage.Total.Store), + Limits = tierLimits ?? new JetStreamAccountLimits(), + }; + } + + if (configuredLimits != null) + { + foreach (var (tier, value) in configuredLimits) + { + if (stats.Tiers.ContainsKey(tier)) + continue; + if (value is not JetStreamAccountLimits lim) + continue; + stats.Tiers[tier] = new JetStreamTier { Limits = lim }; + } + } + } + } + finally + { + jsa.UsageLock.ExitReadLock(); + } + + var allStreams = Streams(); + stats.Streams = allStreams.Count; + foreach (var stream in allStreams) + stats.Consumers += stream.State().Consumers; + + if (stats.Tiers != null) + { + foreach (var stream in allStreams) + { + var tier = JetStreamEngine.TierName(stream.Config.Replicas); + if (!stats.Tiers.TryGetValue(tier, out var u)) + u = new JetStreamTier(); + u.Streams++; + u.Consumers += stream.State().Consumers; + stats.Tiers[tier] = u; + } + } + + if (stats.Tiers == null || stats.Tiers.Count == 0) + { + var (rmem, rstore) = jsa.ReservedStorage(string.Empty); + stats.ReservedMemory = rmem; + stats.ReservedStore = rstore; + } + else + { + foreach (var tier in stats.Tiers.Keys.ToArray()) + { + var tierStats = stats.Tiers[tier]; + (tierStats.ReservedMemory, tierStats.ReservedStore) = jsa.ReservedStorage(tier); + stats.Tiers[tier] = tierStats; + } + } + + _ = accountName; + return stats; + } + + internal Exception? DisableJetStream() => RemoveJetStream(); + + internal Exception? RemoveJetStream() + { + _mu.EnterWriteLock(); + var server = Server as NatsServer; + var jsa = JetStream; + JetStream = null; + _mu.ExitWriteLock(); + + if (server == null) + return new InvalidOperationException("jetstream account not registered"); + var js = server.GetJetStream(); + if (js == null) + return new InvalidOperationException("jetstream not enabled for account"); + + return js.DisableJetStream(jsa); + } + + internal bool JetStreamConfigured() + { + _mu.EnterReadLock(); + try + { + return JetStreamLimits != null && JetStreamLimits.Count > 0; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool JetStreamEnabled() + { + _mu.EnterReadLock(); + try + { + return JetStream != null; + } + finally + { + _mu.ExitReadLock(); + } + } + internal Exception? EnableAllJetStreamServiceImportsAndMappings() { _mu.EnterReadLock(); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs index 9313f00..7402d39 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs @@ -3,6 +3,7 @@ namespace ZB.MOM.NatsNet.Server; internal sealed class JetStreamEngine(JetStream state) { private readonly JetStream _state = state; + private static readonly TimeSpan MinUsageUpdateWindow = TimeSpan.FromMilliseconds(250); internal void SetStarted() { @@ -44,4 +45,105 @@ internal sealed class JetStreamEngine(JetStream state) _state.Lock.ExitReadLock(); } } + + internal Exception? DisableJetStream(JsAccount? account) + { + if (account?.Account is not Account a) + return new InvalidOperationException("jetstream not enabled for account"); + + _state.Lock.EnterWriteLock(); + try + { + _state.Accounts.Remove(a.Name); + } + finally + { + _state.Lock.ExitWriteLock(); + } + + account.Delete(); + return null; + } + + internal bool WouldExceedLimits(StorageType storageType, int size) + { + var total = storageType == StorageType.MemoryStorage + ? Interlocked.Read(ref _state.MemUsed) + : Interlocked.Read(ref _state.StoreUsed); + var max = storageType == StorageType.MemoryStorage + ? _state.Config.MaxMemory + : _state.Config.MaxStore; + return total + size > max; + } + + internal bool LimitsExceeded(StorageType storageType) => WouldExceedLimits(storageType, 0); + + internal static string TierName(int replicas) => $"R{(replicas <= 0 ? 1 : replicas)}"; + + internal static bool IsSameTier(StreamConfig cfgA, StreamConfig cfgB) => + cfgA.Replicas == cfgB.Replicas; + + internal static Dictionary DiffCheckedLimits( + Dictionary a, + Dictionary b) + { + var diff = new Dictionary(StringComparer.Ordinal); + + foreach (var (tier, oldLimit) in a) + { + b.TryGetValue(tier, out var newLimit); + newLimit ??= new JetStreamAccountLimits(); + diff[tier] = new JetStreamAccountLimits + { + MaxMemory = newLimit.MaxMemory - oldLimit.MaxMemory, + MaxStore = newLimit.MaxStore - oldLimit.MaxStore, + }; + } + + foreach (var (tier, newLimit) in b) + { + if (a.ContainsKey(tier)) + continue; + + diff[tier] = new JetStreamAccountLimits + { + MaxMemory = newLimit.MaxMemory, + MaxStore = newLimit.MaxStore, + }; + } + + return diff; + } + + internal static (ulong Mem, ulong Store) ReservedStorage( + Dictionary streamAssignments, + string tier) + { + ulong mem = 0; + ulong store = 0; + + foreach (var assignment in streamAssignments.Values.OfType()) + { + var cfg = assignment.Config; + if (!string.IsNullOrEmpty(tier) && !string.Equals(tier, TierName(cfg.Replicas), StringComparison.Ordinal)) + continue; + if (cfg.MaxBytes <= 0) + continue; + + if (cfg.Storage == StorageType.FileStorage) + store += (ulong)cfg.MaxBytes; + else if (cfg.Storage == StorageType.MemoryStorage) + mem += (ulong)cfg.MaxBytes; + } + + return (mem, store); + } + + internal static bool ShouldSendUsageUpdate(DateTime lastUpdateUtc) => + DateTime.UtcNow - lastUpdateUtc >= MinUsageUpdateWindow; +} + +internal sealed class StreamAssignmentView +{ + public required StreamConfig Config { get; init; } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs new file mode 100644 index 0000000..450e9e2 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs @@ -0,0 +1,194 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class JsAccount +{ + internal (ulong Mem, ulong Store) ReservedStorage(string tier) + { + ulong mem = 0; + ulong store = 0; + + Lock.EnterReadLock(); + try + { + foreach (var stream in Streams.Values.OfType()) + { + var cfg = stream.Config; + if (!string.IsNullOrEmpty(tier) && !string.Equals(tier, JetStreamEngine.TierName(cfg.Replicas), StringComparison.Ordinal)) + continue; + if (cfg.MaxBytes <= 0) + continue; + + if (cfg.Storage == StorageType.FileStorage) + store += (ulong)cfg.MaxBytes; + else if (cfg.Storage == StorageType.MemoryStorage) + mem += (ulong)cfg.MaxBytes; + } + } + finally + { + Lock.ExitReadLock(); + } + + return (mem, store); + } + + internal void RemoteUpdateUsage(byte[] message) + { + if (message.Length < 16) + return; + + UsageLock.EnterWriteLock(); + try + { + if (!Usage.TryGetValue(string.Empty, out var usage)) + { + usage = new JsaStorage(); + Usage[string.Empty] = usage; + } + + usage.Total.Mem = BitConverter.ToInt64(message, 0); + usage.Total.Store = BitConverter.ToInt64(message, 8); + } + finally + { + UsageLock.ExitWriteLock(); + } + } + + internal void CheckAndSyncUsage(string tier, StorageType storageType) + { + if (Interlocked.CompareExchange(ref Sync, 1, 0) != 0) + return; + + try + { + Lock.EnterReadLock(); + try + { + long total = 0; + foreach (var stream in Streams.Values.OfType()) + { + if (!string.Equals(JetStreamEngine.TierName(stream.Config.Replicas), tier, StringComparison.Ordinal)) + continue; + if (stream.Config.Storage != storageType) + continue; + total += (long)stream.State().Bytes; + } + + UsageLock.EnterWriteLock(); + try + { + Usage.TryGetValue(tier, out var usage); + usage ??= new JsaStorage(); + Usage[tier] = usage; + + if (storageType == StorageType.MemoryStorage) + { + usage.Local.Mem = total; + usage.Total.Mem = total; + } + else + { + usage.Local.Store = total; + usage.Total.Store = total; + } + } + finally + { + UsageLock.ExitWriteLock(); + } + } + finally + { + Lock.ExitReadLock(); + } + } + finally + { + Interlocked.Exchange(ref Sync, 0); + } + } + + internal void UpdateUsage(string tier, StorageType storageType, long delta) + { + UsageLock.EnterWriteLock(); + try + { + Usage.TryGetValue(tier, out var usage); + usage ??= new JsaStorage(); + Usage[tier] = usage; + + if (storageType == StorageType.MemoryStorage) + { + usage.Local.Mem += delta; + usage.Total.Mem += delta; + } + else + { + usage.Local.Store += delta; + usage.Total.Store += delta; + } + } + finally + { + UsageLock.ExitWriteLock(); + } + } + + internal void SendClusterUsageUpdateTimer() + { + UsageLock.EnterWriteLock(); + try + { + SendClusterUsageUpdate(); + } + finally + { + UsageLock.ExitWriteLock(); + } + } + + internal void SendClusterUsageUpdate() + { + var now = DateTime.UtcNow; + if (!JetStreamEngine.ShouldSendUsageUpdate(LUpdate)) + return; + LUpdate = now; + + // Cluster bus publish is wired in later cluster sessions. + UsageApi = ApiTotal; + UsageErr = ApiErrors; + } + + internal (JetStream? JetStream, bool Clustered) JetStreamAndClustered() + { + Lock.EnterReadLock(); + try + { + var js = Js as JetStream; + return (js, js?.Cluster != null); + } + finally + { + Lock.ExitReadLock(); + } + } + + internal Account? Acc() => Account as Account; + + internal void Delete() + { + Lock.EnterWriteLock(); + try + { + Streams.Clear(); + Inflight.Clear(); + UpdatesSub = null; + UpdatesPub = string.Empty; + } + finally + { + Lock.ExitWriteLock(); + } + } +} diff --git a/porting.db b/porting.db index da699df2eaf82bda55d6e0309f63fdf7020975c0..76bfcb7a4f9d1ddd24b15756ceb17d1bff9a010a 100644 GIT binary patch delta 4250 zcmbVOYitu&7M>Y`*fBP{s}#tKS0J(T zI0;Dz38K0J)2_1JqCQkw5W;Gqg|<>3&|Q_?{n16NlmZ8Z@Xu0~*w?L46t&b(-Sm{>sn0Tu4lr zLPuv;3+up_+$I?bDRrBte-7dtfq zQ(dG$75#k|`JHO)C%ZMt@9e%?nv{HCk0yFy4|z)-8#&0PwPi1{(mOjGa%rwaWBPof zwlt1k9o<(!>r>qs^y|IkUDc3J_o>@R|Gkey<*3x;-|Qz#RHa_uPfp2E?_*9UeWRN! zP(}aLO^zjcTtFueXoh`yfSgoPma0ZvI7mKFWqY-U6sn@vd&pHeDoy=ruR4kmf)P7L z92iN$i1U0TnV&XWKH{3G=y$zMzw*1r=uN+?Aigc&+9LK^ksI3<3WwU;L)*lo4D~2z zlA&D+nqa6!K}Q%`r=W3$)+p#OLp2H-W2i_$qYULKXvAtlE;;_tc9&T+bDVIRHnzGJ zZDihn!Wm*{w}OzNpDSpPp-l=JV5m_+{R};zpgxAYLswc|%u<-8GD~BY&MbpjCbKMN z^O$8b%V9R3SuV3YX72cv*8Bc&R<$Od=jW@=0zy6lAb1cKAS^^EK)4@a5yE1GLWCs< zMF>j~iV;c>N)gHsya+yoa)b(mN`xweYJ_D7H3+o`bqLE5Rv@fIcmQD)LOntQ!h;B_ z5gtN#7-0>z1-pv6S6gw|8*!+En7XC zY-`L;5`{mRx$676kJ5$(;o3xH=R(dgi1t)23%m7&EPU<`F&9SKAu1 z=$YfjOi|A~tY+p(GfHTh^-OxEI#59`FO*kCnSX+9(z8d6bp<_>-r5lQ5nW!-KBU%_ zS0GUVr)LfuGEemCnM20R9z7F{nFsaEL1X3tJ#)aA*{x^xtC`ALo!GBu(rf2}#yvf; zPtT63b*(ddAa2_f{*PGbWWO9K=5ILH6UoP&e{=ehqK@A>a_qk(7l^~wY<<&m+w!;= zGk+8J0ROQ$n;phH-C^tykJ2gHw0!0$roP{zIf^y29YrRZYalwQ7H#|F-bLpdh)zhN zmc*s8tX6j!Et^BBIR>h<{#dYtrbL@uQV*&rUlq=qx!~q9{cHoxap?}t(BGlHD*D52 z%%$#m207@pB~7{1zcrAauDhyz756%eEQ1VV)YBa($5ygX4+GmArsP8HLq9u8*; z6}7qtZ7#ZLM!Lgl-5UB+PG_pY!VXE>Q`+;=X{ywprrCGCHbkGJNb(Cw_)}!`(nwmtW~pm1N6g^(2Q<*m9ERP{*FXTV@_F@ XSLxFH!6x}PN9wW9r{2vl+fMrqw9^2j delta 1640 zcmZwGYfM{p7zXfj+Mc%m_Oz#5)-j>uDmN7vOc@FT2FwlFWNh<-$jYVYhq6Jr--{^B zX$T{RH=ASz5VN2`M~p`em}Xn%bV>~FgN`M_O#HYk@ym1@@u8=jq={*M$@84I|Ce)4 zsCVt0*6TZ`9g7quIZoejHYT5+yyPrM=R1d{(&edizU~X{ns$K0c2RA6 z(ax(a3+;WiWm>fImw)oxeaY4;Zf8qtTdgpF6z)&`NG5;kW7=&Rf9jS0p3(fp@>(&= z^01pWknZ~vZ517S_HRN77F+@p7pZO$lBXful?U!C4c zh0UyzuEa(AX z4;!b_Ol4-v7XA;`DnW0lm_Mc z8mU^K3o|xRBaBnpj4heUrfm{GN{6Ry*}|F4AFf>dhNKg8;!Q4!%G^?7)aJ!{SA)yr zan%bWL2Z~mn6{+|{vSMLd}gzcthgnxK?sCG7=*(%*qm>N2-pFSK_og&F)LakPaEJ3o;=KcEi)~ z3}nL|$bo0!Imm@P*bDhk0EMs*il7)uU_U$$2jC!-!V6Fahu|>02<31DD&Qy_gX8cL zybQ0vt56BAK^43X)o=owPy@A42le2R^)U?_>2_u^a?(-jCzgLKW#UKXf6d2C^CqJl O8^hmavazz^zVIJZGZg6n