feat: wire remaining E2E gaps — account imports, subject transforms, JWT auth, service latency
Close all 5 server-side wiring gaps so E2E tests pass without skips: - System events: bridge user-defined system_account to internal $SYS - Account imports/exports: config parsing + reverse response import for cross-account request-reply - Subject transforms: parse mappings config block, apply in ProcessMessage - JWT auth: parse trusted_keys, resolver MEMORY, resolver_preload in config - Service latency: timestamp on request, publish ServiceLatencyMsg on response
This commit is contained in:
@@ -307,7 +307,7 @@ public sealed class Account : IDisposable
|
|||||||
return new ServiceExportInfo(subject, se.ResponseType, approved, isWildcard);
|
return new ServiceExportInfo(subject, se.ResponseType, approved, isWildcard);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void AddServiceExport(string subject, ServiceResponseType responseType, IEnumerable<Account>? approved)
|
public void AddServiceExport(string subject, ServiceResponseType responseType, IEnumerable<Account>? approved, ServiceLatency? latency = null)
|
||||||
{
|
{
|
||||||
var auth = new ExportAuth
|
var auth = new ExportAuth
|
||||||
{
|
{
|
||||||
@@ -318,6 +318,7 @@ public sealed class Account : IDisposable
|
|||||||
Auth = auth,
|
Auth = auth,
|
||||||
Account = this,
|
Account = this,
|
||||||
ResponseType = responseType,
|
ResponseType = responseType,
|
||||||
|
Latency = latency,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -5,4 +5,40 @@ public sealed class AccountConfig
|
|||||||
public int MaxConnections { get; init; } // 0 = unlimited
|
public int MaxConnections { get; init; } // 0 = unlimited
|
||||||
public int MaxSubscriptions { get; init; } // 0 = unlimited
|
public int MaxSubscriptions { get; init; } // 0 = unlimited
|
||||||
public Permissions? DefaultPermissions { get; init; }
|
public Permissions? DefaultPermissions { get; init; }
|
||||||
|
|
||||||
|
/// <summary>Service and stream exports from this account.</summary>
|
||||||
|
public List<ExportDefinition>? Exports { get; init; }
|
||||||
|
|
||||||
|
/// <summary>Service and stream imports into this account.</summary>
|
||||||
|
public List<ImportDefinition>? Imports { get; init; }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Represents an export declaration in config: exports = [{ service: "sub" }] or [{ stream: "sub" }].
|
||||||
|
/// Go reference: server/opts.go — parseExportStreamMap / parseExportServiceMap.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class ExportDefinition
|
||||||
|
{
|
||||||
|
public string? Service { get; init; }
|
||||||
|
public string? Stream { get; init; }
|
||||||
|
|
||||||
|
/// <summary>Optional latency tracking subject (e.g. "latency.svc.echo").</summary>
|
||||||
|
public string? LatencySubject { get; init; }
|
||||||
|
|
||||||
|
/// <summary>Latency sampling percentage (1–100, default 100).</summary>
|
||||||
|
public int LatencySampling { get; init; } = 100;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Represents an import declaration in config:
|
||||||
|
/// imports = [{ service: { account: X, subject: "sub" }, to: "local" }].
|
||||||
|
/// Go reference: server/opts.go — parseImportStreamMap / parseImportServiceMap.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class ImportDefinition
|
||||||
|
{
|
||||||
|
public string? ServiceAccount { get; init; }
|
||||||
|
public string? ServiceSubject { get; init; }
|
||||||
|
public string? StreamAccount { get; init; }
|
||||||
|
public string? StreamSubject { get; init; }
|
||||||
|
public string? To { get; init; }
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -291,7 +291,55 @@ public static class ConfigProcessor
|
|||||||
ParseAccounts(accountsDict, opts, errors);
|
ParseAccounts(accountsDict, opts, errors);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
// Unknown keys silently ignored (resolver, operator, etc.)
|
// Server-level subject mappings: mappings { src: dest }
|
||||||
|
// Go reference: server/opts.go — "mappings" case
|
||||||
|
case "mappings" or "maps":
|
||||||
|
if (value is Dictionary<string, object?> mappingsDict)
|
||||||
|
{
|
||||||
|
opts.SubjectMappings ??= new Dictionary<string, string>();
|
||||||
|
foreach (var (src, dest) in mappingsDict)
|
||||||
|
{
|
||||||
|
if (dest is string destStr)
|
||||||
|
opts.SubjectMappings[src] = destStr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
|
||||||
|
// JWT operator mode — trusted operator public NKeys
|
||||||
|
// Go reference: server/opts.go — "trusted_keys" / "trusted" case
|
||||||
|
case "trusted_keys" or "trusted":
|
||||||
|
opts.TrustedKeys = ParseStringArray(value);
|
||||||
|
break;
|
||||||
|
|
||||||
|
// JWT resolver type and preload
|
||||||
|
// Go reference: server/opts.go — "resolver" case
|
||||||
|
case "resolver" or "account_resolver" or "accounts_resolver":
|
||||||
|
if (value is string resolverStr && resolverStr.Equals("MEMORY", StringComparison.OrdinalIgnoreCase))
|
||||||
|
opts.AccountResolver = new Auth.Jwt.MemAccountResolver();
|
||||||
|
break;
|
||||||
|
|
||||||
|
// Pre-load account JWTs into the resolver
|
||||||
|
// Go reference: server/opts.go — "resolver_preload" case
|
||||||
|
case "resolver_preload":
|
||||||
|
if (value is Dictionary<string, object?> preloadDict && opts.AccountResolver != null)
|
||||||
|
{
|
||||||
|
foreach (var (accNkey, jwtObj) in preloadDict)
|
||||||
|
{
|
||||||
|
if (jwtObj is string jwt)
|
||||||
|
opts.AccountResolver.StoreAsync(accNkey, jwt).GetAwaiter().GetResult();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
|
||||||
|
// Operator key (can derive trusted_keys from operator JWT — for now just accept NKeys directly)
|
||||||
|
case "operator" or "operators" or "root" or "roots" or "root_operators" or "root_operator":
|
||||||
|
// For simple mode: treat as trusted_keys alias if string array
|
||||||
|
opts.TrustedKeys ??= ParseStringArray(value);
|
||||||
|
break;
|
||||||
|
|
||||||
|
// Unknown keys silently ignored
|
||||||
default:
|
default:
|
||||||
warnings.Add(new UnknownConfigFieldWarning(key).Message);
|
warnings.Add(new UnknownConfigFieldWarning(key).Message);
|
||||||
break;
|
break;
|
||||||
@@ -975,6 +1023,8 @@ public static class ConfigProcessor
|
|||||||
int maxConnections = 0;
|
int maxConnections = 0;
|
||||||
int maxSubscriptions = 0;
|
int maxSubscriptions = 0;
|
||||||
List<object?>? userList = null;
|
List<object?>? userList = null;
|
||||||
|
List<ExportDefinition>? exports = null;
|
||||||
|
List<ImportDefinition>? imports = null;
|
||||||
|
|
||||||
foreach (var (key, value) in acctDict)
|
foreach (var (key, value) in acctDict)
|
||||||
{
|
{
|
||||||
@@ -989,6 +1039,21 @@ public static class ConfigProcessor
|
|||||||
break;
|
break;
|
||||||
case "max_subscriptions" or "max_subs":
|
case "max_subscriptions" or "max_subs":
|
||||||
maxSubscriptions = ToInt(value);
|
maxSubscriptions = ToInt(value);
|
||||||
|
break;
|
||||||
|
case "exports":
|
||||||
|
if (value is List<object?> exportList)
|
||||||
|
exports = ParseExports(exportList);
|
||||||
|
break;
|
||||||
|
case "imports":
|
||||||
|
if (value is List<object?> importList)
|
||||||
|
imports = ParseImports(importList);
|
||||||
|
break;
|
||||||
|
case "mappings" or "maps":
|
||||||
|
if (value is Dictionary<string, object?> mappingsDict)
|
||||||
|
{
|
||||||
|
// Account-level subject mappings not yet supported
|
||||||
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -997,6 +1062,8 @@ public static class ConfigProcessor
|
|||||||
{
|
{
|
||||||
MaxConnections = maxConnections,
|
MaxConnections = maxConnections,
|
||||||
MaxSubscriptions = maxSubscriptions,
|
MaxSubscriptions = maxSubscriptions,
|
||||||
|
Exports = exports,
|
||||||
|
Imports = imports,
|
||||||
};
|
};
|
||||||
|
|
||||||
if (userList is not null)
|
if (userList is not null)
|
||||||
@@ -1020,6 +1087,140 @@ public static class ConfigProcessor
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Parses an exports array: [{ service: "sub" }, { stream: "sub" }].
|
||||||
|
/// Go reference: server/opts.go — parseExportStreamMap / parseExportServiceMap.
|
||||||
|
/// </summary>
|
||||||
|
private static List<ExportDefinition> ParseExports(List<object?> exportList)
|
||||||
|
{
|
||||||
|
var result = new List<ExportDefinition>();
|
||||||
|
foreach (var item in exportList)
|
||||||
|
{
|
||||||
|
if (item is not Dictionary<string, object?> dict)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
string? service = null, stream = null;
|
||||||
|
string? latencySubject = null;
|
||||||
|
int latencySampling = 100;
|
||||||
|
|
||||||
|
foreach (var (k, v) in dict)
|
||||||
|
{
|
||||||
|
switch (k.ToLowerInvariant())
|
||||||
|
{
|
||||||
|
case "service":
|
||||||
|
service = ToString(v);
|
||||||
|
break;
|
||||||
|
case "stream":
|
||||||
|
stream = ToString(v);
|
||||||
|
break;
|
||||||
|
case "latency":
|
||||||
|
// latency can be a string (subject only) or a map { subject, sampling }
|
||||||
|
// Go reference: server/opts.go — parseServiceLatency
|
||||||
|
if (v is string latStr)
|
||||||
|
{
|
||||||
|
latencySubject = latStr;
|
||||||
|
}
|
||||||
|
else if (v is Dictionary<string, object?> latDict)
|
||||||
|
{
|
||||||
|
foreach (var (lk, lv) in latDict)
|
||||||
|
{
|
||||||
|
switch (lk.ToLowerInvariant())
|
||||||
|
{
|
||||||
|
case "subject":
|
||||||
|
latencySubject = ToString(lv);
|
||||||
|
break;
|
||||||
|
case "sampling":
|
||||||
|
latencySampling = ToInt(lv);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result.Add(new ExportDefinition
|
||||||
|
{
|
||||||
|
Service = service,
|
||||||
|
Stream = stream,
|
||||||
|
LatencySubject = latencySubject,
|
||||||
|
LatencySampling = latencySampling,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Parses an imports array: [{ service: { account: X, subject: "sub" }, to: "local" }].
|
||||||
|
/// Go reference: server/opts.go — parseImportStreamMap / parseImportServiceMap.
|
||||||
|
/// </summary>
|
||||||
|
private static List<ImportDefinition> ParseImports(List<object?> importList)
|
||||||
|
{
|
||||||
|
var result = new List<ImportDefinition>();
|
||||||
|
foreach (var item in importList)
|
||||||
|
{
|
||||||
|
if (item is not Dictionary<string, object?> dict)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
string? serviceAccount = null, serviceSubject = null;
|
||||||
|
string? streamAccount = null, streamSubject = null;
|
||||||
|
string? to = null;
|
||||||
|
|
||||||
|
foreach (var (k, v) in dict)
|
||||||
|
{
|
||||||
|
switch (k.ToLowerInvariant())
|
||||||
|
{
|
||||||
|
case "service" when v is Dictionary<string, object?> svcDict:
|
||||||
|
foreach (var (sk, sv) in svcDict)
|
||||||
|
{
|
||||||
|
switch (sk.ToLowerInvariant())
|
||||||
|
{
|
||||||
|
case "account":
|
||||||
|
serviceAccount = ToString(sv);
|
||||||
|
break;
|
||||||
|
case "subject":
|
||||||
|
serviceSubject = ToString(sv);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
case "stream" when v is Dictionary<string, object?> strmDict:
|
||||||
|
foreach (var (sk, sv) in strmDict)
|
||||||
|
{
|
||||||
|
switch (sk.ToLowerInvariant())
|
||||||
|
{
|
||||||
|
case "account":
|
||||||
|
streamAccount = ToString(sv);
|
||||||
|
break;
|
||||||
|
case "subject":
|
||||||
|
streamSubject = ToString(sv);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
case "to":
|
||||||
|
to = ToString(v);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result.Add(new ImportDefinition
|
||||||
|
{
|
||||||
|
ServiceAccount = serviceAccount,
|
||||||
|
ServiceSubject = serviceSubject,
|
||||||
|
StreamAccount = streamAccount,
|
||||||
|
StreamSubject = streamSubject,
|
||||||
|
To = to,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Splits a users array into plain users and NKey users.
|
/// Splits a users array into plain users and NKey users.
|
||||||
/// An entry with an "nkey" field is an NKey user; entries with "user" are plain users.
|
/// An entry with an "nkey" field is an NKey user; entries with "user" are plain users.
|
||||||
@@ -1623,6 +1824,30 @@ public static class ConfigProcessor
|
|||||||
_ => throw new FormatException($"Cannot convert {value?.GetType().Name ?? "null"} to double"),
|
_ => throw new FormatException($"Cannot convert {value?.GetType().Name ?? "null"} to double"),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Parses a config value that can be a single string or a list of strings into a string[].
|
||||||
|
/// Go reference: server/opts.go — parseTrustedKeys accepts string, []string, []interface{}.
|
||||||
|
/// </summary>
|
||||||
|
private static string[]? ParseStringArray(object? value)
|
||||||
|
{
|
||||||
|
if (value is List<object?> list)
|
||||||
|
{
|
||||||
|
var result = new List<string>(list.Count);
|
||||||
|
foreach (var item in list)
|
||||||
|
{
|
||||||
|
if (item is string s)
|
||||||
|
result.Add(s);
|
||||||
|
}
|
||||||
|
|
||||||
|
return result.Count > 0 ? result.ToArray() : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (value is string str)
|
||||||
|
return [str];
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
private static IReadOnlyList<string> ToStringList(object? value)
|
private static IReadOnlyList<string> ToStringList(object? value)
|
||||||
{
|
{
|
||||||
if (value is List<object?> list)
|
if (value is List<object?> list)
|
||||||
|
|||||||
@@ -529,6 +529,17 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
_systemAccount = new Account(Account.SystemAccountName) { IsSystemAccount = true };
|
_systemAccount = new Account(Account.SystemAccountName) { IsSystemAccount = true };
|
||||||
_accounts[Account.SystemAccountName] = _systemAccount;
|
_accounts[Account.SystemAccountName] = _systemAccount;
|
||||||
|
|
||||||
|
// If a user-defined system_account is configured, promote that account to be the
|
||||||
|
// system account. Events published to $SYS.* will be delivered to subscribers on
|
||||||
|
// this account. Go reference: server/server.go — configureAccounts / setSystemAccount.
|
||||||
|
if (!string.IsNullOrEmpty(options.SystemAccount) &&
|
||||||
|
!string.Equals(options.SystemAccount, Account.SystemAccountName, StringComparison.OrdinalIgnoreCase))
|
||||||
|
{
|
||||||
|
var userSysAccount = GetOrCreateAccount(options.SystemAccount);
|
||||||
|
userSysAccount.IsSystemAccount = true;
|
||||||
|
_systemAccount = userSysAccount;
|
||||||
|
}
|
||||||
|
|
||||||
// Create system internal client and event system
|
// Create system internal client and event system
|
||||||
var sysClientId = Interlocked.Increment(ref _nextClientId);
|
var sysClientId = Interlocked.Increment(ref _nextClientId);
|
||||||
var sysClient = new InternalClient(sysClientId, ClientKind.System, _systemAccount);
|
var sysClient = new InternalClient(sysClientId, ClientKind.System, _systemAccount);
|
||||||
@@ -1312,7 +1323,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
if (si.Invalid) continue;
|
if (si.Invalid) continue;
|
||||||
if (SubjectMatch.MatchLiteral(subject, si.From))
|
if (SubjectMatch.MatchLiteral(subject, si.From))
|
||||||
{
|
{
|
||||||
ProcessServiceImport(si, subject, replyTo, headers, payload);
|
ProcessServiceImport(si, subject, replyTo, headers, payload, sender.Account);
|
||||||
delivered = true;
|
delivered = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1453,7 +1464,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
/// Reference: Go server/accounts.go addServiceImport / processServiceImport.
|
/// Reference: Go server/accounts.go addServiceImport / processServiceImport.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public void ProcessServiceImport(ServiceImport si, string subject, string? replyTo,
|
public void ProcessServiceImport(ServiceImport si, string subject, string? replyTo,
|
||||||
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
|
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload, Account? sourceAccount = null)
|
||||||
{
|
{
|
||||||
if (si.Invalid) return;
|
if (si.Invalid) return;
|
||||||
|
|
||||||
@@ -1477,6 +1488,24 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
targetSubject = MapImportSubject(subject, si.From, si.To);
|
targetSubject = MapImportSubject(subject, si.From, si.To);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set up a temporary reverse service import so that responses from the
|
||||||
|
// destination (exporter) account can route back to the source (importer)
|
||||||
|
// account. This handles request-reply across account boundaries.
|
||||||
|
// Go reference: client.go setupResponseServiceImport
|
||||||
|
if (replyTo != null && sourceAccount != null && !si.IsResponse)
|
||||||
|
{
|
||||||
|
SetupResponseServiceImport(si.DestinationAccount, sourceAccount, replyTo, si.Export);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Service latency tracking: when the response arrives back, compute elapsed
|
||||||
|
// time and publish a latency metric to the configured subject.
|
||||||
|
// Go reference: client.go processServiceImport — latency tracking path.
|
||||||
|
if (si.IsResponse && si.Tracking && si.TimestampTicks > 0)
|
||||||
|
{
|
||||||
|
var elapsed = TimeSpan.FromTicks(Environment.TickCount64 * TimeSpan.TicksPerMillisecond - si.TimestampTicks);
|
||||||
|
PublishServiceLatency(si, elapsed);
|
||||||
|
}
|
||||||
|
|
||||||
// Match against destination account's SubList
|
// Match against destination account's SubList
|
||||||
var destSubList = si.DestinationAccount.SubList;
|
var destSubList = si.DestinationAccount.SubList;
|
||||||
var result = destSubList.Match(targetSubject);
|
var result = destSubList.Match(targetSubject);
|
||||||
@@ -1498,6 +1527,36 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Creates a temporary reverse service import in the exporter's account so that
|
||||||
|
/// when the exporter publishes a response to the reply subject, the message is
|
||||||
|
/// forwarded back to the importer's account where the reply subscription lives.
|
||||||
|
/// Go reference: client.go setupResponseServiceImport.
|
||||||
|
/// </summary>
|
||||||
|
private static void SetupResponseServiceImport(Account exporterAccount, Account importerAccount, string replyTo, ServiceExport? export = null)
|
||||||
|
{
|
||||||
|
// Check if a reverse import for this reply subject already exists
|
||||||
|
if (exporterAccount.Imports.Services.ContainsKey(replyTo))
|
||||||
|
return;
|
||||||
|
|
||||||
|
// Determine if we should track latency for this response
|
||||||
|
var shouldTrack = export?.Latency is { } latency && LatencyTracker.ShouldSample(latency);
|
||||||
|
|
||||||
|
var reverseImport = new ServiceImport
|
||||||
|
{
|
||||||
|
DestinationAccount = importerAccount,
|
||||||
|
From = replyTo,
|
||||||
|
To = replyTo,
|
||||||
|
IsResponse = true,
|
||||||
|
UsePub = true,
|
||||||
|
Export = export,
|
||||||
|
Tracking = shouldTrack,
|
||||||
|
// Store start time as TickCount64 (milliseconds) converted to ticks for elapsed computation
|
||||||
|
TimestampTicks = shouldTrack ? Environment.TickCount64 * TimeSpan.TicksPerMillisecond : 0,
|
||||||
|
};
|
||||||
|
exporterAccount.Imports.AddServiceImport(reverseImport);
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Maps a published subject from the import "From" pattern to the "To" pattern.
|
/// Maps a published subject from the import "From" pattern to the "To" pattern.
|
||||||
/// For example, if From="requests.>" and To="api.>" and subject="requests.test",
|
/// For example, if From="requests.>" and To="api.>" and subject="requests.test",
|
||||||
@@ -1633,11 +1692,54 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
acc.MaxConnections = config.MaxConnections;
|
acc.MaxConnections = config.MaxConnections;
|
||||||
acc.MaxSubscriptions = config.MaxSubscriptions;
|
acc.MaxSubscriptions = config.MaxSubscriptions;
|
||||||
acc.DefaultPermissions = config.DefaultPermissions;
|
acc.DefaultPermissions = config.DefaultPermissions;
|
||||||
|
|
||||||
|
// Wire exports from config
|
||||||
|
if (config.Exports != null)
|
||||||
|
{
|
||||||
|
foreach (var export in config.Exports)
|
||||||
|
{
|
||||||
|
if (export.Service is { Length: > 0 } svc)
|
||||||
|
{
|
||||||
|
ServiceLatency? latency = export.LatencySubject is { Length: > 0 }
|
||||||
|
? new ServiceLatency { Subject = export.LatencySubject, SamplingPercentage = export.LatencySampling }
|
||||||
|
: null;
|
||||||
|
acc.AddServiceExport(svc, Imports.ServiceResponseType.Singleton, approved: null, latency: latency);
|
||||||
|
}
|
||||||
|
else if (export.Stream is { Length: > 0 } strm)
|
||||||
|
{
|
||||||
|
acc.AddStreamExport(strm, approved: null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wire imports from config (deferred — needs destination accounts resolved)
|
||||||
|
if (config.Imports != null)
|
||||||
|
WireAccountImports(acc, config.Imports);
|
||||||
}
|
}
|
||||||
|
|
||||||
return acc;
|
return acc;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void WireAccountImports(Account importer, List<Auth.ImportDefinition> imports)
|
||||||
|
{
|
||||||
|
foreach (var imp in imports)
|
||||||
|
{
|
||||||
|
if (imp.ServiceAccount is { Length: > 0 } svcAcct && imp.ServiceSubject is { Length: > 0 } svcSubj)
|
||||||
|
{
|
||||||
|
var dest = GetOrCreateAccount(svcAcct);
|
||||||
|
var localSubject = imp.To ?? svcSubj;
|
||||||
|
importer.AddServiceImport(dest, from: localSubject, to: svcSubj);
|
||||||
|
}
|
||||||
|
else if (imp.StreamAccount is { Length: > 0 } strmAcct && imp.StreamSubject is { Length: > 0 } strmSubj)
|
||||||
|
{
|
||||||
|
var source = GetOrCreateAccount(strmAcct);
|
||||||
|
var localSubject = imp.To ?? strmSubj;
|
||||||
|
importer.AddStreamImport(source, from: strmSubj, to: localSubject);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Returns true if the subject belongs to the $SYS subject space.
|
/// Returns true if the subject belongs to the $SYS subject space.
|
||||||
/// Reference: Go server/server.go — isReservedSubject.
|
/// Reference: Go server/server.go — isReservedSubject.
|
||||||
@@ -1675,6 +1777,25 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
return account?.SubList ?? _globalAccount.SubList;
|
return account?.SubList ?? _globalAccount.SubList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Publishes a service latency metric message to the configured latency subject.
|
||||||
|
/// Go reference: client.go processServiceImport — trackLatency path.
|
||||||
|
/// </summary>
|
||||||
|
private void PublishServiceLatency(ServiceImport si, TimeSpan elapsed)
|
||||||
|
{
|
||||||
|
var latency = si.Export?.Latency;
|
||||||
|
if (latency == null || string.IsNullOrEmpty(latency.Subject))
|
||||||
|
return;
|
||||||
|
|
||||||
|
var msg = LatencyTracker.BuildLatencyMsg(
|
||||||
|
requestor: si.DestinationAccount.Name,
|
||||||
|
responder: si.Export?.Account?.Name ?? "unknown",
|
||||||
|
serviceLatency: elapsed,
|
||||||
|
totalLatency: elapsed);
|
||||||
|
|
||||||
|
SendInternalMsg(latency.Subject, reply: null, msg);
|
||||||
|
}
|
||||||
|
|
||||||
public void SendInternalMsg(string subject, string? reply, object? msg)
|
public void SendInternalMsg(string subject, string? reply, object? msg)
|
||||||
{
|
{
|
||||||
_eventSystem?.Enqueue(new PublishMessage { Subject = subject, Reply = reply, Body = msg });
|
_eventSystem?.Enqueue(new PublishMessage { Subject = subject, Reply = reply, Body = msg });
|
||||||
|
|||||||
@@ -1,5 +1,8 @@
|
|||||||
|
using System.Text;
|
||||||
|
using System.Text.Json;
|
||||||
using NATS.Client.Core;
|
using NATS.Client.Core;
|
||||||
using NATS.E2E.Tests.Infrastructure;
|
using NATS.E2E.Tests.Infrastructure;
|
||||||
|
using NATS.NKeys;
|
||||||
|
|
||||||
namespace NATS.E2E.Tests;
|
namespace NATS.E2E.Tests;
|
||||||
|
|
||||||
@@ -62,8 +65,7 @@ public class AdvancedTests
|
|||||||
ex.ShouldNotBeNull();
|
ex.ShouldNotBeNull();
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact(Skip = "system_account promotion not yet wired: events route through internal $SYS account, not user-defined account")]
|
[Fact]
|
||||||
[SlopwatchSuppress("SW001", "User-defined system_account SubList is not bridged to the internal event system; tracked as a future milestone")]
|
|
||||||
public async Task SystemEvents_ClientConnect_EventPublished()
|
public async Task SystemEvents_ClientConnect_EventPublished()
|
||||||
{
|
{
|
||||||
var config = """
|
var config = """
|
||||||
@@ -106,8 +108,7 @@ public class AdvancedTests
|
|||||||
msg.Subject.ShouldContain("CONNECT");
|
msg.Subject.ShouldContain("CONNECT");
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact(Skip = "Cross-account service routing not yet implemented in message dispatch path")]
|
[Fact]
|
||||||
[SlopwatchSuppress("SW001", "Cross-account service import routing is not yet wired in the message dispatch path; tracked as a future milestone")]
|
|
||||||
public async Task AccountImportExport_CrossAccountServiceCall()
|
public async Task AccountImportExport_CrossAccountServiceCall()
|
||||||
{
|
{
|
||||||
var config = """
|
var config = """
|
||||||
@@ -164,17 +165,188 @@ public class AdvancedTests
|
|||||||
await responderTask;
|
await responderTask;
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact(Skip = "Subject transforms not yet implemented in config parsing")]
|
[Fact]
|
||||||
[SlopwatchSuppress("SW001", "Subject transform config parsing is not yet implemented; tracked for future milestone")]
|
public async Task ServiceLatency_CrossAccountCall_LatencyMessagePublished()
|
||||||
public Task SubjectTransforms_MappedSubject_ReceivedOnTarget()
|
|
||||||
{
|
{
|
||||||
return Task.CompletedTask;
|
var config = """
|
||||||
|
accounts {
|
||||||
|
SYS {
|
||||||
|
users = [{ user: "sys", password: "sys" }]
|
||||||
|
}
|
||||||
|
PROVIDER {
|
||||||
|
users = [{ user: "provider", password: "prov" }]
|
||||||
|
exports = [
|
||||||
|
{ service: "svc.echo", latency: "latency.svc.echo" }
|
||||||
|
]
|
||||||
|
}
|
||||||
|
CONSUMER {
|
||||||
|
users = [{ user: "consumer", password: "cons" }]
|
||||||
|
imports = [
|
||||||
|
{ service: { account: PROVIDER, subject: "svc.echo" } }
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
system_account: SYS
|
||||||
|
""";
|
||||||
|
|
||||||
|
await using var server = NatsServerProcess.WithConfig(config);
|
||||||
|
await server.StartAsync();
|
||||||
|
|
||||||
|
var url = $"nats://127.0.0.1:{server.Port}";
|
||||||
|
|
||||||
|
// System account client subscribes to latency events
|
||||||
|
await using var sysClient = new NatsConnection(new NatsOpts
|
||||||
|
{
|
||||||
|
Url = url,
|
||||||
|
AuthOpts = new NatsAuthOpts { Username = "sys", Password = "sys" },
|
||||||
|
});
|
||||||
|
await sysClient.ConnectAsync();
|
||||||
|
|
||||||
|
await using var latencySub = await sysClient.SubscribeCoreAsync<string>("latency.svc.echo");
|
||||||
|
await sysClient.PingAsync();
|
||||||
|
|
||||||
|
// Provider sets up the echo responder
|
||||||
|
await using var provider = new NatsConnection(new NatsOpts
|
||||||
|
{
|
||||||
|
Url = url,
|
||||||
|
AuthOpts = new NatsAuthOpts { Username = "provider", Password = "prov" },
|
||||||
|
});
|
||||||
|
await provider.ConnectAsync();
|
||||||
|
|
||||||
|
await using var svcSub = await provider.SubscribeCoreAsync<string>("svc.echo");
|
||||||
|
await provider.PingAsync();
|
||||||
|
|
||||||
|
var responderTask = Task.Run(async () =>
|
||||||
|
{
|
||||||
|
using var cts2 = new CancellationTokenSource(TimeSpan.FromSeconds(10));
|
||||||
|
var msg = await svcSub.Msgs.ReadAsync(cts2.Token);
|
||||||
|
await provider.PublishAsync(msg.ReplyTo!, $"echo: {msg.Data}");
|
||||||
|
});
|
||||||
|
|
||||||
|
// Consumer makes a cross-account service call
|
||||||
|
await using var consumer = new NatsConnection(new NatsOpts
|
||||||
|
{
|
||||||
|
Url = url,
|
||||||
|
AuthOpts = new NatsAuthOpts { Username = "consumer", Password = "cons" },
|
||||||
|
});
|
||||||
|
await consumer.ConnectAsync();
|
||||||
|
|
||||||
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
|
||||||
|
var reply = await consumer.RequestAsync<string, string>("svc.echo", "hello", cancellationToken: cts.Token);
|
||||||
|
reply.Data.ShouldBe("echo: hello");
|
||||||
|
|
||||||
|
await responderTask;
|
||||||
|
|
||||||
|
// Verify latency message was published to the system account
|
||||||
|
var latencyMsg = await latencySub.Msgs.ReadAsync(cts.Token);
|
||||||
|
latencyMsg.Subject.ShouldBe("latency.svc.echo");
|
||||||
|
latencyMsg.Data.ShouldNotBeNull();
|
||||||
|
latencyMsg.Data!.ShouldContain("service_latency");
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact(Skip = "JWT operator mode not yet implemented in config parsing")]
|
[Fact]
|
||||||
[SlopwatchSuppress("SW001", "JWT operator mode config parsing is not yet implemented; tracked for future milestone")]
|
public async Task SubjectTransforms_MappedSubject_ReceivedOnTarget()
|
||||||
public Task JwtAuth_ValidJwt_Connects()
|
|
||||||
{
|
{
|
||||||
return Task.CompletedTask;
|
var config = """
|
||||||
|
mappings {
|
||||||
|
"e2e.src": "e2e.dest"
|
||||||
|
}
|
||||||
|
""";
|
||||||
|
|
||||||
|
await using var server = NatsServerProcess.WithConfig(config);
|
||||||
|
await server.StartAsync();
|
||||||
|
|
||||||
|
var url = $"nats://127.0.0.1:{server.Port}";
|
||||||
|
|
||||||
|
await using var client = new NatsConnection(new NatsOpts { Url = url });
|
||||||
|
await client.ConnectAsync();
|
||||||
|
|
||||||
|
// Subscribe to the destination subject
|
||||||
|
await using var sub = await client.SubscribeCoreAsync<string>("e2e.dest");
|
||||||
|
await client.PingAsync();
|
||||||
|
|
||||||
|
// Publish to the source subject — should be transformed to destination
|
||||||
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||||
|
await client.PublishAsync("e2e.src", "mapped-payload", cancellationToken: cts.Token);
|
||||||
|
await client.PingAsync(cts.Token);
|
||||||
|
|
||||||
|
var msg = await sub.Msgs.ReadAsync(cts.Token);
|
||||||
|
msg.Data.ShouldBe("mapped-payload");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task JwtAuth_ValidJwt_Connects()
|
||||||
|
{
|
||||||
|
// Generate operator, account, and user NKey pairs
|
||||||
|
using var operatorKp = KeyPair.CreatePair(PrefixByte.Operator);
|
||||||
|
using var accountKp = KeyPair.CreatePair(PrefixByte.Account);
|
||||||
|
using var userKp = KeyPair.CreatePair(PrefixByte.User);
|
||||||
|
|
||||||
|
var operatorPub = operatorKp.GetPublicKey();
|
||||||
|
var accountPub = accountKp.GetPublicKey();
|
||||||
|
var userPub = userKp.GetPublicKey();
|
||||||
|
|
||||||
|
// Build account JWT (signed by operator)
|
||||||
|
var accountJwt = BuildJwt(new
|
||||||
|
{
|
||||||
|
sub = accountPub,
|
||||||
|
iss = operatorPub,
|
||||||
|
iat = DateTimeOffset.UtcNow.ToUnixTimeSeconds(),
|
||||||
|
nats = new { type = "account", version = 2 },
|
||||||
|
}, operatorKp);
|
||||||
|
|
||||||
|
// Build user JWT as bearer token (signed by account, no nonce needed)
|
||||||
|
var userJwt = BuildJwt(new
|
||||||
|
{
|
||||||
|
sub = userPub,
|
||||||
|
iss = accountPub,
|
||||||
|
iat = DateTimeOffset.UtcNow.ToUnixTimeSeconds(),
|
||||||
|
nats = new { type = "user", version = 2, bearer_token = true, issuer_account = accountPub },
|
||||||
|
}, accountKp);
|
||||||
|
|
||||||
|
var config = $$"""
|
||||||
|
trusted_keys: "{{operatorPub}}"
|
||||||
|
resolver: MEMORY
|
||||||
|
resolver_preload: {
|
||||||
|
{{accountPub}}: "{{accountJwt}}"
|
||||||
|
}
|
||||||
|
""";
|
||||||
|
|
||||||
|
await using var server = NatsServerProcess.WithConfig(config);
|
||||||
|
await server.StartAsync();
|
||||||
|
|
||||||
|
var url = $"nats://127.0.0.1:{server.Port}";
|
||||||
|
|
||||||
|
await using var client = new NatsConnection(new NatsOpts
|
||||||
|
{
|
||||||
|
Url = url,
|
||||||
|
AuthOpts = new NatsAuthOpts { Jwt = userJwt },
|
||||||
|
});
|
||||||
|
|
||||||
|
await client.ConnectAsync();
|
||||||
|
await client.PingAsync();
|
||||||
|
client.ConnectionState.ShouldBe(NatsConnectionState.Open);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Builds a signed NATS JWT using the given payload and NKey pair.
|
||||||
|
/// Wire format: base64url(header).base64url(payload).base64url(ed25519-signature).
|
||||||
|
/// </summary>
|
||||||
|
private static string BuildJwt(object payload, KeyPair signingKp)
|
||||||
|
{
|
||||||
|
var header = """{"typ":"jwt","alg":"ed25519-nkey"}""";
|
||||||
|
var payloadJson = JsonSerializer.Serialize(payload);
|
||||||
|
|
||||||
|
var headerB64 = Base64UrlEncode(Encoding.UTF8.GetBytes(header));
|
||||||
|
var payloadB64 = Base64UrlEncode(Encoding.UTF8.GetBytes(payloadJson));
|
||||||
|
|
||||||
|
var signingInput = Encoding.UTF8.GetBytes($"{headerB64}.{payloadB64}");
|
||||||
|
var sig = new byte[64];
|
||||||
|
signingKp.Sign(signingInput, sig);
|
||||||
|
|
||||||
|
return $"{headerB64}.{payloadB64}.{Base64UrlEncode(sig)}";
|
||||||
|
}
|
||||||
|
|
||||||
|
private static string Base64UrlEncode(byte[] data)
|
||||||
|
=> Convert.ToBase64String(data).TrimEnd('=').Replace('+', '-').Replace('/', '_');
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user