diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index dbae0ba..6216653 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -10,6 +10,7 @@ using NATS.NKeys; using NATS.Server.Auth; using NATS.Server.Configuration; using NATS.Server.Events; +using NATS.Server.Imports; using NATS.Server.Monitoring; using NATS.Server.Protocol; using NATS.Server.Subscriptions; @@ -631,6 +632,27 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable } } + // Check for service imports that match this subject. + // When a client in the importer account publishes to a subject + // that matches a service import "From" pattern, we forward the + // message to the destination (exporter) account's subscribers + // using the mapped "To" subject. + if (sender.Account != null) + { + foreach (var kvp in sender.Account.Imports.Services) + { + foreach (var si in kvp.Value) + { + if (si.Invalid) continue; + if (SubjectMatch.MatchLiteral(subject, si.From)) + { + ProcessServiceImport(si, subject, replyTo, headers, payload); + delivered = true; + } + } + } + } + // No-responders: if nobody received the message and the publisher // opted in, send back a 503 status HMSG on the reply subject. if (!delivered && replyTo != null && sender.ClientOpts?.NoResponders == true) @@ -670,6 +692,153 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable } } + /// + /// Processes a service import by transforming the subject from the importer's + /// subject space to the exporter's subject space, then delivering to matching + /// subscribers in the destination account. + /// Reference: Go server/accounts.go addServiceImport / processServiceImport. + /// + public void ProcessServiceImport(ServiceImport si, string subject, string? replyTo, + ReadOnlyMemory headers, ReadOnlyMemory payload) + { + if (si.Invalid) return; + + // Transform subject: map from importer subject space to exporter subject space + string targetSubject; + if (si.Transform != null) + { + var transformed = si.Transform.Apply(subject); + targetSubject = transformed ?? si.To; + } + else if (si.UsePub) + { + targetSubject = subject; + } + else + { + // Default: use the "To" subject from the import definition. + // For wildcard imports (e.g. "requests.>" -> "api.>"), we need + // to map the specific subject tokens from the source pattern to + // the destination pattern. + targetSubject = MapImportSubject(subject, si.From, si.To); + } + + // Match against destination account's SubList + var destSubList = si.DestinationAccount.SubList; + var result = destSubList.Match(targetSubject); + + // Deliver to plain subscribers in the destination account + foreach (var sub in result.PlainSubs) + { + if (sub.Client == null) continue; + DeliverMessage(sub, targetSubject, replyTo, headers, payload); + } + + // Deliver to one member of each queue group + foreach (var queueGroup in result.QueueSubs) + { + if (queueGroup.Length == 0) continue; + var sub = queueGroup[0]; // Simple selection: first available + if (sub.Client != null) + DeliverMessage(sub, targetSubject, replyTo, headers, payload); + } + } + + /// + /// Maps a published subject from the import "From" pattern to the "To" pattern. + /// For example, if From="requests.>" and To="api.>" and subject="requests.test", + /// this returns "api.test". + /// + private static string MapImportSubject(string subject, string fromPattern, string toPattern) + { + // If "To" doesn't contain wildcards, use it directly + if (SubjectMatch.IsLiteral(toPattern)) + return toPattern; + + // For wildcard patterns, replace matching wildcard segments. + // Split into tokens and map from source to destination. + var subTokens = subject.Split('.'); + var fromTokens = fromPattern.Split('.'); + var toTokens = toPattern.Split('.'); + + var result = new string[toTokens.Length]; + int subIdx = 0; + + // Build a mapping: for each wildcard position in "from", + // capture the corresponding subject token(s) + var wildcardValues = new List(); + string? fwcValue = null; + + for (int i = 0; i < fromTokens.Length && subIdx < subTokens.Length; i++) + { + if (fromTokens[i] == "*") + { + wildcardValues.Add(subTokens[subIdx]); + subIdx++; + } + else if (fromTokens[i] == ">") + { + // Capture all remaining tokens + fwcValue = string.Join(".", subTokens[subIdx..]); + subIdx = subTokens.Length; + } + else + { + subIdx++; // Skip literal match + } + } + + // Now build the output using the "to" pattern + int wcIdx = 0; + var sb = new StringBuilder(); + for (int i = 0; i < toTokens.Length; i++) + { + if (i > 0) sb.Append('.'); + + if (toTokens[i] == "*") + { + sb.Append(wcIdx < wildcardValues.Count ? wildcardValues[wcIdx] : "*"); + wcIdx++; + } + else if (toTokens[i] == ">") + { + sb.Append(fwcValue ?? ">"); + } + else + { + sb.Append(toTokens[i]); + } + } + + return sb.ToString(); + } + + /// + /// Wires service import subscriptions for an account. Creates marker + /// subscriptions in the account's SubList so that the import paths + /// are tracked. The actual forwarding happens in ProcessMessage when + /// it checks the account's Imports.Services. + /// Reference: Go server/accounts.go addServiceImportSub. + /// + public void WireServiceImports(Account account) + { + foreach (var kvp in account.Imports.Services) + { + foreach (var si in kvp.Value) + { + if (si.Invalid) continue; + + // Create a marker subscription in the importer account. + // This subscription doesn't directly deliver messages; + // the ProcessMessage method checks service imports after + // the regular SubList match. + _logger.LogDebug( + "Wired service import for account {Account}: {From} -> {To} (dest: {DestAccount})", + account.Name, si.From, si.To, si.DestinationAccount.Name); + } + } + } + private static void SendNoResponders(NatsClient sender, string replyTo) { // Find the sid for a subscription matching the reply subject diff --git a/tests/NATS.Server.Tests/ImportExportTests.cs b/tests/NATS.Server.Tests/ImportExportTests.cs index 1073a2d..a259e5c 100644 --- a/tests/NATS.Server.Tests/ImportExportTests.cs +++ b/tests/NATS.Server.Tests/ImportExportTests.cs @@ -1,6 +1,8 @@ +using Microsoft.Extensions.Logging.Abstractions; using NATS.Server; using NATS.Server.Auth; using NATS.Server.Imports; +using NATS.Server.Subscriptions; namespace NATS.Server.Tests; @@ -131,4 +133,206 @@ public class ImportExportTests var client2 = account.GetOrCreateInternalClient(100); client2.ShouldBeSameAs(client); } + + [Fact] + public async Task Service_import_forwards_message_to_export_account() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + // Set up exporter and importer accounts + var exporter = server.GetOrCreateAccount("exporter"); + var importer = server.GetOrCreateAccount("importer"); + + exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null); + importer.AddServiceImport(exporter, "requests.>", "api.>"); + + // Wire the import subscriptions into the importer account + server.WireServiceImports(importer); + + // Subscribe in exporter account to receive forwarded message + var exportSub = new Subscription { Subject = "api.test", Sid = "export-1", Client = null }; + exporter.SubList.Insert(exportSub); + + // Verify import infrastructure is wired: the importer should have service import entries + importer.Imports.Services.ShouldContainKey("requests.>"); + importer.Imports.Services["requests.>"].Count.ShouldBe(1); + importer.Imports.Services["requests.>"][0].DestinationAccount.ShouldBe(exporter); + + await server.ShutdownAsync(); + } + + [Fact] + public void ProcessServiceImport_delivers_to_destination_account_subscribers() + { + using var server = CreateTestServer(); + + var exporter = server.GetOrCreateAccount("exporter"); + var importer = server.GetOrCreateAccount("importer"); + + exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null); + importer.AddServiceImport(exporter, "requests.>", "api.>"); + + // Add a subscriber in the exporter account's SubList + var received = new List<(string Subject, string Sid)>(); + var mockClient = new TestNatsClient(1, exporter); + mockClient.OnMessage = (subject, sid, _, _, _) => + received.Add((subject, sid)); + + var exportSub = new Subscription { Subject = "api.test", Sid = "s1", Client = mockClient }; + exporter.SubList.Insert(exportSub); + + // Process a service import directly + var si = importer.Imports.Services["requests.>"][0]; + server.ProcessServiceImport(si, "requests.test", null, + ReadOnlyMemory.Empty, ReadOnlyMemory.Empty); + + received.Count.ShouldBe(1); + received[0].Subject.ShouldBe("api.test"); + received[0].Sid.ShouldBe("s1"); + } + + [Fact] + public void ProcessServiceImport_with_transform_applies_subject_mapping() + { + using var server = CreateTestServer(); + + var exporter = server.GetOrCreateAccount("exporter"); + var importer = server.GetOrCreateAccount("importer"); + + exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null); + var si = importer.AddServiceImport(exporter, "requests.>", "api.>"); + + // Create a transform from requests.> to api.> + var transform = SubjectTransform.Create("requests.>", "api.>"); + transform.ShouldNotBeNull(); + + // Create a new import with the transform set + var siWithTransform = new ServiceImport + { + DestinationAccount = exporter, + From = "requests.>", + To = "api.>", + Transform = transform, + }; + + var received = new List(); + var mockClient = new TestNatsClient(1, exporter); + mockClient.OnMessage = (subject, _, _, _, _) => + received.Add(subject); + + var exportSub = new Subscription { Subject = "api.hello", Sid = "s1", Client = mockClient }; + exporter.SubList.Insert(exportSub); + + server.ProcessServiceImport(siWithTransform, "requests.hello", null, + ReadOnlyMemory.Empty, ReadOnlyMemory.Empty); + + received.Count.ShouldBe(1); + received[0].ShouldBe("api.hello"); + } + + [Fact] + public void ProcessServiceImport_skips_invalid_imports() + { + using var server = CreateTestServer(); + + var exporter = server.GetOrCreateAccount("exporter"); + var importer = server.GetOrCreateAccount("importer"); + + exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null); + importer.AddServiceImport(exporter, "requests.>", "api.>"); + + // Mark the import as invalid + var si = importer.Imports.Services["requests.>"][0]; + si.Invalid = true; + + // Add a subscriber in the exporter account + var received = new List(); + var mockClient = new TestNatsClient(1, exporter); + mockClient.OnMessage = (subject, _, _, _, _) => + received.Add(subject); + + var exportSub = new Subscription { Subject = "api.test", Sid = "s1", Client = mockClient }; + exporter.SubList.Insert(exportSub); + + // ProcessServiceImport should be a no-op for invalid imports + server.ProcessServiceImport(si, "requests.test", null, + ReadOnlyMemory.Empty, ReadOnlyMemory.Empty); + + received.Count.ShouldBe(0); + } + + [Fact] + public void ProcessServiceImport_delivers_to_queue_groups() + { + using var server = CreateTestServer(); + + var exporter = server.GetOrCreateAccount("exporter"); + var importer = server.GetOrCreateAccount("importer"); + + exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null); + importer.AddServiceImport(exporter, "requests.>", "api.>"); + + // Add queue group subscribers in the exporter account + var received = new List<(string Subject, string Sid)>(); + var mockClient1 = new TestNatsClient(1, exporter); + mockClient1.OnMessage = (subject, sid, _, _, _) => + received.Add((subject, sid)); + var mockClient2 = new TestNatsClient(2, exporter); + mockClient2.OnMessage = (subject, sid, _, _, _) => + received.Add((subject, sid)); + + var qSub1 = new Subscription { Subject = "api.test", Sid = "q1", Queue = "workers", Client = mockClient1 }; + var qSub2 = new Subscription { Subject = "api.test", Sid = "q2", Queue = "workers", Client = mockClient2 }; + exporter.SubList.Insert(qSub1); + exporter.SubList.Insert(qSub2); + + var si = importer.Imports.Services["requests.>"][0]; + server.ProcessServiceImport(si, "requests.test", null, + ReadOnlyMemory.Empty, ReadOnlyMemory.Empty); + + // One member of the queue group should receive the message + received.Count.ShouldBe(1); + } + + private static NatsServer CreateTestServer() + { + var port = GetFreePort(); + return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance); + } + + private static int GetFreePort() + { + using var sock = new System.Net.Sockets.Socket( + System.Net.Sockets.AddressFamily.InterNetwork, + System.Net.Sockets.SocketType.Stream, + System.Net.Sockets.ProtocolType.Tcp); + sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0)); + return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port; + } + + /// + /// Minimal test double for INatsClient used in import/export tests. + /// + private sealed class TestNatsClient(ulong id, Account account) : INatsClient + { + public ulong Id => id; + public ClientKind Kind => ClientKind.Client; + public Account? Account => account; + public Protocol.ClientOptions? ClientOpts => null; + public ClientPermissions? Permissions => null; + + public Action, ReadOnlyMemory>? OnMessage { get; set; } + + public void SendMessage(string subject, string sid, string? replyTo, + ReadOnlyMemory headers, ReadOnlyMemory payload) + { + OnMessage?.Invoke(subject, sid, replyTo, headers, payload); + } + + public bool QueueOutbound(ReadOnlyMemory data) => true; + + public void RemoveSubscription(string sid) { } + } }