diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs index d04e8ae..a62381e 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs @@ -250,6 +250,162 @@ public sealed partial class Account return null; } + public JetStreamAccountStats JetStreamUsage() + { + _mu.EnterReadLock(); + var jsa = JetStream; + var accountName = Name; + var configuredLimits = JetStreamLimits; + _mu.ExitReadLock(); + + var stats = new JetStreamAccountStats(); + if (jsa == null) + return stats; + + var (js, _) = jsa.JetStreamAndClustered(); + if (js == null) + return stats; + + jsa.UsageLock.EnterReadLock(); + try + { + long mem = 0; + long store = 0; + foreach (var usage in jsa.Usage.Values) + { + mem += usage.Total.Mem; + store += usage.Total.Store; + } + + stats.Memory = (ulong)Math.Max(0, mem); + stats.Store = (ulong)Math.Max(0, store); + stats.Domain = js.Config.Domain; + stats.Api = new JetStreamApiStats + { + Level = JetStreamVersioning.JsApiLevel, + Total = jsa.ApiTotal, + Errors = jsa.ApiErrors, + }; + + if (jsa.Limits.TryGetValue(string.Empty, out var defaultTier)) + { + stats.Limits = defaultTier; + } + else + { + stats.Tiers = new Dictionary(StringComparer.Ordinal); + foreach (var (tier, usage) in jsa.Usage) + { + jsa.Limits.TryGetValue(tier, out var tierLimits); + stats.Tiers[tier] = new JetStreamTier + { + Memory = (ulong)Math.Max(0, usage.Total.Mem), + Store = (ulong)Math.Max(0, usage.Total.Store), + Limits = tierLimits ?? new JetStreamAccountLimits(), + }; + } + + if (configuredLimits != null) + { + foreach (var (tier, value) in configuredLimits) + { + if (stats.Tiers.ContainsKey(tier)) + continue; + if (value is not JetStreamAccountLimits lim) + continue; + stats.Tiers[tier] = new JetStreamTier { Limits = lim }; + } + } + } + } + finally + { + jsa.UsageLock.ExitReadLock(); + } + + var allStreams = Streams(); + stats.Streams = allStreams.Count; + foreach (var stream in allStreams) + stats.Consumers += stream.State().Consumers; + + if (stats.Tiers != null) + { + foreach (var stream in allStreams) + { + var tier = JetStreamEngine.TierName(stream.Config.Replicas); + if (!stats.Tiers.TryGetValue(tier, out var u)) + u = new JetStreamTier(); + u.Streams++; + u.Consumers += stream.State().Consumers; + stats.Tiers[tier] = u; + } + } + + if (stats.Tiers == null || stats.Tiers.Count == 0) + { + var (rmem, rstore) = jsa.ReservedStorage(string.Empty); + stats.ReservedMemory = rmem; + stats.ReservedStore = rstore; + } + else + { + foreach (var tier in stats.Tiers.Keys.ToArray()) + { + var tierStats = stats.Tiers[tier]; + (tierStats.ReservedMemory, tierStats.ReservedStore) = jsa.ReservedStorage(tier); + stats.Tiers[tier] = tierStats; + } + } + + _ = accountName; + return stats; + } + + internal Exception? DisableJetStream() => RemoveJetStream(); + + internal Exception? RemoveJetStream() + { + _mu.EnterWriteLock(); + var server = Server as NatsServer; + var jsa = JetStream; + JetStream = null; + _mu.ExitWriteLock(); + + if (server == null) + return new InvalidOperationException("jetstream account not registered"); + var js = server.GetJetStream(); + if (js == null) + return new InvalidOperationException("jetstream not enabled for account"); + + return js.DisableJetStream(jsa); + } + + internal bool JetStreamConfigured() + { + _mu.EnterReadLock(); + try + { + return JetStreamLimits != null && JetStreamLimits.Count > 0; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool JetStreamEnabled() + { + _mu.EnterReadLock(); + try + { + return JetStream != null; + } + finally + { + _mu.ExitReadLock(); + } + } + internal Exception? EnableAllJetStreamServiceImportsAndMappings() { _mu.EnterReadLock(); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs index 9313f00..7402d39 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs @@ -3,6 +3,7 @@ namespace ZB.MOM.NatsNet.Server; internal sealed class JetStreamEngine(JetStream state) { private readonly JetStream _state = state; + private static readonly TimeSpan MinUsageUpdateWindow = TimeSpan.FromMilliseconds(250); internal void SetStarted() { @@ -44,4 +45,105 @@ internal sealed class JetStreamEngine(JetStream state) _state.Lock.ExitReadLock(); } } + + internal Exception? DisableJetStream(JsAccount? account) + { + if (account?.Account is not Account a) + return new InvalidOperationException("jetstream not enabled for account"); + + _state.Lock.EnterWriteLock(); + try + { + _state.Accounts.Remove(a.Name); + } + finally + { + _state.Lock.ExitWriteLock(); + } + + account.Delete(); + return null; + } + + internal bool WouldExceedLimits(StorageType storageType, int size) + { + var total = storageType == StorageType.MemoryStorage + ? Interlocked.Read(ref _state.MemUsed) + : Interlocked.Read(ref _state.StoreUsed); + var max = storageType == StorageType.MemoryStorage + ? _state.Config.MaxMemory + : _state.Config.MaxStore; + return total + size > max; + } + + internal bool LimitsExceeded(StorageType storageType) => WouldExceedLimits(storageType, 0); + + internal static string TierName(int replicas) => $"R{(replicas <= 0 ? 1 : replicas)}"; + + internal static bool IsSameTier(StreamConfig cfgA, StreamConfig cfgB) => + cfgA.Replicas == cfgB.Replicas; + + internal static Dictionary DiffCheckedLimits( + Dictionary a, + Dictionary b) + { + var diff = new Dictionary(StringComparer.Ordinal); + + foreach (var (tier, oldLimit) in a) + { + b.TryGetValue(tier, out var newLimit); + newLimit ??= new JetStreamAccountLimits(); + diff[tier] = new JetStreamAccountLimits + { + MaxMemory = newLimit.MaxMemory - oldLimit.MaxMemory, + MaxStore = newLimit.MaxStore - oldLimit.MaxStore, + }; + } + + foreach (var (tier, newLimit) in b) + { + if (a.ContainsKey(tier)) + continue; + + diff[tier] = new JetStreamAccountLimits + { + MaxMemory = newLimit.MaxMemory, + MaxStore = newLimit.MaxStore, + }; + } + + return diff; + } + + internal static (ulong Mem, ulong Store) ReservedStorage( + Dictionary streamAssignments, + string tier) + { + ulong mem = 0; + ulong store = 0; + + foreach (var assignment in streamAssignments.Values.OfType()) + { + var cfg = assignment.Config; + if (!string.IsNullOrEmpty(tier) && !string.Equals(tier, TierName(cfg.Replicas), StringComparison.Ordinal)) + continue; + if (cfg.MaxBytes <= 0) + continue; + + if (cfg.Storage == StorageType.FileStorage) + store += (ulong)cfg.MaxBytes; + else if (cfg.Storage == StorageType.MemoryStorage) + mem += (ulong)cfg.MaxBytes; + } + + return (mem, store); + } + + internal static bool ShouldSendUsageUpdate(DateTime lastUpdateUtc) => + DateTime.UtcNow - lastUpdateUtc >= MinUsageUpdateWindow; +} + +internal sealed class StreamAssignmentView +{ + public required StreamConfig Config { get; init; } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs new file mode 100644 index 0000000..450e9e2 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs @@ -0,0 +1,194 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class JsAccount +{ + internal (ulong Mem, ulong Store) ReservedStorage(string tier) + { + ulong mem = 0; + ulong store = 0; + + Lock.EnterReadLock(); + try + { + foreach (var stream in Streams.Values.OfType()) + { + var cfg = stream.Config; + if (!string.IsNullOrEmpty(tier) && !string.Equals(tier, JetStreamEngine.TierName(cfg.Replicas), StringComparison.Ordinal)) + continue; + if (cfg.MaxBytes <= 0) + continue; + + if (cfg.Storage == StorageType.FileStorage) + store += (ulong)cfg.MaxBytes; + else if (cfg.Storage == StorageType.MemoryStorage) + mem += (ulong)cfg.MaxBytes; + } + } + finally + { + Lock.ExitReadLock(); + } + + return (mem, store); + } + + internal void RemoteUpdateUsage(byte[] message) + { + if (message.Length < 16) + return; + + UsageLock.EnterWriteLock(); + try + { + if (!Usage.TryGetValue(string.Empty, out var usage)) + { + usage = new JsaStorage(); + Usage[string.Empty] = usage; + } + + usage.Total.Mem = BitConverter.ToInt64(message, 0); + usage.Total.Store = BitConverter.ToInt64(message, 8); + } + finally + { + UsageLock.ExitWriteLock(); + } + } + + internal void CheckAndSyncUsage(string tier, StorageType storageType) + { + if (Interlocked.CompareExchange(ref Sync, 1, 0) != 0) + return; + + try + { + Lock.EnterReadLock(); + try + { + long total = 0; + foreach (var stream in Streams.Values.OfType()) + { + if (!string.Equals(JetStreamEngine.TierName(stream.Config.Replicas), tier, StringComparison.Ordinal)) + continue; + if (stream.Config.Storage != storageType) + continue; + total += (long)stream.State().Bytes; + } + + UsageLock.EnterWriteLock(); + try + { + Usage.TryGetValue(tier, out var usage); + usage ??= new JsaStorage(); + Usage[tier] = usage; + + if (storageType == StorageType.MemoryStorage) + { + usage.Local.Mem = total; + usage.Total.Mem = total; + } + else + { + usage.Local.Store = total; + usage.Total.Store = total; + } + } + finally + { + UsageLock.ExitWriteLock(); + } + } + finally + { + Lock.ExitReadLock(); + } + } + finally + { + Interlocked.Exchange(ref Sync, 0); + } + } + + internal void UpdateUsage(string tier, StorageType storageType, long delta) + { + UsageLock.EnterWriteLock(); + try + { + Usage.TryGetValue(tier, out var usage); + usage ??= new JsaStorage(); + Usage[tier] = usage; + + if (storageType == StorageType.MemoryStorage) + { + usage.Local.Mem += delta; + usage.Total.Mem += delta; + } + else + { + usage.Local.Store += delta; + usage.Total.Store += delta; + } + } + finally + { + UsageLock.ExitWriteLock(); + } + } + + internal void SendClusterUsageUpdateTimer() + { + UsageLock.EnterWriteLock(); + try + { + SendClusterUsageUpdate(); + } + finally + { + UsageLock.ExitWriteLock(); + } + } + + internal void SendClusterUsageUpdate() + { + var now = DateTime.UtcNow; + if (!JetStreamEngine.ShouldSendUsageUpdate(LUpdate)) + return; + LUpdate = now; + + // Cluster bus publish is wired in later cluster sessions. + UsageApi = ApiTotal; + UsageErr = ApiErrors; + } + + internal (JetStream? JetStream, bool Clustered) JetStreamAndClustered() + { + Lock.EnterReadLock(); + try + { + var js = Js as JetStream; + return (js, js?.Cluster != null); + } + finally + { + Lock.ExitReadLock(); + } + } + + internal Account? Acc() => Account as Account; + + internal void Delete() + { + Lock.EnterWriteLock(); + try + { + Streams.Clear(); + Inflight.Clear(); + UpdatesSub = null; + UpdatesPub = string.Empty; + } + finally + { + Lock.ExitWriteLock(); + } + } +} diff --git a/porting.db b/porting.db index da699df..76bfcb7 100644 Binary files a/porting.db and b/porting.db differ