diff --git a/src/NATS.Server/Auth/Account.cs b/src/NATS.Server/Auth/Account.cs index bce25e1..078f438 100644 --- a/src/NATS.Server/Auth/Account.cs +++ b/src/NATS.Server/Auth/Account.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using NATS.Server.Imports; using NATS.Server.Subscriptions; namespace NATS.Server.Auth; @@ -12,6 +13,8 @@ public sealed class Account : IDisposable public Permissions? DefaultPermissions { get; set; } public int MaxConnections { get; set; } // 0 = unlimited public int MaxSubscriptions { get; set; } // 0 = unlimited + public ExportMap Exports { get; } = new(); + public ImportMap Imports { get; } = new(); // JWT fields public string? Nkey { get; set; } @@ -89,5 +92,77 @@ public sealed class Account : IDisposable Interlocked.Add(ref _outBytes, bytes); } + // Internal (ACCOUNT) client for import/export message routing + private InternalClient? _internalClient; + + public InternalClient GetOrCreateInternalClient(ulong clientId) + { + if (_internalClient != null) return _internalClient; + _internalClient = new InternalClient(clientId, ClientKind.Account, this); + return _internalClient; + } + + public void AddServiceExport(string subject, ServiceResponseType responseType, IEnumerable? approved) + { + var auth = new ExportAuth + { + ApprovedAccounts = approved != null ? new HashSet(approved.Select(a => a.Name)) : null, + }; + Exports.Services[subject] = new ServiceExport + { + Auth = auth, + Account = this, + ResponseType = responseType, + }; + } + + public void AddStreamExport(string subject, IEnumerable? approved) + { + var auth = new ExportAuth + { + ApprovedAccounts = approved != null ? new HashSet(approved.Select(a => a.Name)) : null, + }; + Exports.Streams[subject] = new StreamExport { Auth = auth }; + } + + public ServiceImport AddServiceImport(Account destination, string from, string to) + { + if (!destination.Exports.Services.TryGetValue(to, out var export)) + throw new InvalidOperationException($"No service export found for '{to}' on account '{destination.Name}'"); + + if (!export.Auth.IsAuthorized(this)) + throw new UnauthorizedAccessException($"Account '{Name}' not authorized to import '{to}' from '{destination.Name}'"); + + var si = new ServiceImport + { + DestinationAccount = destination, + From = from, + To = to, + Export = export, + ResponseType = export.ResponseType, + }; + + Imports.AddServiceImport(si); + return si; + } + + public void AddStreamImport(Account source, string from, string to) + { + if (!source.Exports.Streams.TryGetValue(from, out var export)) + throw new InvalidOperationException($"No stream export found for '{from}' on account '{source.Name}'"); + + if (!export.Auth.IsAuthorized(this)) + throw new UnauthorizedAccessException($"Account '{Name}' not authorized to import '{from}' from '{source.Name}'"); + + var si = new StreamImport + { + SourceAccount = source, + From = from, + To = to, + }; + + Imports.Streams.Add(si); + } + public void Dispose() => SubList.Dispose(); } diff --git a/src/NATS.Server/Subscriptions/Subscription.cs b/src/NATS.Server/Subscriptions/Subscription.cs index 0a13604..4f10cb2 100644 --- a/src/NATS.Server/Subscriptions/Subscription.cs +++ b/src/NATS.Server/Subscriptions/Subscription.cs @@ -1,4 +1,5 @@ using NATS.Server; +using NATS.Server.Imports; namespace NATS.Server.Subscriptions; @@ -10,4 +11,6 @@ public sealed class Subscription public long MessageCount; // Interlocked public long MaxMessages; // 0 = unlimited public INatsClient? Client { get; set; } + public ServiceImport? ServiceImport { get; set; } + public StreamImport? StreamImport { get; set; } } diff --git a/tests/NATS.Server.Tests/ImportExportTests.cs b/tests/NATS.Server.Tests/ImportExportTests.cs index 2945a27..1073a2d 100644 --- a/tests/NATS.Server.Tests/ImportExportTests.cs +++ b/tests/NATS.Server.Tests/ImportExportTests.cs @@ -1,3 +1,4 @@ +using NATS.Server; using NATS.Server.Auth; using NATS.Server.Imports; @@ -72,4 +73,62 @@ public class ImportExportTests map.Services.ShouldContainKey("requests.>"); map.Services["requests.>"].Count.ShouldBe(1); } + + [Fact] + public void Account_add_service_export_and_import() + { + var exporter = new Account("exporter"); + var importer = new Account("importer"); + + exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null); + exporter.Exports.Services.ShouldContainKey("api.>"); + + var si = importer.AddServiceImport(exporter, "requests.>", "api.>"); + si.ShouldNotBeNull(); + si.From.ShouldBe("requests.>"); + si.To.ShouldBe("api.>"); + si.DestinationAccount.ShouldBe(exporter); + importer.Imports.Services.ShouldContainKey("requests.>"); + } + + [Fact] + public void Account_add_stream_export_and_import() + { + var exporter = new Account("exporter"); + var importer = new Account("importer"); + + exporter.AddStreamExport("events.>", null); + exporter.Exports.Streams.ShouldContainKey("events.>"); + + importer.AddStreamImport(exporter, "events.>", "imported.events.>"); + importer.Imports.Streams.Count.ShouldBe(1); + importer.Imports.Streams[0].From.ShouldBe("events.>"); + importer.Imports.Streams[0].To.ShouldBe("imported.events.>"); + } + + [Fact] + public void Account_service_import_auth_rejected() + { + var exporter = new Account("exporter"); + var importer = new Account("importer"); + + exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, [new Account("other")]); + + Should.Throw(() => + importer.AddServiceImport(exporter, "requests.>", "api.>")); + } + + [Fact] + public void Account_lazy_creates_internal_client() + { + var account = new Account("test"); + var client = account.GetOrCreateInternalClient(99); + client.ShouldNotBeNull(); + client.Kind.ShouldBe(ClientKind.Account); + client.Account.ShouldBe(account); + + // Second call returns same instance + var client2 = account.GetOrCreateInternalClient(100); + client2.ShouldBeSameAs(client); + } }