From 12b8c9b4c568e650f5e8451e6d5937cb9a7705dd Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 21:01:11 -0500 Subject: [PATCH] feat(batch27): implement jetstream engine state and account enable core --- .../Accounts/Account.JetStream.cs | 248 ++++++++++++++++++ .../JetStream/JetStreamEngine.cs | 47 ++++ .../NatsServer.JetStreamCore.cs | 75 ++++++ porting.db | Bin 6705152 -> 6709248 bytes 4 files changed, 370 insertions(+) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs index c20d2d9..d04e8ae 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs @@ -2,6 +2,254 @@ namespace ZB.MOM.NatsNet.Server; public sealed partial class Account { + private static Dictionary DefaultJetStreamAccountTiers() + { + return new Dictionary(StringComparer.Ordinal) + { + [string.Empty] = new JetStreamAccountLimits + { + MaxMemory = -1, + MaxStore = -1, + MaxStreams = -1, + MaxConsumers = -1, + MaxAckPending = -1, + MemoryMaxStreamBytes = -1, + StoreMaxStreamBytes = -1, + }, + }; + } + + private static Dictionary ToTypedLimits(Dictionary limits) + { + var typed = new Dictionary(StringComparer.Ordinal); + foreach (var (tier, value) in limits) + { + if (value is JetStreamAccountLimits v) + typed[tier] = v; + } + + return typed; + } + + private static JetStreamAccountLimits SelectLimits(Dictionary limits) + { + if (limits.TryGetValue(string.Empty, out var selected)) + return selected; + + foreach (var (_, value) in limits) + return value; + + return new JetStreamAccountLimits(); + } + + internal void AssignJetStreamLimits(Dictionary limits) + { + _mu.EnterWriteLock(); + try + { + JetStreamLimits = limits; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal Exception? EnableJetStream(Dictionary? limits) + { + _mu.EnterReadLock(); + var server = Server as NatsServer; + _mu.ExitReadLock(); + + if (server == null) + return new InvalidOperationException("jetstream account not registered"); + if (ReferenceEquals(server.SystemAccount(), this)) + return new InvalidOperationException("jetstream can not be enabled on the system account"); + + limits ??= DefaultJetStreamAccountTiers(); + if (limits.Count == 0) + limits = DefaultJetStreamAccountTiers(); + + AssignJetStreamLimits(limits); + var typedLimits = ToTypedLimits(limits); + + var js = server.GetJetStreamState(); + if (js == null) + return new InvalidOperationException("jetstream not enabled"); + + js.Lock.EnterWriteLock(); + try + { + if (js.Accounts.TryGetValue(Name, out var existing)) + { + _mu.EnterWriteLock(); + JetStream = existing; + _mu.ExitWriteLock(); + return EnableAllJetStreamServiceImportsAndMappings(); + } + + var jsa = new JsAccount + { + Js = js, + Account = this, + StoreDir = Path.Combine(js.Config.StoreDir, Name), + }; + + foreach (var (tier, tierLimits) in typedLimits) + jsa.Limits[tier] = tierLimits; + + js.Accounts[Name] = jsa; + _mu.EnterWriteLock(); + JetStream = jsa; + _mu.ExitWriteLock(); + } + finally + { + js.Lock.ExitWriteLock(); + } + + return EnableAllJetStreamServiceImportsAndMappings(); + } + + internal (bool MaxBytesRequired, long MaxStreamBytes) MaxBytesLimits(StreamConfig? cfg) + { + _mu.EnterReadLock(); + var jsa = JetStream; + _mu.ExitReadLock(); + if (jsa == null) + return (false, 0); + + jsa.UsageLock.EnterReadLock(); + try + { + var selected = SelectLimits(jsa.Limits); + var maxStreamBytes = cfg?.Storage == StorageType.MemoryStorage + ? selected.MemoryMaxStreamBytes + : selected.StoreMaxStreamBytes; + return (selected.MaxBytesRequired, maxStreamBytes); + } + finally + { + jsa.UsageLock.ExitReadLock(); + } + } + + internal int NumStreams() + { + _mu.EnterReadLock(); + var jsa = JetStream; + _mu.ExitReadLock(); + if (jsa == null) + return 0; + + jsa.Lock.EnterReadLock(); + try + { + return jsa.Streams.Count; + } + finally + { + jsa.Lock.ExitReadLock(); + } + } + + internal List Streams() => FilteredStreams(string.Empty); + + internal List FilteredStreams(string filter) + { + _mu.EnterReadLock(); + var jsa = JetStream; + _mu.ExitReadLock(); + if (jsa == null) + return []; + + jsa.Lock.EnterReadLock(); + try + { + var streams = new List(); + foreach (var stream in jsa.Streams.Values.OfType()) + { + if (string.IsNullOrWhiteSpace(filter)) + { + streams.Add(stream); + continue; + } + + foreach (var subject in stream.Config.Subjects ?? []) + { + if (Internal.DataStructures.SubscriptionIndex.SubjectsCollide(filter, subject)) + { + streams.Add(stream); + break; + } + } + } + + return streams; + } + finally + { + jsa.Lock.ExitReadLock(); + } + } + + internal (NatsStream? Stream, Exception? Error) LookupStream(string name) + { + _mu.EnterReadLock(); + var jsa = JetStream; + _mu.ExitReadLock(); + if (jsa == null) + return (null, new InvalidOperationException("jetstream not enabled for account")); + + jsa.Lock.EnterReadLock(); + try + { + if (jsa.Streams.TryGetValue(name, out var stream) && stream is NatsStream ns) + return (ns, null); + + return (null, new InvalidOperationException("stream not found")); + } + finally + { + jsa.Lock.ExitReadLock(); + } + } + + internal Exception? UpdateJetStreamLimits(Dictionary? limits) + { + _mu.EnterReadLock(); + var server = Server as NatsServer; + var jsa = JetStream; + _mu.ExitReadLock(); + + if (server == null) + return new InvalidOperationException("jetstream account not registered"); + if (server.GetJetStreamState() == null) + return new InvalidOperationException("jetstream not enabled"); + if (jsa == null) + return new InvalidOperationException("jetstream not enabled for account"); + + limits ??= DefaultJetStreamAccountTiers(); + if (limits.Count == 0) + limits = DefaultJetStreamAccountTiers(); + AssignJetStreamLimits(limits); + + var typed = ToTypedLimits(limits); + jsa.UsageLock.EnterWriteLock(); + try + { + jsa.Limits.Clear(); + foreach (var (tier, tierLimits) in typed) + jsa.Limits[tier] = tierLimits; + } + finally + { + jsa.UsageLock.ExitWriteLock(); + } + + return null; + } + internal Exception? EnableAllJetStreamServiceImportsAndMappings() { _mu.EnterReadLock(); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs new file mode 100644 index 0000000..9313f00 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs @@ -0,0 +1,47 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed class JetStreamEngine(JetStream state) +{ + private readonly JetStream _state = state; + + internal void SetStarted() + { + _state.Lock.EnterWriteLock(); + try + { + _state.Started = DateTime.UtcNow; + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal bool IsEnabled() => Interlocked.CompareExchange(ref _state.Disabled, 0, 0) == 0; + + internal void SetJetStreamStandAlone(bool isStandAlone) + { + _state.Lock.EnterWriteLock(); + try + { + _state.StandAlone = isStandAlone; + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal bool IsShuttingDown() + { + _state.Lock.EnterReadLock(); + try + { + return _state.ShuttingDown; + } + finally + { + _state.Lock.ExitReadLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs index 774daa8..625632b 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs @@ -424,4 +424,79 @@ public sealed partial class NatsServer var js = _jetStream; return js != null && Interlocked.CompareExchange(ref js.Disabled, 0, 0) == 0; } + + public bool JetStreamEnabledForDomain() + { + if (JetStreamEnabled()) + return true; + + foreach (var value in _nodeToInfo.Values) + { + if (value is NodeInfo { Js: true }) + return true; + } + + return false; + } + + public JetStreamConfig? JetStreamConfig() + { + var js = _jetStream; + if (js == null) + return null; + + var cfg = js.Config; + return new JetStreamConfig + { + MaxMemory = cfg.MaxMemory, + MaxStore = cfg.MaxStore, + StoreDir = cfg.StoreDir, + SyncInterval = cfg.SyncInterval, + SyncAlways = cfg.SyncAlways, + Domain = cfg.Domain, + CompressOK = cfg.CompressOK, + UniqueTag = cfg.UniqueTag, + Strict = cfg.Strict, + }; + } + + public string StoreDir() + { + var js = _jetStream; + return js == null ? string.Empty : js.Config.StoreDir; + } + + public int JetStreamNumAccounts() + { + var js = _jetStream; + if (js == null) + return 0; + + js.Lock.EnterReadLock(); + try + { + return js.Accounts.Count; + } + finally + { + js.Lock.ExitReadLock(); + } + } + + public (long MemReserved, long StoreReserved, Exception? Error) JetStreamReservedResources() + { + var js = _jetStream; + if (js == null) + return (-1, -1, new InvalidOperationException("jetstream not enabled")); + + return ( + Interlocked.Read(ref js.MemReserved), + Interlocked.Read(ref js.StoreReserved), + null); + } + + internal JetStreamEngine? GetJetStream() => + _jetStream == null ? null : new JetStreamEngine(_jetStream); + + internal JetStream? GetJetStreamState() => _jetStream; } diff --git a/porting.db b/porting.db index 86c296c2d5e7301edc95f7c59fc42a237f63ca1a..da699df2eaf82bda55d6e0309f63fdf7020975c0 100644 GIT binary patch delta 4029 zcmaJ@Yfu~472bzls|PX&Fd)Pp;RXyfFG;*jAaT<;aa!VMOdvKwXd7$^un-ty9#Z07 zr^Gg-GvzvJoR+xmbUGc^_9Rv&c3NoqqchET@YKzZmg(Rbx0!gR4>y03{>b!d7cIM5 zT4v77`MBTt&bjxVJy)?Gzsbgy-el*U`nM>`AiM$X^jp_^J8J0hR~KuNi#2rM8oS2c zz{WMYxy=*1qc9p%uF0nVbVlIoLO#e=HtkL6i@-O%4 zd-RNrbuodV@Ng&?4eBRV=(Gw=sL&}D8dsr{Dm11-qbd}&CF>s0msp0P@XDw39d$Jk z6?a&LhE!-!g~BQnQlXd%sq;HYK6HihL;ry%2!M4*BwJtG1 zJW1jd@g#|H;z<%Gi6=>n5l@mBP5$gE)8jT>v6hqn8HA*Dh1pFr)^Z$gF4&!X`!4f6 zvwH|BHPvgs+qs-Z&nrOwB` z#$2Ems33L&-G)Q2bHB%T5m%059LM46^PCY6p5sb!>|LF~=xM2~ZSnZwE3L#DD6w&tmY~%aZ+m`vWs$y@#I-*=J`ciYe_63c*Mu z*sqTflq;junn6a>6h(g_qbY(O%4m|HRT+_MrtipzTs6HUBXZsJ4H=D7GZg(38IpUa z7i2{4o_>o_UYO~t8B3r=tcVmgcs9V!Dun}e#ZK0a1 zmo1MhC1#zylk}|O5Bu`@8RAzv@XWv};Yyli$no1(ooFZMh14%}J9`=r+W^%l;Go0p3- zx^}gQUsCk;$IkUabJhi4amBQdeH&;wPlF zTQ1(MUi-|&-&JILSij-urs6tSUpawxL zf;t2q1YQI_1oa3S5HuoaLePw$1wku4x zH-cvnJd0otg69x?3&FP$bRu{j!CnOW5bQ^A0Kq{7hY-Ag;4p$N1TP}!M$m)cB?Lzh z97WKJz>lC0K>$HNg5XUMbkI>kouTaevp&r_on^O8+HBTEtH<(@<$(FG<}tI8yU10L z=X{TG)fhH>WjL(Ap||RS>~GmpCPx2(E~Cz*W^c#Vcu8W2qg;$@#&z5)1pU#`NN_|S z!OQW!Rzr1Qq{Um0uNV7s@oy3(>GiIBD={HUg+oAT6ZHdT_&?n)dJ^6vd5Q8)F7q6d1C=nD-u`{oI zqpk2F7p%tfZo!JLyccvj>l<6VwV_16mN|o;C>5Gjtbmp^t!4FTSyLOVqV z)0a`3#}i8QYMG}u^in%Wv5sn46C12l))ecAmW7w~{hMTcNy{A9>h)+@c=>U^7cXrS zjN;j)Gk0&o-1~k&!A$SuMJ;o5BXgS7rDa7kSp$i~BI_At2SjlEXrPAmGy-0HFUY6$ zUeM|dZ|D_^=NGpp4oN-fP%4u4U7sd)pGW+i+vkr4URGxqNF3DGGPqeS1BnA#W?0RX zPIVx$U(3Ywg+Mv(6#|^RM^37~BC$`Je#bh6g(4gIg~FLlsR8~|@)P+v>yIq|A*1`M z*_0$>`5#n+a delta 1733 zcmY+C3v3is6ozN^H9OnR&bGVswcXxR9xbi(L0hmCDy8yjDHbTmtHAcLRY_Z$0*L}b z3(X-mCS8nh0tk^pj4`xO6FZ5Dpdm&QB`blb&}eH=Lt@ZK;v;I}=}wrGnf%HBoqO+p z=iWIzS5Me`-a28A2A!uQ$rU*Yi{(=z;i@8e-QlxE#@Qmd<&ynNM=>em@&cRVBkGz6 zrx*vu<*@AaT%GzFpH0ZW$On}h(t(y2JG$Bmq`N!PDvwE$yw{?;Ozjrk zVcKrdw@hm+`i806qOY0CEc%=&$D&J2X%=0SG)eYYIBewKm+6BNEhd%h+P$aQxs%=O zxO;(JR@^<$E)aLmvGc@ToAL0R?I*QzySGtV6t8S!XJ$0AGc&fbGc&fZGcz``Gcz_( z%CK!iWrQ!pjA&(NMzpXqBbwQn5l!sOh%h@dVu$h0u&puMeLk^(Yp#fq7_+UEX`&*n z);P0PAMhM07_+hLEC8?jPNA=z5c}b^tdIkGid3%M*D@0Ao-t2}+UZ`bD6TT+ zZpWwgUv0N-Gr1$CJiVLxck$&_Hk8uYY0iMr8xjX?o(#=2B5OwuwZqhGnP8!M( zI{5-3ovO}+bb_GRN6G)Fq2sIU3S9|`S@c_5P^X7Mv5~r4U1q&AQ_MDh_4!QEN!6oT zJ}nA~3^Sv`Ctjh~%UvM~FXwH>d}8|K@A6V&lK7r}uTjhpJz3*gnWc25au8h zAQU1LArvFbMJPciMJPjJf8^U&k v9SC8BCWK~$7KB!W$SAZqz$NPqlHOoshqu+}j^4bJ>u99B%}+b)+m8Pa9Gxt^