feat: add import/export support to Account with ACCOUNT client lazy creation
This commit is contained in:
@@ -1,4 +1,5 @@
|
|||||||
using System.Collections.Concurrent;
|
using System.Collections.Concurrent;
|
||||||
|
using NATS.Server.Imports;
|
||||||
using NATS.Server.Subscriptions;
|
using NATS.Server.Subscriptions;
|
||||||
|
|
||||||
namespace NATS.Server.Auth;
|
namespace NATS.Server.Auth;
|
||||||
@@ -12,6 +13,8 @@ public sealed class Account : IDisposable
|
|||||||
public Permissions? DefaultPermissions { get; set; }
|
public Permissions? DefaultPermissions { get; set; }
|
||||||
public int MaxConnections { get; set; } // 0 = unlimited
|
public int MaxConnections { get; set; } // 0 = unlimited
|
||||||
public int MaxSubscriptions { get; set; } // 0 = unlimited
|
public int MaxSubscriptions { get; set; } // 0 = unlimited
|
||||||
|
public ExportMap Exports { get; } = new();
|
||||||
|
public ImportMap Imports { get; } = new();
|
||||||
|
|
||||||
// JWT fields
|
// JWT fields
|
||||||
public string? Nkey { get; set; }
|
public string? Nkey { get; set; }
|
||||||
@@ -89,5 +92,77 @@ public sealed class Account : IDisposable
|
|||||||
Interlocked.Add(ref _outBytes, bytes);
|
Interlocked.Add(ref _outBytes, bytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Internal (ACCOUNT) client for import/export message routing
|
||||||
|
private InternalClient? _internalClient;
|
||||||
|
|
||||||
|
public InternalClient GetOrCreateInternalClient(ulong clientId)
|
||||||
|
{
|
||||||
|
if (_internalClient != null) return _internalClient;
|
||||||
|
_internalClient = new InternalClient(clientId, ClientKind.Account, this);
|
||||||
|
return _internalClient;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void AddServiceExport(string subject, ServiceResponseType responseType, IEnumerable<Account>? approved)
|
||||||
|
{
|
||||||
|
var auth = new ExportAuth
|
||||||
|
{
|
||||||
|
ApprovedAccounts = approved != null ? new HashSet<string>(approved.Select(a => a.Name)) : null,
|
||||||
|
};
|
||||||
|
Exports.Services[subject] = new ServiceExport
|
||||||
|
{
|
||||||
|
Auth = auth,
|
||||||
|
Account = this,
|
||||||
|
ResponseType = responseType,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
public void AddStreamExport(string subject, IEnumerable<Account>? approved)
|
||||||
|
{
|
||||||
|
var auth = new ExportAuth
|
||||||
|
{
|
||||||
|
ApprovedAccounts = approved != null ? new HashSet<string>(approved.Select(a => a.Name)) : null,
|
||||||
|
};
|
||||||
|
Exports.Streams[subject] = new StreamExport { Auth = auth };
|
||||||
|
}
|
||||||
|
|
||||||
|
public ServiceImport AddServiceImport(Account destination, string from, string to)
|
||||||
|
{
|
||||||
|
if (!destination.Exports.Services.TryGetValue(to, out var export))
|
||||||
|
throw new InvalidOperationException($"No service export found for '{to}' on account '{destination.Name}'");
|
||||||
|
|
||||||
|
if (!export.Auth.IsAuthorized(this))
|
||||||
|
throw new UnauthorizedAccessException($"Account '{Name}' not authorized to import '{to}' from '{destination.Name}'");
|
||||||
|
|
||||||
|
var si = new ServiceImport
|
||||||
|
{
|
||||||
|
DestinationAccount = destination,
|
||||||
|
From = from,
|
||||||
|
To = to,
|
||||||
|
Export = export,
|
||||||
|
ResponseType = export.ResponseType,
|
||||||
|
};
|
||||||
|
|
||||||
|
Imports.AddServiceImport(si);
|
||||||
|
return si;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void AddStreamImport(Account source, string from, string to)
|
||||||
|
{
|
||||||
|
if (!source.Exports.Streams.TryGetValue(from, out var export))
|
||||||
|
throw new InvalidOperationException($"No stream export found for '{from}' on account '{source.Name}'");
|
||||||
|
|
||||||
|
if (!export.Auth.IsAuthorized(this))
|
||||||
|
throw new UnauthorizedAccessException($"Account '{Name}' not authorized to import '{from}' from '{source.Name}'");
|
||||||
|
|
||||||
|
var si = new StreamImport
|
||||||
|
{
|
||||||
|
SourceAccount = source,
|
||||||
|
From = from,
|
||||||
|
To = to,
|
||||||
|
};
|
||||||
|
|
||||||
|
Imports.Streams.Add(si);
|
||||||
|
}
|
||||||
|
|
||||||
public void Dispose() => SubList.Dispose();
|
public void Dispose() => SubList.Dispose();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
using NATS.Server;
|
using NATS.Server;
|
||||||
|
using NATS.Server.Imports;
|
||||||
|
|
||||||
namespace NATS.Server.Subscriptions;
|
namespace NATS.Server.Subscriptions;
|
||||||
|
|
||||||
@@ -10,4 +11,6 @@ public sealed class Subscription
|
|||||||
public long MessageCount; // Interlocked
|
public long MessageCount; // Interlocked
|
||||||
public long MaxMessages; // 0 = unlimited
|
public long MaxMessages; // 0 = unlimited
|
||||||
public INatsClient? Client { get; set; }
|
public INatsClient? Client { get; set; }
|
||||||
|
public ServiceImport? ServiceImport { get; set; }
|
||||||
|
public StreamImport? StreamImport { get; set; }
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
using NATS.Server;
|
||||||
using NATS.Server.Auth;
|
using NATS.Server.Auth;
|
||||||
using NATS.Server.Imports;
|
using NATS.Server.Imports;
|
||||||
|
|
||||||
@@ -72,4 +73,62 @@ public class ImportExportTests
|
|||||||
map.Services.ShouldContainKey("requests.>");
|
map.Services.ShouldContainKey("requests.>");
|
||||||
map.Services["requests.>"].Count.ShouldBe(1);
|
map.Services["requests.>"].Count.ShouldBe(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Account_add_service_export_and_import()
|
||||||
|
{
|
||||||
|
var exporter = new Account("exporter");
|
||||||
|
var importer = new Account("importer");
|
||||||
|
|
||||||
|
exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null);
|
||||||
|
exporter.Exports.Services.ShouldContainKey("api.>");
|
||||||
|
|
||||||
|
var si = importer.AddServiceImport(exporter, "requests.>", "api.>");
|
||||||
|
si.ShouldNotBeNull();
|
||||||
|
si.From.ShouldBe("requests.>");
|
||||||
|
si.To.ShouldBe("api.>");
|
||||||
|
si.DestinationAccount.ShouldBe(exporter);
|
||||||
|
importer.Imports.Services.ShouldContainKey("requests.>");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Account_add_stream_export_and_import()
|
||||||
|
{
|
||||||
|
var exporter = new Account("exporter");
|
||||||
|
var importer = new Account("importer");
|
||||||
|
|
||||||
|
exporter.AddStreamExport("events.>", null);
|
||||||
|
exporter.Exports.Streams.ShouldContainKey("events.>");
|
||||||
|
|
||||||
|
importer.AddStreamImport(exporter, "events.>", "imported.events.>");
|
||||||
|
importer.Imports.Streams.Count.ShouldBe(1);
|
||||||
|
importer.Imports.Streams[0].From.ShouldBe("events.>");
|
||||||
|
importer.Imports.Streams[0].To.ShouldBe("imported.events.>");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Account_service_import_auth_rejected()
|
||||||
|
{
|
||||||
|
var exporter = new Account("exporter");
|
||||||
|
var importer = new Account("importer");
|
||||||
|
|
||||||
|
exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, [new Account("other")]);
|
||||||
|
|
||||||
|
Should.Throw<UnauthorizedAccessException>(() =>
|
||||||
|
importer.AddServiceImport(exporter, "requests.>", "api.>"));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Account_lazy_creates_internal_client()
|
||||||
|
{
|
||||||
|
var account = new Account("test");
|
||||||
|
var client = account.GetOrCreateInternalClient(99);
|
||||||
|
client.ShouldNotBeNull();
|
||||||
|
client.Kind.ShouldBe(ClientKind.Account);
|
||||||
|
client.Account.ShouldBe(account);
|
||||||
|
|
||||||
|
// Second call returns same instance
|
||||||
|
var client2 = account.GetOrCreateInternalClient(100);
|
||||||
|
client2.ShouldBeSameAs(client);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user