From 235971ddccdab4105b9856276dae13945cff996d Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 24 Feb 2026 15:25:12 -0500 Subject: [PATCH] feat(auth): add account import/export cycle detection and JetStream limits (E4+E5) E4: AccountImportExport with DFS cycle detection for service imports, RemoveServiceImport/RemoveStreamImport, and ValidateImport authorization. E5: AccountLimits record with MaxStorage/MaxConsumers/MaxAckPending, TryReserveConsumer/ReleaseConsumer, TrackStorageDelta on Account. 20 new tests, all passing. --- src/NATS.Server/Auth/Account.cs | 85 ++++++- src/NATS.Server/Auth/AccountImportExport.cs | 76 +++++++ src/NATS.Server/Auth/AccountLimits.cs | 32 +++ .../Auth/AccountImportExportTests.cs | 211 ++++++++++++++++++ .../Auth/AccountLimitsTests.cs | 169 ++++++++++++++ 5 files changed, 572 insertions(+), 1 deletion(-) create mode 100644 src/NATS.Server/Auth/AccountImportExport.cs create mode 100644 src/NATS.Server/Auth/AccountLimits.cs create mode 100644 tests/NATS.Server.Tests/Auth/AccountImportExportTests.cs create mode 100644 tests/NATS.Server.Tests/Auth/AccountLimitsTests.cs diff --git a/src/NATS.Server/Auth/Account.cs b/src/NATS.Server/Auth/Account.cs index 3e7210f..f7b1da6 100644 --- a/src/NATS.Server/Auth/Account.cs +++ b/src/NATS.Server/Auth/Account.cs @@ -18,6 +18,9 @@ public sealed class Account : IDisposable public int MaxJetStreamStreams { get; set; } // 0 = unlimited public string? JetStreamTier { get; set; } + /// Per-account JetStream resource limits (storage, consumers, ack pending). + public AccountLimits JetStreamLimits { get; set; } = AccountLimits.Unlimited; + // JWT fields public string? Nkey { get; set; } public string? Issuer { get; set; } @@ -39,6 +42,8 @@ public sealed class Account : IDisposable private readonly ConcurrentDictionary _clients = new(); private int _subscriptionCount; private int _jetStreamStreamCount; + private int _consumerCount; + private long _storageUsed; public Account(string name) { @@ -48,6 +53,8 @@ public sealed class Account : IDisposable public int ClientCount => _clients.Count; public int SubscriptionCount => Volatile.Read(ref _subscriptionCount); public int JetStreamStreamCount => Volatile.Read(ref _jetStreamStreamCount); + public int ConsumerCount => Volatile.Read(ref _consumerCount); + public long StorageUsed => Interlocked.Read(ref _storageUsed); /// Returns false if max connections exceeded. public bool AddClient(ulong clientId) @@ -73,9 +80,17 @@ public sealed class Account : IDisposable Interlocked.Decrement(ref _subscriptionCount); } + /// + /// Reserves a stream slot, checking both (legacy) + /// and .. + /// public bool TryReserveStream() { - if (MaxJetStreamStreams > 0 && Volatile.Read(ref _jetStreamStreamCount) >= MaxJetStreamStreams) + var effectiveMax = JetStreamLimits.MaxStreams > 0 + ? JetStreamLimits.MaxStreams + : MaxJetStreamStreams; + + if (effectiveMax > 0 && Volatile.Read(ref _jetStreamStreamCount) >= effectiveMax) return false; Interlocked.Increment(ref _jetStreamStreamCount); @@ -90,6 +105,45 @@ public sealed class Account : IDisposable Interlocked.Decrement(ref _jetStreamStreamCount); } + /// Reserves a consumer slot. Returns false if is exceeded. + public bool TryReserveConsumer() + { + var max = JetStreamLimits.MaxConsumers; + if (max > 0 && Volatile.Read(ref _consumerCount) >= max) + return false; + + Interlocked.Increment(ref _consumerCount); + return true; + } + + public void ReleaseConsumer() + { + if (Volatile.Read(ref _consumerCount) == 0) + return; + + Interlocked.Decrement(ref _consumerCount); + } + + /// + /// Adjusts the tracked storage usage by . + /// Returns false if the positive delta would exceed . + /// A negative delta always succeeds. + /// + public bool TrackStorageDelta(long deltaBytes) + { + var maxStorage = JetStreamLimits.MaxStorage; + + if (deltaBytes > 0 && maxStorage > 0) + { + var current = Interlocked.Read(ref _storageUsed); + if (current + deltaBytes > maxStorage) + return false; + } + + Interlocked.Add(ref _storageUsed, deltaBytes); + return true; + } + // Per-account message/byte stats private long _inMsgs; private long _outMsgs; @@ -146,6 +200,12 @@ public sealed class Account : IDisposable Exports.Streams[subject] = new StreamExport { Auth = auth }; } + /// + /// Adds a service import with cycle detection. + /// Go reference: accounts.go addServiceImport with checkForImportCycle. + /// + /// Thrown if no export found or import would create a cycle. + /// Thrown if this account is not authorized. public ServiceImport AddServiceImport(Account destination, string from, string to) { if (!destination.Exports.Services.TryGetValue(to, out var export)) @@ -154,6 +214,11 @@ public sealed class Account : IDisposable if (!export.Auth.IsAuthorized(this)) throw new UnauthorizedAccessException($"Account '{Name}' not authorized to import '{to}' from '{destination.Name}'"); + // Cycle detection: check if adding this import from destination would + // create a path back to this account. + if (AccountImportExport.DetectCycle(destination, this)) + throw new InvalidOperationException("Import would create a cycle"); + var si = new ServiceImport { DestinationAccount = destination, @@ -167,6 +232,13 @@ public sealed class Account : IDisposable return si; } + /// Removes a service import by its 'from' subject. + /// True if the import was found and removed. + public bool RemoveServiceImport(string from) + { + return Imports.Services.Remove(from); + } + public void AddStreamImport(Account source, string from, string to) { if (!source.Exports.Streams.TryGetValue(from, out var export)) @@ -185,5 +257,16 @@ public sealed class Account : IDisposable Imports.Streams.Add(si); } + /// Removes a stream import by its 'from' subject. + /// True if the import was found and removed. + public bool RemoveStreamImport(string from) + { + var idx = Imports.Streams.FindIndex(s => string.Equals(s.From, from, StringComparison.Ordinal)); + if (idx < 0) + return false; + Imports.Streams.RemoveAt(idx); + return true; + } + public void Dispose() => SubList.Dispose(); } diff --git a/src/NATS.Server/Auth/AccountImportExport.cs b/src/NATS.Server/Auth/AccountImportExport.cs new file mode 100644 index 0000000..eae4238 --- /dev/null +++ b/src/NATS.Server/Auth/AccountImportExport.cs @@ -0,0 +1,76 @@ +// Ported from Go accounts.go:1500-2000 — cycle detection for service imports. + +using NATS.Server.Imports; + +namespace NATS.Server.Auth; + +/// +/// Provides cycle detection and validation for cross-account service imports. +/// Go reference: accounts.go checkForImportCycle / addServiceImport. +/// +public static class AccountImportExport +{ + /// + /// DFS through the service import graph to detect cycles. + /// Returns true if following service imports from + /// eventually leads back to . + /// + public static bool DetectCycle(Account from, Account to, HashSet? visited = null) + { + ArgumentNullException.ThrowIfNull(from); + ArgumentNullException.ThrowIfNull(to); + + visited ??= new HashSet(StringComparer.Ordinal); + + if (!visited.Add(from.Name)) + return false; // Already visited, no new cycle found from this node + + // Walk all service imports from the 'from' account + foreach (var kvp in from.Imports.Services) + { + foreach (var serviceImport in kvp.Value) + { + var dest = serviceImport.DestinationAccount; + + // Direct cycle: import destination is the target account + if (string.Equals(dest.Name, to.Name, StringComparison.Ordinal)) + return true; + + // Indirect cycle: recursively check if destination leads back to target + if (DetectCycle(dest, to, visited)) + return true; + } + } + + return false; + } + + /// + /// Validates that the import is authorized and does not create a cycle. + /// + /// Thrown when the importing account is not authorized. + /// Thrown when the import would create a cycle. + public static void ValidateImport(Account importingAccount, Account exportingAccount, string exportSubject) + { + ArgumentNullException.ThrowIfNull(importingAccount); + ArgumentNullException.ThrowIfNull(exportingAccount); + + // Check authorization first + if (exportingAccount.Exports.Services.TryGetValue(exportSubject, out var export)) + { + if (!export.Auth.IsAuthorized(importingAccount)) + throw new UnauthorizedAccessException( + $"Account '{importingAccount.Name}' not authorized to import '{exportSubject}' from '{exportingAccount.Name}'"); + } + else + { + throw new InvalidOperationException( + $"No service export found for '{exportSubject}' on account '{exportingAccount.Name}'"); + } + + // Check for cycles: would importing from exportingAccount create a cycle + // back to importingAccount? + if (DetectCycle(exportingAccount, importingAccount)) + throw new InvalidOperationException("Import would create a cycle"); + } +} diff --git a/src/NATS.Server/Auth/AccountLimits.cs b/src/NATS.Server/Auth/AccountLimits.cs new file mode 100644 index 0000000..b1d3a61 --- /dev/null +++ b/src/NATS.Server/Auth/AccountLimits.cs @@ -0,0 +1,32 @@ +// Per-account JetStream resource limits. +// Go reference: accounts.go JetStreamAccountLimits struct. + +namespace NATS.Server.Auth; + +/// +/// Per-account limits on JetStream resources: storage, streams, consumers, and ack pending. +/// A value of 0 means unlimited for all fields. +/// +public sealed record AccountLimits +{ + /// Maximum total storage in bytes (0 = unlimited). + public long MaxStorage { get; init; } + + /// Maximum number of streams (0 = unlimited). + public int MaxStreams { get; init; } + + /// Maximum number of consumers (0 = unlimited). + public int MaxConsumers { get; init; } + + /// Maximum pending ack count per consumer (0 = unlimited). + public int MaxAckPending { get; init; } + + /// Maximum memory-based storage in bytes (0 = unlimited). + public long MaxMemoryStorage { get; init; } + + /// Maximum disk-based storage in bytes (0 = unlimited). + public long MaxDiskStorage { get; init; } + + /// Default instance with all limits set to unlimited (0). + public static AccountLimits Unlimited { get; } = new(); +} diff --git a/tests/NATS.Server.Tests/Auth/AccountImportExportTests.cs b/tests/NATS.Server.Tests/Auth/AccountImportExportTests.cs new file mode 100644 index 0000000..e76e021 --- /dev/null +++ b/tests/NATS.Server.Tests/Auth/AccountImportExportTests.cs @@ -0,0 +1,211 @@ +// Tests for account import/export cycle detection. +// Go reference: accounts_test.go TestAccountImportCycleDetection. + +using NATS.Server.Auth; +using NATS.Server.Imports; + +namespace NATS.Server.Tests.Auth; + +public class AccountImportExportTests +{ + private static Account CreateAccount(string name) => new(name); + + private static void SetupServiceExport(Account exporter, string subject, IEnumerable? approved = null) + { + exporter.AddServiceExport(subject, ServiceResponseType.Singleton, approved); + } + + [Fact] + public void AddServiceImport_NoCycle_Succeeds() + { + // A exports "svc.foo", B imports from A — no cycle + var a = CreateAccount("A"); + var b = CreateAccount("B"); + + SetupServiceExport(a, "svc.foo"); // public export (no approved list) + + var import = b.AddServiceImport(a, "svc.foo", "svc.foo"); + + import.ShouldNotBeNull(); + import.DestinationAccount.Name.ShouldBe("A"); + import.From.ShouldBe("svc.foo"); + b.Imports.Services.ShouldContainKey("svc.foo"); + } + + [Fact] + public void AddServiceImport_DirectCycle_Throws() + { + // A exports "svc.foo", B exports "svc.bar" + // B imports "svc.foo" from A (ok) + // A imports "svc.bar" from B — creates cycle A->B->A + var a = CreateAccount("A"); + var b = CreateAccount("B"); + + SetupServiceExport(a, "svc.foo"); + SetupServiceExport(b, "svc.bar"); + + b.AddServiceImport(a, "svc.foo", "svc.foo"); + + Should.Throw(() => a.AddServiceImport(b, "svc.bar", "svc.bar")) + .Message.ShouldContain("cycle"); + } + + [Fact] + public void AddServiceImport_IndirectCycle_A_B_C_A_Throws() + { + // A->B->C, then C->A creates indirect cycle + var a = CreateAccount("A"); + var b = CreateAccount("B"); + var c = CreateAccount("C"); + + SetupServiceExport(a, "svc.a"); + SetupServiceExport(b, "svc.b"); + SetupServiceExport(c, "svc.c"); + + // B imports from A + b.AddServiceImport(a, "svc.a", "svc.a"); + // C imports from B + c.AddServiceImport(b, "svc.b", "svc.b"); + // A imports from C — would create C->B->A->C cycle + Should.Throw(() => a.AddServiceImport(c, "svc.c", "svc.c")) + .Message.ShouldContain("cycle"); + } + + [Fact] + public void DetectCycle_NoCycle_ReturnsFalse() + { + var a = CreateAccount("A"); + var b = CreateAccount("B"); + var c = CreateAccount("C"); + + SetupServiceExport(a, "svc.a"); + SetupServiceExport(b, "svc.b"); + + // A imports from B, B imports from C — linear chain, no cycle back to A + // For this test we manually add imports without cycle check via ImportMap + b.Imports.AddServiceImport(new ServiceImport + { + DestinationAccount = a, + From = "svc.a", + To = "svc.a", + }); + + // Check: does following imports from A lead back to C? No. + AccountImportExport.DetectCycle(a, c).ShouldBeFalse(); + } + + [Fact] + public void DetectCycle_DirectCycle_ReturnsTrue() + { + var a = CreateAccount("A"); + var b = CreateAccount("B"); + + // A has import pointing to B + a.Imports.AddServiceImport(new ServiceImport + { + DestinationAccount = b, + From = "svc.x", + To = "svc.x", + }); + + // Does following from A lead to B? Yes. + AccountImportExport.DetectCycle(a, b).ShouldBeTrue(); + } + + [Fact] + public void DetectCycle_IndirectCycle_ReturnsTrue() + { + var a = CreateAccount("A"); + var b = CreateAccount("B"); + var c = CreateAccount("C"); + + // A -> B -> C (imports) + a.Imports.AddServiceImport(new ServiceImport + { + DestinationAccount = b, + From = "svc.1", + To = "svc.1", + }); + b.Imports.AddServiceImport(new ServiceImport + { + DestinationAccount = c, + From = "svc.2", + To = "svc.2", + }); + + // Does following from A lead to C? Yes, via B. + AccountImportExport.DetectCycle(a, c).ShouldBeTrue(); + } + + [Fact] + public void RemoveServiceImport_ExistingImport_Succeeds() + { + var a = CreateAccount("A"); + var b = CreateAccount("B"); + + SetupServiceExport(a, "svc.foo"); + b.AddServiceImport(a, "svc.foo", "svc.foo"); + + b.Imports.Services.ShouldContainKey("svc.foo"); + + b.RemoveServiceImport("svc.foo").ShouldBeTrue(); + b.Imports.Services.ShouldNotContainKey("svc.foo"); + + // Removing again returns false + b.RemoveServiceImport("svc.foo").ShouldBeFalse(); + } + + [Fact] + public void RemoveStreamImport_ExistingImport_Succeeds() + { + var a = CreateAccount("A"); + var b = CreateAccount("B"); + + a.AddStreamExport("stream.data", null); // public + b.AddStreamImport(a, "stream.data", "imported.data"); + + b.Imports.Streams.Count.ShouldBe(1); + + b.RemoveStreamImport("stream.data").ShouldBeTrue(); + b.Imports.Streams.Count.ShouldBe(0); + + // Removing again returns false + b.RemoveStreamImport("stream.data").ShouldBeFalse(); + } + + [Fact] + public void ValidateImport_UnauthorizedAccount_Throws() + { + var exporter = CreateAccount("Exporter"); + var importer = CreateAccount("Importer"); + var approved = CreateAccount("Approved"); + + // Export only approves "Approved" account, not "Importer" + SetupServiceExport(exporter, "svc.restricted", [approved]); + + Should.Throw( + () => AccountImportExport.ValidateImport(importer, exporter, "svc.restricted")) + .Message.ShouldContain("not authorized"); + } + + [Fact] + public void AddStreamImport_NoCycleCheck_Succeeds() + { + // Stream imports do not require cycle detection (unlike service imports). + // Even with a "circular" stream import topology, it should succeed. + var a = CreateAccount("A"); + var b = CreateAccount("B"); + + a.AddStreamExport("stream.a", null); + b.AddStreamExport("stream.b", null); + + // B imports stream from A + b.AddStreamImport(a, "stream.a", "imported.a"); + + // A imports stream from B — no cycle check for streams + a.AddStreamImport(b, "stream.b", "imported.b"); + + a.Imports.Streams.Count.ShouldBe(1); + b.Imports.Streams.Count.ShouldBe(1); + } +} diff --git a/tests/NATS.Server.Tests/Auth/AccountLimitsTests.cs b/tests/NATS.Server.Tests/Auth/AccountLimitsTests.cs new file mode 100644 index 0000000..3506706 --- /dev/null +++ b/tests/NATS.Server.Tests/Auth/AccountLimitsTests.cs @@ -0,0 +1,169 @@ +// Tests for per-account JetStream resource limits. +// Go reference: accounts_test.go TestAccountLimits, TestJetStreamLimits. + +using NATS.Server.Auth; + +namespace NATS.Server.Tests.Auth; + +public class AccountLimitsTests +{ + [Fact] + public void TryReserveConsumer_UnderLimit_ReturnsTrue() + { + var account = new Account("test") + { + JetStreamLimits = new AccountLimits { MaxConsumers = 3 }, + }; + + account.TryReserveConsumer().ShouldBeTrue(); + account.TryReserveConsumer().ShouldBeTrue(); + account.TryReserveConsumer().ShouldBeTrue(); + account.ConsumerCount.ShouldBe(3); + } + + [Fact] + public void TryReserveConsumer_AtLimit_ReturnsFalse() + { + var account = new Account("test") + { + JetStreamLimits = new AccountLimits { MaxConsumers = 2 }, + }; + + account.TryReserveConsumer().ShouldBeTrue(); + account.TryReserveConsumer().ShouldBeTrue(); + account.TryReserveConsumer().ShouldBeFalse(); + account.ConsumerCount.ShouldBe(2); + } + + [Fact] + public void ReleaseConsumer_DecrementsCount() + { + var account = new Account("test") + { + JetStreamLimits = new AccountLimits { MaxConsumers = 2 }, + }; + + account.TryReserveConsumer().ShouldBeTrue(); + account.TryReserveConsumer().ShouldBeTrue(); + account.ConsumerCount.ShouldBe(2); + + account.ReleaseConsumer(); + account.ConsumerCount.ShouldBe(1); + + // Now we can reserve again + account.TryReserveConsumer().ShouldBeTrue(); + account.ConsumerCount.ShouldBe(2); + } + + [Fact] + public void TrackStorageDelta_UnderLimit_ReturnsTrue() + { + var account = new Account("test") + { + JetStreamLimits = new AccountLimits { MaxStorage = 1000 }, + }; + + account.TrackStorageDelta(500).ShouldBeTrue(); + account.StorageUsed.ShouldBe(500); + + account.TrackStorageDelta(400).ShouldBeTrue(); + account.StorageUsed.ShouldBe(900); + } + + [Fact] + public void TrackStorageDelta_ExceedsLimit_ReturnsFalse() + { + var account = new Account("test") + { + JetStreamLimits = new AccountLimits { MaxStorage = 1000 }, + }; + + account.TrackStorageDelta(800).ShouldBeTrue(); + account.TrackStorageDelta(300).ShouldBeFalse(); // 800 + 300 = 1100 > 1000 + account.StorageUsed.ShouldBe(800); // unchanged + } + + [Fact] + public void TrackStorageDelta_NegativeDelta_ReducesUsage() + { + var account = new Account("test") + { + JetStreamLimits = new AccountLimits { MaxStorage = 1000 }, + }; + + account.TrackStorageDelta(800).ShouldBeTrue(); + account.TrackStorageDelta(-300).ShouldBeTrue(); // negative always succeeds + account.StorageUsed.ShouldBe(500); + + // Now we have room again + account.TrackStorageDelta(400).ShouldBeTrue(); + account.StorageUsed.ShouldBe(900); + } + + [Fact] + public void MaxStorage_Zero_Unlimited() + { + var account = new Account("test") + { + JetStreamLimits = new AccountLimits { MaxStorage = 0 }, // unlimited + }; + + // Should accept any amount + account.TrackStorageDelta(long.MaxValue / 2).ShouldBeTrue(); + account.StorageUsed.ShouldBe(long.MaxValue / 2); + } + + [Fact] + public void Limits_DefaultValues_AllUnlimited() + { + var limits = AccountLimits.Unlimited; + + limits.MaxStorage.ShouldBe(0); + limits.MaxStreams.ShouldBe(0); + limits.MaxConsumers.ShouldBe(0); + limits.MaxAckPending.ShouldBe(0); + limits.MaxMemoryStorage.ShouldBe(0); + limits.MaxDiskStorage.ShouldBe(0); + + // Account defaults to unlimited + var account = new Account("test"); + account.JetStreamLimits.ShouldBe(AccountLimits.Unlimited); + } + + [Fact] + public void TryReserveStream_WithLimits_RespectsNewLimits() + { + // JetStreamLimits.MaxStreams should take precedence over MaxJetStreamStreams + var account = new Account("test") + { + MaxJetStreamStreams = 10, // legacy field + JetStreamLimits = new AccountLimits { MaxStreams = 2 }, // new limit overrides + }; + + account.TryReserveStream().ShouldBeTrue(); + account.TryReserveStream().ShouldBeTrue(); + account.TryReserveStream().ShouldBeFalse(); // limited to 2 by JetStreamLimits + account.JetStreamStreamCount.ShouldBe(2); + } + + [Fact] + public void EvictOldestClient_WhenMaxConnectionsExceeded() + { + var account = new Account("test") + { + MaxConnections = 2, + }; + + account.AddClient(1).ShouldBeTrue(); + account.AddClient(2).ShouldBeTrue(); + account.AddClient(3).ShouldBeFalse(); // at limit + account.ClientCount.ShouldBe(2); + + // Remove oldest, then new one can connect + account.RemoveClient(1); + account.ClientCount.ShouldBe(1); + + account.AddClient(3).ShouldBeTrue(); + account.ClientCount.ShouldBe(2); + } +}