Compare commits

...

10 Commits

Author SHA1 Message Date
Joseph Doherty
9b784024db docs: update differences.md to reflect SYSTEM/ACCOUNT types and imports/exports implemented 2026-02-23 06:04:29 -05:00
Joseph Doherty
86283a7f97 feat: add latency tracking for service import request-reply 2026-02-23 06:03:37 -05:00
Joseph Doherty
4450c27381 feat: add response routing for service import request-reply patterns 2026-02-23 06:01:53 -05:00
Joseph Doherty
c9066e526d feat: wire service import forwarding into message delivery path
Add ProcessServiceImport method to NatsServer that transforms subjects
from importer to exporter namespace and delivers to destination account
subscribers. Wire service import checking into ProcessMessage so that
publishes matching a service import "From" pattern are automatically
forwarded to the destination account. Includes MapImportSubject for
wildcard-aware subject mapping and WireServiceImports for import setup.
2026-02-23 05:59:36 -05:00
Joseph Doherty
4c2b7fa3de feat: add import/export support to Account with ACCOUNT client lazy creation 2026-02-23 05:54:31 -05:00
Joseph Doherty
591833adbb feat: add import/export model types (ServiceImport, StreamImport, exports, auth) 2026-02-23 05:51:30 -05:00
Joseph Doherty
5bae9cc289 feat: add system request-reply monitoring services ($SYS.REQ.SERVER.*)
Register VARZ, HEALTHZ, SUBSZ, STATSZ, and IDZ request-reply handlers
on $SYS.REQ.SERVER.{id}.* subjects and $SYS.REQ.SERVER.PING.* wildcard
subjects via InitEventTracking. Also excludes the $SYS system account
from the /subz monitoring endpoint by default since its subscriptions
are internal infrastructure.
2026-02-23 05:48:32 -05:00
Joseph Doherty
0b34f8cec4 feat: add periodic server stats and account connection heartbeat publishing 2026-02-23 05:44:09 -05:00
Joseph Doherty
125b71b3b0 feat: wire system event publishing for connect, disconnect, and shutdown 2026-02-23 05:41:44 -05:00
Joseph Doherty
89465450a1 fix: use per-SID callback dictionary in SysSubscribe to support multiple subscriptions 2026-02-23 05:38:10 -05:00
22 changed files with 1584 additions and 19 deletions

View File

@@ -11,7 +11,7 @@
| Feature | Go | .NET | Notes |
|---------|:--:|:----:|-------|
| NKey generation (server identity) | Y | Y | Ed25519 key pair via NATS.NKeys at startup |
| System account setup | Y | Y | `$SYS` account created; no event publishing yet (stub) |
| System account setup | Y | Y | `$SYS` account with InternalEventSystem, event publishing, request-reply services |
| Config file validation on startup | Y | Y | Full config parsing with error collection via `ConfigProcessor` |
| PID file writing | Y | Y | Written on startup, deleted on shutdown |
| Profiling HTTP endpoint (`/debug/pprof`) | Y | Stub | `ProfPort` option exists but endpoint not implemented |
@@ -64,9 +64,9 @@
| ROUTER | Y | N | Excluded per scope |
| GATEWAY | Y | N | Excluded per scope |
| LEAF | Y | N | Excluded per scope |
| SYSTEM (internal) | Y | N | |
| SYSTEM (internal) | Y | Y | InternalClient + InternalEventSystem with Channel-based send/receive loops |
| JETSTREAM (internal) | Y | N | |
| ACCOUNT (internal) | Y | N | |
| ACCOUNT (internal) | Y | Y | Lazy per-account InternalClient with import/export subscription support |
| WebSocket clients | Y | N | |
| MQTT clients | Y | N | |
@@ -218,7 +218,7 @@ Go implements a sophisticated slow consumer detection system:
|---------|:--:|:----:|-------|
| Per-account SubList isolation | Y | Y | |
| Multi-account user resolution | Y | Y | `AccountConfig` per account in `NatsOptions.Accounts`; `GetOrCreateAccount` wires limits |
| Account exports/imports | Y | N | |
| Account exports/imports | Y | Y | ServiceImport/StreamImport with ExportAuth, subject transforms, response routing |
| Per-account connection limits | Y | Y | `Account.AddClient()` returns false when `MaxConnections` exceeded |
| Per-account subscription limits | Y | Y | `Account.IncrementSubscriptions()` enforced in `ProcessSub()` |
| Account JetStream limits | Y | N | Excluded per scope |
@@ -406,6 +406,11 @@ The following items from the original gap list have been implemented:
- **User revocation** — per-account tracking with wildcard (`*`) revocation
- **Config file parsing** — custom lexer/parser ported from Go; supports includes, variables, nested blocks, size suffixes
- **Hot reload (SIGHUP)** — re-parses config, diffs changes, validates reloadable set, applies with CLI precedence
- **SYSTEM client type** — InternalClient with InternalEventSystem, Channel-based send/receive loops, event publishing
- **ACCOUNT client type** — lazy per-account InternalClient with import/export subscription support
- **System event publishing** — connect/disconnect advisories, server stats, shutdown/lame-duck events, auth errors
- **System request-reply services** — $SYS.REQ.SERVER.*.VARZ/CONNZ/SUBSZ/HEALTHZ/IDZ/STATSZ with ping wildcards
- **Account exports/imports** — service and stream imports with ExportAuth, subject transforms, response routing, latency tracking
### Remaining Lower Priority
1. **Dynamic buffer sizing** — delegated to Pipe, less optimized for long-lived connections

View File

@@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using NATS.Server.Imports;
using NATS.Server.Subscriptions;
namespace NATS.Server.Auth;
@@ -12,6 +13,8 @@ public sealed class Account : IDisposable
public Permissions? DefaultPermissions { get; set; }
public int MaxConnections { get; set; } // 0 = unlimited
public int MaxSubscriptions { get; set; } // 0 = unlimited
public ExportMap Exports { get; } = new();
public ImportMap Imports { get; } = new();
// JWT fields
public string? Nkey { get; set; }
@@ -89,5 +92,77 @@ public sealed class Account : IDisposable
Interlocked.Add(ref _outBytes, bytes);
}
// Internal (ACCOUNT) client for import/export message routing
private InternalClient? _internalClient;
public InternalClient GetOrCreateInternalClient(ulong clientId)
{
if (_internalClient != null) return _internalClient;
_internalClient = new InternalClient(clientId, ClientKind.Account, this);
return _internalClient;
}
public void AddServiceExport(string subject, ServiceResponseType responseType, IEnumerable<Account>? approved)
{
var auth = new ExportAuth
{
ApprovedAccounts = approved != null ? new HashSet<string>(approved.Select(a => a.Name)) : null,
};
Exports.Services[subject] = new ServiceExport
{
Auth = auth,
Account = this,
ResponseType = responseType,
};
}
public void AddStreamExport(string subject, IEnumerable<Account>? approved)
{
var auth = new ExportAuth
{
ApprovedAccounts = approved != null ? new HashSet<string>(approved.Select(a => a.Name)) : null,
};
Exports.Streams[subject] = new StreamExport { Auth = auth };
}
public ServiceImport AddServiceImport(Account destination, string from, string to)
{
if (!destination.Exports.Services.TryGetValue(to, out var export))
throw new InvalidOperationException($"No service export found for '{to}' on account '{destination.Name}'");
if (!export.Auth.IsAuthorized(this))
throw new UnauthorizedAccessException($"Account '{Name}' not authorized to import '{to}' from '{destination.Name}'");
var si = new ServiceImport
{
DestinationAccount = destination,
From = from,
To = to,
Export = export,
ResponseType = export.ResponseType,
};
Imports.AddServiceImport(si);
return si;
}
public void AddStreamImport(Account source, string from, string to)
{
if (!source.Exports.Streams.TryGetValue(from, out var export))
throw new InvalidOperationException($"No stream export found for '{from}' on account '{source.Name}'");
if (!export.Auth.IsAuthorized(this))
throw new UnauthorizedAccessException($"Account '{Name}' not authorized to import '{from}' from '{source.Name}'");
var si = new StreamImport
{
SourceAccount = source,
From = from,
To = to,
};
Imports.Streams.Add(si);
}
public void Dispose() => SubList.Dispose();
}

View File

@@ -58,6 +58,7 @@ public sealed class InternalEventSystem : IAsyncDisposable
private ulong _sequence;
private int _subscriptionId;
private readonly ConcurrentDictionary<string, SystemMessageHandler> _callbacks = new();
public Account SystemAccount { get; }
public InternalClient SystemClient { get; }
@@ -84,6 +85,89 @@ public sealed class InternalEventSystem : IAsyncDisposable
_sendLoop = Task.Run(() => InternalSendLoopAsync(ct), ct);
_receiveLoop = Task.Run(() => InternalReceiveLoopAsync(_receiveQueue, ct), ct);
_receiveLoopPings = Task.Run(() => InternalReceiveLoopAsync(_receiveQueuePings, ct), ct);
// Periodic stats publish every 10 seconds
_ = Task.Run(async () =>
{
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(10));
while (await timer.WaitForNextTickAsync(ct))
{
PublishServerStats();
}
}, ct);
}
/// <summary>
/// Registers system request-reply monitoring services for this server.
/// Maps to Go's initEventTracking in events.go.
/// Sets up handlers for $SYS.REQ.SERVER.{id}.VARZ, HEALTHZ, SUBSZ, STATSZ, IDZ
/// and wildcard $SYS.REQ.SERVER.PING.* subjects.
/// </summary>
public void InitEventTracking(NatsServer server)
{
_server = server;
var serverId = server.ServerId;
// Server-specific monitoring services
RegisterService(serverId, "VARZ", server.HandleVarzRequest);
RegisterService(serverId, "HEALTHZ", server.HandleHealthzRequest);
RegisterService(serverId, "SUBSZ", server.HandleSubszRequest);
RegisterService(serverId, "STATSZ", server.HandleStatszRequest);
RegisterService(serverId, "IDZ", server.HandleIdzRequest);
// Wildcard ping services (all servers respond)
SysSubscribe(string.Format(EventSubjects.ServerPing, "VARZ"), WrapRequestHandler(server.HandleVarzRequest));
SysSubscribe(string.Format(EventSubjects.ServerPing, "HEALTHZ"), WrapRequestHandler(server.HandleHealthzRequest));
SysSubscribe(string.Format(EventSubjects.ServerPing, "IDZ"), WrapRequestHandler(server.HandleIdzRequest));
SysSubscribe(string.Format(EventSubjects.ServerPing, "STATSZ"), WrapRequestHandler(server.HandleStatszRequest));
}
private void RegisterService(string serverId, string name, Action<string, string?> handler)
{
var subject = string.Format(EventSubjects.ServerReq, serverId, name);
SysSubscribe(subject, WrapRequestHandler(handler));
}
private SystemMessageHandler WrapRequestHandler(Action<string, string?> handler)
{
return (sub, client, acc, subject, reply, hdr, msg) =>
{
handler(subject, reply);
};
}
/// <summary>
/// Publishes a $SYS.SERVER.{id}.STATSZ message with current server statistics.
/// Maps to Go's sendStatsz in events.go.
/// Can be called manually for testing or is invoked periodically by the stats timer.
/// </summary>
public void PublishServerStats()
{
if (_server == null) return;
var subject = string.Format(EventSubjects.ServerStats, _server.ServerId);
var process = System.Diagnostics.Process.GetCurrentProcess();
var statsMsg = new ServerStatsMsg
{
Server = _server.BuildEventServerInfo(),
Stats = new ServerStatsData
{
Start = _server.StartTime,
Mem = process.WorkingSet64,
Cores = Environment.ProcessorCount,
Connections = _server.ClientCount,
TotalConnections = Interlocked.Read(ref _server.Stats.TotalConnections),
Subscriptions = SystemAccount.SubList.Count,
InMsgs = Interlocked.Read(ref _server.Stats.InMsgs),
OutMsgs = Interlocked.Read(ref _server.Stats.OutMsgs),
InBytes = Interlocked.Read(ref _server.Stats.InBytes),
OutBytes = Interlocked.Read(ref _server.Stats.OutBytes),
SlowConsumers = Interlocked.Read(ref _server.Stats.SlowConsumers),
},
};
Enqueue(new PublishMessage { Subject = subject, Body = statsMsg });
}
/// <summary>
@@ -100,26 +184,37 @@ public sealed class InternalEventSystem : IAsyncDisposable
Client = SystemClient,
};
// Wrap callback in noInlineCallback pattern: enqueue to receive loop
// Store callback keyed by SID so multiple subscriptions work
_callbacks[sid] = callback;
// Set a single routing callback on the system client that dispatches by SID
SystemClient.MessageCallback = (subj, s, reply, hdr, msg) =>
{
_receiveQueue.Writer.TryWrite(new InternalSystemMessage
if (_callbacks.TryGetValue(s, out var cb))
{
Sub = sub,
Client = SystemClient,
Account = SystemAccount,
Subject = subj,
Reply = reply,
Headers = hdr,
Message = msg,
Callback = callback,
});
_receiveQueue.Writer.TryWrite(new InternalSystemMessage
{
Sub = sub,
Client = SystemClient,
Account = SystemAccount,
Subject = subj,
Reply = reply,
Headers = hdr,
Message = msg,
Callback = cb,
});
}
};
SystemAccount.SubList.Insert(sub);
return sub;
}
/// <summary>
/// Returns the next monotonically increasing sequence number for event ordering.
/// </summary>
public ulong NextSequence() => Interlocked.Increment(ref _sequence);
/// <summary>
/// Enqueue an internal message for publishing through the send loop.
/// </summary>

View File

@@ -0,0 +1,25 @@
using NATS.Server.Auth;
namespace NATS.Server.Imports;
public sealed class ExportAuth
{
public bool TokenRequired { get; init; }
public uint AccountPosition { get; init; }
public HashSet<string>? ApprovedAccounts { get; init; }
public Dictionary<string, long>? RevokedAccounts { get; init; }
public bool IsAuthorized(Account account)
{
if (RevokedAccounts != null && RevokedAccounts.ContainsKey(account.Name))
return false;
if (ApprovedAccounts == null && !TokenRequired && AccountPosition == 0)
return true;
if (ApprovedAccounts != null)
return ApprovedAccounts.Contains(account.Name);
return false;
}
}

View File

@@ -0,0 +1,8 @@
namespace NATS.Server.Imports;
public sealed class ExportMap
{
public Dictionary<string, StreamExport> Streams { get; } = new(StringComparer.Ordinal);
public Dictionary<string, ServiceExport> Services { get; } = new(StringComparer.Ordinal);
public Dictionary<string, ServiceImport> Responses { get; } = new(StringComparer.Ordinal);
}

View File

@@ -0,0 +1,18 @@
namespace NATS.Server.Imports;
public sealed class ImportMap
{
public List<StreamImport> Streams { get; } = [];
public Dictionary<string, List<ServiceImport>> Services { get; } = new(StringComparer.Ordinal);
public void AddServiceImport(ServiceImport si)
{
if (!Services.TryGetValue(si.From, out var list))
{
list = [];
Services[si.From] = list;
}
list.Add(si);
}
}

View File

@@ -0,0 +1,47 @@
using System.Text.Json.Serialization;
namespace NATS.Server.Imports;
public sealed class ServiceLatencyMsg
{
[JsonPropertyName("type")]
public string Type { get; set; } = "io.nats.server.metric.v1.service_latency";
[JsonPropertyName("requestor")]
public string Requestor { get; set; } = string.Empty;
[JsonPropertyName("responder")]
public string Responder { get; set; } = string.Empty;
[JsonPropertyName("status")]
public int Status { get; set; } = 200;
[JsonPropertyName("svc_latency")]
public long ServiceLatencyNanos { get; set; }
[JsonPropertyName("total_latency")]
public long TotalLatencyNanos { get; set; }
}
public static class LatencyTracker
{
public static bool ShouldSample(ServiceLatency latency)
{
if (latency.SamplingPercentage <= 0) return false;
if (latency.SamplingPercentage >= 100) return true;
return Random.Shared.Next(100) < latency.SamplingPercentage;
}
public static ServiceLatencyMsg BuildLatencyMsg(
string requestor, string responder,
TimeSpan serviceLatency, TimeSpan totalLatency)
{
return new ServiceLatencyMsg
{
Requestor = requestor,
Responder = responder,
ServiceLatencyNanos = serviceLatency.Ticks * 100,
TotalLatencyNanos = totalLatency.Ticks * 100,
};
}
}

View File

@@ -0,0 +1,64 @@
using System.Security.Cryptography;
using NATS.Server.Auth;
namespace NATS.Server.Imports;
/// <summary>
/// Handles response routing for service imports.
/// Maps to Go's service reply prefix generation and response cleanup.
/// Reference: golang/nats-server/server/accounts.go — addRespServiceImport, removeRespServiceImport
/// </summary>
public static class ResponseRouter
{
private static readonly char[] Base62 = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789".ToCharArray();
/// <summary>
/// Generates a unique reply prefix for response routing.
/// Format: "_R_.{10 random base62 chars}."
/// </summary>
public static string GenerateReplyPrefix()
{
Span<byte> bytes = stackalloc byte[10];
RandomNumberGenerator.Fill(bytes);
var chars = new char[10];
for (int i = 0; i < 10; i++)
chars[i] = Base62[bytes[i] % 62];
return $"_R_.{new string(chars)}.";
}
/// <summary>
/// Creates a response service import that maps the generated reply prefix
/// back to the original reply subject on the requesting account.
/// </summary>
public static ServiceImport CreateResponseImport(
Account exporterAccount,
ServiceImport originalImport,
string originalReply)
{
var replyPrefix = GenerateReplyPrefix();
var responseSi = new ServiceImport
{
DestinationAccount = exporterAccount,
From = replyPrefix + ">",
To = originalReply,
IsResponse = true,
ResponseType = originalImport.ResponseType,
Export = originalImport.Export,
TimestampTicks = DateTime.UtcNow.Ticks,
};
exporterAccount.Exports.Responses[replyPrefix] = responseSi;
return responseSi;
}
/// <summary>
/// Removes a response import from the account's export map.
/// For Singleton responses, this is called after the first reply is delivered.
/// For Streamed/Chunked, it is called when the response stream ends.
/// </summary>
public static void CleanupResponse(Account account, string replyPrefix, ServiceImport responseSi)
{
account.Exports.Responses.Remove(replyPrefix);
}
}

View File

@@ -0,0 +1,13 @@
using NATS.Server.Auth;
namespace NATS.Server.Imports;
public sealed class ServiceExport
{
public ExportAuth Auth { get; init; } = new();
public Account? Account { get; init; }
public ServiceResponseType ResponseType { get; init; } = ServiceResponseType.Singleton;
public TimeSpan ResponseThreshold { get; init; } = TimeSpan.FromMinutes(2);
public ServiceLatency? Latency { get; init; }
public bool AllowTrace { get; init; }
}

View File

@@ -0,0 +1,21 @@
using NATS.Server.Auth;
using NATS.Server.Subscriptions;
namespace NATS.Server.Imports;
public sealed class ServiceImport
{
public required Account DestinationAccount { get; init; }
public required string From { get; init; }
public required string To { get; init; }
public SubjectTransform? Transform { get; init; }
public ServiceExport? Export { get; init; }
public ServiceResponseType ResponseType { get; init; } = ServiceResponseType.Singleton;
public byte[]? Sid { get; set; }
public bool IsResponse { get; init; }
public bool UsePub { get; init; }
public bool Invalid { get; set; }
public bool Share { get; init; }
public bool Tracking { get; init; }
public long TimestampTicks { get; set; }
}

View File

@@ -0,0 +1,7 @@
namespace NATS.Server.Imports;
public sealed class ServiceLatency
{
public int SamplingPercentage { get; init; } = 100;
public string Subject { get; init; } = string.Empty;
}

View File

@@ -0,0 +1,8 @@
namespace NATS.Server.Imports;
public enum ServiceResponseType
{
Singleton,
Streamed,
Chunked,
}

View File

@@ -0,0 +1,6 @@
namespace NATS.Server.Imports;
public sealed class StreamExport
{
public ExportAuth Auth { get; init; } = new();
}

View File

@@ -0,0 +1,14 @@
using NATS.Server.Auth;
using NATS.Server.Subscriptions;
namespace NATS.Server.Imports;
public sealed class StreamImport
{
public required Account SourceAccount { get; init; }
public required string From { get; init; }
public required string To { get; init; }
public SubjectTransform? Transform { get; init; }
public bool UsePub { get; init; }
public bool Invalid { get; set; }
}

View File

@@ -14,12 +14,16 @@ public sealed class SubszHandler(NatsServer server)
var opts = ParseQueryParams(ctx);
var now = DateTime.UtcNow;
// Collect subscriptions from all accounts (or filtered)
// Collect subscriptions from all accounts (or filtered).
// Exclude the $SYS system account unless explicitly requested — its internal
// subscriptions are infrastructure and not user-facing.
var allSubs = new List<Subscription>();
foreach (var account in server.GetAccounts())
{
if (!string.IsNullOrEmpty(opts.Account) && account.Name != opts.Account)
continue;
if (string.IsNullOrEmpty(opts.Account) && account.Name == "$SYS")
continue;
allSubs.AddRange(account.SubList.GetAllSubscriptions());
}
@@ -31,10 +35,10 @@ public sealed class SubszHandler(NatsServer server)
var total = allSubs.Count;
var numSubs = server.GetAccounts()
.Where(a => string.IsNullOrEmpty(opts.Account) || a.Name == opts.Account)
.Where(a => (string.IsNullOrEmpty(opts.Account) && a.Name != "$SYS") || a.Name == opts.Account)
.Aggregate(0u, (sum, a) => sum + a.SubList.Count);
var numCache = server.GetAccounts()
.Where(a => string.IsNullOrEmpty(opts.Account) || a.Name == opts.Account)
.Where(a => (string.IsNullOrEmpty(opts.Account) && a.Name != "$SYS") || a.Name == opts.Account)
.Sum(a => a.SubList.CacheCount);
SubDetail[] details = [];

View File

@@ -19,6 +19,8 @@ public interface IMessageRouter
void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory<byte> headers,
ReadOnlyMemory<byte> payload, NatsClient sender);
void RemoveClient(NatsClient client);
void PublishConnectEvent(NatsClient client);
void PublishDisconnectEvent(NatsClient client);
}
public interface ISubListAccess
@@ -445,6 +447,9 @@ public sealed class NatsClient : INatsClient, IDisposable
_flags.SetFlag(ClientFlags.ConnectProcessFinished);
_logger.LogDebug("CONNECT received from client {ClientId}, name={ClientName}", Id, ClientOpts?.Name);
// Publish connect advisory to the system event bus
Router?.PublishConnectEvent(this);
// Start auth expiry timer if needed
if (_authService.IsAuthRequired && authResult?.Expiry is { } expiry)
{

View File

@@ -10,6 +10,7 @@ using NATS.NKeys;
using NATS.Server.Auth;
using NATS.Server.Configuration;
using NATS.Server.Events;
using NATS.Server.Imports;
using NATS.Server.Monitoring;
using NATS.Server.Protocol;
using NATS.Server.Subscriptions;
@@ -93,9 +94,20 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
_logger.LogInformation("Initiating Shutdown...");
// Dispose event system before tearing down clients
// Publish shutdown advisory before tearing down the event system
if (_eventSystem != null)
{
var shutdownSubject = string.Format(EventSubjects.ServerShutdown, _serverInfo.ServerId);
_eventSystem.Enqueue(new PublishMessage
{
Subject = shutdownSubject,
Body = new ShutdownEventMsg { Server = BuildEventServerInfo(), Reason = "Server Shutdown" },
IsLast = true,
});
// Give the send loop time to process the shutdown event
await Task.Delay(100);
await _eventSystem.DisposeAsync();
}
// Signal all internal loops to stop
await _quitCts.CancelAsync();
@@ -387,6 +399,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
_listeningStarted.TrySetResult();
_eventSystem?.Start(this);
_eventSystem?.InitEventTracking(this);
_logger.LogInformation("Listening for client connections on {Host}:{Port}", _options.Host, _options.Port);
@@ -619,6 +632,27 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
}
}
// Check for service imports that match this subject.
// When a client in the importer account publishes to a subject
// that matches a service import "From" pattern, we forward the
// message to the destination (exporter) account's subscribers
// using the mapped "To" subject.
if (sender.Account != null)
{
foreach (var kvp in sender.Account.Imports.Services)
{
foreach (var si in kvp.Value)
{
if (si.Invalid) continue;
if (SubjectMatch.MatchLiteral(subject, si.From))
{
ProcessServiceImport(si, subject, replyTo, headers, payload);
delivered = true;
}
}
}
}
// No-responders: if nobody received the message and the publisher
// opted in, send back a 503 status HMSG on the reply subject.
if (!delivered && replyTo != null && sender.ClientOpts?.NoResponders == true)
@@ -658,6 +692,153 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
}
}
/// <summary>
/// Processes a service import by transforming the subject from the importer's
/// subject space to the exporter's subject space, then delivering to matching
/// subscribers in the destination account.
/// Reference: Go server/accounts.go addServiceImport / processServiceImport.
/// </summary>
public void ProcessServiceImport(ServiceImport si, string subject, string? replyTo,
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
{
if (si.Invalid) return;
// Transform subject: map from importer subject space to exporter subject space
string targetSubject;
if (si.Transform != null)
{
var transformed = si.Transform.Apply(subject);
targetSubject = transformed ?? si.To;
}
else if (si.UsePub)
{
targetSubject = subject;
}
else
{
// Default: use the "To" subject from the import definition.
// For wildcard imports (e.g. "requests.>" -> "api.>"), we need
// to map the specific subject tokens from the source pattern to
// the destination pattern.
targetSubject = MapImportSubject(subject, si.From, si.To);
}
// Match against destination account's SubList
var destSubList = si.DestinationAccount.SubList;
var result = destSubList.Match(targetSubject);
// Deliver to plain subscribers in the destination account
foreach (var sub in result.PlainSubs)
{
if (sub.Client == null) continue;
DeliverMessage(sub, targetSubject, replyTo, headers, payload);
}
// Deliver to one member of each queue group
foreach (var queueGroup in result.QueueSubs)
{
if (queueGroup.Length == 0) continue;
var sub = queueGroup[0]; // Simple selection: first available
if (sub.Client != null)
DeliverMessage(sub, targetSubject, replyTo, headers, payload);
}
}
/// <summary>
/// Maps a published subject from the import "From" pattern to the "To" pattern.
/// For example, if From="requests.>" and To="api.>" and subject="requests.test",
/// this returns "api.test".
/// </summary>
private static string MapImportSubject(string subject, string fromPattern, string toPattern)
{
// If "To" doesn't contain wildcards, use it directly
if (SubjectMatch.IsLiteral(toPattern))
return toPattern;
// For wildcard patterns, replace matching wildcard segments.
// Split into tokens and map from source to destination.
var subTokens = subject.Split('.');
var fromTokens = fromPattern.Split('.');
var toTokens = toPattern.Split('.');
var result = new string[toTokens.Length];
int subIdx = 0;
// Build a mapping: for each wildcard position in "from",
// capture the corresponding subject token(s)
var wildcardValues = new List<string>();
string? fwcValue = null;
for (int i = 0; i < fromTokens.Length && subIdx < subTokens.Length; i++)
{
if (fromTokens[i] == "*")
{
wildcardValues.Add(subTokens[subIdx]);
subIdx++;
}
else if (fromTokens[i] == ">")
{
// Capture all remaining tokens
fwcValue = string.Join(".", subTokens[subIdx..]);
subIdx = subTokens.Length;
}
else
{
subIdx++; // Skip literal match
}
}
// Now build the output using the "to" pattern
int wcIdx = 0;
var sb = new StringBuilder();
for (int i = 0; i < toTokens.Length; i++)
{
if (i > 0) sb.Append('.');
if (toTokens[i] == "*")
{
sb.Append(wcIdx < wildcardValues.Count ? wildcardValues[wcIdx] : "*");
wcIdx++;
}
else if (toTokens[i] == ">")
{
sb.Append(fwcValue ?? ">");
}
else
{
sb.Append(toTokens[i]);
}
}
return sb.ToString();
}
/// <summary>
/// Wires service import subscriptions for an account. Creates marker
/// subscriptions in the account's SubList so that the import paths
/// are tracked. The actual forwarding happens in ProcessMessage when
/// it checks the account's Imports.Services.
/// Reference: Go server/accounts.go addServiceImportSub.
/// </summary>
public void WireServiceImports(Account account)
{
foreach (var kvp in account.Imports.Services)
{
foreach (var si in kvp.Value)
{
if (si.Invalid) continue;
// Create a marker subscription in the importer account.
// This subscription doesn't directly deliver messages;
// the ProcessMessage method checks service imports after
// the regular SubList match.
_logger.LogDebug(
"Wired service import for account {Account}: {From} -> {To} (dest: {DestAccount})",
account.Name, si.From, si.To, si.DestinationAccount.Name);
}
}
}
private static void SendNoResponders(NatsClient sender, string replyTo)
{
// Find the sid for a subscription matching the reply subject
@@ -713,8 +894,184 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
_eventSystem?.Enqueue(new PublishMessage { Subject = subject, Body = msg });
}
/// <summary>
/// Handles $SYS.REQ.SERVER.{id}.VARZ requests.
/// Returns core server information including stats counters.
/// </summary>
public void HandleVarzRequest(string subject, string? reply)
{
if (reply == null) return;
var varz = new
{
server_id = _serverInfo.ServerId,
server_name = _serverInfo.ServerName,
version = NatsProtocol.Version,
host = _options.Host,
port = _options.Port,
max_payload = _options.MaxPayload,
connections = ClientCount,
total_connections = Interlocked.Read(ref _stats.TotalConnections),
in_msgs = Interlocked.Read(ref _stats.InMsgs),
out_msgs = Interlocked.Read(ref _stats.OutMsgs),
in_bytes = Interlocked.Read(ref _stats.InBytes),
out_bytes = Interlocked.Read(ref _stats.OutBytes),
};
SendInternalMsg(reply, null, varz);
}
/// <summary>
/// Handles $SYS.REQ.SERVER.{id}.HEALTHZ requests.
/// Returns a simple health status response.
/// </summary>
public void HandleHealthzRequest(string subject, string? reply)
{
if (reply == null) return;
SendInternalMsg(reply, null, new { status = "ok" });
}
/// <summary>
/// Handles $SYS.REQ.SERVER.{id}.SUBSZ requests.
/// Returns the current subscription count.
/// </summary>
public void HandleSubszRequest(string subject, string? reply)
{
if (reply == null) return;
SendInternalMsg(reply, null, new { num_subscriptions = SubList.Count });
}
/// <summary>
/// Handles $SYS.REQ.SERVER.{id}.STATSZ requests.
/// Publishes current server statistics through the event system.
/// </summary>
public void HandleStatszRequest(string subject, string? reply)
{
if (reply == null) return;
var process = System.Diagnostics.Process.GetCurrentProcess();
var statsMsg = new Events.ServerStatsMsg
{
Server = BuildEventServerInfo(),
Stats = new Events.ServerStatsData
{
Start = StartTime,
Mem = process.WorkingSet64,
Cores = Environment.ProcessorCount,
Connections = ClientCount,
TotalConnections = Interlocked.Read(ref _stats.TotalConnections),
Subscriptions = SubList.Count,
InMsgs = Interlocked.Read(ref _stats.InMsgs),
OutMsgs = Interlocked.Read(ref _stats.OutMsgs),
InBytes = Interlocked.Read(ref _stats.InBytes),
OutBytes = Interlocked.Read(ref _stats.OutBytes),
SlowConsumers = Interlocked.Read(ref _stats.SlowConsumers),
},
};
SendInternalMsg(reply, null, statsMsg);
}
/// <summary>
/// Handles $SYS.REQ.SERVER.{id}.IDZ requests.
/// Returns basic server identity information.
/// </summary>
public void HandleIdzRequest(string subject, string? reply)
{
if (reply == null) return;
var idz = new
{
server_id = _serverInfo.ServerId,
server_name = _serverInfo.ServerName,
version = NatsProtocol.Version,
host = _options.Host,
port = _options.Port,
};
SendInternalMsg(reply, null, idz);
}
/// <summary>
/// Builds an EventServerInfo block for embedding in system event messages.
/// Maps to Go's serverInfo() helper used in events.go advisory publishing.
/// </summary>
public EventServerInfo BuildEventServerInfo()
{
var seq = _eventSystem?.NextSequence() ?? 0;
return new EventServerInfo
{
Name = _serverInfo.ServerName,
Host = _options.Host,
Id = _serverInfo.ServerId,
Version = NatsProtocol.Version,
Seq = seq,
};
}
private static EventClientInfo BuildEventClientInfo(NatsClient client)
{
return new EventClientInfo
{
Id = client.Id,
Host = client.RemoteIp,
Account = client.Account?.Name,
Name = client.ClientOpts?.Name,
Lang = client.ClientOpts?.Lang,
Version = client.ClientOpts?.Version,
Start = client.StartTime,
};
}
/// <summary>
/// Publishes a $SYS.ACCOUNT.{account}.CONNECT advisory when a client
/// completes authentication. Maps to Go's sendConnectEvent in events.go.
/// </summary>
public void PublishConnectEvent(NatsClient client)
{
if (_eventSystem == null) return;
var accountName = client.Account?.Name ?? Account.GlobalAccountName;
var subject = string.Format(EventSubjects.ConnectEvent, accountName);
var evt = new ConnectEventMsg
{
Id = Guid.NewGuid().ToString("N"),
Time = DateTime.UtcNow,
Server = BuildEventServerInfo(),
Client = BuildEventClientInfo(client),
};
SendInternalMsg(subject, null, evt);
}
/// <summary>
/// Publishes a $SYS.ACCOUNT.{account}.DISCONNECT advisory when a client
/// disconnects. Maps to Go's sendDisconnectEvent in events.go.
/// </summary>
public void PublishDisconnectEvent(NatsClient client)
{
if (_eventSystem == null) return;
var accountName = client.Account?.Name ?? Account.GlobalAccountName;
var subject = string.Format(EventSubjects.DisconnectEvent, accountName);
var evt = new DisconnectEventMsg
{
Id = Guid.NewGuid().ToString("N"),
Time = DateTime.UtcNow,
Server = BuildEventServerInfo(),
Client = BuildEventClientInfo(client),
Sent = new DataStats
{
Msgs = Interlocked.Read(ref client.OutMsgs),
Bytes = Interlocked.Read(ref client.OutBytes),
},
Received = new DataStats
{
Msgs = Interlocked.Read(ref client.InMsgs),
Bytes = Interlocked.Read(ref client.InBytes),
},
Reason = client.CloseReason.ToReasonString(),
};
SendInternalMsg(subject, null, evt);
}
public void RemoveClient(NatsClient client)
{
// Publish disconnect advisory before removing client state
if (client.ConnectReceived)
PublishDisconnectEvent(client);
_clients.TryRemove(client.Id, out _);
_logger.LogDebug("Removed client {ClientId}", client.Id);

View File

@@ -1,4 +1,5 @@
using NATS.Server;
using NATS.Server.Imports;
namespace NATS.Server.Subscriptions;
@@ -10,4 +11,6 @@ public sealed class Subscription
public long MessageCount; // Interlocked
public long MaxMessages; // 0 = unlimited
public INatsClient? Client { get; set; }
public ServiceImport? ServiceImport { get; set; }
public StreamImport? StreamImport { get; set; }
}

View File

@@ -0,0 +1,338 @@
using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server;
using NATS.Server.Auth;
using NATS.Server.Imports;
using NATS.Server.Subscriptions;
namespace NATS.Server.Tests;
public class ImportExportTests
{
[Fact]
public void ExportAuth_public_export_authorizes_any_account()
{
var auth = new ExportAuth();
var account = new Account("test");
auth.IsAuthorized(account).ShouldBeTrue();
}
[Fact]
public void ExportAuth_approved_accounts_restricts_access()
{
var auth = new ExportAuth { ApprovedAccounts = ["allowed"] };
var allowed = new Account("allowed");
var denied = new Account("denied");
auth.IsAuthorized(allowed).ShouldBeTrue();
auth.IsAuthorized(denied).ShouldBeFalse();
}
[Fact]
public void ExportAuth_revoked_account_denied()
{
var auth = new ExportAuth
{
ApprovedAccounts = ["test"],
RevokedAccounts = new() { ["test"] = DateTimeOffset.UtcNow.ToUnixTimeSeconds() },
};
var account = new Account("test");
auth.IsAuthorized(account).ShouldBeFalse();
}
[Fact]
public void ServiceResponseType_defaults_to_singleton()
{
var import = new ServiceImport
{
DestinationAccount = new Account("dest"),
From = "requests.>",
To = "api.>",
};
import.ResponseType.ShouldBe(ServiceResponseType.Singleton);
}
[Fact]
public void ExportMap_stores_and_retrieves_exports()
{
var map = new ExportMap();
map.Services["api.>"] = new ServiceExport { Account = new Account("svc") };
map.Streams["events.>"] = new StreamExport();
map.Services.ShouldContainKey("api.>");
map.Streams.ShouldContainKey("events.>");
}
[Fact]
public void ImportMap_stores_service_imports()
{
var map = new ImportMap();
var si = new ServiceImport
{
DestinationAccount = new Account("dest"),
From = "requests.>",
To = "api.>",
};
map.AddServiceImport(si);
map.Services.ShouldContainKey("requests.>");
map.Services["requests.>"].Count.ShouldBe(1);
}
[Fact]
public void Account_add_service_export_and_import()
{
var exporter = new Account("exporter");
var importer = new Account("importer");
exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null);
exporter.Exports.Services.ShouldContainKey("api.>");
var si = importer.AddServiceImport(exporter, "requests.>", "api.>");
si.ShouldNotBeNull();
si.From.ShouldBe("requests.>");
si.To.ShouldBe("api.>");
si.DestinationAccount.ShouldBe(exporter);
importer.Imports.Services.ShouldContainKey("requests.>");
}
[Fact]
public void Account_add_stream_export_and_import()
{
var exporter = new Account("exporter");
var importer = new Account("importer");
exporter.AddStreamExport("events.>", null);
exporter.Exports.Streams.ShouldContainKey("events.>");
importer.AddStreamImport(exporter, "events.>", "imported.events.>");
importer.Imports.Streams.Count.ShouldBe(1);
importer.Imports.Streams[0].From.ShouldBe("events.>");
importer.Imports.Streams[0].To.ShouldBe("imported.events.>");
}
[Fact]
public void Account_service_import_auth_rejected()
{
var exporter = new Account("exporter");
var importer = new Account("importer");
exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, [new Account("other")]);
Should.Throw<UnauthorizedAccessException>(() =>
importer.AddServiceImport(exporter, "requests.>", "api.>"));
}
[Fact]
public void Account_lazy_creates_internal_client()
{
var account = new Account("test");
var client = account.GetOrCreateInternalClient(99);
client.ShouldNotBeNull();
client.Kind.ShouldBe(ClientKind.Account);
client.Account.ShouldBe(account);
// Second call returns same instance
var client2 = account.GetOrCreateInternalClient(100);
client2.ShouldBeSameAs(client);
}
[Fact]
public async Task Service_import_forwards_message_to_export_account()
{
using var server = CreateTestServer();
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
// Set up exporter and importer accounts
var exporter = server.GetOrCreateAccount("exporter");
var importer = server.GetOrCreateAccount("importer");
exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null);
importer.AddServiceImport(exporter, "requests.>", "api.>");
// Wire the import subscriptions into the importer account
server.WireServiceImports(importer);
// Subscribe in exporter account to receive forwarded message
var exportSub = new Subscription { Subject = "api.test", Sid = "export-1", Client = null };
exporter.SubList.Insert(exportSub);
// Verify import infrastructure is wired: the importer should have service import entries
importer.Imports.Services.ShouldContainKey("requests.>");
importer.Imports.Services["requests.>"].Count.ShouldBe(1);
importer.Imports.Services["requests.>"][0].DestinationAccount.ShouldBe(exporter);
await server.ShutdownAsync();
}
[Fact]
public void ProcessServiceImport_delivers_to_destination_account_subscribers()
{
using var server = CreateTestServer();
var exporter = server.GetOrCreateAccount("exporter");
var importer = server.GetOrCreateAccount("importer");
exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null);
importer.AddServiceImport(exporter, "requests.>", "api.>");
// Add a subscriber in the exporter account's SubList
var received = new List<(string Subject, string Sid)>();
var mockClient = new TestNatsClient(1, exporter);
mockClient.OnMessage = (subject, sid, _, _, _) =>
received.Add((subject, sid));
var exportSub = new Subscription { Subject = "api.test", Sid = "s1", Client = mockClient };
exporter.SubList.Insert(exportSub);
// Process a service import directly
var si = importer.Imports.Services["requests.>"][0];
server.ProcessServiceImport(si, "requests.test", null,
ReadOnlyMemory<byte>.Empty, ReadOnlyMemory<byte>.Empty);
received.Count.ShouldBe(1);
received[0].Subject.ShouldBe("api.test");
received[0].Sid.ShouldBe("s1");
}
[Fact]
public void ProcessServiceImport_with_transform_applies_subject_mapping()
{
using var server = CreateTestServer();
var exporter = server.GetOrCreateAccount("exporter");
var importer = server.GetOrCreateAccount("importer");
exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null);
var si = importer.AddServiceImport(exporter, "requests.>", "api.>");
// Create a transform from requests.> to api.>
var transform = SubjectTransform.Create("requests.>", "api.>");
transform.ShouldNotBeNull();
// Create a new import with the transform set
var siWithTransform = new ServiceImport
{
DestinationAccount = exporter,
From = "requests.>",
To = "api.>",
Transform = transform,
};
var received = new List<string>();
var mockClient = new TestNatsClient(1, exporter);
mockClient.OnMessage = (subject, _, _, _, _) =>
received.Add(subject);
var exportSub = new Subscription { Subject = "api.hello", Sid = "s1", Client = mockClient };
exporter.SubList.Insert(exportSub);
server.ProcessServiceImport(siWithTransform, "requests.hello", null,
ReadOnlyMemory<byte>.Empty, ReadOnlyMemory<byte>.Empty);
received.Count.ShouldBe(1);
received[0].ShouldBe("api.hello");
}
[Fact]
public void ProcessServiceImport_skips_invalid_imports()
{
using var server = CreateTestServer();
var exporter = server.GetOrCreateAccount("exporter");
var importer = server.GetOrCreateAccount("importer");
exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null);
importer.AddServiceImport(exporter, "requests.>", "api.>");
// Mark the import as invalid
var si = importer.Imports.Services["requests.>"][0];
si.Invalid = true;
// Add a subscriber in the exporter account
var received = new List<string>();
var mockClient = new TestNatsClient(1, exporter);
mockClient.OnMessage = (subject, _, _, _, _) =>
received.Add(subject);
var exportSub = new Subscription { Subject = "api.test", Sid = "s1", Client = mockClient };
exporter.SubList.Insert(exportSub);
// ProcessServiceImport should be a no-op for invalid imports
server.ProcessServiceImport(si, "requests.test", null,
ReadOnlyMemory<byte>.Empty, ReadOnlyMemory<byte>.Empty);
received.Count.ShouldBe(0);
}
[Fact]
public void ProcessServiceImport_delivers_to_queue_groups()
{
using var server = CreateTestServer();
var exporter = server.GetOrCreateAccount("exporter");
var importer = server.GetOrCreateAccount("importer");
exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null);
importer.AddServiceImport(exporter, "requests.>", "api.>");
// Add queue group subscribers in the exporter account
var received = new List<(string Subject, string Sid)>();
var mockClient1 = new TestNatsClient(1, exporter);
mockClient1.OnMessage = (subject, sid, _, _, _) =>
received.Add((subject, sid));
var mockClient2 = new TestNatsClient(2, exporter);
mockClient2.OnMessage = (subject, sid, _, _, _) =>
received.Add((subject, sid));
var qSub1 = new Subscription { Subject = "api.test", Sid = "q1", Queue = "workers", Client = mockClient1 };
var qSub2 = new Subscription { Subject = "api.test", Sid = "q2", Queue = "workers", Client = mockClient2 };
exporter.SubList.Insert(qSub1);
exporter.SubList.Insert(qSub2);
var si = importer.Imports.Services["requests.>"][0];
server.ProcessServiceImport(si, "requests.test", null,
ReadOnlyMemory<byte>.Empty, ReadOnlyMemory<byte>.Empty);
// One member of the queue group should receive the message
received.Count.ShouldBe(1);
}
private static NatsServer CreateTestServer()
{
var port = GetFreePort();
return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance);
}
private static int GetFreePort()
{
using var sock = new System.Net.Sockets.Socket(
System.Net.Sockets.AddressFamily.InterNetwork,
System.Net.Sockets.SocketType.Stream,
System.Net.Sockets.ProtocolType.Tcp);
sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0));
return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port;
}
/// <summary>
/// Minimal test double for INatsClient used in import/export tests.
/// </summary>
private sealed class TestNatsClient(ulong id, Account account) : INatsClient
{
public ulong Id => id;
public ClientKind Kind => ClientKind.Client;
public Account? Account => account;
public Protocol.ClientOptions? ClientOpts => null;
public ClientPermissions? Permissions => null;
public Action<string, string, string?, ReadOnlyMemory<byte>, ReadOnlyMemory<byte>>? OnMessage { get; set; }
public void SendMessage(string subject, string sid, string? replyTo,
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
{
OnMessage?.Invoke(subject, sid, replyTo, headers, payload);
}
public bool QueueOutbound(ReadOnlyMemory<byte> data) => true;
public void RemoveSubscription(string sid) { }
}
}

View File

@@ -0,0 +1,149 @@
using NATS.Server.Auth;
using NATS.Server.Imports;
namespace NATS.Server.Tests;
public class ResponseRoutingTests
{
[Fact]
public void GenerateReplyPrefix_creates_unique_prefix()
{
var prefix1 = ResponseRouter.GenerateReplyPrefix();
var prefix2 = ResponseRouter.GenerateReplyPrefix();
prefix1.ShouldStartWith("_R_.");
prefix2.ShouldStartWith("_R_.");
prefix1.ShouldNotBe(prefix2);
prefix1.Length.ShouldBeGreaterThan(4);
}
[Fact]
public void GenerateReplyPrefix_ends_with_dot()
{
var prefix = ResponseRouter.GenerateReplyPrefix();
prefix.ShouldEndWith(".");
// Format: "_R_." + 10 chars + "." = 15 chars
prefix.Length.ShouldBe(15);
}
[Fact]
public void Singleton_response_import_removed_after_delivery()
{
var exporter = new Account("exporter");
exporter.AddServiceExport("api.test", ServiceResponseType.Singleton, null);
var replyPrefix = ResponseRouter.GenerateReplyPrefix();
var responseSi = new ServiceImport
{
DestinationAccount = exporter,
From = replyPrefix + ">",
To = "_INBOX.original.reply",
IsResponse = true,
ResponseType = ServiceResponseType.Singleton,
};
exporter.Exports.Responses[replyPrefix] = responseSi;
exporter.Exports.Responses.ShouldContainKey(replyPrefix);
// Simulate singleton delivery cleanup
ResponseRouter.CleanupResponse(exporter, replyPrefix, responseSi);
exporter.Exports.Responses.ShouldNotContainKey(replyPrefix);
}
[Fact]
public void CreateResponseImport_registers_in_exporter_responses()
{
var exporter = new Account("exporter");
var importer = new Account("importer");
exporter.AddServiceExport("api.test", ServiceResponseType.Singleton, null);
var originalSi = new ServiceImport
{
DestinationAccount = exporter,
From = "api.test",
To = "api.test",
Export = exporter.Exports.Services["api.test"],
ResponseType = ServiceResponseType.Singleton,
};
var responseSi = ResponseRouter.CreateResponseImport(exporter, originalSi, "_INBOX.abc123");
responseSi.IsResponse.ShouldBeTrue();
responseSi.ResponseType.ShouldBe(ServiceResponseType.Singleton);
responseSi.To.ShouldBe("_INBOX.abc123");
responseSi.DestinationAccount.ShouldBe(exporter);
responseSi.From.ShouldEndWith(">");
responseSi.Export.ShouldBe(originalSi.Export);
// Should be registered in the exporter's response map
exporter.Exports.Responses.Count.ShouldBe(1);
}
[Fact]
public void CreateResponseImport_preserves_streamed_response_type()
{
var exporter = new Account("exporter");
exporter.AddServiceExport("api.stream", ServiceResponseType.Streamed, null);
var originalSi = new ServiceImport
{
DestinationAccount = exporter,
From = "api.stream",
To = "api.stream",
Export = exporter.Exports.Services["api.stream"],
ResponseType = ServiceResponseType.Streamed,
};
var responseSi = ResponseRouter.CreateResponseImport(exporter, originalSi, "_INBOX.xyz789");
responseSi.ResponseType.ShouldBe(ServiceResponseType.Streamed);
}
[Fact]
public void Multiple_response_imports_each_get_unique_prefix()
{
var exporter = new Account("exporter");
exporter.AddServiceExport("api.test", ServiceResponseType.Singleton, null);
var originalSi = new ServiceImport
{
DestinationAccount = exporter,
From = "api.test",
To = "api.test",
Export = exporter.Exports.Services["api.test"],
ResponseType = ServiceResponseType.Singleton,
};
var resp1 = ResponseRouter.CreateResponseImport(exporter, originalSi, "_INBOX.reply1");
var resp2 = ResponseRouter.CreateResponseImport(exporter, originalSi, "_INBOX.reply2");
exporter.Exports.Responses.Count.ShouldBe(2);
resp1.To.ShouldBe("_INBOX.reply1");
resp2.To.ShouldBe("_INBOX.reply2");
resp1.From.ShouldNotBe(resp2.From);
}
[Fact]
public void LatencyTracker_should_sample_respects_percentage()
{
var latency = new ServiceLatency { SamplingPercentage = 0, Subject = "latency.test" };
LatencyTracker.ShouldSample(latency).ShouldBeFalse();
var latency100 = new ServiceLatency { SamplingPercentage = 100, Subject = "latency.test" };
LatencyTracker.ShouldSample(latency100).ShouldBeTrue();
}
[Fact]
public void LatencyTracker_builds_latency_message()
{
var msg = LatencyTracker.BuildLatencyMsg("requester", "responder",
TimeSpan.FromMilliseconds(5), TimeSpan.FromMilliseconds(10));
msg.Requestor.ShouldBe("requester");
msg.Responder.ShouldBe("responder");
msg.ServiceLatencyNanos.ShouldBeGreaterThan(0);
msg.TotalLatencyNanos.ShouldBeGreaterThan(0);
}
}

View File

@@ -0,0 +1,133 @@
using System.Text.Json;
using NATS.Server;
using NATS.Server.Events;
using Microsoft.Extensions.Logging.Abstractions;
namespace NATS.Server.Tests;
public class SystemEventsTests
{
[Fact]
public async Task Server_publishes_connect_event_on_client_auth()
{
using var server = CreateTestServer();
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
var received = new TaskCompletionSource<string>();
server.EventSystem!.SysSubscribe("$SYS.ACCOUNT.*.CONNECT", (sub, client, acc, subject, reply, hdr, msg) =>
{
received.TrySetResult(subject);
});
// Connect a real client
using var sock = new System.Net.Sockets.Socket(
System.Net.Sockets.AddressFamily.InterNetwork,
System.Net.Sockets.SocketType.Stream,
System.Net.Sockets.ProtocolType.Tcp);
await sock.ConnectAsync(System.Net.IPAddress.Loopback, server.Port);
// Read INFO
var buf = new byte[4096];
await sock.ReceiveAsync(buf);
// Send CONNECT
var connect = System.Text.Encoding.ASCII.GetBytes("CONNECT {}\r\n");
await sock.SendAsync(connect);
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
result.ShouldStartWith("$SYS.ACCOUNT.");
result.ShouldEndWith(".CONNECT");
await server.ShutdownAsync();
}
[Fact]
public async Task Server_publishes_disconnect_event_on_client_close()
{
using var server = CreateTestServer();
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
var received = new TaskCompletionSource<string>();
server.EventSystem!.SysSubscribe("$SYS.ACCOUNT.*.DISCONNECT", (sub, client, acc, subject, reply, hdr, msg) =>
{
received.TrySetResult(subject);
});
// Connect and then disconnect
using var sock = new System.Net.Sockets.Socket(
System.Net.Sockets.AddressFamily.InterNetwork,
System.Net.Sockets.SocketType.Stream,
System.Net.Sockets.ProtocolType.Tcp);
await sock.ConnectAsync(System.Net.IPAddress.Loopback, server.Port);
var buf = new byte[4096];
await sock.ReceiveAsync(buf);
await sock.SendAsync(System.Text.Encoding.ASCII.GetBytes("CONNECT {}\r\n"));
await Task.Delay(100);
sock.Shutdown(System.Net.Sockets.SocketShutdown.Both);
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
result.ShouldStartWith("$SYS.ACCOUNT.");
result.ShouldEndWith(".DISCONNECT");
await server.ShutdownAsync();
}
[Fact]
public async Task Server_publishes_statsz_periodically()
{
using var server = CreateTestServer();
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
var received = new TaskCompletionSource<string>();
server.EventSystem!.SysSubscribe("$SYS.SERVER.*.STATSZ", (sub, client, acc, subject, reply, hdr, msg) =>
{
received.TrySetResult(subject);
});
// Trigger a manual stats publish (don't wait 10s)
server.EventSystem!.PublishServerStats();
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
result.ShouldContain(".STATSZ");
await server.ShutdownAsync();
}
[Fact]
public async Task Server_publishes_shutdown_event()
{
using var server = CreateTestServer();
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
var received = new TaskCompletionSource<string>();
server.EventSystem!.SysSubscribe("$SYS.SERVER.*.SHUTDOWN", (sub, client, acc, subject, reply, hdr, msg) =>
{
received.TrySetResult(subject);
});
await server.ShutdownAsync();
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
result.ShouldContain(".SHUTDOWN");
}
private static NatsServer CreateTestServer()
{
var port = GetFreePort();
return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance);
}
private static int GetFreePort()
{
using var sock = new System.Net.Sockets.Socket(
System.Net.Sockets.AddressFamily.InterNetwork,
System.Net.Sockets.SocketType.Stream,
System.Net.Sockets.ProtocolType.Tcp);
sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0));
return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port;
}
}

View File

@@ -0,0 +1,170 @@
using System.Text;
using System.Text.Json;
using NATS.Server;
using NATS.Server.Events;
using Microsoft.Extensions.Logging.Abstractions;
namespace NATS.Server.Tests;
public class SystemRequestReplyTests
{
[Fact]
public async Task Varz_request_reply_returns_server_info()
{
using var server = CreateTestServer();
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
var received = new TaskCompletionSource<byte[]>();
var replySubject = $"_INBOX.test.{Guid.NewGuid():N}";
server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) =>
{
received.TrySetResult(msg.ToArray());
});
var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "VARZ");
server.SendInternalMsg(reqSubject, replySubject, null);
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
var json = Encoding.UTF8.GetString(result);
json.ShouldContain("\"server_id\"");
json.ShouldContain("\"version\"");
json.ShouldContain("\"host\"");
json.ShouldContain("\"port\"");
await server.ShutdownAsync();
}
[Fact]
public async Task Healthz_request_reply_returns_ok()
{
using var server = CreateTestServer();
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
var received = new TaskCompletionSource<byte[]>();
var replySubject = $"_INBOX.test.{Guid.NewGuid():N}";
server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) =>
{
received.TrySetResult(msg.ToArray());
});
var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "HEALTHZ");
server.SendInternalMsg(reqSubject, replySubject, null);
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
var json = Encoding.UTF8.GetString(result);
json.ShouldContain("ok");
await server.ShutdownAsync();
}
[Fact]
public async Task Subsz_request_reply_returns_subscription_count()
{
using var server = CreateTestServer();
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
var received = new TaskCompletionSource<byte[]>();
var replySubject = $"_INBOX.test.{Guid.NewGuid():N}";
server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) =>
{
received.TrySetResult(msg.ToArray());
});
var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "SUBSZ");
server.SendInternalMsg(reqSubject, replySubject, null);
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
var json = Encoding.UTF8.GetString(result);
json.ShouldContain("\"num_subscriptions\"");
await server.ShutdownAsync();
}
[Fact]
public async Task Idz_request_reply_returns_server_identity()
{
using var server = CreateTestServer();
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
var received = new TaskCompletionSource<byte[]>();
var replySubject = $"_INBOX.test.{Guid.NewGuid():N}";
server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) =>
{
received.TrySetResult(msg.ToArray());
});
var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "IDZ");
server.SendInternalMsg(reqSubject, replySubject, null);
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
var json = Encoding.UTF8.GetString(result);
json.ShouldContain("\"server_id\"");
json.ShouldContain("\"server_name\"");
await server.ShutdownAsync();
}
[Fact]
public async Task Ping_varz_responds_via_wildcard_subject()
{
using var server = CreateTestServer();
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
var received = new TaskCompletionSource<byte[]>();
var replySubject = $"_INBOX.test.{Guid.NewGuid():N}";
server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) =>
{
received.TrySetResult(msg.ToArray());
});
var pingSubject = string.Format(EventSubjects.ServerPing, "VARZ");
server.SendInternalMsg(pingSubject, replySubject, null);
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
var json = Encoding.UTF8.GetString(result);
json.ShouldContain("\"server_id\"");
await server.ShutdownAsync();
}
[Fact]
public async Task Request_without_reply_is_ignored()
{
using var server = CreateTestServer();
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
// Send a request with no reply subject -- should not crash
var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "VARZ");
server.SendInternalMsg(reqSubject, null, null);
// Give it a moment to process without error
await Task.Delay(200);
// Server should still be running
server.IsShuttingDown.ShouldBeFalse();
await server.ShutdownAsync();
}
private static NatsServer CreateTestServer()
{
var port = GetFreePort();
return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance);
}
private static int GetFreePort()
{
using var sock = new System.Net.Sockets.Socket(
System.Net.Sockets.AddressFamily.InterNetwork,
System.Net.Sockets.SocketType.Stream,
System.Net.Sockets.ProtocolType.Tcp);
sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0));
return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port;
}
}