diff --git a/src/NATS.Server/Auth/Account.cs b/src/NATS.Server/Auth/Account.cs
index 3e7210f..f7b1da6 100644
--- a/src/NATS.Server/Auth/Account.cs
+++ b/src/NATS.Server/Auth/Account.cs
@@ -18,6 +18,9 @@ public sealed class Account : IDisposable
public int MaxJetStreamStreams { get; set; } // 0 = unlimited
public string? JetStreamTier { get; set; }
+ /// Per-account JetStream resource limits (storage, consumers, ack pending).
+ public AccountLimits JetStreamLimits { get; set; } = AccountLimits.Unlimited;
+
// JWT fields
public string? Nkey { get; set; }
public string? Issuer { get; set; }
@@ -39,6 +42,8 @@ public sealed class Account : IDisposable
private readonly ConcurrentDictionary _clients = new();
private int _subscriptionCount;
private int _jetStreamStreamCount;
+ private int _consumerCount;
+ private long _storageUsed;
public Account(string name)
{
@@ -48,6 +53,8 @@ public sealed class Account : IDisposable
public int ClientCount => _clients.Count;
public int SubscriptionCount => Volatile.Read(ref _subscriptionCount);
public int JetStreamStreamCount => Volatile.Read(ref _jetStreamStreamCount);
+ public int ConsumerCount => Volatile.Read(ref _consumerCount);
+ public long StorageUsed => Interlocked.Read(ref _storageUsed);
/// Returns false if max connections exceeded.
public bool AddClient(ulong clientId)
@@ -73,9 +80,17 @@ public sealed class Account : IDisposable
Interlocked.Decrement(ref _subscriptionCount);
}
+ ///
+ /// Reserves a stream slot, checking both (legacy)
+ /// and ..
+ ///
public bool TryReserveStream()
{
- if (MaxJetStreamStreams > 0 && Volatile.Read(ref _jetStreamStreamCount) >= MaxJetStreamStreams)
+ var effectiveMax = JetStreamLimits.MaxStreams > 0
+ ? JetStreamLimits.MaxStreams
+ : MaxJetStreamStreams;
+
+ if (effectiveMax > 0 && Volatile.Read(ref _jetStreamStreamCount) >= effectiveMax)
return false;
Interlocked.Increment(ref _jetStreamStreamCount);
@@ -90,6 +105,45 @@ public sealed class Account : IDisposable
Interlocked.Decrement(ref _jetStreamStreamCount);
}
+ /// Reserves a consumer slot. Returns false if is exceeded.
+ public bool TryReserveConsumer()
+ {
+ var max = JetStreamLimits.MaxConsumers;
+ if (max > 0 && Volatile.Read(ref _consumerCount) >= max)
+ return false;
+
+ Interlocked.Increment(ref _consumerCount);
+ return true;
+ }
+
+ public void ReleaseConsumer()
+ {
+ if (Volatile.Read(ref _consumerCount) == 0)
+ return;
+
+ Interlocked.Decrement(ref _consumerCount);
+ }
+
+ ///
+ /// Adjusts the tracked storage usage by .
+ /// Returns false if the positive delta would exceed .
+ /// A negative delta always succeeds.
+ ///
+ public bool TrackStorageDelta(long deltaBytes)
+ {
+ var maxStorage = JetStreamLimits.MaxStorage;
+
+ if (deltaBytes > 0 && maxStorage > 0)
+ {
+ var current = Interlocked.Read(ref _storageUsed);
+ if (current + deltaBytes > maxStorage)
+ return false;
+ }
+
+ Interlocked.Add(ref _storageUsed, deltaBytes);
+ return true;
+ }
+
// Per-account message/byte stats
private long _inMsgs;
private long _outMsgs;
@@ -146,6 +200,12 @@ public sealed class Account : IDisposable
Exports.Streams[subject] = new StreamExport { Auth = auth };
}
+ ///
+ /// Adds a service import with cycle detection.
+ /// Go reference: accounts.go addServiceImport with checkForImportCycle.
+ ///
+ /// Thrown if no export found or import would create a cycle.
+ /// Thrown if this account is not authorized.
public ServiceImport AddServiceImport(Account destination, string from, string to)
{
if (!destination.Exports.Services.TryGetValue(to, out var export))
@@ -154,6 +214,11 @@ public sealed class Account : IDisposable
if (!export.Auth.IsAuthorized(this))
throw new UnauthorizedAccessException($"Account '{Name}' not authorized to import '{to}' from '{destination.Name}'");
+ // Cycle detection: check if adding this import from destination would
+ // create a path back to this account.
+ if (AccountImportExport.DetectCycle(destination, this))
+ throw new InvalidOperationException("Import would create a cycle");
+
var si = new ServiceImport
{
DestinationAccount = destination,
@@ -167,6 +232,13 @@ public sealed class Account : IDisposable
return si;
}
+ /// Removes a service import by its 'from' subject.
+ /// True if the import was found and removed.
+ public bool RemoveServiceImport(string from)
+ {
+ return Imports.Services.Remove(from);
+ }
+
public void AddStreamImport(Account source, string from, string to)
{
if (!source.Exports.Streams.TryGetValue(from, out var export))
@@ -185,5 +257,16 @@ public sealed class Account : IDisposable
Imports.Streams.Add(si);
}
+ /// Removes a stream import by its 'from' subject.
+ /// True if the import was found and removed.
+ public bool RemoveStreamImport(string from)
+ {
+ var idx = Imports.Streams.FindIndex(s => string.Equals(s.From, from, StringComparison.Ordinal));
+ if (idx < 0)
+ return false;
+ Imports.Streams.RemoveAt(idx);
+ return true;
+ }
+
public void Dispose() => SubList.Dispose();
}
diff --git a/src/NATS.Server/Auth/AccountImportExport.cs b/src/NATS.Server/Auth/AccountImportExport.cs
new file mode 100644
index 0000000..eae4238
--- /dev/null
+++ b/src/NATS.Server/Auth/AccountImportExport.cs
@@ -0,0 +1,76 @@
+// Ported from Go accounts.go:1500-2000 — cycle detection for service imports.
+
+using NATS.Server.Imports;
+
+namespace NATS.Server.Auth;
+
+///
+/// Provides cycle detection and validation for cross-account service imports.
+/// Go reference: accounts.go checkForImportCycle / addServiceImport.
+///
+public static class AccountImportExport
+{
+ ///
+ /// DFS through the service import graph to detect cycles.
+ /// Returns true if following service imports from
+ /// eventually leads back to .
+ ///
+ public static bool DetectCycle(Account from, Account to, HashSet? visited = null)
+ {
+ ArgumentNullException.ThrowIfNull(from);
+ ArgumentNullException.ThrowIfNull(to);
+
+ visited ??= new HashSet(StringComparer.Ordinal);
+
+ if (!visited.Add(from.Name))
+ return false; // Already visited, no new cycle found from this node
+
+ // Walk all service imports from the 'from' account
+ foreach (var kvp in from.Imports.Services)
+ {
+ foreach (var serviceImport in kvp.Value)
+ {
+ var dest = serviceImport.DestinationAccount;
+
+ // Direct cycle: import destination is the target account
+ if (string.Equals(dest.Name, to.Name, StringComparison.Ordinal))
+ return true;
+
+ // Indirect cycle: recursively check if destination leads back to target
+ if (DetectCycle(dest, to, visited))
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ ///
+ /// Validates that the import is authorized and does not create a cycle.
+ ///
+ /// Thrown when the importing account is not authorized.
+ /// Thrown when the import would create a cycle.
+ public static void ValidateImport(Account importingAccount, Account exportingAccount, string exportSubject)
+ {
+ ArgumentNullException.ThrowIfNull(importingAccount);
+ ArgumentNullException.ThrowIfNull(exportingAccount);
+
+ // Check authorization first
+ if (exportingAccount.Exports.Services.TryGetValue(exportSubject, out var export))
+ {
+ if (!export.Auth.IsAuthorized(importingAccount))
+ throw new UnauthorizedAccessException(
+ $"Account '{importingAccount.Name}' not authorized to import '{exportSubject}' from '{exportingAccount.Name}'");
+ }
+ else
+ {
+ throw new InvalidOperationException(
+ $"No service export found for '{exportSubject}' on account '{exportingAccount.Name}'");
+ }
+
+ // Check for cycles: would importing from exportingAccount create a cycle
+ // back to importingAccount?
+ if (DetectCycle(exportingAccount, importingAccount))
+ throw new InvalidOperationException("Import would create a cycle");
+ }
+}
diff --git a/src/NATS.Server/Auth/AccountLimits.cs b/src/NATS.Server/Auth/AccountLimits.cs
new file mode 100644
index 0000000..b1d3a61
--- /dev/null
+++ b/src/NATS.Server/Auth/AccountLimits.cs
@@ -0,0 +1,32 @@
+// Per-account JetStream resource limits.
+// Go reference: accounts.go JetStreamAccountLimits struct.
+
+namespace NATS.Server.Auth;
+
+///
+/// Per-account limits on JetStream resources: storage, streams, consumers, and ack pending.
+/// A value of 0 means unlimited for all fields.
+///
+public sealed record AccountLimits
+{
+ /// Maximum total storage in bytes (0 = unlimited).
+ public long MaxStorage { get; init; }
+
+ /// Maximum number of streams (0 = unlimited).
+ public int MaxStreams { get; init; }
+
+ /// Maximum number of consumers (0 = unlimited).
+ public int MaxConsumers { get; init; }
+
+ /// Maximum pending ack count per consumer (0 = unlimited).
+ public int MaxAckPending { get; init; }
+
+ /// Maximum memory-based storage in bytes (0 = unlimited).
+ public long MaxMemoryStorage { get; init; }
+
+ /// Maximum disk-based storage in bytes (0 = unlimited).
+ public long MaxDiskStorage { get; init; }
+
+ /// Default instance with all limits set to unlimited (0).
+ public static AccountLimits Unlimited { get; } = new();
+}
diff --git a/tests/NATS.Server.Tests/Auth/AccountImportExportTests.cs b/tests/NATS.Server.Tests/Auth/AccountImportExportTests.cs
new file mode 100644
index 0000000..e76e021
--- /dev/null
+++ b/tests/NATS.Server.Tests/Auth/AccountImportExportTests.cs
@@ -0,0 +1,211 @@
+// Tests for account import/export cycle detection.
+// Go reference: accounts_test.go TestAccountImportCycleDetection.
+
+using NATS.Server.Auth;
+using NATS.Server.Imports;
+
+namespace NATS.Server.Tests.Auth;
+
+public class AccountImportExportTests
+{
+ private static Account CreateAccount(string name) => new(name);
+
+ private static void SetupServiceExport(Account exporter, string subject, IEnumerable? approved = null)
+ {
+ exporter.AddServiceExport(subject, ServiceResponseType.Singleton, approved);
+ }
+
+ [Fact]
+ public void AddServiceImport_NoCycle_Succeeds()
+ {
+ // A exports "svc.foo", B imports from A — no cycle
+ var a = CreateAccount("A");
+ var b = CreateAccount("B");
+
+ SetupServiceExport(a, "svc.foo"); // public export (no approved list)
+
+ var import = b.AddServiceImport(a, "svc.foo", "svc.foo");
+
+ import.ShouldNotBeNull();
+ import.DestinationAccount.Name.ShouldBe("A");
+ import.From.ShouldBe("svc.foo");
+ b.Imports.Services.ShouldContainKey("svc.foo");
+ }
+
+ [Fact]
+ public void AddServiceImport_DirectCycle_Throws()
+ {
+ // A exports "svc.foo", B exports "svc.bar"
+ // B imports "svc.foo" from A (ok)
+ // A imports "svc.bar" from B — creates cycle A->B->A
+ var a = CreateAccount("A");
+ var b = CreateAccount("B");
+
+ SetupServiceExport(a, "svc.foo");
+ SetupServiceExport(b, "svc.bar");
+
+ b.AddServiceImport(a, "svc.foo", "svc.foo");
+
+ Should.Throw(() => a.AddServiceImport(b, "svc.bar", "svc.bar"))
+ .Message.ShouldContain("cycle");
+ }
+
+ [Fact]
+ public void AddServiceImport_IndirectCycle_A_B_C_A_Throws()
+ {
+ // A->B->C, then C->A creates indirect cycle
+ var a = CreateAccount("A");
+ var b = CreateAccount("B");
+ var c = CreateAccount("C");
+
+ SetupServiceExport(a, "svc.a");
+ SetupServiceExport(b, "svc.b");
+ SetupServiceExport(c, "svc.c");
+
+ // B imports from A
+ b.AddServiceImport(a, "svc.a", "svc.a");
+ // C imports from B
+ c.AddServiceImport(b, "svc.b", "svc.b");
+ // A imports from C — would create C->B->A->C cycle
+ Should.Throw(() => a.AddServiceImport(c, "svc.c", "svc.c"))
+ .Message.ShouldContain("cycle");
+ }
+
+ [Fact]
+ public void DetectCycle_NoCycle_ReturnsFalse()
+ {
+ var a = CreateAccount("A");
+ var b = CreateAccount("B");
+ var c = CreateAccount("C");
+
+ SetupServiceExport(a, "svc.a");
+ SetupServiceExport(b, "svc.b");
+
+ // A imports from B, B imports from C — linear chain, no cycle back to A
+ // For this test we manually add imports without cycle check via ImportMap
+ b.Imports.AddServiceImport(new ServiceImport
+ {
+ DestinationAccount = a,
+ From = "svc.a",
+ To = "svc.a",
+ });
+
+ // Check: does following imports from A lead back to C? No.
+ AccountImportExport.DetectCycle(a, c).ShouldBeFalse();
+ }
+
+ [Fact]
+ public void DetectCycle_DirectCycle_ReturnsTrue()
+ {
+ var a = CreateAccount("A");
+ var b = CreateAccount("B");
+
+ // A has import pointing to B
+ a.Imports.AddServiceImport(new ServiceImport
+ {
+ DestinationAccount = b,
+ From = "svc.x",
+ To = "svc.x",
+ });
+
+ // Does following from A lead to B? Yes.
+ AccountImportExport.DetectCycle(a, b).ShouldBeTrue();
+ }
+
+ [Fact]
+ public void DetectCycle_IndirectCycle_ReturnsTrue()
+ {
+ var a = CreateAccount("A");
+ var b = CreateAccount("B");
+ var c = CreateAccount("C");
+
+ // A -> B -> C (imports)
+ a.Imports.AddServiceImport(new ServiceImport
+ {
+ DestinationAccount = b,
+ From = "svc.1",
+ To = "svc.1",
+ });
+ b.Imports.AddServiceImport(new ServiceImport
+ {
+ DestinationAccount = c,
+ From = "svc.2",
+ To = "svc.2",
+ });
+
+ // Does following from A lead to C? Yes, via B.
+ AccountImportExport.DetectCycle(a, c).ShouldBeTrue();
+ }
+
+ [Fact]
+ public void RemoveServiceImport_ExistingImport_Succeeds()
+ {
+ var a = CreateAccount("A");
+ var b = CreateAccount("B");
+
+ SetupServiceExport(a, "svc.foo");
+ b.AddServiceImport(a, "svc.foo", "svc.foo");
+
+ b.Imports.Services.ShouldContainKey("svc.foo");
+
+ b.RemoveServiceImport("svc.foo").ShouldBeTrue();
+ b.Imports.Services.ShouldNotContainKey("svc.foo");
+
+ // Removing again returns false
+ b.RemoveServiceImport("svc.foo").ShouldBeFalse();
+ }
+
+ [Fact]
+ public void RemoveStreamImport_ExistingImport_Succeeds()
+ {
+ var a = CreateAccount("A");
+ var b = CreateAccount("B");
+
+ a.AddStreamExport("stream.data", null); // public
+ b.AddStreamImport(a, "stream.data", "imported.data");
+
+ b.Imports.Streams.Count.ShouldBe(1);
+
+ b.RemoveStreamImport("stream.data").ShouldBeTrue();
+ b.Imports.Streams.Count.ShouldBe(0);
+
+ // Removing again returns false
+ b.RemoveStreamImport("stream.data").ShouldBeFalse();
+ }
+
+ [Fact]
+ public void ValidateImport_UnauthorizedAccount_Throws()
+ {
+ var exporter = CreateAccount("Exporter");
+ var importer = CreateAccount("Importer");
+ var approved = CreateAccount("Approved");
+
+ // Export only approves "Approved" account, not "Importer"
+ SetupServiceExport(exporter, "svc.restricted", [approved]);
+
+ Should.Throw(
+ () => AccountImportExport.ValidateImport(importer, exporter, "svc.restricted"))
+ .Message.ShouldContain("not authorized");
+ }
+
+ [Fact]
+ public void AddStreamImport_NoCycleCheck_Succeeds()
+ {
+ // Stream imports do not require cycle detection (unlike service imports).
+ // Even with a "circular" stream import topology, it should succeed.
+ var a = CreateAccount("A");
+ var b = CreateAccount("B");
+
+ a.AddStreamExport("stream.a", null);
+ b.AddStreamExport("stream.b", null);
+
+ // B imports stream from A
+ b.AddStreamImport(a, "stream.a", "imported.a");
+
+ // A imports stream from B — no cycle check for streams
+ a.AddStreamImport(b, "stream.b", "imported.b");
+
+ a.Imports.Streams.Count.ShouldBe(1);
+ b.Imports.Streams.Count.ShouldBe(1);
+ }
+}
diff --git a/tests/NATS.Server.Tests/Auth/AccountLimitsTests.cs b/tests/NATS.Server.Tests/Auth/AccountLimitsTests.cs
new file mode 100644
index 0000000..3506706
--- /dev/null
+++ b/tests/NATS.Server.Tests/Auth/AccountLimitsTests.cs
@@ -0,0 +1,169 @@
+// Tests for per-account JetStream resource limits.
+// Go reference: accounts_test.go TestAccountLimits, TestJetStreamLimits.
+
+using NATS.Server.Auth;
+
+namespace NATS.Server.Tests.Auth;
+
+public class AccountLimitsTests
+{
+ [Fact]
+ public void TryReserveConsumer_UnderLimit_ReturnsTrue()
+ {
+ var account = new Account("test")
+ {
+ JetStreamLimits = new AccountLimits { MaxConsumers = 3 },
+ };
+
+ account.TryReserveConsumer().ShouldBeTrue();
+ account.TryReserveConsumer().ShouldBeTrue();
+ account.TryReserveConsumer().ShouldBeTrue();
+ account.ConsumerCount.ShouldBe(3);
+ }
+
+ [Fact]
+ public void TryReserveConsumer_AtLimit_ReturnsFalse()
+ {
+ var account = new Account("test")
+ {
+ JetStreamLimits = new AccountLimits { MaxConsumers = 2 },
+ };
+
+ account.TryReserveConsumer().ShouldBeTrue();
+ account.TryReserveConsumer().ShouldBeTrue();
+ account.TryReserveConsumer().ShouldBeFalse();
+ account.ConsumerCount.ShouldBe(2);
+ }
+
+ [Fact]
+ public void ReleaseConsumer_DecrementsCount()
+ {
+ var account = new Account("test")
+ {
+ JetStreamLimits = new AccountLimits { MaxConsumers = 2 },
+ };
+
+ account.TryReserveConsumer().ShouldBeTrue();
+ account.TryReserveConsumer().ShouldBeTrue();
+ account.ConsumerCount.ShouldBe(2);
+
+ account.ReleaseConsumer();
+ account.ConsumerCount.ShouldBe(1);
+
+ // Now we can reserve again
+ account.TryReserveConsumer().ShouldBeTrue();
+ account.ConsumerCount.ShouldBe(2);
+ }
+
+ [Fact]
+ public void TrackStorageDelta_UnderLimit_ReturnsTrue()
+ {
+ var account = new Account("test")
+ {
+ JetStreamLimits = new AccountLimits { MaxStorage = 1000 },
+ };
+
+ account.TrackStorageDelta(500).ShouldBeTrue();
+ account.StorageUsed.ShouldBe(500);
+
+ account.TrackStorageDelta(400).ShouldBeTrue();
+ account.StorageUsed.ShouldBe(900);
+ }
+
+ [Fact]
+ public void TrackStorageDelta_ExceedsLimit_ReturnsFalse()
+ {
+ var account = new Account("test")
+ {
+ JetStreamLimits = new AccountLimits { MaxStorage = 1000 },
+ };
+
+ account.TrackStorageDelta(800).ShouldBeTrue();
+ account.TrackStorageDelta(300).ShouldBeFalse(); // 800 + 300 = 1100 > 1000
+ account.StorageUsed.ShouldBe(800); // unchanged
+ }
+
+ [Fact]
+ public void TrackStorageDelta_NegativeDelta_ReducesUsage()
+ {
+ var account = new Account("test")
+ {
+ JetStreamLimits = new AccountLimits { MaxStorage = 1000 },
+ };
+
+ account.TrackStorageDelta(800).ShouldBeTrue();
+ account.TrackStorageDelta(-300).ShouldBeTrue(); // negative always succeeds
+ account.StorageUsed.ShouldBe(500);
+
+ // Now we have room again
+ account.TrackStorageDelta(400).ShouldBeTrue();
+ account.StorageUsed.ShouldBe(900);
+ }
+
+ [Fact]
+ public void MaxStorage_Zero_Unlimited()
+ {
+ var account = new Account("test")
+ {
+ JetStreamLimits = new AccountLimits { MaxStorage = 0 }, // unlimited
+ };
+
+ // Should accept any amount
+ account.TrackStorageDelta(long.MaxValue / 2).ShouldBeTrue();
+ account.StorageUsed.ShouldBe(long.MaxValue / 2);
+ }
+
+ [Fact]
+ public void Limits_DefaultValues_AllUnlimited()
+ {
+ var limits = AccountLimits.Unlimited;
+
+ limits.MaxStorage.ShouldBe(0);
+ limits.MaxStreams.ShouldBe(0);
+ limits.MaxConsumers.ShouldBe(0);
+ limits.MaxAckPending.ShouldBe(0);
+ limits.MaxMemoryStorage.ShouldBe(0);
+ limits.MaxDiskStorage.ShouldBe(0);
+
+ // Account defaults to unlimited
+ var account = new Account("test");
+ account.JetStreamLimits.ShouldBe(AccountLimits.Unlimited);
+ }
+
+ [Fact]
+ public void TryReserveStream_WithLimits_RespectsNewLimits()
+ {
+ // JetStreamLimits.MaxStreams should take precedence over MaxJetStreamStreams
+ var account = new Account("test")
+ {
+ MaxJetStreamStreams = 10, // legacy field
+ JetStreamLimits = new AccountLimits { MaxStreams = 2 }, // new limit overrides
+ };
+
+ account.TryReserveStream().ShouldBeTrue();
+ account.TryReserveStream().ShouldBeTrue();
+ account.TryReserveStream().ShouldBeFalse(); // limited to 2 by JetStreamLimits
+ account.JetStreamStreamCount.ShouldBe(2);
+ }
+
+ [Fact]
+ public void EvictOldestClient_WhenMaxConnectionsExceeded()
+ {
+ var account = new Account("test")
+ {
+ MaxConnections = 2,
+ };
+
+ account.AddClient(1).ShouldBeTrue();
+ account.AddClient(2).ShouldBeTrue();
+ account.AddClient(3).ShouldBeFalse(); // at limit
+ account.ClientCount.ShouldBe(2);
+
+ // Remove oldest, then new one can connect
+ account.RemoveClient(1);
+ account.ClientCount.ShouldBe(1);
+
+ account.AddClient(3).ShouldBeTrue();
+ account.ClientCount.ShouldBe(2);
+ }
+}