feat(batch27): implement jetstream usage foundations and limit helpers

This commit is contained in:
Joseph Doherty
2026-02-28 21:07:24 -05:00
parent 12b8c9b4c5
commit 6726c07749
4 changed files with 452 additions and 0 deletions

View File

@@ -250,6 +250,162 @@ public sealed partial class Account
return null;
}
public JetStreamAccountStats JetStreamUsage()
{
_mu.EnterReadLock();
var jsa = JetStream;
var accountName = Name;
var configuredLimits = JetStreamLimits;
_mu.ExitReadLock();
var stats = new JetStreamAccountStats();
if (jsa == null)
return stats;
var (js, _) = jsa.JetStreamAndClustered();
if (js == null)
return stats;
jsa.UsageLock.EnterReadLock();
try
{
long mem = 0;
long store = 0;
foreach (var usage in jsa.Usage.Values)
{
mem += usage.Total.Mem;
store += usage.Total.Store;
}
stats.Memory = (ulong)Math.Max(0, mem);
stats.Store = (ulong)Math.Max(0, store);
stats.Domain = js.Config.Domain;
stats.Api = new JetStreamApiStats
{
Level = JetStreamVersioning.JsApiLevel,
Total = jsa.ApiTotal,
Errors = jsa.ApiErrors,
};
if (jsa.Limits.TryGetValue(string.Empty, out var defaultTier))
{
stats.Limits = defaultTier;
}
else
{
stats.Tiers = new Dictionary<string, JetStreamTier>(StringComparer.Ordinal);
foreach (var (tier, usage) in jsa.Usage)
{
jsa.Limits.TryGetValue(tier, out var tierLimits);
stats.Tiers[tier] = new JetStreamTier
{
Memory = (ulong)Math.Max(0, usage.Total.Mem),
Store = (ulong)Math.Max(0, usage.Total.Store),
Limits = tierLimits ?? new JetStreamAccountLimits(),
};
}
if (configuredLimits != null)
{
foreach (var (tier, value) in configuredLimits)
{
if (stats.Tiers.ContainsKey(tier))
continue;
if (value is not JetStreamAccountLimits lim)
continue;
stats.Tiers[tier] = new JetStreamTier { Limits = lim };
}
}
}
}
finally
{
jsa.UsageLock.ExitReadLock();
}
var allStreams = Streams();
stats.Streams = allStreams.Count;
foreach (var stream in allStreams)
stats.Consumers += stream.State().Consumers;
if (stats.Tiers != null)
{
foreach (var stream in allStreams)
{
var tier = JetStreamEngine.TierName(stream.Config.Replicas);
if (!stats.Tiers.TryGetValue(tier, out var u))
u = new JetStreamTier();
u.Streams++;
u.Consumers += stream.State().Consumers;
stats.Tiers[tier] = u;
}
}
if (stats.Tiers == null || stats.Tiers.Count == 0)
{
var (rmem, rstore) = jsa.ReservedStorage(string.Empty);
stats.ReservedMemory = rmem;
stats.ReservedStore = rstore;
}
else
{
foreach (var tier in stats.Tiers.Keys.ToArray())
{
var tierStats = stats.Tiers[tier];
(tierStats.ReservedMemory, tierStats.ReservedStore) = jsa.ReservedStorage(tier);
stats.Tiers[tier] = tierStats;
}
}
_ = accountName;
return stats;
}
internal Exception? DisableJetStream() => RemoveJetStream();
internal Exception? RemoveJetStream()
{
_mu.EnterWriteLock();
var server = Server as NatsServer;
var jsa = JetStream;
JetStream = null;
_mu.ExitWriteLock();
if (server == null)
return new InvalidOperationException("jetstream account not registered");
var js = server.GetJetStream();
if (js == null)
return new InvalidOperationException("jetstream not enabled for account");
return js.DisableJetStream(jsa);
}
internal bool JetStreamConfigured()
{
_mu.EnterReadLock();
try
{
return JetStreamLimits != null && JetStreamLimits.Count > 0;
}
finally
{
_mu.ExitReadLock();
}
}
internal bool JetStreamEnabled()
{
_mu.EnterReadLock();
try
{
return JetStream != null;
}
finally
{
_mu.ExitReadLock();
}
}
internal Exception? EnableAllJetStreamServiceImportsAndMappings()
{
_mu.EnterReadLock();

View File

@@ -3,6 +3,7 @@ namespace ZB.MOM.NatsNet.Server;
internal sealed class JetStreamEngine(JetStream state)
{
private readonly JetStream _state = state;
private static readonly TimeSpan MinUsageUpdateWindow = TimeSpan.FromMilliseconds(250);
internal void SetStarted()
{
@@ -44,4 +45,105 @@ internal sealed class JetStreamEngine(JetStream state)
_state.Lock.ExitReadLock();
}
}
internal Exception? DisableJetStream(JsAccount? account)
{
if (account?.Account is not Account a)
return new InvalidOperationException("jetstream not enabled for account");
_state.Lock.EnterWriteLock();
try
{
_state.Accounts.Remove(a.Name);
}
finally
{
_state.Lock.ExitWriteLock();
}
account.Delete();
return null;
}
internal bool WouldExceedLimits(StorageType storageType, int size)
{
var total = storageType == StorageType.MemoryStorage
? Interlocked.Read(ref _state.MemUsed)
: Interlocked.Read(ref _state.StoreUsed);
var max = storageType == StorageType.MemoryStorage
? _state.Config.MaxMemory
: _state.Config.MaxStore;
return total + size > max;
}
internal bool LimitsExceeded(StorageType storageType) => WouldExceedLimits(storageType, 0);
internal static string TierName(int replicas) => $"R{(replicas <= 0 ? 1 : replicas)}";
internal static bool IsSameTier(StreamConfig cfgA, StreamConfig cfgB) =>
cfgA.Replicas == cfgB.Replicas;
internal static Dictionary<string, JetStreamAccountLimits> DiffCheckedLimits(
Dictionary<string, JetStreamAccountLimits> a,
Dictionary<string, JetStreamAccountLimits> b)
{
var diff = new Dictionary<string, JetStreamAccountLimits>(StringComparer.Ordinal);
foreach (var (tier, oldLimit) in a)
{
b.TryGetValue(tier, out var newLimit);
newLimit ??= new JetStreamAccountLimits();
diff[tier] = new JetStreamAccountLimits
{
MaxMemory = newLimit.MaxMemory - oldLimit.MaxMemory,
MaxStore = newLimit.MaxStore - oldLimit.MaxStore,
};
}
foreach (var (tier, newLimit) in b)
{
if (a.ContainsKey(tier))
continue;
diff[tier] = new JetStreamAccountLimits
{
MaxMemory = newLimit.MaxMemory,
MaxStore = newLimit.MaxStore,
};
}
return diff;
}
internal static (ulong Mem, ulong Store) ReservedStorage(
Dictionary<string, object?> streamAssignments,
string tier)
{
ulong mem = 0;
ulong store = 0;
foreach (var assignment in streamAssignments.Values.OfType<StreamAssignmentView>())
{
var cfg = assignment.Config;
if (!string.IsNullOrEmpty(tier) && !string.Equals(tier, TierName(cfg.Replicas), StringComparison.Ordinal))
continue;
if (cfg.MaxBytes <= 0)
continue;
if (cfg.Storage == StorageType.FileStorage)
store += (ulong)cfg.MaxBytes;
else if (cfg.Storage == StorageType.MemoryStorage)
mem += (ulong)cfg.MaxBytes;
}
return (mem, store);
}
internal static bool ShouldSendUsageUpdate(DateTime lastUpdateUtc) =>
DateTime.UtcNow - lastUpdateUtc >= MinUsageUpdateWindow;
}
internal sealed class StreamAssignmentView
{
public required StreamConfig Config { get; init; }
}

View File

@@ -0,0 +1,194 @@
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class JsAccount
{
internal (ulong Mem, ulong Store) ReservedStorage(string tier)
{
ulong mem = 0;
ulong store = 0;
Lock.EnterReadLock();
try
{
foreach (var stream in Streams.Values.OfType<NatsStream>())
{
var cfg = stream.Config;
if (!string.IsNullOrEmpty(tier) && !string.Equals(tier, JetStreamEngine.TierName(cfg.Replicas), StringComparison.Ordinal))
continue;
if (cfg.MaxBytes <= 0)
continue;
if (cfg.Storage == StorageType.FileStorage)
store += (ulong)cfg.MaxBytes;
else if (cfg.Storage == StorageType.MemoryStorage)
mem += (ulong)cfg.MaxBytes;
}
}
finally
{
Lock.ExitReadLock();
}
return (mem, store);
}
internal void RemoteUpdateUsage(byte[] message)
{
if (message.Length < 16)
return;
UsageLock.EnterWriteLock();
try
{
if (!Usage.TryGetValue(string.Empty, out var usage))
{
usage = new JsaStorage();
Usage[string.Empty] = usage;
}
usage.Total.Mem = BitConverter.ToInt64(message, 0);
usage.Total.Store = BitConverter.ToInt64(message, 8);
}
finally
{
UsageLock.ExitWriteLock();
}
}
internal void CheckAndSyncUsage(string tier, StorageType storageType)
{
if (Interlocked.CompareExchange(ref Sync, 1, 0) != 0)
return;
try
{
Lock.EnterReadLock();
try
{
long total = 0;
foreach (var stream in Streams.Values.OfType<NatsStream>())
{
if (!string.Equals(JetStreamEngine.TierName(stream.Config.Replicas), tier, StringComparison.Ordinal))
continue;
if (stream.Config.Storage != storageType)
continue;
total += (long)stream.State().Bytes;
}
UsageLock.EnterWriteLock();
try
{
Usage.TryGetValue(tier, out var usage);
usage ??= new JsaStorage();
Usage[tier] = usage;
if (storageType == StorageType.MemoryStorage)
{
usage.Local.Mem = total;
usage.Total.Mem = total;
}
else
{
usage.Local.Store = total;
usage.Total.Store = total;
}
}
finally
{
UsageLock.ExitWriteLock();
}
}
finally
{
Lock.ExitReadLock();
}
}
finally
{
Interlocked.Exchange(ref Sync, 0);
}
}
internal void UpdateUsage(string tier, StorageType storageType, long delta)
{
UsageLock.EnterWriteLock();
try
{
Usage.TryGetValue(tier, out var usage);
usage ??= new JsaStorage();
Usage[tier] = usage;
if (storageType == StorageType.MemoryStorage)
{
usage.Local.Mem += delta;
usage.Total.Mem += delta;
}
else
{
usage.Local.Store += delta;
usage.Total.Store += delta;
}
}
finally
{
UsageLock.ExitWriteLock();
}
}
internal void SendClusterUsageUpdateTimer()
{
UsageLock.EnterWriteLock();
try
{
SendClusterUsageUpdate();
}
finally
{
UsageLock.ExitWriteLock();
}
}
internal void SendClusterUsageUpdate()
{
var now = DateTime.UtcNow;
if (!JetStreamEngine.ShouldSendUsageUpdate(LUpdate))
return;
LUpdate = now;
// Cluster bus publish is wired in later cluster sessions.
UsageApi = ApiTotal;
UsageErr = ApiErrors;
}
internal (JetStream? JetStream, bool Clustered) JetStreamAndClustered()
{
Lock.EnterReadLock();
try
{
var js = Js as JetStream;
return (js, js?.Cluster != null);
}
finally
{
Lock.ExitReadLock();
}
}
internal Account? Acc() => Account as Account;
internal void Delete()
{
Lock.EnterWriteLock();
try
{
Streams.Clear();
Inflight.Clear();
UpdatesSub = null;
UpdatesPub = string.Empty;
}
finally
{
Lock.ExitWriteLock();
}
}
}

Binary file not shown.