From 4450c273812078c4eb9666809a2573760edf38c3 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 06:01:53 -0500 Subject: [PATCH] feat: add response routing for service import request-reply patterns --- src/NATS.Server/Imports/ResponseRouter.cs | 64 +++++++++ .../NATS.Server.Tests/ResponseRoutingTests.cs | 127 ++++++++++++++++++ 2 files changed, 191 insertions(+) create mode 100644 src/NATS.Server/Imports/ResponseRouter.cs create mode 100644 tests/NATS.Server.Tests/ResponseRoutingTests.cs diff --git a/src/NATS.Server/Imports/ResponseRouter.cs b/src/NATS.Server/Imports/ResponseRouter.cs new file mode 100644 index 0000000..1b8ca98 --- /dev/null +++ b/src/NATS.Server/Imports/ResponseRouter.cs @@ -0,0 +1,64 @@ +using System.Security.Cryptography; +using NATS.Server.Auth; + +namespace NATS.Server.Imports; + +/// +/// Handles response routing for service imports. +/// Maps to Go's service reply prefix generation and response cleanup. +/// Reference: golang/nats-server/server/accounts.go — addRespServiceImport, removeRespServiceImport +/// +public static class ResponseRouter +{ + private static readonly char[] Base62 = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789".ToCharArray(); + + /// + /// Generates a unique reply prefix for response routing. + /// Format: "_R_.{10 random base62 chars}." + /// + public static string GenerateReplyPrefix() + { + Span bytes = stackalloc byte[10]; + RandomNumberGenerator.Fill(bytes); + var chars = new char[10]; + for (int i = 0; i < 10; i++) + chars[i] = Base62[bytes[i] % 62]; + return $"_R_.{new string(chars)}."; + } + + /// + /// Creates a response service import that maps the generated reply prefix + /// back to the original reply subject on the requesting account. + /// + public static ServiceImport CreateResponseImport( + Account exporterAccount, + ServiceImport originalImport, + string originalReply) + { + var replyPrefix = GenerateReplyPrefix(); + + var responseSi = new ServiceImport + { + DestinationAccount = exporterAccount, + From = replyPrefix + ">", + To = originalReply, + IsResponse = true, + ResponseType = originalImport.ResponseType, + Export = originalImport.Export, + TimestampTicks = DateTime.UtcNow.Ticks, + }; + + exporterAccount.Exports.Responses[replyPrefix] = responseSi; + return responseSi; + } + + /// + /// Removes a response import from the account's export map. + /// For Singleton responses, this is called after the first reply is delivered. + /// For Streamed/Chunked, it is called when the response stream ends. + /// + public static void CleanupResponse(Account account, string replyPrefix, ServiceImport responseSi) + { + account.Exports.Responses.Remove(replyPrefix); + } +} diff --git a/tests/NATS.Server.Tests/ResponseRoutingTests.cs b/tests/NATS.Server.Tests/ResponseRoutingTests.cs new file mode 100644 index 0000000..6f29340 --- /dev/null +++ b/tests/NATS.Server.Tests/ResponseRoutingTests.cs @@ -0,0 +1,127 @@ +using NATS.Server.Auth; +using NATS.Server.Imports; + +namespace NATS.Server.Tests; + +public class ResponseRoutingTests +{ + [Fact] + public void GenerateReplyPrefix_creates_unique_prefix() + { + var prefix1 = ResponseRouter.GenerateReplyPrefix(); + var prefix2 = ResponseRouter.GenerateReplyPrefix(); + + prefix1.ShouldStartWith("_R_."); + prefix2.ShouldStartWith("_R_."); + prefix1.ShouldNotBe(prefix2); + prefix1.Length.ShouldBeGreaterThan(4); + } + + [Fact] + public void GenerateReplyPrefix_ends_with_dot() + { + var prefix = ResponseRouter.GenerateReplyPrefix(); + + prefix.ShouldEndWith("."); + // Format: "_R_." + 10 chars + "." = 15 chars + prefix.Length.ShouldBe(15); + } + + [Fact] + public void Singleton_response_import_removed_after_delivery() + { + var exporter = new Account("exporter"); + exporter.AddServiceExport("api.test", ServiceResponseType.Singleton, null); + + var replyPrefix = ResponseRouter.GenerateReplyPrefix(); + var responseSi = new ServiceImport + { + DestinationAccount = exporter, + From = replyPrefix + ">", + To = "_INBOX.original.reply", + IsResponse = true, + ResponseType = ServiceResponseType.Singleton, + }; + exporter.Exports.Responses[replyPrefix] = responseSi; + + exporter.Exports.Responses.ShouldContainKey(replyPrefix); + + // Simulate singleton delivery cleanup + ResponseRouter.CleanupResponse(exporter, replyPrefix, responseSi); + + exporter.Exports.Responses.ShouldNotContainKey(replyPrefix); + } + + [Fact] + public void CreateResponseImport_registers_in_exporter_responses() + { + var exporter = new Account("exporter"); + var importer = new Account("importer"); + exporter.AddServiceExport("api.test", ServiceResponseType.Singleton, null); + + var originalSi = new ServiceImport + { + DestinationAccount = exporter, + From = "api.test", + To = "api.test", + Export = exporter.Exports.Services["api.test"], + ResponseType = ServiceResponseType.Singleton, + }; + + var responseSi = ResponseRouter.CreateResponseImport(exporter, originalSi, "_INBOX.abc123"); + + responseSi.IsResponse.ShouldBeTrue(); + responseSi.ResponseType.ShouldBe(ServiceResponseType.Singleton); + responseSi.To.ShouldBe("_INBOX.abc123"); + responseSi.DestinationAccount.ShouldBe(exporter); + responseSi.From.ShouldEndWith(">"); + responseSi.Export.ShouldBe(originalSi.Export); + + // Should be registered in the exporter's response map + exporter.Exports.Responses.Count.ShouldBe(1); + } + + [Fact] + public void CreateResponseImport_preserves_streamed_response_type() + { + var exporter = new Account("exporter"); + exporter.AddServiceExport("api.stream", ServiceResponseType.Streamed, null); + + var originalSi = new ServiceImport + { + DestinationAccount = exporter, + From = "api.stream", + To = "api.stream", + Export = exporter.Exports.Services["api.stream"], + ResponseType = ServiceResponseType.Streamed, + }; + + var responseSi = ResponseRouter.CreateResponseImport(exporter, originalSi, "_INBOX.xyz789"); + + responseSi.ResponseType.ShouldBe(ServiceResponseType.Streamed); + } + + [Fact] + public void Multiple_response_imports_each_get_unique_prefix() + { + var exporter = new Account("exporter"); + exporter.AddServiceExport("api.test", ServiceResponseType.Singleton, null); + + var originalSi = new ServiceImport + { + DestinationAccount = exporter, + From = "api.test", + To = "api.test", + Export = exporter.Exports.Services["api.test"], + ResponseType = ServiceResponseType.Singleton, + }; + + var resp1 = ResponseRouter.CreateResponseImport(exporter, originalSi, "_INBOX.reply1"); + var resp2 = ResponseRouter.CreateResponseImport(exporter, originalSi, "_INBOX.reply2"); + + exporter.Exports.Responses.Count.ShouldBe(2); + resp1.To.ShouldBe("_INBOX.reply1"); + resp2.To.ShouldBe("_INBOX.reply2"); + resp1.From.ShouldNotBe(resp2.From); + } +}