feat(auth): add account import/export cycle detection and JetStream limits (E4+E5)

E4: AccountImportExport with DFS cycle detection for service imports,
RemoveServiceImport/RemoveStreamImport, and ValidateImport authorization.
E5: AccountLimits record with MaxStorage/MaxConsumers/MaxAckPending,
TryReserveConsumer/ReleaseConsumer, TrackStorageDelta on Account.
20 new tests, all passing.
This commit is contained in:
Joseph Doherty
2026-02-24 15:25:12 -05:00
parent efd053ba60
commit 235971ddcc
5 changed files with 572 additions and 1 deletions

View File

@@ -18,6 +18,9 @@ public sealed class Account : IDisposable
public int MaxJetStreamStreams { get; set; } // 0 = unlimited
public string? JetStreamTier { get; set; }
/// <summary>Per-account JetStream resource limits (storage, consumers, ack pending).</summary>
public AccountLimits JetStreamLimits { get; set; } = AccountLimits.Unlimited;
// JWT fields
public string? Nkey { get; set; }
public string? Issuer { get; set; }
@@ -39,6 +42,8 @@ public sealed class Account : IDisposable
private readonly ConcurrentDictionary<ulong, byte> _clients = new();
private int _subscriptionCount;
private int _jetStreamStreamCount;
private int _consumerCount;
private long _storageUsed;
public Account(string name)
{
@@ -48,6 +53,8 @@ public sealed class Account : IDisposable
public int ClientCount => _clients.Count;
public int SubscriptionCount => Volatile.Read(ref _subscriptionCount);
public int JetStreamStreamCount => Volatile.Read(ref _jetStreamStreamCount);
public int ConsumerCount => Volatile.Read(ref _consumerCount);
public long StorageUsed => Interlocked.Read(ref _storageUsed);
/// <summary>Returns false if max connections exceeded.</summary>
public bool AddClient(ulong clientId)
@@ -73,9 +80,17 @@ public sealed class Account : IDisposable
Interlocked.Decrement(ref _subscriptionCount);
}
/// <summary>
/// Reserves a stream slot, checking both <see cref="MaxJetStreamStreams"/> (legacy)
/// and <see cref="JetStreamLimits"/>.<see cref="AccountLimits.MaxStreams"/>.
/// </summary>
public bool TryReserveStream()
{
if (MaxJetStreamStreams > 0 && Volatile.Read(ref _jetStreamStreamCount) >= MaxJetStreamStreams)
var effectiveMax = JetStreamLimits.MaxStreams > 0
? JetStreamLimits.MaxStreams
: MaxJetStreamStreams;
if (effectiveMax > 0 && Volatile.Read(ref _jetStreamStreamCount) >= effectiveMax)
return false;
Interlocked.Increment(ref _jetStreamStreamCount);
@@ -90,6 +105,45 @@ public sealed class Account : IDisposable
Interlocked.Decrement(ref _jetStreamStreamCount);
}
/// <summary>Reserves a consumer slot. Returns false if <see cref="AccountLimits.MaxConsumers"/> is exceeded.</summary>
public bool TryReserveConsumer()
{
var max = JetStreamLimits.MaxConsumers;
if (max > 0 && Volatile.Read(ref _consumerCount) >= max)
return false;
Interlocked.Increment(ref _consumerCount);
return true;
}
public void ReleaseConsumer()
{
if (Volatile.Read(ref _consumerCount) == 0)
return;
Interlocked.Decrement(ref _consumerCount);
}
/// <summary>
/// Adjusts the tracked storage usage by <paramref name="deltaBytes"/>.
/// Returns false if the positive delta would exceed <see cref="AccountLimits.MaxStorage"/>.
/// A negative delta always succeeds.
/// </summary>
public bool TrackStorageDelta(long deltaBytes)
{
var maxStorage = JetStreamLimits.MaxStorage;
if (deltaBytes > 0 && maxStorage > 0)
{
var current = Interlocked.Read(ref _storageUsed);
if (current + deltaBytes > maxStorage)
return false;
}
Interlocked.Add(ref _storageUsed, deltaBytes);
return true;
}
// Per-account message/byte stats
private long _inMsgs;
private long _outMsgs;
@@ -146,6 +200,12 @@ public sealed class Account : IDisposable
Exports.Streams[subject] = new StreamExport { Auth = auth };
}
/// <summary>
/// Adds a service import with cycle detection.
/// Go reference: accounts.go addServiceImport with checkForImportCycle.
/// </summary>
/// <exception cref="InvalidOperationException">Thrown if no export found or import would create a cycle.</exception>
/// <exception cref="UnauthorizedAccessException">Thrown if this account is not authorized.</exception>
public ServiceImport AddServiceImport(Account destination, string from, string to)
{
if (!destination.Exports.Services.TryGetValue(to, out var export))
@@ -154,6 +214,11 @@ public sealed class Account : IDisposable
if (!export.Auth.IsAuthorized(this))
throw new UnauthorizedAccessException($"Account '{Name}' not authorized to import '{to}' from '{destination.Name}'");
// Cycle detection: check if adding this import from destination would
// create a path back to this account.
if (AccountImportExport.DetectCycle(destination, this))
throw new InvalidOperationException("Import would create a cycle");
var si = new ServiceImport
{
DestinationAccount = destination,
@@ -167,6 +232,13 @@ public sealed class Account : IDisposable
return si;
}
/// <summary>Removes a service import by its 'from' subject.</summary>
/// <returns>True if the import was found and removed.</returns>
public bool RemoveServiceImport(string from)
{
return Imports.Services.Remove(from);
}
public void AddStreamImport(Account source, string from, string to)
{
if (!source.Exports.Streams.TryGetValue(from, out var export))
@@ -185,5 +257,16 @@ public sealed class Account : IDisposable
Imports.Streams.Add(si);
}
/// <summary>Removes a stream import by its 'from' subject.</summary>
/// <returns>True if the import was found and removed.</returns>
public bool RemoveStreamImport(string from)
{
var idx = Imports.Streams.FindIndex(s => string.Equals(s.From, from, StringComparison.Ordinal));
if (idx < 0)
return false;
Imports.Streams.RemoveAt(idx);
return true;
}
public void Dispose() => SubList.Dispose();
}

View File

@@ -0,0 +1,76 @@
// Ported from Go accounts.go:1500-2000 — cycle detection for service imports.
using NATS.Server.Imports;
namespace NATS.Server.Auth;
/// <summary>
/// Provides cycle detection and validation for cross-account service imports.
/// Go reference: accounts.go checkForImportCycle / addServiceImport.
/// </summary>
public static class AccountImportExport
{
/// <summary>
/// DFS through the service import graph to detect cycles.
/// Returns true if following service imports from <paramref name="from"/>
/// eventually leads back to <paramref name="to"/>.
/// </summary>
public static bool DetectCycle(Account from, Account to, HashSet<string>? visited = null)
{
ArgumentNullException.ThrowIfNull(from);
ArgumentNullException.ThrowIfNull(to);
visited ??= new HashSet<string>(StringComparer.Ordinal);
if (!visited.Add(from.Name))
return false; // Already visited, no new cycle found from this node
// Walk all service imports from the 'from' account
foreach (var kvp in from.Imports.Services)
{
foreach (var serviceImport in kvp.Value)
{
var dest = serviceImport.DestinationAccount;
// Direct cycle: import destination is the target account
if (string.Equals(dest.Name, to.Name, StringComparison.Ordinal))
return true;
// Indirect cycle: recursively check if destination leads back to target
if (DetectCycle(dest, to, visited))
return true;
}
}
return false;
}
/// <summary>
/// Validates that the import is authorized and does not create a cycle.
/// </summary>
/// <exception cref="UnauthorizedAccessException">Thrown when the importing account is not authorized.</exception>
/// <exception cref="InvalidOperationException">Thrown when the import would create a cycle.</exception>
public static void ValidateImport(Account importingAccount, Account exportingAccount, string exportSubject)
{
ArgumentNullException.ThrowIfNull(importingAccount);
ArgumentNullException.ThrowIfNull(exportingAccount);
// Check authorization first
if (exportingAccount.Exports.Services.TryGetValue(exportSubject, out var export))
{
if (!export.Auth.IsAuthorized(importingAccount))
throw new UnauthorizedAccessException(
$"Account '{importingAccount.Name}' not authorized to import '{exportSubject}' from '{exportingAccount.Name}'");
}
else
{
throw new InvalidOperationException(
$"No service export found for '{exportSubject}' on account '{exportingAccount.Name}'");
}
// Check for cycles: would importing from exportingAccount create a cycle
// back to importingAccount?
if (DetectCycle(exportingAccount, importingAccount))
throw new InvalidOperationException("Import would create a cycle");
}
}

View File

@@ -0,0 +1,32 @@
// Per-account JetStream resource limits.
// Go reference: accounts.go JetStreamAccountLimits struct.
namespace NATS.Server.Auth;
/// <summary>
/// Per-account limits on JetStream resources: storage, streams, consumers, and ack pending.
/// A value of 0 means unlimited for all fields.
/// </summary>
public sealed record AccountLimits
{
/// <summary>Maximum total storage in bytes (0 = unlimited).</summary>
public long MaxStorage { get; init; }
/// <summary>Maximum number of streams (0 = unlimited).</summary>
public int MaxStreams { get; init; }
/// <summary>Maximum number of consumers (0 = unlimited).</summary>
public int MaxConsumers { get; init; }
/// <summary>Maximum pending ack count per consumer (0 = unlimited).</summary>
public int MaxAckPending { get; init; }
/// <summary>Maximum memory-based storage in bytes (0 = unlimited).</summary>
public long MaxMemoryStorage { get; init; }
/// <summary>Maximum disk-based storage in bytes (0 = unlimited).</summary>
public long MaxDiskStorage { get; init; }
/// <summary>Default instance with all limits set to unlimited (0).</summary>
public static AccountLimits Unlimited { get; } = new();
}