feat(batch27): implement jsaccount limit checks and resource accounting

This commit is contained in:
Joseph Doherty
2026-02-28 21:10:32 -05:00
parent 6726c07749
commit 68f32c7b85
3 changed files with 290 additions and 0 deletions

View File

@@ -141,6 +141,185 @@ internal sealed class JetStreamEngine(JetStream state)
internal static bool ShouldSendUsageUpdate(DateTime lastUpdateUtc) =>
DateTime.UtcNow - lastUpdateUtc >= MinUsageUpdateWindow;
internal Exception? CheckAccountLimits(
JetStreamAccountLimits selected,
StreamConfig config,
long currentReservation) =>
CheckLimits(selected, config, checkServer: false, currentReservation, 0);
internal Exception? CheckAllLimits(
JetStreamAccountLimits selected,
StreamConfig config,
long currentReservation,
long maxBytesOffset) =>
CheckLimits(selected, config, checkServer: true, currentReservation, maxBytesOffset);
internal Exception? CheckLimits(
JetStreamAccountLimits selected,
StreamConfig config,
bool checkServer,
long currentReservation,
long maxBytesOffset)
{
if (config.MaxConsumers > 0 && selected.MaxConsumers > 0 && config.MaxConsumers > selected.MaxConsumers)
return new InvalidOperationException("maximum consumers limit exceeded");
return CheckBytesLimits(selected, config.MaxBytes, config.Storage, checkServer, currentReservation, maxBytesOffset);
}
internal Exception? CheckBytesLimits(
JetStreamAccountLimits selected,
long addBytes,
StorageType storage,
bool checkServer,
long currentReservation,
long maxBytesOffset)
{
if (addBytes < 0)
addBytes = 1;
var totalBytes = addBytes + maxBytesOffset;
switch (storage)
{
case StorageType.MemoryStorage:
if (selected.MaxMemory >= 0 && currentReservation + totalBytes > selected.MaxMemory)
return new InvalidOperationException("insufficient memory resources");
if (checkServer && Interlocked.Read(ref _state.MemReserved) + totalBytes > _state.Config.MaxMemory)
return new InvalidOperationException("insufficient memory resources");
break;
case StorageType.FileStorage:
if (selected.MaxStore >= 0 && currentReservation + totalBytes > selected.MaxStore)
return new InvalidOperationException("insufficient storage resources");
if (checkServer && Interlocked.Read(ref _state.StoreReserved) + totalBytes > _state.Config.MaxStore)
return new InvalidOperationException("insufficient storage resources");
break;
}
return null;
}
internal JsAccount? LookupAccount(Account? account)
{
if (account == null)
return null;
_state.Lock.EnterReadLock();
try
{
_state.Accounts.TryGetValue(account.Name, out var jsa);
return jsa;
}
finally
{
_state.Lock.ExitReadLock();
}
}
internal JetStreamStats UsageStats()
{
var stats = new JetStreamStats();
_state.Lock.EnterReadLock();
try
{
stats.Accounts = _state.Accounts.Count;
stats.ReservedMemory = (ulong)Math.Max(0, Interlocked.Read(ref _state.MemReserved));
stats.ReservedStore = (ulong)Math.Max(0, Interlocked.Read(ref _state.StoreReserved));
stats.Api.Level = JetStreamVersioning.JsApiLevel;
stats.Api.Total = (ulong)Math.Max(0, Interlocked.Read(ref _state.ApiTotal));
stats.Api.Errors = (ulong)Math.Max(0, Interlocked.Read(ref _state.ApiErrors));
stats.Api.Inflight = (ulong)Math.Max(0, Interlocked.Read(ref _state.ApiInflight));
stats.Memory = (ulong)Math.Max(0, Interlocked.Read(ref _state.MemUsed));
stats.Store = (ulong)Math.Max(0, Interlocked.Read(ref _state.StoreUsed));
stats.HAAssets = 0;
}
finally
{
_state.Lock.ExitReadLock();
}
return stats;
}
internal Exception? SufficientResources(Dictionary<string, JetStreamAccountLimits>? limits)
{
if (limits == null || !_state.StandAlone)
return null;
static (long MaxMem, long MaxStore) Totals(Dictionary<string, JetStreamAccountLimits> source)
{
long mem = 0;
long store = 0;
foreach (var lim in source.Values)
{
if (lim.MaxMemory > 0) mem += lim.MaxMemory;
if (lim.MaxStore > 0) store += lim.MaxStore;
}
return (mem, store);
}
var (totalMem, totalStore) = Totals(limits);
if (Interlocked.Read(ref _state.MemReserved) + totalMem > _state.Config.MaxMemory)
return new InvalidOperationException("insufficient memory resources");
if (Interlocked.Read(ref _state.StoreReserved) + totalStore > _state.Config.MaxStore)
return new InvalidOperationException("insufficient storage resources");
long reservedMem = 0;
long reservedStore = 0;
_state.Lock.EnterReadLock();
try
{
foreach (var jsa in _state.Accounts.Values)
{
jsa.UsageLock.EnterReadLock();
try
{
var (m, s) = Totals(jsa.Limits);
reservedMem += m;
reservedStore += s;
}
finally
{
jsa.UsageLock.ExitReadLock();
}
}
}
finally
{
_state.Lock.ExitReadLock();
}
if (reservedMem + totalMem > _state.Config.MaxMemory)
return new InvalidOperationException("insufficient memory resources");
if (reservedStore + totalStore > _state.Config.MaxStore)
return new InvalidOperationException("insufficient storage resources");
return null;
}
internal void ReserveStreamResources(StreamConfig? cfg)
{
if (cfg == null || cfg.MaxBytes <= 0)
return;
if (cfg.Storage == StorageType.MemoryStorage)
Interlocked.Add(ref _state.MemReserved, cfg.MaxBytes);
else if (cfg.Storage == StorageType.FileStorage)
Interlocked.Add(ref _state.StoreReserved, cfg.MaxBytes);
}
internal void ReleaseStreamResources(StreamConfig? cfg)
{
if (cfg == null || cfg.MaxBytes <= 0)
return;
if (cfg.Storage == StorageType.MemoryStorage)
Interlocked.Add(ref _state.MemReserved, -cfg.MaxBytes);
else if (cfg.Storage == StorageType.FileStorage)
Interlocked.Add(ref _state.StoreReserved, -cfg.MaxBytes);
}
}
internal sealed class StreamAssignmentView

View File

@@ -176,6 +176,117 @@ internal sealed partial class JsAccount
internal Account? Acc() => Account as Account;
internal (JetStreamAccountLimits Limits, string Tier, bool Found) SelectLimits(int replicas)
{
UsageLock.EnterReadLock();
try
{
if (Limits.TryGetValue(string.Empty, out var selected))
return (selected, string.Empty, true);
var tier = JetStreamEngine.TierName(replicas);
if (Limits.TryGetValue(tier, out selected))
return (selected, tier, true);
return (new JetStreamAccountLimits(), string.Empty, false);
}
finally
{
UsageLock.ExitReadLock();
}
}
internal int CountStreams(string tier, StreamConfig cfg)
{
Lock.EnterReadLock();
try
{
var count = 0;
foreach (var stream in Streams.Values.OfType<NatsStream>())
{
if ((!string.IsNullOrEmpty(tier) && !JetStreamEngine.IsSameTier(stream.Config, cfg)) ||
string.Equals(stream.Config.Name, cfg.Name, StringComparison.Ordinal))
continue;
count++;
}
return count;
}
finally
{
Lock.ExitReadLock();
}
}
internal (ulong Mem, ulong Store) StorageTotals()
{
UsageLock.EnterReadLock();
try
{
ulong mem = 0;
ulong store = 0;
foreach (var usage in Usage.Values)
{
mem += (ulong)Math.Max(0, usage.Total.Mem);
store += (ulong)Math.Max(0, usage.Total.Store);
}
return (mem, store);
}
finally
{
UsageLock.ExitReadLock();
}
}
internal (bool Exceeded, JsApiError? Error) LimitsExceeded(StorageType storageType, string tier, int replicas) =>
WouldExceedLimits(storageType, tier, replicas, string.Empty, null, null);
internal (bool Exceeded, JsApiError? Error) WouldExceedLimits(
StorageType storageType,
string tier,
int replicas,
string subject,
byte[]? headers,
byte[]? payload)
{
UsageLock.EnterReadLock();
try
{
if (!Limits.TryGetValue(tier, out var selected))
return (true, JsApiErrors.NewJSNoLimitsError());
if (!Usage.TryGetValue(tier, out var inUse))
return (false, null);
var r = Math.Max(1, replicas);
var lr = string.IsNullOrEmpty(tier) ? 1 : r;
var bytes = (subject.Length + (headers?.Length ?? 0) + (payload?.Length ?? 0)) * r;
if (storageType == StorageType.MemoryStorage)
{
var totalMem = inUse.Total.Mem + bytes;
if (selected.MemoryMaxStreamBytes > 0 && totalMem > selected.MemoryMaxStreamBytes * lr)
return (true, null);
if (selected.MaxMemory >= 0 && totalMem > selected.MaxMemory * lr)
return (true, null);
}
else
{
var totalStore = inUse.Total.Store + bytes;
if (selected.StoreMaxStreamBytes > 0 && totalStore > selected.StoreMaxStreamBytes * lr)
return (true, null);
if (selected.MaxStore >= 0 && totalStore > selected.MaxStore * lr)
return (true, null);
}
return (false, null);
}
finally
{
UsageLock.ExitReadLock();
}
}
internal void Delete()
{
Lock.EnterWriteLock();

Binary file not shown.