diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs index 7402d39..ba08743 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs @@ -141,6 +141,185 @@ internal sealed class JetStreamEngine(JetStream state) internal static bool ShouldSendUsageUpdate(DateTime lastUpdateUtc) => DateTime.UtcNow - lastUpdateUtc >= MinUsageUpdateWindow; + + internal Exception? CheckAccountLimits( + JetStreamAccountLimits selected, + StreamConfig config, + long currentReservation) => + CheckLimits(selected, config, checkServer: false, currentReservation, 0); + + internal Exception? CheckAllLimits( + JetStreamAccountLimits selected, + StreamConfig config, + long currentReservation, + long maxBytesOffset) => + CheckLimits(selected, config, checkServer: true, currentReservation, maxBytesOffset); + + internal Exception? CheckLimits( + JetStreamAccountLimits selected, + StreamConfig config, + bool checkServer, + long currentReservation, + long maxBytesOffset) + { + if (config.MaxConsumers > 0 && selected.MaxConsumers > 0 && config.MaxConsumers > selected.MaxConsumers) + return new InvalidOperationException("maximum consumers limit exceeded"); + + return CheckBytesLimits(selected, config.MaxBytes, config.Storage, checkServer, currentReservation, maxBytesOffset); + } + + internal Exception? CheckBytesLimits( + JetStreamAccountLimits selected, + long addBytes, + StorageType storage, + bool checkServer, + long currentReservation, + long maxBytesOffset) + { + if (addBytes < 0) + addBytes = 1; + + var totalBytes = addBytes + maxBytesOffset; + switch (storage) + { + case StorageType.MemoryStorage: + if (selected.MaxMemory >= 0 && currentReservation + totalBytes > selected.MaxMemory) + return new InvalidOperationException("insufficient memory resources"); + if (checkServer && Interlocked.Read(ref _state.MemReserved) + totalBytes > _state.Config.MaxMemory) + return new InvalidOperationException("insufficient memory resources"); + break; + + case StorageType.FileStorage: + if (selected.MaxStore >= 0 && currentReservation + totalBytes > selected.MaxStore) + return new InvalidOperationException("insufficient storage resources"); + if (checkServer && Interlocked.Read(ref _state.StoreReserved) + totalBytes > _state.Config.MaxStore) + return new InvalidOperationException("insufficient storage resources"); + break; + } + + return null; + } + + internal JsAccount? LookupAccount(Account? account) + { + if (account == null) + return null; + + _state.Lock.EnterReadLock(); + try + { + _state.Accounts.TryGetValue(account.Name, out var jsa); + return jsa; + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal JetStreamStats UsageStats() + { + var stats = new JetStreamStats(); + _state.Lock.EnterReadLock(); + try + { + stats.Accounts = _state.Accounts.Count; + stats.ReservedMemory = (ulong)Math.Max(0, Interlocked.Read(ref _state.MemReserved)); + stats.ReservedStore = (ulong)Math.Max(0, Interlocked.Read(ref _state.StoreReserved)); + stats.Api.Level = JetStreamVersioning.JsApiLevel; + stats.Api.Total = (ulong)Math.Max(0, Interlocked.Read(ref _state.ApiTotal)); + stats.Api.Errors = (ulong)Math.Max(0, Interlocked.Read(ref _state.ApiErrors)); + stats.Api.Inflight = (ulong)Math.Max(0, Interlocked.Read(ref _state.ApiInflight)); + stats.Memory = (ulong)Math.Max(0, Interlocked.Read(ref _state.MemUsed)); + stats.Store = (ulong)Math.Max(0, Interlocked.Read(ref _state.StoreUsed)); + stats.HAAssets = 0; + } + finally + { + _state.Lock.ExitReadLock(); + } + + return stats; + } + + internal Exception? SufficientResources(Dictionary? limits) + { + if (limits == null || !_state.StandAlone) + return null; + + static (long MaxMem, long MaxStore) Totals(Dictionary source) + { + long mem = 0; + long store = 0; + foreach (var lim in source.Values) + { + if (lim.MaxMemory > 0) mem += lim.MaxMemory; + if (lim.MaxStore > 0) store += lim.MaxStore; + } + + return (mem, store); + } + + var (totalMem, totalStore) = Totals(limits); + if (Interlocked.Read(ref _state.MemReserved) + totalMem > _state.Config.MaxMemory) + return new InvalidOperationException("insufficient memory resources"); + if (Interlocked.Read(ref _state.StoreReserved) + totalStore > _state.Config.MaxStore) + return new InvalidOperationException("insufficient storage resources"); + + long reservedMem = 0; + long reservedStore = 0; + _state.Lock.EnterReadLock(); + try + { + foreach (var jsa in _state.Accounts.Values) + { + jsa.UsageLock.EnterReadLock(); + try + { + var (m, s) = Totals(jsa.Limits); + reservedMem += m; + reservedStore += s; + } + finally + { + jsa.UsageLock.ExitReadLock(); + } + } + } + finally + { + _state.Lock.ExitReadLock(); + } + + if (reservedMem + totalMem > _state.Config.MaxMemory) + return new InvalidOperationException("insufficient memory resources"); + if (reservedStore + totalStore > _state.Config.MaxStore) + return new InvalidOperationException("insufficient storage resources"); + + return null; + } + + internal void ReserveStreamResources(StreamConfig? cfg) + { + if (cfg == null || cfg.MaxBytes <= 0) + return; + + if (cfg.Storage == StorageType.MemoryStorage) + Interlocked.Add(ref _state.MemReserved, cfg.MaxBytes); + else if (cfg.Storage == StorageType.FileStorage) + Interlocked.Add(ref _state.StoreReserved, cfg.MaxBytes); + } + + internal void ReleaseStreamResources(StreamConfig? cfg) + { + if (cfg == null || cfg.MaxBytes <= 0) + return; + + if (cfg.Storage == StorageType.MemoryStorage) + Interlocked.Add(ref _state.MemReserved, -cfg.MaxBytes); + else if (cfg.Storage == StorageType.FileStorage) + Interlocked.Add(ref _state.StoreReserved, -cfg.MaxBytes); + } } internal sealed class StreamAssignmentView diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs index 450e9e2..7437ae5 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs @@ -176,6 +176,117 @@ internal sealed partial class JsAccount internal Account? Acc() => Account as Account; + internal (JetStreamAccountLimits Limits, string Tier, bool Found) SelectLimits(int replicas) + { + UsageLock.EnterReadLock(); + try + { + if (Limits.TryGetValue(string.Empty, out var selected)) + return (selected, string.Empty, true); + + var tier = JetStreamEngine.TierName(replicas); + if (Limits.TryGetValue(tier, out selected)) + return (selected, tier, true); + + return (new JetStreamAccountLimits(), string.Empty, false); + } + finally + { + UsageLock.ExitReadLock(); + } + } + + internal int CountStreams(string tier, StreamConfig cfg) + { + Lock.EnterReadLock(); + try + { + var count = 0; + foreach (var stream in Streams.Values.OfType()) + { + if ((!string.IsNullOrEmpty(tier) && !JetStreamEngine.IsSameTier(stream.Config, cfg)) || + string.Equals(stream.Config.Name, cfg.Name, StringComparison.Ordinal)) + continue; + count++; + } + + return count; + } + finally + { + Lock.ExitReadLock(); + } + } + + internal (ulong Mem, ulong Store) StorageTotals() + { + UsageLock.EnterReadLock(); + try + { + ulong mem = 0; + ulong store = 0; + foreach (var usage in Usage.Values) + { + mem += (ulong)Math.Max(0, usage.Total.Mem); + store += (ulong)Math.Max(0, usage.Total.Store); + } + + return (mem, store); + } + finally + { + UsageLock.ExitReadLock(); + } + } + + internal (bool Exceeded, JsApiError? Error) LimitsExceeded(StorageType storageType, string tier, int replicas) => + WouldExceedLimits(storageType, tier, replicas, string.Empty, null, null); + + internal (bool Exceeded, JsApiError? Error) WouldExceedLimits( + StorageType storageType, + string tier, + int replicas, + string subject, + byte[]? headers, + byte[]? payload) + { + UsageLock.EnterReadLock(); + try + { + if (!Limits.TryGetValue(tier, out var selected)) + return (true, JsApiErrors.NewJSNoLimitsError()); + if (!Usage.TryGetValue(tier, out var inUse)) + return (false, null); + + var r = Math.Max(1, replicas); + var lr = string.IsNullOrEmpty(tier) ? 1 : r; + var bytes = (subject.Length + (headers?.Length ?? 0) + (payload?.Length ?? 0)) * r; + + if (storageType == StorageType.MemoryStorage) + { + var totalMem = inUse.Total.Mem + bytes; + if (selected.MemoryMaxStreamBytes > 0 && totalMem > selected.MemoryMaxStreamBytes * lr) + return (true, null); + if (selected.MaxMemory >= 0 && totalMem > selected.MaxMemory * lr) + return (true, null); + } + else + { + var totalStore = inUse.Total.Store + bytes; + if (selected.StoreMaxStreamBytes > 0 && totalStore > selected.StoreMaxStreamBytes * lr) + return (true, null); + if (selected.MaxStore >= 0 && totalStore > selected.MaxStore * lr) + return (true, null); + } + + return (false, null); + } + finally + { + UsageLock.ExitReadLock(); + } + } + internal void Delete() { Lock.EnterWriteLock(); diff --git a/porting.db b/porting.db index 76bfcb7..82f20b1 100644 Binary files a/porting.db and b/porting.db differ