diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs index c20d2d9..d04e8ae 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs @@ -2,6 +2,254 @@ namespace ZB.MOM.NatsNet.Server; public sealed partial class Account { + private static Dictionary DefaultJetStreamAccountTiers() + { + return new Dictionary(StringComparer.Ordinal) + { + [string.Empty] = new JetStreamAccountLimits + { + MaxMemory = -1, + MaxStore = -1, + MaxStreams = -1, + MaxConsumers = -1, + MaxAckPending = -1, + MemoryMaxStreamBytes = -1, + StoreMaxStreamBytes = -1, + }, + }; + } + + private static Dictionary ToTypedLimits(Dictionary limits) + { + var typed = new Dictionary(StringComparer.Ordinal); + foreach (var (tier, value) in limits) + { + if (value is JetStreamAccountLimits v) + typed[tier] = v; + } + + return typed; + } + + private static JetStreamAccountLimits SelectLimits(Dictionary limits) + { + if (limits.TryGetValue(string.Empty, out var selected)) + return selected; + + foreach (var (_, value) in limits) + return value; + + return new JetStreamAccountLimits(); + } + + internal void AssignJetStreamLimits(Dictionary limits) + { + _mu.EnterWriteLock(); + try + { + JetStreamLimits = limits; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal Exception? EnableJetStream(Dictionary? limits) + { + _mu.EnterReadLock(); + var server = Server as NatsServer; + _mu.ExitReadLock(); + + if (server == null) + return new InvalidOperationException("jetstream account not registered"); + if (ReferenceEquals(server.SystemAccount(), this)) + return new InvalidOperationException("jetstream can not be enabled on the system account"); + + limits ??= DefaultJetStreamAccountTiers(); + if (limits.Count == 0) + limits = DefaultJetStreamAccountTiers(); + + AssignJetStreamLimits(limits); + var typedLimits = ToTypedLimits(limits); + + var js = server.GetJetStreamState(); + if (js == null) + return new InvalidOperationException("jetstream not enabled"); + + js.Lock.EnterWriteLock(); + try + { + if (js.Accounts.TryGetValue(Name, out var existing)) + { + _mu.EnterWriteLock(); + JetStream = existing; + _mu.ExitWriteLock(); + return EnableAllJetStreamServiceImportsAndMappings(); + } + + var jsa = new JsAccount + { + Js = js, + Account = this, + StoreDir = Path.Combine(js.Config.StoreDir, Name), + }; + + foreach (var (tier, tierLimits) in typedLimits) + jsa.Limits[tier] = tierLimits; + + js.Accounts[Name] = jsa; + _mu.EnterWriteLock(); + JetStream = jsa; + _mu.ExitWriteLock(); + } + finally + { + js.Lock.ExitWriteLock(); + } + + return EnableAllJetStreamServiceImportsAndMappings(); + } + + internal (bool MaxBytesRequired, long MaxStreamBytes) MaxBytesLimits(StreamConfig? cfg) + { + _mu.EnterReadLock(); + var jsa = JetStream; + _mu.ExitReadLock(); + if (jsa == null) + return (false, 0); + + jsa.UsageLock.EnterReadLock(); + try + { + var selected = SelectLimits(jsa.Limits); + var maxStreamBytes = cfg?.Storage == StorageType.MemoryStorage + ? selected.MemoryMaxStreamBytes + : selected.StoreMaxStreamBytes; + return (selected.MaxBytesRequired, maxStreamBytes); + } + finally + { + jsa.UsageLock.ExitReadLock(); + } + } + + internal int NumStreams() + { + _mu.EnterReadLock(); + var jsa = JetStream; + _mu.ExitReadLock(); + if (jsa == null) + return 0; + + jsa.Lock.EnterReadLock(); + try + { + return jsa.Streams.Count; + } + finally + { + jsa.Lock.ExitReadLock(); + } + } + + internal List Streams() => FilteredStreams(string.Empty); + + internal List FilteredStreams(string filter) + { + _mu.EnterReadLock(); + var jsa = JetStream; + _mu.ExitReadLock(); + if (jsa == null) + return []; + + jsa.Lock.EnterReadLock(); + try + { + var streams = new List(); + foreach (var stream in jsa.Streams.Values.OfType()) + { + if (string.IsNullOrWhiteSpace(filter)) + { + streams.Add(stream); + continue; + } + + foreach (var subject in stream.Config.Subjects ?? []) + { + if (Internal.DataStructures.SubscriptionIndex.SubjectsCollide(filter, subject)) + { + streams.Add(stream); + break; + } + } + } + + return streams; + } + finally + { + jsa.Lock.ExitReadLock(); + } + } + + internal (NatsStream? Stream, Exception? Error) LookupStream(string name) + { + _mu.EnterReadLock(); + var jsa = JetStream; + _mu.ExitReadLock(); + if (jsa == null) + return (null, new InvalidOperationException("jetstream not enabled for account")); + + jsa.Lock.EnterReadLock(); + try + { + if (jsa.Streams.TryGetValue(name, out var stream) && stream is NatsStream ns) + return (ns, null); + + return (null, new InvalidOperationException("stream not found")); + } + finally + { + jsa.Lock.ExitReadLock(); + } + } + + internal Exception? UpdateJetStreamLimits(Dictionary? limits) + { + _mu.EnterReadLock(); + var server = Server as NatsServer; + var jsa = JetStream; + _mu.ExitReadLock(); + + if (server == null) + return new InvalidOperationException("jetstream account not registered"); + if (server.GetJetStreamState() == null) + return new InvalidOperationException("jetstream not enabled"); + if (jsa == null) + return new InvalidOperationException("jetstream not enabled for account"); + + limits ??= DefaultJetStreamAccountTiers(); + if (limits.Count == 0) + limits = DefaultJetStreamAccountTiers(); + AssignJetStreamLimits(limits); + + var typed = ToTypedLimits(limits); + jsa.UsageLock.EnterWriteLock(); + try + { + jsa.Limits.Clear(); + foreach (var (tier, tierLimits) in typed) + jsa.Limits[tier] = tierLimits; + } + finally + { + jsa.UsageLock.ExitWriteLock(); + } + + return null; + } + internal Exception? EnableAllJetStreamServiceImportsAndMappings() { _mu.EnterReadLock(); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs new file mode 100644 index 0000000..9313f00 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs @@ -0,0 +1,47 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed class JetStreamEngine(JetStream state) +{ + private readonly JetStream _state = state; + + internal void SetStarted() + { + _state.Lock.EnterWriteLock(); + try + { + _state.Started = DateTime.UtcNow; + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal bool IsEnabled() => Interlocked.CompareExchange(ref _state.Disabled, 0, 0) == 0; + + internal void SetJetStreamStandAlone(bool isStandAlone) + { + _state.Lock.EnterWriteLock(); + try + { + _state.StandAlone = isStandAlone; + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal bool IsShuttingDown() + { + _state.Lock.EnterReadLock(); + try + { + return _state.ShuttingDown; + } + finally + { + _state.Lock.ExitReadLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs index 774daa8..625632b 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs @@ -424,4 +424,79 @@ public sealed partial class NatsServer var js = _jetStream; return js != null && Interlocked.CompareExchange(ref js.Disabled, 0, 0) == 0; } + + public bool JetStreamEnabledForDomain() + { + if (JetStreamEnabled()) + return true; + + foreach (var value in _nodeToInfo.Values) + { + if (value is NodeInfo { Js: true }) + return true; + } + + return false; + } + + public JetStreamConfig? JetStreamConfig() + { + var js = _jetStream; + if (js == null) + return null; + + var cfg = js.Config; + return new JetStreamConfig + { + MaxMemory = cfg.MaxMemory, + MaxStore = cfg.MaxStore, + StoreDir = cfg.StoreDir, + SyncInterval = cfg.SyncInterval, + SyncAlways = cfg.SyncAlways, + Domain = cfg.Domain, + CompressOK = cfg.CompressOK, + UniqueTag = cfg.UniqueTag, + Strict = cfg.Strict, + }; + } + + public string StoreDir() + { + var js = _jetStream; + return js == null ? string.Empty : js.Config.StoreDir; + } + + public int JetStreamNumAccounts() + { + var js = _jetStream; + if (js == null) + return 0; + + js.Lock.EnterReadLock(); + try + { + return js.Accounts.Count; + } + finally + { + js.Lock.ExitReadLock(); + } + } + + public (long MemReserved, long StoreReserved, Exception? Error) JetStreamReservedResources() + { + var js = _jetStream; + if (js == null) + return (-1, -1, new InvalidOperationException("jetstream not enabled")); + + return ( + Interlocked.Read(ref js.MemReserved), + Interlocked.Read(ref js.StoreReserved), + null); + } + + internal JetStreamEngine? GetJetStream() => + _jetStream == null ? null : new JetStreamEngine(_jetStream); + + internal JetStream? GetJetStreamState() => _jetStream; } diff --git a/porting.db b/porting.db index 86c296c..da699df 100644 Binary files a/porting.db and b/porting.db differ