From 95e9f0a92ef48b990af568b5148537637c1f0ab9 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 12 Mar 2026 23:03:12 -0400 Subject: [PATCH] =?UTF-8?q?feat:=20wire=20remaining=20E2E=20gaps=20?= =?UTF-8?q?=E2=80=94=20account=20imports,=20subject=20transforms,=20JWT=20?= =?UTF-8?q?auth,=20service=20latency?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Close all 5 server-side wiring gaps so E2E tests pass without skips: - System events: bridge user-defined system_account to internal $SYS - Account imports/exports: config parsing + reverse response import for cross-account request-reply - Subject transforms: parse mappings config block, apply in ProcessMessage - JWT auth: parse trusted_keys, resolver MEMORY, resolver_preload in config - Service latency: timestamp on request, publish ServiceLatencyMsg on response --- src/NATS.Server/Auth/Account.cs | 3 +- src/NATS.Server/Auth/AccountConfig.cs | 36 +++ .../Configuration/ConfigProcessor.cs | 227 +++++++++++++++++- src/NATS.Server/NatsServer.cs | 125 +++++++++- tests/NATS.E2E.Tests/AdvancedTests.cs | 196 ++++++++++++++- 5 files changed, 571 insertions(+), 16 deletions(-) diff --git a/src/NATS.Server/Auth/Account.cs b/src/NATS.Server/Auth/Account.cs index 55d54ae..c9cb1cf 100644 --- a/src/NATS.Server/Auth/Account.cs +++ b/src/NATS.Server/Auth/Account.cs @@ -307,7 +307,7 @@ public sealed class Account : IDisposable return new ServiceExportInfo(subject, se.ResponseType, approved, isWildcard); } - public void AddServiceExport(string subject, ServiceResponseType responseType, IEnumerable? approved) + public void AddServiceExport(string subject, ServiceResponseType responseType, IEnumerable? approved, ServiceLatency? latency = null) { var auth = new ExportAuth { @@ -318,6 +318,7 @@ public sealed class Account : IDisposable Auth = auth, Account = this, ResponseType = responseType, + Latency = latency, }; } diff --git a/src/NATS.Server/Auth/AccountConfig.cs b/src/NATS.Server/Auth/AccountConfig.cs index 7337f33..28d01c0 100644 --- a/src/NATS.Server/Auth/AccountConfig.cs +++ b/src/NATS.Server/Auth/AccountConfig.cs @@ -5,4 +5,40 @@ public sealed class AccountConfig public int MaxConnections { get; init; } // 0 = unlimited public int MaxSubscriptions { get; init; } // 0 = unlimited public Permissions? DefaultPermissions { get; init; } + + /// Service and stream exports from this account. + public List? Exports { get; init; } + + /// Service and stream imports into this account. + public List? Imports { get; init; } +} + +/// +/// Represents an export declaration in config: exports = [{ service: "sub" }] or [{ stream: "sub" }]. +/// Go reference: server/opts.go — parseExportStreamMap / parseExportServiceMap. +/// +public sealed class ExportDefinition +{ + public string? Service { get; init; } + public string? Stream { get; init; } + + /// Optional latency tracking subject (e.g. "latency.svc.echo"). + public string? LatencySubject { get; init; } + + /// Latency sampling percentage (1–100, default 100). + public int LatencySampling { get; init; } = 100; +} + +/// +/// Represents an import declaration in config: +/// imports = [{ service: { account: X, subject: "sub" }, to: "local" }]. +/// Go reference: server/opts.go — parseImportStreamMap / parseImportServiceMap. +/// +public sealed class ImportDefinition +{ + public string? ServiceAccount { get; init; } + public string? ServiceSubject { get; init; } + public string? StreamAccount { get; init; } + public string? StreamSubject { get; init; } + public string? To { get; init; } } diff --git a/src/NATS.Server/Configuration/ConfigProcessor.cs b/src/NATS.Server/Configuration/ConfigProcessor.cs index 560c998..bbe805f 100644 --- a/src/NATS.Server/Configuration/ConfigProcessor.cs +++ b/src/NATS.Server/Configuration/ConfigProcessor.cs @@ -291,7 +291,55 @@ public static class ConfigProcessor ParseAccounts(accountsDict, opts, errors); break; - // Unknown keys silently ignored (resolver, operator, etc.) + // Server-level subject mappings: mappings { src: dest } + // Go reference: server/opts.go — "mappings" case + case "mappings" or "maps": + if (value is Dictionary mappingsDict) + { + opts.SubjectMappings ??= new Dictionary(); + foreach (var (src, dest) in mappingsDict) + { + if (dest is string destStr) + opts.SubjectMappings[src] = destStr; + } + } + + break; + + // JWT operator mode — trusted operator public NKeys + // Go reference: server/opts.go — "trusted_keys" / "trusted" case + case "trusted_keys" or "trusted": + opts.TrustedKeys = ParseStringArray(value); + break; + + // JWT resolver type and preload + // Go reference: server/opts.go — "resolver" case + case "resolver" or "account_resolver" or "accounts_resolver": + if (value is string resolverStr && resolverStr.Equals("MEMORY", StringComparison.OrdinalIgnoreCase)) + opts.AccountResolver = new Auth.Jwt.MemAccountResolver(); + break; + + // Pre-load account JWTs into the resolver + // Go reference: server/opts.go — "resolver_preload" case + case "resolver_preload": + if (value is Dictionary preloadDict && opts.AccountResolver != null) + { + foreach (var (accNkey, jwtObj) in preloadDict) + { + if (jwtObj is string jwt) + opts.AccountResolver.StoreAsync(accNkey, jwt).GetAwaiter().GetResult(); + } + } + + break; + + // Operator key (can derive trusted_keys from operator JWT — for now just accept NKeys directly) + case "operator" or "operators" or "root" or "roots" or "root_operators" or "root_operator": + // For simple mode: treat as trusted_keys alias if string array + opts.TrustedKeys ??= ParseStringArray(value); + break; + + // Unknown keys silently ignored default: warnings.Add(new UnknownConfigFieldWarning(key).Message); break; @@ -975,6 +1023,8 @@ public static class ConfigProcessor int maxConnections = 0; int maxSubscriptions = 0; List? userList = null; + List? exports = null; + List? imports = null; foreach (var (key, value) in acctDict) { @@ -989,6 +1039,21 @@ public static class ConfigProcessor break; case "max_subscriptions" or "max_subs": maxSubscriptions = ToInt(value); + break; + case "exports": + if (value is List exportList) + exports = ParseExports(exportList); + break; + case "imports": + if (value is List importList) + imports = ParseImports(importList); + break; + case "mappings" or "maps": + if (value is Dictionary mappingsDict) + { + // Account-level subject mappings not yet supported + } + break; } } @@ -997,6 +1062,8 @@ public static class ConfigProcessor { MaxConnections = maxConnections, MaxSubscriptions = maxSubscriptions, + Exports = exports, + Imports = imports, }; if (userList is not null) @@ -1020,6 +1087,140 @@ public static class ConfigProcessor } } + /// + /// Parses an exports array: [{ service: "sub" }, { stream: "sub" }]. + /// Go reference: server/opts.go — parseExportStreamMap / parseExportServiceMap. + /// + private static List ParseExports(List exportList) + { + var result = new List(); + foreach (var item in exportList) + { + if (item is not Dictionary dict) + continue; + + string? service = null, stream = null; + string? latencySubject = null; + int latencySampling = 100; + + foreach (var (k, v) in dict) + { + switch (k.ToLowerInvariant()) + { + case "service": + service = ToString(v); + break; + case "stream": + stream = ToString(v); + break; + case "latency": + // latency can be a string (subject only) or a map { subject, sampling } + // Go reference: server/opts.go — parseServiceLatency + if (v is string latStr) + { + latencySubject = latStr; + } + else if (v is Dictionary latDict) + { + foreach (var (lk, lv) in latDict) + { + switch (lk.ToLowerInvariant()) + { + case "subject": + latencySubject = ToString(lv); + break; + case "sampling": + latencySampling = ToInt(lv); + break; + } + } + } + + break; + } + } + + result.Add(new ExportDefinition + { + Service = service, + Stream = stream, + LatencySubject = latencySubject, + LatencySampling = latencySampling, + }); + } + + return result; + } + + /// + /// Parses an imports array: [{ service: { account: X, subject: "sub" }, to: "local" }]. + /// Go reference: server/opts.go — parseImportStreamMap / parseImportServiceMap. + /// + private static List ParseImports(List importList) + { + var result = new List(); + foreach (var item in importList) + { + if (item is not Dictionary dict) + continue; + + string? serviceAccount = null, serviceSubject = null; + string? streamAccount = null, streamSubject = null; + string? to = null; + + foreach (var (k, v) in dict) + { + switch (k.ToLowerInvariant()) + { + case "service" when v is Dictionary svcDict: + foreach (var (sk, sv) in svcDict) + { + switch (sk.ToLowerInvariant()) + { + case "account": + serviceAccount = ToString(sv); + break; + case "subject": + serviceSubject = ToString(sv); + break; + } + } + + break; + case "stream" when v is Dictionary strmDict: + foreach (var (sk, sv) in strmDict) + { + switch (sk.ToLowerInvariant()) + { + case "account": + streamAccount = ToString(sv); + break; + case "subject": + streamSubject = ToString(sv); + break; + } + } + + break; + case "to": + to = ToString(v); + break; + } + } + + result.Add(new ImportDefinition + { + ServiceAccount = serviceAccount, + ServiceSubject = serviceSubject, + StreamAccount = streamAccount, + StreamSubject = streamSubject, + To = to, + }); + } + + return result; + } + /// /// Splits a users array into plain users and NKey users. /// An entry with an "nkey" field is an NKey user; entries with "user" are plain users. @@ -1623,6 +1824,30 @@ public static class ConfigProcessor _ => throw new FormatException($"Cannot convert {value?.GetType().Name ?? "null"} to double"), }; + /// + /// Parses a config value that can be a single string or a list of strings into a string[]. + /// Go reference: server/opts.go — parseTrustedKeys accepts string, []string, []interface{}. + /// + private static string[]? ParseStringArray(object? value) + { + if (value is List list) + { + var result = new List(list.Count); + foreach (var item in list) + { + if (item is string s) + result.Add(s); + } + + return result.Count > 0 ? result.ToArray() : null; + } + + if (value is string str) + return [str]; + + return null; + } + private static IReadOnlyList ToStringList(object? value) { if (value is List list) diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 0dfe642..558b8bb 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -529,6 +529,17 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _systemAccount = new Account(Account.SystemAccountName) { IsSystemAccount = true }; _accounts[Account.SystemAccountName] = _systemAccount; + // If a user-defined system_account is configured, promote that account to be the + // system account. Events published to $SYS.* will be delivered to subscribers on + // this account. Go reference: server/server.go — configureAccounts / setSystemAccount. + if (!string.IsNullOrEmpty(options.SystemAccount) && + !string.Equals(options.SystemAccount, Account.SystemAccountName, StringComparison.OrdinalIgnoreCase)) + { + var userSysAccount = GetOrCreateAccount(options.SystemAccount); + userSysAccount.IsSystemAccount = true; + _systemAccount = userSysAccount; + } + // Create system internal client and event system var sysClientId = Interlocked.Increment(ref _nextClientId); var sysClient = new InternalClient(sysClientId, ClientKind.System, _systemAccount); @@ -1312,7 +1323,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable if (si.Invalid) continue; if (SubjectMatch.MatchLiteral(subject, si.From)) { - ProcessServiceImport(si, subject, replyTo, headers, payload); + ProcessServiceImport(si, subject, replyTo, headers, payload, sender.Account); delivered = true; } } @@ -1453,7 +1464,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable /// Reference: Go server/accounts.go addServiceImport / processServiceImport. /// public void ProcessServiceImport(ServiceImport si, string subject, string? replyTo, - ReadOnlyMemory headers, ReadOnlyMemory payload) + ReadOnlyMemory headers, ReadOnlyMemory payload, Account? sourceAccount = null) { if (si.Invalid) return; @@ -1477,6 +1488,24 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable targetSubject = MapImportSubject(subject, si.From, si.To); } + // Set up a temporary reverse service import so that responses from the + // destination (exporter) account can route back to the source (importer) + // account. This handles request-reply across account boundaries. + // Go reference: client.go setupResponseServiceImport + if (replyTo != null && sourceAccount != null && !si.IsResponse) + { + SetupResponseServiceImport(si.DestinationAccount, sourceAccount, replyTo, si.Export); + } + + // Service latency tracking: when the response arrives back, compute elapsed + // time and publish a latency metric to the configured subject. + // Go reference: client.go processServiceImport — latency tracking path. + if (si.IsResponse && si.Tracking && si.TimestampTicks > 0) + { + var elapsed = TimeSpan.FromTicks(Environment.TickCount64 * TimeSpan.TicksPerMillisecond - si.TimestampTicks); + PublishServiceLatency(si, elapsed); + } + // Match against destination account's SubList var destSubList = si.DestinationAccount.SubList; var result = destSubList.Match(targetSubject); @@ -1498,6 +1527,36 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable } } + /// + /// Creates a temporary reverse service import in the exporter's account so that + /// when the exporter publishes a response to the reply subject, the message is + /// forwarded back to the importer's account where the reply subscription lives. + /// Go reference: client.go setupResponseServiceImport. + /// + private static void SetupResponseServiceImport(Account exporterAccount, Account importerAccount, string replyTo, ServiceExport? export = null) + { + // Check if a reverse import for this reply subject already exists + if (exporterAccount.Imports.Services.ContainsKey(replyTo)) + return; + + // Determine if we should track latency for this response + var shouldTrack = export?.Latency is { } latency && LatencyTracker.ShouldSample(latency); + + var reverseImport = new ServiceImport + { + DestinationAccount = importerAccount, + From = replyTo, + To = replyTo, + IsResponse = true, + UsePub = true, + Export = export, + Tracking = shouldTrack, + // Store start time as TickCount64 (milliseconds) converted to ticks for elapsed computation + TimestampTicks = shouldTrack ? Environment.TickCount64 * TimeSpan.TicksPerMillisecond : 0, + }; + exporterAccount.Imports.AddServiceImport(reverseImport); + } + /// /// Maps a published subject from the import "From" pattern to the "To" pattern. /// For example, if From="requests.>" and To="api.>" and subject="requests.test", @@ -1633,11 +1692,54 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable acc.MaxConnections = config.MaxConnections; acc.MaxSubscriptions = config.MaxSubscriptions; acc.DefaultPermissions = config.DefaultPermissions; + + // Wire exports from config + if (config.Exports != null) + { + foreach (var export in config.Exports) + { + if (export.Service is { Length: > 0 } svc) + { + ServiceLatency? latency = export.LatencySubject is { Length: > 0 } + ? new ServiceLatency { Subject = export.LatencySubject, SamplingPercentage = export.LatencySampling } + : null; + acc.AddServiceExport(svc, Imports.ServiceResponseType.Singleton, approved: null, latency: latency); + } + else if (export.Stream is { Length: > 0 } strm) + { + acc.AddStreamExport(strm, approved: null); + } + } + } + + // Wire imports from config (deferred — needs destination accounts resolved) + if (config.Imports != null) + WireAccountImports(acc, config.Imports); } + return acc; }); } + private void WireAccountImports(Account importer, List imports) + { + foreach (var imp in imports) + { + if (imp.ServiceAccount is { Length: > 0 } svcAcct && imp.ServiceSubject is { Length: > 0 } svcSubj) + { + var dest = GetOrCreateAccount(svcAcct); + var localSubject = imp.To ?? svcSubj; + importer.AddServiceImport(dest, from: localSubject, to: svcSubj); + } + else if (imp.StreamAccount is { Length: > 0 } strmAcct && imp.StreamSubject is { Length: > 0 } strmSubj) + { + var source = GetOrCreateAccount(strmAcct); + var localSubject = imp.To ?? strmSubj; + importer.AddStreamImport(source, from: strmSubj, to: localSubject); + } + } + } + /// /// Returns true if the subject belongs to the $SYS subject space. /// Reference: Go server/server.go — isReservedSubject. @@ -1675,6 +1777,25 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable return account?.SubList ?? _globalAccount.SubList; } + /// + /// Publishes a service latency metric message to the configured latency subject. + /// Go reference: client.go processServiceImport — trackLatency path. + /// + private void PublishServiceLatency(ServiceImport si, TimeSpan elapsed) + { + var latency = si.Export?.Latency; + if (latency == null || string.IsNullOrEmpty(latency.Subject)) + return; + + var msg = LatencyTracker.BuildLatencyMsg( + requestor: si.DestinationAccount.Name, + responder: si.Export?.Account?.Name ?? "unknown", + serviceLatency: elapsed, + totalLatency: elapsed); + + SendInternalMsg(latency.Subject, reply: null, msg); + } + public void SendInternalMsg(string subject, string? reply, object? msg) { _eventSystem?.Enqueue(new PublishMessage { Subject = subject, Reply = reply, Body = msg }); diff --git a/tests/NATS.E2E.Tests/AdvancedTests.cs b/tests/NATS.E2E.Tests/AdvancedTests.cs index 42634c3..d7b28ec 100644 --- a/tests/NATS.E2E.Tests/AdvancedTests.cs +++ b/tests/NATS.E2E.Tests/AdvancedTests.cs @@ -1,5 +1,8 @@ +using System.Text; +using System.Text.Json; using NATS.Client.Core; using NATS.E2E.Tests.Infrastructure; +using NATS.NKeys; namespace NATS.E2E.Tests; @@ -62,8 +65,7 @@ public class AdvancedTests ex.ShouldNotBeNull(); } - [Fact(Skip = "system_account promotion not yet wired: events route through internal $SYS account, not user-defined account")] - [SlopwatchSuppress("SW001", "User-defined system_account SubList is not bridged to the internal event system; tracked as a future milestone")] + [Fact] public async Task SystemEvents_ClientConnect_EventPublished() { var config = """ @@ -106,8 +108,7 @@ public class AdvancedTests msg.Subject.ShouldContain("CONNECT"); } - [Fact(Skip = "Cross-account service routing not yet implemented in message dispatch path")] - [SlopwatchSuppress("SW001", "Cross-account service import routing is not yet wired in the message dispatch path; tracked as a future milestone")] + [Fact] public async Task AccountImportExport_CrossAccountServiceCall() { var config = """ @@ -164,17 +165,188 @@ public class AdvancedTests await responderTask; } - [Fact(Skip = "Subject transforms not yet implemented in config parsing")] - [SlopwatchSuppress("SW001", "Subject transform config parsing is not yet implemented; tracked for future milestone")] - public Task SubjectTransforms_MappedSubject_ReceivedOnTarget() + [Fact] + public async Task ServiceLatency_CrossAccountCall_LatencyMessagePublished() { - return Task.CompletedTask; + var config = """ + accounts { + SYS { + users = [{ user: "sys", password: "sys" }] + } + PROVIDER { + users = [{ user: "provider", password: "prov" }] + exports = [ + { service: "svc.echo", latency: "latency.svc.echo" } + ] + } + CONSUMER { + users = [{ user: "consumer", password: "cons" }] + imports = [ + { service: { account: PROVIDER, subject: "svc.echo" } } + ] + } + } + system_account: SYS + """; + + await using var server = NatsServerProcess.WithConfig(config); + await server.StartAsync(); + + var url = $"nats://127.0.0.1:{server.Port}"; + + // System account client subscribes to latency events + await using var sysClient = new NatsConnection(new NatsOpts + { + Url = url, + AuthOpts = new NatsAuthOpts { Username = "sys", Password = "sys" }, + }); + await sysClient.ConnectAsync(); + + await using var latencySub = await sysClient.SubscribeCoreAsync("latency.svc.echo"); + await sysClient.PingAsync(); + + // Provider sets up the echo responder + await using var provider = new NatsConnection(new NatsOpts + { + Url = url, + AuthOpts = new NatsAuthOpts { Username = "provider", Password = "prov" }, + }); + await provider.ConnectAsync(); + + await using var svcSub = await provider.SubscribeCoreAsync("svc.echo"); + await provider.PingAsync(); + + var responderTask = Task.Run(async () => + { + using var cts2 = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var msg = await svcSub.Msgs.ReadAsync(cts2.Token); + await provider.PublishAsync(msg.ReplyTo!, $"echo: {msg.Data}"); + }); + + // Consumer makes a cross-account service call + await using var consumer = new NatsConnection(new NatsOpts + { + Url = url, + AuthOpts = new NatsAuthOpts { Username = "consumer", Password = "cons" }, + }); + await consumer.ConnectAsync(); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var reply = await consumer.RequestAsync("svc.echo", "hello", cancellationToken: cts.Token); + reply.Data.ShouldBe("echo: hello"); + + await responderTask; + + // Verify latency message was published to the system account + var latencyMsg = await latencySub.Msgs.ReadAsync(cts.Token); + latencyMsg.Subject.ShouldBe("latency.svc.echo"); + latencyMsg.Data.ShouldNotBeNull(); + latencyMsg.Data!.ShouldContain("service_latency"); } - [Fact(Skip = "JWT operator mode not yet implemented in config parsing")] - [SlopwatchSuppress("SW001", "JWT operator mode config parsing is not yet implemented; tracked for future milestone")] - public Task JwtAuth_ValidJwt_Connects() + [Fact] + public async Task SubjectTransforms_MappedSubject_ReceivedOnTarget() { - return Task.CompletedTask; + var config = """ + mappings { + "e2e.src": "e2e.dest" + } + """; + + await using var server = NatsServerProcess.WithConfig(config); + await server.StartAsync(); + + var url = $"nats://127.0.0.1:{server.Port}"; + + await using var client = new NatsConnection(new NatsOpts { Url = url }); + await client.ConnectAsync(); + + // Subscribe to the destination subject + await using var sub = await client.SubscribeCoreAsync("e2e.dest"); + await client.PingAsync(); + + // Publish to the source subject — should be transformed to destination + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + await client.PublishAsync("e2e.src", "mapped-payload", cancellationToken: cts.Token); + await client.PingAsync(cts.Token); + + var msg = await sub.Msgs.ReadAsync(cts.Token); + msg.Data.ShouldBe("mapped-payload"); } + + [Fact] + public async Task JwtAuth_ValidJwt_Connects() + { + // Generate operator, account, and user NKey pairs + using var operatorKp = KeyPair.CreatePair(PrefixByte.Operator); + using var accountKp = KeyPair.CreatePair(PrefixByte.Account); + using var userKp = KeyPair.CreatePair(PrefixByte.User); + + var operatorPub = operatorKp.GetPublicKey(); + var accountPub = accountKp.GetPublicKey(); + var userPub = userKp.GetPublicKey(); + + // Build account JWT (signed by operator) + var accountJwt = BuildJwt(new + { + sub = accountPub, + iss = operatorPub, + iat = DateTimeOffset.UtcNow.ToUnixTimeSeconds(), + nats = new { type = "account", version = 2 }, + }, operatorKp); + + // Build user JWT as bearer token (signed by account, no nonce needed) + var userJwt = BuildJwt(new + { + sub = userPub, + iss = accountPub, + iat = DateTimeOffset.UtcNow.ToUnixTimeSeconds(), + nats = new { type = "user", version = 2, bearer_token = true, issuer_account = accountPub }, + }, accountKp); + + var config = $$""" + trusted_keys: "{{operatorPub}}" + resolver: MEMORY + resolver_preload: { + {{accountPub}}: "{{accountJwt}}" + } + """; + + await using var server = NatsServerProcess.WithConfig(config); + await server.StartAsync(); + + var url = $"nats://127.0.0.1:{server.Port}"; + + await using var client = new NatsConnection(new NatsOpts + { + Url = url, + AuthOpts = new NatsAuthOpts { Jwt = userJwt }, + }); + + await client.ConnectAsync(); + await client.PingAsync(); + client.ConnectionState.ShouldBe(NatsConnectionState.Open); + } + + /// + /// Builds a signed NATS JWT using the given payload and NKey pair. + /// Wire format: base64url(header).base64url(payload).base64url(ed25519-signature). + /// + private static string BuildJwt(object payload, KeyPair signingKp) + { + var header = """{"typ":"jwt","alg":"ed25519-nkey"}"""; + var payloadJson = JsonSerializer.Serialize(payload); + + var headerB64 = Base64UrlEncode(Encoding.UTF8.GetBytes(header)); + var payloadB64 = Base64UrlEncode(Encoding.UTF8.GetBytes(payloadJson)); + + var signingInput = Encoding.UTF8.GetBytes($"{headerB64}.{payloadB64}"); + var sig = new byte[64]; + signingKp.Sign(signingInput, sig); + + return $"{headerB64}.{payloadB64}.{Base64UrlEncode(sig)}"; + } + + private static string Base64UrlEncode(byte[] data) + => Convert.ToBase64String(data).TrimEnd('=').Replace('+', '-').Replace('/', '_'); }