feat: add response routing for service import request-reply patterns
This commit is contained in:
64
src/NATS.Server/Imports/ResponseRouter.cs
Normal file
64
src/NATS.Server/Imports/ResponseRouter.cs
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
using System.Security.Cryptography;
|
||||||
|
using NATS.Server.Auth;
|
||||||
|
|
||||||
|
namespace NATS.Server.Imports;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Handles response routing for service imports.
|
||||||
|
/// Maps to Go's service reply prefix generation and response cleanup.
|
||||||
|
/// Reference: golang/nats-server/server/accounts.go — addRespServiceImport, removeRespServiceImport
|
||||||
|
/// </summary>
|
||||||
|
public static class ResponseRouter
|
||||||
|
{
|
||||||
|
private static readonly char[] Base62 = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789".ToCharArray();
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Generates a unique reply prefix for response routing.
|
||||||
|
/// Format: "_R_.{10 random base62 chars}."
|
||||||
|
/// </summary>
|
||||||
|
public static string GenerateReplyPrefix()
|
||||||
|
{
|
||||||
|
Span<byte> bytes = stackalloc byte[10];
|
||||||
|
RandomNumberGenerator.Fill(bytes);
|
||||||
|
var chars = new char[10];
|
||||||
|
for (int i = 0; i < 10; i++)
|
||||||
|
chars[i] = Base62[bytes[i] % 62];
|
||||||
|
return $"_R_.{new string(chars)}.";
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Creates a response service import that maps the generated reply prefix
|
||||||
|
/// back to the original reply subject on the requesting account.
|
||||||
|
/// </summary>
|
||||||
|
public static ServiceImport CreateResponseImport(
|
||||||
|
Account exporterAccount,
|
||||||
|
ServiceImport originalImport,
|
||||||
|
string originalReply)
|
||||||
|
{
|
||||||
|
var replyPrefix = GenerateReplyPrefix();
|
||||||
|
|
||||||
|
var responseSi = new ServiceImport
|
||||||
|
{
|
||||||
|
DestinationAccount = exporterAccount,
|
||||||
|
From = replyPrefix + ">",
|
||||||
|
To = originalReply,
|
||||||
|
IsResponse = true,
|
||||||
|
ResponseType = originalImport.ResponseType,
|
||||||
|
Export = originalImport.Export,
|
||||||
|
TimestampTicks = DateTime.UtcNow.Ticks,
|
||||||
|
};
|
||||||
|
|
||||||
|
exporterAccount.Exports.Responses[replyPrefix] = responseSi;
|
||||||
|
return responseSi;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Removes a response import from the account's export map.
|
||||||
|
/// For Singleton responses, this is called after the first reply is delivered.
|
||||||
|
/// For Streamed/Chunked, it is called when the response stream ends.
|
||||||
|
/// </summary>
|
||||||
|
public static void CleanupResponse(Account account, string replyPrefix, ServiceImport responseSi)
|
||||||
|
{
|
||||||
|
account.Exports.Responses.Remove(replyPrefix);
|
||||||
|
}
|
||||||
|
}
|
||||||
127
tests/NATS.Server.Tests/ResponseRoutingTests.cs
Normal file
127
tests/NATS.Server.Tests/ResponseRoutingTests.cs
Normal file
@@ -0,0 +1,127 @@
|
|||||||
|
using NATS.Server.Auth;
|
||||||
|
using NATS.Server.Imports;
|
||||||
|
|
||||||
|
namespace NATS.Server.Tests;
|
||||||
|
|
||||||
|
public class ResponseRoutingTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public void GenerateReplyPrefix_creates_unique_prefix()
|
||||||
|
{
|
||||||
|
var prefix1 = ResponseRouter.GenerateReplyPrefix();
|
||||||
|
var prefix2 = ResponseRouter.GenerateReplyPrefix();
|
||||||
|
|
||||||
|
prefix1.ShouldStartWith("_R_.");
|
||||||
|
prefix2.ShouldStartWith("_R_.");
|
||||||
|
prefix1.ShouldNotBe(prefix2);
|
||||||
|
prefix1.Length.ShouldBeGreaterThan(4);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void GenerateReplyPrefix_ends_with_dot()
|
||||||
|
{
|
||||||
|
var prefix = ResponseRouter.GenerateReplyPrefix();
|
||||||
|
|
||||||
|
prefix.ShouldEndWith(".");
|
||||||
|
// Format: "_R_." + 10 chars + "." = 15 chars
|
||||||
|
prefix.Length.ShouldBe(15);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Singleton_response_import_removed_after_delivery()
|
||||||
|
{
|
||||||
|
var exporter = new Account("exporter");
|
||||||
|
exporter.AddServiceExport("api.test", ServiceResponseType.Singleton, null);
|
||||||
|
|
||||||
|
var replyPrefix = ResponseRouter.GenerateReplyPrefix();
|
||||||
|
var responseSi = new ServiceImport
|
||||||
|
{
|
||||||
|
DestinationAccount = exporter,
|
||||||
|
From = replyPrefix + ">",
|
||||||
|
To = "_INBOX.original.reply",
|
||||||
|
IsResponse = true,
|
||||||
|
ResponseType = ServiceResponseType.Singleton,
|
||||||
|
};
|
||||||
|
exporter.Exports.Responses[replyPrefix] = responseSi;
|
||||||
|
|
||||||
|
exporter.Exports.Responses.ShouldContainKey(replyPrefix);
|
||||||
|
|
||||||
|
// Simulate singleton delivery cleanup
|
||||||
|
ResponseRouter.CleanupResponse(exporter, replyPrefix, responseSi);
|
||||||
|
|
||||||
|
exporter.Exports.Responses.ShouldNotContainKey(replyPrefix);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void CreateResponseImport_registers_in_exporter_responses()
|
||||||
|
{
|
||||||
|
var exporter = new Account("exporter");
|
||||||
|
var importer = new Account("importer");
|
||||||
|
exporter.AddServiceExport("api.test", ServiceResponseType.Singleton, null);
|
||||||
|
|
||||||
|
var originalSi = new ServiceImport
|
||||||
|
{
|
||||||
|
DestinationAccount = exporter,
|
||||||
|
From = "api.test",
|
||||||
|
To = "api.test",
|
||||||
|
Export = exporter.Exports.Services["api.test"],
|
||||||
|
ResponseType = ServiceResponseType.Singleton,
|
||||||
|
};
|
||||||
|
|
||||||
|
var responseSi = ResponseRouter.CreateResponseImport(exporter, originalSi, "_INBOX.abc123");
|
||||||
|
|
||||||
|
responseSi.IsResponse.ShouldBeTrue();
|
||||||
|
responseSi.ResponseType.ShouldBe(ServiceResponseType.Singleton);
|
||||||
|
responseSi.To.ShouldBe("_INBOX.abc123");
|
||||||
|
responseSi.DestinationAccount.ShouldBe(exporter);
|
||||||
|
responseSi.From.ShouldEndWith(">");
|
||||||
|
responseSi.Export.ShouldBe(originalSi.Export);
|
||||||
|
|
||||||
|
// Should be registered in the exporter's response map
|
||||||
|
exporter.Exports.Responses.Count.ShouldBe(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void CreateResponseImport_preserves_streamed_response_type()
|
||||||
|
{
|
||||||
|
var exporter = new Account("exporter");
|
||||||
|
exporter.AddServiceExport("api.stream", ServiceResponseType.Streamed, null);
|
||||||
|
|
||||||
|
var originalSi = new ServiceImport
|
||||||
|
{
|
||||||
|
DestinationAccount = exporter,
|
||||||
|
From = "api.stream",
|
||||||
|
To = "api.stream",
|
||||||
|
Export = exporter.Exports.Services["api.stream"],
|
||||||
|
ResponseType = ServiceResponseType.Streamed,
|
||||||
|
};
|
||||||
|
|
||||||
|
var responseSi = ResponseRouter.CreateResponseImport(exporter, originalSi, "_INBOX.xyz789");
|
||||||
|
|
||||||
|
responseSi.ResponseType.ShouldBe(ServiceResponseType.Streamed);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Multiple_response_imports_each_get_unique_prefix()
|
||||||
|
{
|
||||||
|
var exporter = new Account("exporter");
|
||||||
|
exporter.AddServiceExport("api.test", ServiceResponseType.Singleton, null);
|
||||||
|
|
||||||
|
var originalSi = new ServiceImport
|
||||||
|
{
|
||||||
|
DestinationAccount = exporter,
|
||||||
|
From = "api.test",
|
||||||
|
To = "api.test",
|
||||||
|
Export = exporter.Exports.Services["api.test"],
|
||||||
|
ResponseType = ServiceResponseType.Singleton,
|
||||||
|
};
|
||||||
|
|
||||||
|
var resp1 = ResponseRouter.CreateResponseImport(exporter, originalSi, "_INBOX.reply1");
|
||||||
|
var resp2 = ResponseRouter.CreateResponseImport(exporter, originalSi, "_INBOX.reply2");
|
||||||
|
|
||||||
|
exporter.Exports.Responses.Count.ShouldBe(2);
|
||||||
|
resp1.To.ShouldBe("_INBOX.reply1");
|
||||||
|
resp2.To.ShouldBe("_INBOX.reply2");
|
||||||
|
resp1.From.ShouldNotBe(resp2.From);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user