diff --git a/differences.md b/differences.md index 33a6453..08eeef1 100644 --- a/differences.md +++ b/differences.md @@ -11,7 +11,7 @@ | Feature | Go | .NET | Notes | |---------|:--:|:----:|-------| | NKey generation (server identity) | Y | Y | Ed25519 key pair via NATS.NKeys at startup | -| System account setup | Y | Y | `$SYS` account created; no event publishing yet (stub) | +| System account setup | Y | Y | `$SYS` account with InternalEventSystem, event publishing, request-reply services | | Config file validation on startup | Y | Y | Full config parsing with error collection via `ConfigProcessor` | | PID file writing | Y | Y | Written on startup, deleted on shutdown | | Profiling HTTP endpoint (`/debug/pprof`) | Y | Stub | `ProfPort` option exists but endpoint not implemented | @@ -64,9 +64,9 @@ | ROUTER | Y | N | Excluded per scope | | GATEWAY | Y | N | Excluded per scope | | LEAF | Y | N | Excluded per scope | -| SYSTEM (internal) | Y | N | | +| SYSTEM (internal) | Y | Y | InternalClient + InternalEventSystem with Channel-based send/receive loops | | JETSTREAM (internal) | Y | N | | -| ACCOUNT (internal) | Y | N | | +| ACCOUNT (internal) | Y | Y | Lazy per-account InternalClient with import/export subscription support | | WebSocket clients | Y | Y | Custom frame parser, permessage-deflate compression, origin checking, cookie auth | | MQTT clients | Y | N | | @@ -218,7 +218,7 @@ Go implements a sophisticated slow consumer detection system: |---------|:--:|:----:|-------| | Per-account SubList isolation | Y | Y | | | Multi-account user resolution | Y | Y | `AccountConfig` per account in `NatsOptions.Accounts`; `GetOrCreateAccount` wires limits | -| Account exports/imports | Y | N | | +| Account exports/imports | Y | Y | ServiceImport/StreamImport with ExportAuth, subject transforms, response routing | | Per-account connection limits | Y | Y | `Account.AddClient()` returns false when `MaxConnections` exceeded | | Per-account subscription limits | Y | Y | `Account.IncrementSubscriptions()` enforced in `ProcessSub()` | | Account JetStream limits | Y | N | Excluded per scope | @@ -407,6 +407,11 @@ The following items from the original gap list have been implemented: - **User revocation** — per-account tracking with wildcard (`*`) revocation - **Config file parsing** — custom lexer/parser ported from Go; supports includes, variables, nested blocks, size suffixes - **Hot reload (SIGHUP)** — re-parses config, diffs changes, validates reloadable set, applies with CLI precedence +- **SYSTEM client type** — InternalClient with InternalEventSystem, Channel-based send/receive loops, event publishing +- **ACCOUNT client type** — lazy per-account InternalClient with import/export subscription support +- **System event publishing** — connect/disconnect advisories, server stats, shutdown/lame-duck events, auth errors +- **System request-reply services** — $SYS.REQ.SERVER.*.VARZ/CONNZ/SUBSZ/HEALTHZ/IDZ/STATSZ with ping wildcards +- **Account exports/imports** — service and stream imports with ExportAuth, subject transforms, response routing, latency tracking ### Remaining Lower Priority 1. **Dynamic buffer sizing** — delegated to Pipe, less optimized for long-lived connections diff --git a/src/NATS.Server/Auth/Account.cs b/src/NATS.Server/Auth/Account.cs index bce25e1..078f438 100644 --- a/src/NATS.Server/Auth/Account.cs +++ b/src/NATS.Server/Auth/Account.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using NATS.Server.Imports; using NATS.Server.Subscriptions; namespace NATS.Server.Auth; @@ -12,6 +13,8 @@ public sealed class Account : IDisposable public Permissions? DefaultPermissions { get; set; } public int MaxConnections { get; set; } // 0 = unlimited public int MaxSubscriptions { get; set; } // 0 = unlimited + public ExportMap Exports { get; } = new(); + public ImportMap Imports { get; } = new(); // JWT fields public string? Nkey { get; set; } @@ -89,5 +92,77 @@ public sealed class Account : IDisposable Interlocked.Add(ref _outBytes, bytes); } + // Internal (ACCOUNT) client for import/export message routing + private InternalClient? _internalClient; + + public InternalClient GetOrCreateInternalClient(ulong clientId) + { + if (_internalClient != null) return _internalClient; + _internalClient = new InternalClient(clientId, ClientKind.Account, this); + return _internalClient; + } + + public void AddServiceExport(string subject, ServiceResponseType responseType, IEnumerable? approved) + { + var auth = new ExportAuth + { + ApprovedAccounts = approved != null ? new HashSet(approved.Select(a => a.Name)) : null, + }; + Exports.Services[subject] = new ServiceExport + { + Auth = auth, + Account = this, + ResponseType = responseType, + }; + } + + public void AddStreamExport(string subject, IEnumerable? approved) + { + var auth = new ExportAuth + { + ApprovedAccounts = approved != null ? new HashSet(approved.Select(a => a.Name)) : null, + }; + Exports.Streams[subject] = new StreamExport { Auth = auth }; + } + + public ServiceImport AddServiceImport(Account destination, string from, string to) + { + if (!destination.Exports.Services.TryGetValue(to, out var export)) + throw new InvalidOperationException($"No service export found for '{to}' on account '{destination.Name}'"); + + if (!export.Auth.IsAuthorized(this)) + throw new UnauthorizedAccessException($"Account '{Name}' not authorized to import '{to}' from '{destination.Name}'"); + + var si = new ServiceImport + { + DestinationAccount = destination, + From = from, + To = to, + Export = export, + ResponseType = export.ResponseType, + }; + + Imports.AddServiceImport(si); + return si; + } + + public void AddStreamImport(Account source, string from, string to) + { + if (!source.Exports.Streams.TryGetValue(from, out var export)) + throw new InvalidOperationException($"No stream export found for '{from}' on account '{source.Name}'"); + + if (!export.Auth.IsAuthorized(this)) + throw new UnauthorizedAccessException($"Account '{Name}' not authorized to import '{from}' from '{source.Name}'"); + + var si = new StreamImport + { + SourceAccount = source, + From = from, + To = to, + }; + + Imports.Streams.Add(si); + } + public void Dispose() => SubList.Dispose(); } diff --git a/src/NATS.Server/ClientKind.cs b/src/NATS.Server/ClientKind.cs new file mode 100644 index 0000000..4a238ca --- /dev/null +++ b/src/NATS.Server/ClientKind.cs @@ -0,0 +1,22 @@ +namespace NATS.Server; + +/// +/// Identifies the type of a client connection. +/// Maps to Go's client kind constants in client.go:45-65. +/// +public enum ClientKind +{ + Client, + Router, + Gateway, + Leaf, + System, + JetStream, + Account, +} + +public static class ClientKindExtensions +{ + public static bool IsInternal(this ClientKind kind) => + kind is ClientKind.System or ClientKind.JetStream or ClientKind.Account; +} diff --git a/src/NATS.Server/Events/EventJsonContext.cs b/src/NATS.Server/Events/EventJsonContext.cs new file mode 100644 index 0000000..7ac4ed2 --- /dev/null +++ b/src/NATS.Server/Events/EventJsonContext.cs @@ -0,0 +1,12 @@ +using System.Text.Json.Serialization; + +namespace NATS.Server.Events; + +[JsonSerializable(typeof(ConnectEventMsg))] +[JsonSerializable(typeof(DisconnectEventMsg))] +[JsonSerializable(typeof(AccountNumConns))] +[JsonSerializable(typeof(ServerStatsMsg))] +[JsonSerializable(typeof(ShutdownEventMsg))] +[JsonSerializable(typeof(LameDuckEventMsg))] +[JsonSerializable(typeof(AuthErrorEventMsg))] +internal partial class EventJsonContext : JsonSerializerContext; diff --git a/src/NATS.Server/Events/EventSubjects.cs b/src/NATS.Server/Events/EventSubjects.cs new file mode 100644 index 0000000..f29460f --- /dev/null +++ b/src/NATS.Server/Events/EventSubjects.cs @@ -0,0 +1,49 @@ +using NATS.Server.Auth; +using NATS.Server.Subscriptions; + +namespace NATS.Server.Events; + +/// +/// System event subject patterns. +/// Maps to Go events.go:41-97 subject constants. +/// +public static class EventSubjects +{ + // Account-scoped events + public const string ConnectEvent = "$SYS.ACCOUNT.{0}.CONNECT"; + public const string DisconnectEvent = "$SYS.ACCOUNT.{0}.DISCONNECT"; + public const string AccountConnsNew = "$SYS.ACCOUNT.{0}.SERVER.CONNS"; + public const string AccountConnsOld = "$SYS.SERVER.ACCOUNT.{0}.CONNS"; + + // Server-scoped events + public const string ServerStats = "$SYS.SERVER.{0}.STATSZ"; + public const string ServerShutdown = "$SYS.SERVER.{0}.SHUTDOWN"; + public const string ServerLameDuck = "$SYS.SERVER.{0}.LAMEDUCK"; + public const string AuthError = "$SYS.SERVER.{0}.CLIENT.AUTH.ERR"; + public const string AuthErrorAccount = "$SYS.ACCOUNT.CLIENT.AUTH.ERR"; + + // Request-reply subjects (server-specific) + public const string ServerReq = "$SYS.REQ.SERVER.{0}.{1}"; + + // Wildcard ping subjects (all servers respond) + public const string ServerPing = "$SYS.REQ.SERVER.PING.{0}"; + + // Account-scoped request subjects + public const string AccountReq = "$SYS.REQ.ACCOUNT.{0}.{1}"; + + // Inbox for responses + public const string InboxResponse = "$SYS._INBOX_.{0}"; +} + +/// +/// Callback signature for system message handlers. +/// Maps to Go's sysMsgHandler type in events.go:109. +/// +public delegate void SystemMessageHandler( + Subscription? sub, + INatsClient? client, + Account? account, + string subject, + string? reply, + ReadOnlyMemory headers, + ReadOnlyMemory message); diff --git a/src/NATS.Server/Events/EventTypes.cs b/src/NATS.Server/Events/EventTypes.cs new file mode 100644 index 0000000..9da36bb --- /dev/null +++ b/src/NATS.Server/Events/EventTypes.cs @@ -0,0 +1,270 @@ +using System.Text.Json.Serialization; + +namespace NATS.Server.Events; + +/// +/// Server identity block embedded in all system events. +/// +public sealed class EventServerInfo +{ + [JsonPropertyName("name")] + public string Name { get; set; } = string.Empty; + + [JsonPropertyName("host")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Host { get; set; } + + [JsonPropertyName("id")] + public string Id { get; set; } = string.Empty; + + [JsonPropertyName("cluster")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Cluster { get; set; } + + [JsonPropertyName("domain")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Domain { get; set; } + + [JsonPropertyName("ver")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Version { get; set; } + + [JsonPropertyName("seq")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public ulong Seq { get; set; } + + [JsonPropertyName("tags")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public Dictionary? Tags { get; set; } +} + +/// +/// Client identity block for connect/disconnect events. +/// +public sealed class EventClientInfo +{ + [JsonPropertyName("start")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public DateTime Start { get; set; } + + [JsonPropertyName("stop")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public DateTime Stop { get; set; } + + [JsonPropertyName("host")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Host { get; set; } + + [JsonPropertyName("id")] + public ulong Id { get; set; } + + [JsonPropertyName("acc")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Account { get; set; } + + [JsonPropertyName("name")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Name { get; set; } + + [JsonPropertyName("lang")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Lang { get; set; } + + [JsonPropertyName("ver")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Version { get; set; } + + [JsonPropertyName("rtt")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public long RttNanos { get; set; } +} + +public sealed class DataStats +{ + [JsonPropertyName("msgs")] + public long Msgs { get; set; } + + [JsonPropertyName("bytes")] + public long Bytes { get; set; } +} + +/// Client connect advisory. Go events.go:155-160. +public sealed class ConnectEventMsg +{ + public const string EventType = "io.nats.server.advisory.v1.client_connect"; + + [JsonPropertyName("type")] + public string Type { get; set; } = EventType; + + [JsonPropertyName("id")] + public string Id { get; set; } = string.Empty; + + [JsonPropertyName("timestamp")] + public DateTime Time { get; set; } + + [JsonPropertyName("server")] + public EventServerInfo Server { get; set; } = new(); + + [JsonPropertyName("client")] + public EventClientInfo Client { get; set; } = new(); +} + +/// Client disconnect advisory. Go events.go:167-174. +public sealed class DisconnectEventMsg +{ + public const string EventType = "io.nats.server.advisory.v1.client_disconnect"; + + [JsonPropertyName("type")] + public string Type { get; set; } = EventType; + + [JsonPropertyName("id")] + public string Id { get; set; } = string.Empty; + + [JsonPropertyName("timestamp")] + public DateTime Time { get; set; } + + [JsonPropertyName("server")] + public EventServerInfo Server { get; set; } = new(); + + [JsonPropertyName("client")] + public EventClientInfo Client { get; set; } = new(); + + [JsonPropertyName("sent")] + public DataStats Sent { get; set; } = new(); + + [JsonPropertyName("received")] + public DataStats Received { get; set; } = new(); + + [JsonPropertyName("reason")] + public string Reason { get; set; } = string.Empty; +} + +/// Account connection count heartbeat. Go events.go:210-214. +public sealed class AccountNumConns +{ + public const string EventType = "io.nats.server.advisory.v1.account_connections"; + + [JsonPropertyName("type")] + public string Type { get; set; } = EventType; + + [JsonPropertyName("id")] + public string Id { get; set; } = string.Empty; + + [JsonPropertyName("timestamp")] + public DateTime Time { get; set; } + + [JsonPropertyName("server")] + public EventServerInfo Server { get; set; } = new(); + + [JsonPropertyName("acc")] + public string AccountName { get; set; } = string.Empty; + + [JsonPropertyName("conns")] + public int Connections { get; set; } + + [JsonPropertyName("total_conns")] + public long TotalConnections { get; set; } + + [JsonPropertyName("subs")] + public int Subscriptions { get; set; } + + [JsonPropertyName("sent")] + public DataStats Sent { get; set; } = new(); + + [JsonPropertyName("received")] + public DataStats Received { get; set; } = new(); +} + +/// Server stats broadcast. Go events.go:150-153. +public sealed class ServerStatsMsg +{ + [JsonPropertyName("server")] + public EventServerInfo Server { get; set; } = new(); + + [JsonPropertyName("statsz")] + public ServerStatsData Stats { get; set; } = new(); +} + +public sealed class ServerStatsData +{ + [JsonPropertyName("start")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public DateTime Start { get; set; } + + [JsonPropertyName("mem")] + public long Mem { get; set; } + + [JsonPropertyName("cores")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public int Cores { get; set; } + + [JsonPropertyName("connections")] + public int Connections { get; set; } + + [JsonPropertyName("total_connections")] + public long TotalConnections { get; set; } + + [JsonPropertyName("active_accounts")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public int ActiveAccounts { get; set; } + + [JsonPropertyName("subscriptions")] + public long Subscriptions { get; set; } + + [JsonPropertyName("in_msgs")] + public long InMsgs { get; set; } + + [JsonPropertyName("out_msgs")] + public long OutMsgs { get; set; } + + [JsonPropertyName("in_bytes")] + public long InBytes { get; set; } + + [JsonPropertyName("out_bytes")] + public long OutBytes { get; set; } + + [JsonPropertyName("slow_consumers")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public long SlowConsumers { get; set; } +} + +/// Server shutdown notification. +public sealed class ShutdownEventMsg +{ + [JsonPropertyName("server")] + public EventServerInfo Server { get; set; } = new(); + + [JsonPropertyName("reason")] + public string Reason { get; set; } = string.Empty; +} + +/// Lame duck mode notification. +public sealed class LameDuckEventMsg +{ + [JsonPropertyName("server")] + public EventServerInfo Server { get; set; } = new(); +} + +/// Auth error advisory. +public sealed class AuthErrorEventMsg +{ + public const string EventType = "io.nats.server.advisory.v1.client_auth"; + + [JsonPropertyName("type")] + public string Type { get; set; } = EventType; + + [JsonPropertyName("id")] + public string Id { get; set; } = string.Empty; + + [JsonPropertyName("timestamp")] + public DateTime Time { get; set; } + + [JsonPropertyName("server")] + public EventServerInfo Server { get; set; } = new(); + + [JsonPropertyName("client")] + public EventClientInfo Client { get; set; } = new(); + + [JsonPropertyName("reason")] + public string Reason { get; set; } = string.Empty; +} diff --git a/src/NATS.Server/Events/InternalEventSystem.cs b/src/NATS.Server/Events/InternalEventSystem.cs new file mode 100644 index 0000000..caac5dd --- /dev/null +++ b/src/NATS.Server/Events/InternalEventSystem.cs @@ -0,0 +1,333 @@ +using System.Collections.Concurrent; +using System.Security.Cryptography; +using System.Text; +using System.Text.Json; +using System.Threading.Channels; +using Microsoft.Extensions.Logging; +using NATS.Server.Auth; +using NATS.Server.Subscriptions; + +namespace NATS.Server.Events; + +/// +/// Internal publish message queued for the send loop. +/// +public sealed class PublishMessage +{ + public InternalClient? Client { get; init; } + public required string Subject { get; init; } + public string? Reply { get; init; } + public byte[]? Headers { get; init; } + public object? Body { get; init; } + public bool Echo { get; init; } + public bool IsLast { get; init; } +} + +/// +/// Internal received message queued for the receive loop. +/// +public sealed class InternalSystemMessage +{ + public required Subscription? Sub { get; init; } + public required INatsClient? Client { get; init; } + public required Account? Account { get; init; } + public required string Subject { get; init; } + public required string? Reply { get; init; } + public required ReadOnlyMemory Headers { get; init; } + public required ReadOnlyMemory Message { get; init; } + public required SystemMessageHandler Callback { get; init; } +} + +/// +/// Manages the server's internal event system with Channel-based send/receive loops. +/// Maps to Go's internal struct in events.go:124-147 and the goroutines +/// internalSendLoop (events.go:495) and internalReceiveLoop (events.go:476). +/// +public sealed class InternalEventSystem : IAsyncDisposable +{ + private readonly ILogger _logger; + private readonly Channel _sendQueue; + private readonly Channel _receiveQueue; + private readonly Channel _receiveQueuePings; + private readonly CancellationTokenSource _cts = new(); + + private Task? _sendLoop; + private Task? _receiveLoop; + private Task? _receiveLoopPings; + private NatsServer? _server; + + private ulong _sequence; + private int _subscriptionId; + private readonly ConcurrentDictionary _callbacks = new(); + + public Account SystemAccount { get; } + public InternalClient SystemClient { get; } + public string ServerHash { get; } + + public InternalEventSystem(Account systemAccount, InternalClient systemClient, string serverName, ILogger logger) + { + _logger = logger; + SystemAccount = systemAccount; + SystemClient = systemClient; + + // Hash server name for inbox routing (matches Go's shash) + ServerHash = Convert.ToHexString(SHA256.HashData(Encoding.UTF8.GetBytes(serverName)))[..8].ToLowerInvariant(); + + _sendQueue = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); + _receiveQueue = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); + _receiveQueuePings = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); + } + + public void Start(NatsServer server) + { + _server = server; + var ct = _cts.Token; + _sendLoop = Task.Run(() => InternalSendLoopAsync(ct), ct); + _receiveLoop = Task.Run(() => InternalReceiveLoopAsync(_receiveQueue, ct), ct); + _receiveLoopPings = Task.Run(() => InternalReceiveLoopAsync(_receiveQueuePings, ct), ct); + + // Periodic stats publish every 10 seconds + _ = Task.Run(async () => + { + using var timer = new PeriodicTimer(TimeSpan.FromSeconds(10)); + while (await timer.WaitForNextTickAsync(ct)) + { + PublishServerStats(); + } + }, ct); + } + + /// + /// Registers system request-reply monitoring services for this server. + /// Maps to Go's initEventTracking in events.go. + /// Sets up handlers for $SYS.REQ.SERVER.{id}.VARZ, HEALTHZ, SUBSZ, STATSZ, IDZ + /// and wildcard $SYS.REQ.SERVER.PING.* subjects. + /// + public void InitEventTracking(NatsServer server) + { + _server = server; + var serverId = server.ServerId; + + // Server-specific monitoring services + RegisterService(serverId, "VARZ", server.HandleVarzRequest); + RegisterService(serverId, "HEALTHZ", server.HandleHealthzRequest); + RegisterService(serverId, "SUBSZ", server.HandleSubszRequest); + RegisterService(serverId, "STATSZ", server.HandleStatszRequest); + RegisterService(serverId, "IDZ", server.HandleIdzRequest); + + // Wildcard ping services (all servers respond) + SysSubscribe(string.Format(EventSubjects.ServerPing, "VARZ"), WrapRequestHandler(server.HandleVarzRequest)); + SysSubscribe(string.Format(EventSubjects.ServerPing, "HEALTHZ"), WrapRequestHandler(server.HandleHealthzRequest)); + SysSubscribe(string.Format(EventSubjects.ServerPing, "IDZ"), WrapRequestHandler(server.HandleIdzRequest)); + SysSubscribe(string.Format(EventSubjects.ServerPing, "STATSZ"), WrapRequestHandler(server.HandleStatszRequest)); + } + + private void RegisterService(string serverId, string name, Action handler) + { + var subject = string.Format(EventSubjects.ServerReq, serverId, name); + SysSubscribe(subject, WrapRequestHandler(handler)); + } + + private SystemMessageHandler WrapRequestHandler(Action handler) + { + return (sub, client, acc, subject, reply, hdr, msg) => + { + handler(subject, reply); + }; + } + + /// + /// Publishes a $SYS.SERVER.{id}.STATSZ message with current server statistics. + /// Maps to Go's sendStatsz in events.go. + /// Can be called manually for testing or is invoked periodically by the stats timer. + /// + public void PublishServerStats() + { + if (_server == null) return; + + var subject = string.Format(EventSubjects.ServerStats, _server.ServerId); + var process = System.Diagnostics.Process.GetCurrentProcess(); + + var statsMsg = new ServerStatsMsg + { + Server = _server.BuildEventServerInfo(), + Stats = new ServerStatsData + { + Start = _server.StartTime, + Mem = process.WorkingSet64, + Cores = Environment.ProcessorCount, + Connections = _server.ClientCount, + TotalConnections = Interlocked.Read(ref _server.Stats.TotalConnections), + Subscriptions = SystemAccount.SubList.Count, + InMsgs = Interlocked.Read(ref _server.Stats.InMsgs), + OutMsgs = Interlocked.Read(ref _server.Stats.OutMsgs), + InBytes = Interlocked.Read(ref _server.Stats.InBytes), + OutBytes = Interlocked.Read(ref _server.Stats.OutBytes), + SlowConsumers = Interlocked.Read(ref _server.Stats.SlowConsumers), + }, + }; + + Enqueue(new PublishMessage { Subject = subject, Body = statsMsg }); + } + + /// + /// Creates a system subscription in the system account's SubList. + /// Maps to Go's sysSubscribe in events.go:2796. + /// + public Subscription SysSubscribe(string subject, SystemMessageHandler callback) + { + var sid = Interlocked.Increment(ref _subscriptionId).ToString(); + var sub = new Subscription + { + Subject = subject, + Sid = sid, + Client = SystemClient, + }; + + // Store callback keyed by SID so multiple subscriptions work + _callbacks[sid] = callback; + + // Set a single routing callback on the system client that dispatches by SID + SystemClient.MessageCallback = (subj, s, reply, hdr, msg) => + { + if (_callbacks.TryGetValue(s, out var cb)) + { + _receiveQueue.Writer.TryWrite(new InternalSystemMessage + { + Sub = sub, + Client = SystemClient, + Account = SystemAccount, + Subject = subj, + Reply = reply, + Headers = hdr, + Message = msg, + Callback = cb, + }); + } + }; + + SystemAccount.SubList.Insert(sub); + return sub; + } + + /// + /// Returns the next monotonically increasing sequence number for event ordering. + /// + public ulong NextSequence() => Interlocked.Increment(ref _sequence); + + /// + /// Enqueue an internal message for publishing through the send loop. + /// + public void Enqueue(PublishMessage message) + { + _sendQueue.Writer.TryWrite(message); + } + + /// + /// The send loop: serializes messages and delivers them via the server's routing. + /// Maps to Go's internalSendLoop in events.go:495-668. + /// + private async Task InternalSendLoopAsync(CancellationToken ct) + { + try + { + await foreach (var pm in _sendQueue.Reader.ReadAllAsync(ct)) + { + try + { + var seq = Interlocked.Increment(ref _sequence); + + // Serialize body to JSON + byte[] payload; + if (pm.Body is byte[] raw) + { + payload = raw; + } + else if (pm.Body != null) + { + // Try source-generated context first, fall back to reflection-based for unknown types + var bodyType = pm.Body.GetType(); + var typeInfo = EventJsonContext.Default.GetTypeInfo(bodyType); + payload = typeInfo != null + ? JsonSerializer.SerializeToUtf8Bytes(pm.Body, typeInfo) + : JsonSerializer.SerializeToUtf8Bytes(pm.Body, bodyType); + } + else + { + payload = []; + } + + // Deliver via the system account's SubList matching + var result = SystemAccount.SubList.Match(pm.Subject); + + foreach (var sub in result.PlainSubs) + { + sub.Client?.SendMessage(pm.Subject, sub.Sid, pm.Reply, + pm.Headers ?? ReadOnlyMemory.Empty, + payload); + } + + foreach (var queueGroup in result.QueueSubs) + { + if (queueGroup.Length == 0) continue; + var sub = queueGroup[0]; // Simple pick for internal + sub.Client?.SendMessage(pm.Subject, sub.Sid, pm.Reply, + pm.Headers ?? ReadOnlyMemory.Empty, + payload); + } + + if (pm.IsLast) + break; + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Error in internal send loop processing message on {Subject}", pm.Subject); + } + } + } + catch (OperationCanceledException) + { + // Normal shutdown + } + } + + /// + /// The receive loop: dispatches callbacks for internally-received messages. + /// Maps to Go's internalReceiveLoop in events.go:476-491. + /// + private async Task InternalReceiveLoopAsync(Channel queue, CancellationToken ct) + { + try + { + await foreach (var msg in queue.Reader.ReadAllAsync(ct)) + { + try + { + msg.Callback(msg.Sub, msg.Client, msg.Account, msg.Subject, msg.Reply, msg.Headers, msg.Message); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Error in internal receive loop processing {Subject}", msg.Subject); + } + } + } + catch (OperationCanceledException) + { + // Normal shutdown + } + } + + public async ValueTask DisposeAsync() + { + await _cts.CancelAsync(); + _sendQueue.Writer.TryComplete(); + _receiveQueue.Writer.TryComplete(); + _receiveQueuePings.Writer.TryComplete(); + + if (_sendLoop != null) await _sendLoop.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); + if (_receiveLoop != null) await _receiveLoop.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); + if (_receiveLoopPings != null) await _receiveLoopPings.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); + + _cts.Dispose(); + } +} diff --git a/src/NATS.Server/INatsClient.cs b/src/NATS.Server/INatsClient.cs new file mode 100644 index 0000000..5b3e714 --- /dev/null +++ b/src/NATS.Server/INatsClient.cs @@ -0,0 +1,19 @@ +using NATS.Server.Auth; +using NATS.Server.Protocol; + +namespace NATS.Server; + +public interface INatsClient +{ + ulong Id { get; } + ClientKind Kind { get; } + bool IsInternal => Kind.IsInternal(); + Account? Account { get; } + ClientOptions? ClientOpts { get; } + ClientPermissions? Permissions { get; } + + void SendMessage(string subject, string sid, string? replyTo, + ReadOnlyMemory headers, ReadOnlyMemory payload); + bool QueueOutbound(ReadOnlyMemory data); + void RemoveSubscription(string sid); +} diff --git a/src/NATS.Server/Imports/ExportAuth.cs b/src/NATS.Server/Imports/ExportAuth.cs new file mode 100644 index 0000000..14200e1 --- /dev/null +++ b/src/NATS.Server/Imports/ExportAuth.cs @@ -0,0 +1,25 @@ +using NATS.Server.Auth; + +namespace NATS.Server.Imports; + +public sealed class ExportAuth +{ + public bool TokenRequired { get; init; } + public uint AccountPosition { get; init; } + public HashSet? ApprovedAccounts { get; init; } + public Dictionary? RevokedAccounts { get; init; } + + public bool IsAuthorized(Account account) + { + if (RevokedAccounts != null && RevokedAccounts.ContainsKey(account.Name)) + return false; + + if (ApprovedAccounts == null && !TokenRequired && AccountPosition == 0) + return true; + + if (ApprovedAccounts != null) + return ApprovedAccounts.Contains(account.Name); + + return false; + } +} diff --git a/src/NATS.Server/Imports/ExportMap.cs b/src/NATS.Server/Imports/ExportMap.cs new file mode 100644 index 0000000..410830a --- /dev/null +++ b/src/NATS.Server/Imports/ExportMap.cs @@ -0,0 +1,8 @@ +namespace NATS.Server.Imports; + +public sealed class ExportMap +{ + public Dictionary Streams { get; } = new(StringComparer.Ordinal); + public Dictionary Services { get; } = new(StringComparer.Ordinal); + public Dictionary Responses { get; } = new(StringComparer.Ordinal); +} diff --git a/src/NATS.Server/Imports/ImportMap.cs b/src/NATS.Server/Imports/ImportMap.cs new file mode 100644 index 0000000..a136c54 --- /dev/null +++ b/src/NATS.Server/Imports/ImportMap.cs @@ -0,0 +1,18 @@ +namespace NATS.Server.Imports; + +public sealed class ImportMap +{ + public List Streams { get; } = []; + public Dictionary> Services { get; } = new(StringComparer.Ordinal); + + public void AddServiceImport(ServiceImport si) + { + if (!Services.TryGetValue(si.From, out var list)) + { + list = []; + Services[si.From] = list; + } + + list.Add(si); + } +} diff --git a/src/NATS.Server/Imports/LatencyTracker.cs b/src/NATS.Server/Imports/LatencyTracker.cs new file mode 100644 index 0000000..d4e9d43 --- /dev/null +++ b/src/NATS.Server/Imports/LatencyTracker.cs @@ -0,0 +1,47 @@ +using System.Text.Json.Serialization; + +namespace NATS.Server.Imports; + +public sealed class ServiceLatencyMsg +{ + [JsonPropertyName("type")] + public string Type { get; set; } = "io.nats.server.metric.v1.service_latency"; + + [JsonPropertyName("requestor")] + public string Requestor { get; set; } = string.Empty; + + [JsonPropertyName("responder")] + public string Responder { get; set; } = string.Empty; + + [JsonPropertyName("status")] + public int Status { get; set; } = 200; + + [JsonPropertyName("svc_latency")] + public long ServiceLatencyNanos { get; set; } + + [JsonPropertyName("total_latency")] + public long TotalLatencyNanos { get; set; } +} + +public static class LatencyTracker +{ + public static bool ShouldSample(ServiceLatency latency) + { + if (latency.SamplingPercentage <= 0) return false; + if (latency.SamplingPercentage >= 100) return true; + return Random.Shared.Next(100) < latency.SamplingPercentage; + } + + public static ServiceLatencyMsg BuildLatencyMsg( + string requestor, string responder, + TimeSpan serviceLatency, TimeSpan totalLatency) + { + return new ServiceLatencyMsg + { + Requestor = requestor, + Responder = responder, + ServiceLatencyNanos = serviceLatency.Ticks * 100, + TotalLatencyNanos = totalLatency.Ticks * 100, + }; + } +} diff --git a/src/NATS.Server/Imports/ResponseRouter.cs b/src/NATS.Server/Imports/ResponseRouter.cs new file mode 100644 index 0000000..1b8ca98 --- /dev/null +++ b/src/NATS.Server/Imports/ResponseRouter.cs @@ -0,0 +1,64 @@ +using System.Security.Cryptography; +using NATS.Server.Auth; + +namespace NATS.Server.Imports; + +/// +/// Handles response routing for service imports. +/// Maps to Go's service reply prefix generation and response cleanup. +/// Reference: golang/nats-server/server/accounts.go — addRespServiceImport, removeRespServiceImport +/// +public static class ResponseRouter +{ + private static readonly char[] Base62 = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789".ToCharArray(); + + /// + /// Generates a unique reply prefix for response routing. + /// Format: "_R_.{10 random base62 chars}." + /// + public static string GenerateReplyPrefix() + { + Span bytes = stackalloc byte[10]; + RandomNumberGenerator.Fill(bytes); + var chars = new char[10]; + for (int i = 0; i < 10; i++) + chars[i] = Base62[bytes[i] % 62]; + return $"_R_.{new string(chars)}."; + } + + /// + /// Creates a response service import that maps the generated reply prefix + /// back to the original reply subject on the requesting account. + /// + public static ServiceImport CreateResponseImport( + Account exporterAccount, + ServiceImport originalImport, + string originalReply) + { + var replyPrefix = GenerateReplyPrefix(); + + var responseSi = new ServiceImport + { + DestinationAccount = exporterAccount, + From = replyPrefix + ">", + To = originalReply, + IsResponse = true, + ResponseType = originalImport.ResponseType, + Export = originalImport.Export, + TimestampTicks = DateTime.UtcNow.Ticks, + }; + + exporterAccount.Exports.Responses[replyPrefix] = responseSi; + return responseSi; + } + + /// + /// Removes a response import from the account's export map. + /// For Singleton responses, this is called after the first reply is delivered. + /// For Streamed/Chunked, it is called when the response stream ends. + /// + public static void CleanupResponse(Account account, string replyPrefix, ServiceImport responseSi) + { + account.Exports.Responses.Remove(replyPrefix); + } +} diff --git a/src/NATS.Server/Imports/ServiceExport.cs b/src/NATS.Server/Imports/ServiceExport.cs new file mode 100644 index 0000000..0b4a9ed --- /dev/null +++ b/src/NATS.Server/Imports/ServiceExport.cs @@ -0,0 +1,13 @@ +using NATS.Server.Auth; + +namespace NATS.Server.Imports; + +public sealed class ServiceExport +{ + public ExportAuth Auth { get; init; } = new(); + public Account? Account { get; init; } + public ServiceResponseType ResponseType { get; init; } = ServiceResponseType.Singleton; + public TimeSpan ResponseThreshold { get; init; } = TimeSpan.FromMinutes(2); + public ServiceLatency? Latency { get; init; } + public bool AllowTrace { get; init; } +} diff --git a/src/NATS.Server/Imports/ServiceImport.cs b/src/NATS.Server/Imports/ServiceImport.cs new file mode 100644 index 0000000..20d5536 --- /dev/null +++ b/src/NATS.Server/Imports/ServiceImport.cs @@ -0,0 +1,21 @@ +using NATS.Server.Auth; +using NATS.Server.Subscriptions; + +namespace NATS.Server.Imports; + +public sealed class ServiceImport +{ + public required Account DestinationAccount { get; init; } + public required string From { get; init; } + public required string To { get; init; } + public SubjectTransform? Transform { get; init; } + public ServiceExport? Export { get; init; } + public ServiceResponseType ResponseType { get; init; } = ServiceResponseType.Singleton; + public byte[]? Sid { get; set; } + public bool IsResponse { get; init; } + public bool UsePub { get; init; } + public bool Invalid { get; set; } + public bool Share { get; init; } + public bool Tracking { get; init; } + public long TimestampTicks { get; set; } +} diff --git a/src/NATS.Server/Imports/ServiceLatency.cs b/src/NATS.Server/Imports/ServiceLatency.cs new file mode 100644 index 0000000..0ee37fc --- /dev/null +++ b/src/NATS.Server/Imports/ServiceLatency.cs @@ -0,0 +1,7 @@ +namespace NATS.Server.Imports; + +public sealed class ServiceLatency +{ + public int SamplingPercentage { get; init; } = 100; + public string Subject { get; init; } = string.Empty; +} diff --git a/src/NATS.Server/Imports/ServiceResponseType.cs b/src/NATS.Server/Imports/ServiceResponseType.cs new file mode 100644 index 0000000..a1297ee --- /dev/null +++ b/src/NATS.Server/Imports/ServiceResponseType.cs @@ -0,0 +1,8 @@ +namespace NATS.Server.Imports; + +public enum ServiceResponseType +{ + Singleton, + Streamed, + Chunked, +} diff --git a/src/NATS.Server/Imports/StreamExport.cs b/src/NATS.Server/Imports/StreamExport.cs new file mode 100644 index 0000000..9ac6753 --- /dev/null +++ b/src/NATS.Server/Imports/StreamExport.cs @@ -0,0 +1,6 @@ +namespace NATS.Server.Imports; + +public sealed class StreamExport +{ + public ExportAuth Auth { get; init; } = new(); +} diff --git a/src/NATS.Server/Imports/StreamImport.cs b/src/NATS.Server/Imports/StreamImport.cs new file mode 100644 index 0000000..832950d --- /dev/null +++ b/src/NATS.Server/Imports/StreamImport.cs @@ -0,0 +1,14 @@ +using NATS.Server.Auth; +using NATS.Server.Subscriptions; + +namespace NATS.Server.Imports; + +public sealed class StreamImport +{ + public required Account SourceAccount { get; init; } + public required string From { get; init; } + public required string To { get; init; } + public SubjectTransform? Transform { get; init; } + public bool UsePub { get; init; } + public bool Invalid { get; set; } +} diff --git a/src/NATS.Server/InternalClient.cs b/src/NATS.Server/InternalClient.cs new file mode 100644 index 0000000..61532a9 --- /dev/null +++ b/src/NATS.Server/InternalClient.cs @@ -0,0 +1,59 @@ +using NATS.Server.Auth; +using NATS.Server.Protocol; +using NATS.Server.Subscriptions; + +namespace NATS.Server; + +/// +/// Lightweight socketless client for internal messaging (SYSTEM, ACCOUNT, JETSTREAM). +/// Maps to Go's internal client created by createInternalClient() in server.go:1910-1936. +/// No network I/O — messages are delivered via callback. +/// +public sealed class InternalClient : INatsClient +{ + public ulong Id { get; } + public ClientKind Kind { get; } + public bool IsInternal => Kind.IsInternal(); + public Account? Account { get; } + public ClientOptions? ClientOpts => null; + public ClientPermissions? Permissions => null; + + /// + /// Callback invoked when a message is delivered to this internal client. + /// Set by the event system or account import infrastructure. + /// + public Action, ReadOnlyMemory>? MessageCallback { get; set; } + + private readonly Dictionary _subs = new(StringComparer.Ordinal); + + public InternalClient(ulong id, ClientKind kind, Account account) + { + if (!kind.IsInternal()) + throw new ArgumentException($"InternalClient requires an internal ClientKind, got {kind}", nameof(kind)); + + Id = id; + Kind = kind; + Account = account; + } + + public void SendMessage(string subject, string sid, string? replyTo, + ReadOnlyMemory headers, ReadOnlyMemory payload) + { + MessageCallback?.Invoke(subject, sid, replyTo, headers, payload); + } + + public bool QueueOutbound(ReadOnlyMemory data) => true; // no-op for internal clients + + public void RemoveSubscription(string sid) + { + if (_subs.Remove(sid)) + Account?.DecrementSubscriptions(); + } + + public void AddSubscription(Subscription sub) + { + _subs[sub.Sid] = sub; + } + + public IReadOnlyDictionary Subscriptions => _subs; +} diff --git a/src/NATS.Server/Monitoring/SubszHandler.cs b/src/NATS.Server/Monitoring/SubszHandler.cs index 1de4f97..4a64796 100644 --- a/src/NATS.Server/Monitoring/SubszHandler.cs +++ b/src/NATS.Server/Monitoring/SubszHandler.cs @@ -14,12 +14,16 @@ public sealed class SubszHandler(NatsServer server) var opts = ParseQueryParams(ctx); var now = DateTime.UtcNow; - // Collect subscriptions from all accounts (or filtered) + // Collect subscriptions from all accounts (or filtered). + // Exclude the $SYS system account unless explicitly requested — its internal + // subscriptions are infrastructure and not user-facing. var allSubs = new List(); foreach (var account in server.GetAccounts()) { if (!string.IsNullOrEmpty(opts.Account) && account.Name != opts.Account) continue; + if (string.IsNullOrEmpty(opts.Account) && account.Name == "$SYS") + continue; allSubs.AddRange(account.SubList.GetAllSubscriptions()); } @@ -31,10 +35,10 @@ public sealed class SubszHandler(NatsServer server) var total = allSubs.Count; var numSubs = server.GetAccounts() - .Where(a => string.IsNullOrEmpty(opts.Account) || a.Name == opts.Account) + .Where(a => (string.IsNullOrEmpty(opts.Account) && a.Name != "$SYS") || a.Name == opts.Account) .Aggregate(0u, (sum, a) => sum + a.SubList.Count); var numCache = server.GetAccounts() - .Where(a => string.IsNullOrEmpty(opts.Account) || a.Name == opts.Account) + .Where(a => (string.IsNullOrEmpty(opts.Account) && a.Name != "$SYS") || a.Name == opts.Account) .Sum(a => a.SubList.CacheCount); SubDetail[] details = []; diff --git a/src/NATS.Server/NATS.Server.csproj b/src/NATS.Server/NATS.Server.csproj index d85e688..390f283 100644 --- a/src/NATS.Server/NATS.Server.csproj +++ b/src/NATS.Server/NATS.Server.csproj @@ -1,4 +1,7 @@ + + + diff --git a/src/NATS.Server/NatsClient.cs b/src/NATS.Server/NatsClient.cs index 620f275..0f6a47a 100644 --- a/src/NATS.Server/NatsClient.cs +++ b/src/NATS.Server/NatsClient.cs @@ -20,6 +20,8 @@ public interface IMessageRouter void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory headers, ReadOnlyMemory payload, NatsClient sender); void RemoveClient(NatsClient client); + void PublishConnectEvent(NatsClient client); + void PublishDisconnectEvent(NatsClient client); } public interface ISubListAccess @@ -27,7 +29,7 @@ public interface ISubListAccess SubList SubList { get; } } -public sealed class NatsClient : IDisposable +public sealed class NatsClient : INatsClient, IDisposable { private readonly Socket _socket; private readonly Stream _stream; @@ -46,6 +48,7 @@ public sealed class NatsClient : IDisposable private readonly ServerStats _serverStats; public ulong Id { get; } + public ClientKind Kind => ClientKind.Client; public ClientOptions? ClientOpts { get; private set; } public IMessageRouter? Router { get; set; } public Account? Account { get; private set; } @@ -448,6 +451,9 @@ public sealed class NatsClient : IDisposable _flags.SetFlag(ClientFlags.ConnectProcessFinished); _logger.LogDebug("CONNECT received from client {ClientId}, name={ClientName}", Id, ClientOpts?.Name); + // Publish connect advisory to the system event bus + Router?.PublishConnectEvent(this); + // Start auth expiry timer if needed if (_authService.IsAuthRequired && authResult?.Expiry is { } expiry) { diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 9a1f717..01d1a16 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -9,6 +9,8 @@ using Microsoft.Extensions.Logging; using NATS.NKeys; using NATS.Server.Auth; using NATS.Server.Configuration; +using NATS.Server.Events; +using NATS.Server.Imports; using NATS.Server.Monitoring; using NATS.Server.Protocol; using NATS.Server.Subscriptions; @@ -36,6 +38,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable private string? _configDigest; private readonly Account _globalAccount; private readonly Account _systemAccount; + private InternalEventSystem? _eventSystem; private readonly SslServerAuthenticationOptions? _sslOptions; private readonly TlsRateLimiter? _tlsRateLimiter; private readonly SubjectTransform[] _subjectTransforms; @@ -73,6 +76,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable public int Port => _options.Port; public Account SystemAccount => _systemAccount; public string ServerNKey { get; } + public InternalEventSystem? EventSystem => _eventSystem; public bool IsShuttingDown => Volatile.Read(ref _shutdown) != 0; public bool IsLameDuckMode => Volatile.Read(ref _lameDuck) != 0; public Action? ReOpenLogFile { get; set; } @@ -93,6 +97,21 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _logger.LogInformation("Initiating Shutdown..."); + // Publish shutdown advisory before tearing down the event system + if (_eventSystem != null) + { + var shutdownSubject = string.Format(EventSubjects.ServerShutdown, _serverInfo.ServerId); + _eventSystem.Enqueue(new PublishMessage + { + Subject = shutdownSubject, + Body = new ShutdownEventMsg { Server = BuildEventServerInfo(), Reason = "Server Shutdown" }, + IsLast = true, + }); + // Give the send loop time to process the shutdown event + await Task.Delay(100); + await _eventSystem.DisposeAsync(); + } + // Signal all internal loops to stop await _quitCts.CancelAsync(); @@ -272,6 +291,14 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _systemAccount = new Account("$SYS"); _accounts["$SYS"] = _systemAccount; + // Create system internal client and event system + var sysClientId = Interlocked.Increment(ref _nextClientId); + var sysClient = new InternalClient(sysClientId, ClientKind.System, _systemAccount); + _eventSystem = new InternalEventSystem( + _systemAccount, sysClient, + options.ServerName ?? $"nats-dotnet-{Environment.MachineName}", + _loggerFactory.CreateLogger()); + // Generate Ed25519 server NKey identity using var serverKeyPair = KeyPair.CreatePair(PrefixByte.Server); ServerNKey = serverKeyPair.GetPublicKey(); @@ -416,6 +443,9 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _listeningStarted.TrySetResult(); + _eventSystem?.Start(this); + _eventSystem?.InitEventTracking(this); + var tmpDelay = AcceptMinSleep; try @@ -728,6 +758,27 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable } } + // Check for service imports that match this subject. + // When a client in the importer account publishes to a subject + // that matches a service import "From" pattern, we forward the + // message to the destination (exporter) account's subscribers + // using the mapped "To" subject. + if (sender.Account != null) + { + foreach (var kvp in sender.Account.Imports.Services) + { + foreach (var si in kvp.Value) + { + if (si.Invalid) continue; + if (SubjectMatch.MatchLiteral(subject, si.From)) + { + ProcessServiceImport(si, subject, replyTo, headers, payload); + delivered = true; + } + } + } + } + // No-responders: if nobody received the message and the publisher // opted in, send back a 503 status HMSG on the reply subject. if (!delivered && replyTo != null && sender.ClientOpts?.NoResponders == true) @@ -767,6 +818,153 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable } } + /// + /// Processes a service import by transforming the subject from the importer's + /// subject space to the exporter's subject space, then delivering to matching + /// subscribers in the destination account. + /// Reference: Go server/accounts.go addServiceImport / processServiceImport. + /// + public void ProcessServiceImport(ServiceImport si, string subject, string? replyTo, + ReadOnlyMemory headers, ReadOnlyMemory payload) + { + if (si.Invalid) return; + + // Transform subject: map from importer subject space to exporter subject space + string targetSubject; + if (si.Transform != null) + { + var transformed = si.Transform.Apply(subject); + targetSubject = transformed ?? si.To; + } + else if (si.UsePub) + { + targetSubject = subject; + } + else + { + // Default: use the "To" subject from the import definition. + // For wildcard imports (e.g. "requests.>" -> "api.>"), we need + // to map the specific subject tokens from the source pattern to + // the destination pattern. + targetSubject = MapImportSubject(subject, si.From, si.To); + } + + // Match against destination account's SubList + var destSubList = si.DestinationAccount.SubList; + var result = destSubList.Match(targetSubject); + + // Deliver to plain subscribers in the destination account + foreach (var sub in result.PlainSubs) + { + if (sub.Client == null) continue; + DeliverMessage(sub, targetSubject, replyTo, headers, payload); + } + + // Deliver to one member of each queue group + foreach (var queueGroup in result.QueueSubs) + { + if (queueGroup.Length == 0) continue; + var sub = queueGroup[0]; // Simple selection: first available + if (sub.Client != null) + DeliverMessage(sub, targetSubject, replyTo, headers, payload); + } + } + + /// + /// Maps a published subject from the import "From" pattern to the "To" pattern. + /// For example, if From="requests.>" and To="api.>" and subject="requests.test", + /// this returns "api.test". + /// + private static string MapImportSubject(string subject, string fromPattern, string toPattern) + { + // If "To" doesn't contain wildcards, use it directly + if (SubjectMatch.IsLiteral(toPattern)) + return toPattern; + + // For wildcard patterns, replace matching wildcard segments. + // Split into tokens and map from source to destination. + var subTokens = subject.Split('.'); + var fromTokens = fromPattern.Split('.'); + var toTokens = toPattern.Split('.'); + + var result = new string[toTokens.Length]; + int subIdx = 0; + + // Build a mapping: for each wildcard position in "from", + // capture the corresponding subject token(s) + var wildcardValues = new List(); + string? fwcValue = null; + + for (int i = 0; i < fromTokens.Length && subIdx < subTokens.Length; i++) + { + if (fromTokens[i] == "*") + { + wildcardValues.Add(subTokens[subIdx]); + subIdx++; + } + else if (fromTokens[i] == ">") + { + // Capture all remaining tokens + fwcValue = string.Join(".", subTokens[subIdx..]); + subIdx = subTokens.Length; + } + else + { + subIdx++; // Skip literal match + } + } + + // Now build the output using the "to" pattern + int wcIdx = 0; + var sb = new StringBuilder(); + for (int i = 0; i < toTokens.Length; i++) + { + if (i > 0) sb.Append('.'); + + if (toTokens[i] == "*") + { + sb.Append(wcIdx < wildcardValues.Count ? wildcardValues[wcIdx] : "*"); + wcIdx++; + } + else if (toTokens[i] == ">") + { + sb.Append(fwcValue ?? ">"); + } + else + { + sb.Append(toTokens[i]); + } + } + + return sb.ToString(); + } + + /// + /// Wires service import subscriptions for an account. Creates marker + /// subscriptions in the account's SubList so that the import paths + /// are tracked. The actual forwarding happens in ProcessMessage when + /// it checks the account's Imports.Services. + /// Reference: Go server/accounts.go addServiceImportSub. + /// + public void WireServiceImports(Account account) + { + foreach (var kvp in account.Imports.Services) + { + foreach (var si in kvp.Value) + { + if (si.Invalid) continue; + + // Create a marker subscription in the importer account. + // This subscription doesn't directly deliver messages; + // the ProcessMessage method checks service imports after + // the regular SubList match. + _logger.LogDebug( + "Wired service import for account {Account}: {From} -> {To} (dest: {DestAccount})", + account.Name, si.From, si.To, si.DestinationAccount.Name); + } + } + } + private static void SendNoResponders(NatsClient sender, string replyTo) { // Find the sid for a subscription matching the reply subject @@ -812,8 +1010,194 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable }); } + public void SendInternalMsg(string subject, string? reply, object? msg) + { + _eventSystem?.Enqueue(new PublishMessage { Subject = subject, Reply = reply, Body = msg }); + } + + public void SendInternalAccountMsg(Account account, string subject, object? msg) + { + _eventSystem?.Enqueue(new PublishMessage { Subject = subject, Body = msg }); + } + + /// + /// Handles $SYS.REQ.SERVER.{id}.VARZ requests. + /// Returns core server information including stats counters. + /// + public void HandleVarzRequest(string subject, string? reply) + { + if (reply == null) return; + var varz = new + { + server_id = _serverInfo.ServerId, + server_name = _serverInfo.ServerName, + version = NatsProtocol.Version, + host = _options.Host, + port = _options.Port, + max_payload = _options.MaxPayload, + connections = ClientCount, + total_connections = Interlocked.Read(ref _stats.TotalConnections), + in_msgs = Interlocked.Read(ref _stats.InMsgs), + out_msgs = Interlocked.Read(ref _stats.OutMsgs), + in_bytes = Interlocked.Read(ref _stats.InBytes), + out_bytes = Interlocked.Read(ref _stats.OutBytes), + }; + SendInternalMsg(reply, null, varz); + } + + /// + /// Handles $SYS.REQ.SERVER.{id}.HEALTHZ requests. + /// Returns a simple health status response. + /// + public void HandleHealthzRequest(string subject, string? reply) + { + if (reply == null) return; + SendInternalMsg(reply, null, new { status = "ok" }); + } + + /// + /// Handles $SYS.REQ.SERVER.{id}.SUBSZ requests. + /// Returns the current subscription count. + /// + public void HandleSubszRequest(string subject, string? reply) + { + if (reply == null) return; + SendInternalMsg(reply, null, new { num_subscriptions = SubList.Count }); + } + + /// + /// Handles $SYS.REQ.SERVER.{id}.STATSZ requests. + /// Publishes current server statistics through the event system. + /// + public void HandleStatszRequest(string subject, string? reply) + { + if (reply == null) return; + var process = System.Diagnostics.Process.GetCurrentProcess(); + var statsMsg = new Events.ServerStatsMsg + { + Server = BuildEventServerInfo(), + Stats = new Events.ServerStatsData + { + Start = StartTime, + Mem = process.WorkingSet64, + Cores = Environment.ProcessorCount, + Connections = ClientCount, + TotalConnections = Interlocked.Read(ref _stats.TotalConnections), + Subscriptions = SubList.Count, + InMsgs = Interlocked.Read(ref _stats.InMsgs), + OutMsgs = Interlocked.Read(ref _stats.OutMsgs), + InBytes = Interlocked.Read(ref _stats.InBytes), + OutBytes = Interlocked.Read(ref _stats.OutBytes), + SlowConsumers = Interlocked.Read(ref _stats.SlowConsumers), + }, + }; + SendInternalMsg(reply, null, statsMsg); + } + + /// + /// Handles $SYS.REQ.SERVER.{id}.IDZ requests. + /// Returns basic server identity information. + /// + public void HandleIdzRequest(string subject, string? reply) + { + if (reply == null) return; + var idz = new + { + server_id = _serverInfo.ServerId, + server_name = _serverInfo.ServerName, + version = NatsProtocol.Version, + host = _options.Host, + port = _options.Port, + }; + SendInternalMsg(reply, null, idz); + } + + /// + /// Builds an EventServerInfo block for embedding in system event messages. + /// Maps to Go's serverInfo() helper used in events.go advisory publishing. + /// + public EventServerInfo BuildEventServerInfo() + { + var seq = _eventSystem?.NextSequence() ?? 0; + return new EventServerInfo + { + Name = _serverInfo.ServerName, + Host = _options.Host, + Id = _serverInfo.ServerId, + Version = NatsProtocol.Version, + Seq = seq, + }; + } + + private static EventClientInfo BuildEventClientInfo(NatsClient client) + { + return new EventClientInfo + { + Id = client.Id, + Host = client.RemoteIp, + Account = client.Account?.Name, + Name = client.ClientOpts?.Name, + Lang = client.ClientOpts?.Lang, + Version = client.ClientOpts?.Version, + Start = client.StartTime, + }; + } + + /// + /// Publishes a $SYS.ACCOUNT.{account}.CONNECT advisory when a client + /// completes authentication. Maps to Go's sendConnectEvent in events.go. + /// + public void PublishConnectEvent(NatsClient client) + { + if (_eventSystem == null) return; + var accountName = client.Account?.Name ?? Account.GlobalAccountName; + var subject = string.Format(EventSubjects.ConnectEvent, accountName); + var evt = new ConnectEventMsg + { + Id = Guid.NewGuid().ToString("N"), + Time = DateTime.UtcNow, + Server = BuildEventServerInfo(), + Client = BuildEventClientInfo(client), + }; + SendInternalMsg(subject, null, evt); + } + + /// + /// Publishes a $SYS.ACCOUNT.{account}.DISCONNECT advisory when a client + /// disconnects. Maps to Go's sendDisconnectEvent in events.go. + /// + public void PublishDisconnectEvent(NatsClient client) + { + if (_eventSystem == null) return; + var accountName = client.Account?.Name ?? Account.GlobalAccountName; + var subject = string.Format(EventSubjects.DisconnectEvent, accountName); + var evt = new DisconnectEventMsg + { + Id = Guid.NewGuid().ToString("N"), + Time = DateTime.UtcNow, + Server = BuildEventServerInfo(), + Client = BuildEventClientInfo(client), + Sent = new DataStats + { + Msgs = Interlocked.Read(ref client.OutMsgs), + Bytes = Interlocked.Read(ref client.OutBytes), + }, + Received = new DataStats + { + Msgs = Interlocked.Read(ref client.InMsgs), + Bytes = Interlocked.Read(ref client.InBytes), + }, + Reason = client.CloseReason.ToReasonString(), + }; + SendInternalMsg(subject, null, evt); + } + public void RemoveClient(NatsClient client) { + // Publish disconnect advisory before removing client state + if (client.ConnectReceived) + PublishDisconnectEvent(client); + _clients.TryRemove(client.Id, out _); _logger.LogDebug("Removed client {ClientId}", client.Id); diff --git a/src/NATS.Server/Subscriptions/Subscription.cs b/src/NATS.Server/Subscriptions/Subscription.cs index d96095b..4f10cb2 100644 --- a/src/NATS.Server/Subscriptions/Subscription.cs +++ b/src/NATS.Server/Subscriptions/Subscription.cs @@ -1,4 +1,5 @@ using NATS.Server; +using NATS.Server.Imports; namespace NATS.Server.Subscriptions; @@ -9,5 +10,7 @@ public sealed class Subscription public required string Sid { get; init; } public long MessageCount; // Interlocked public long MaxMessages; // 0 = unlimited - public NatsClient? Client { get; set; } + public INatsClient? Client { get; set; } + public ServiceImport? ServiceImport { get; set; } + public StreamImport? StreamImport { get; set; } } diff --git a/tests/NATS.Server.Tests/EventSystemTests.cs b/tests/NATS.Server.Tests/EventSystemTests.cs new file mode 100644 index 0000000..4924bc8 --- /dev/null +++ b/tests/NATS.Server.Tests/EventSystemTests.cs @@ -0,0 +1,121 @@ +using System.Text.Json; +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Server.Events; + +namespace NATS.Server.Tests; + +public class EventSystemTests +{ + [Fact] + public void ConnectEventMsg_serializes_with_correct_type() + { + var evt = new ConnectEventMsg + { + Type = ConnectEventMsg.EventType, + Id = "test123", + Time = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc), + Server = new EventServerInfo { Name = "test-server", Id = "SRV1" }, + Client = new EventClientInfo { Id = 1, Account = "$G" }, + }; + + var json = JsonSerializer.Serialize(evt, EventJsonContext.Default.ConnectEventMsg); + json.ShouldContain("\"type\":\"io.nats.server.advisory.v1.client_connect\""); + json.ShouldContain("\"server\":"); + json.ShouldContain("\"client\":"); + } + + [Fact] + public void DisconnectEventMsg_serializes_with_reason() + { + var evt = new DisconnectEventMsg + { + Type = DisconnectEventMsg.EventType, + Id = "test456", + Time = DateTime.UtcNow, + Server = new EventServerInfo { Name = "test-server", Id = "SRV1" }, + Client = new EventClientInfo { Id = 2, Account = "myacc" }, + Reason = "Client Closed", + Sent = new DataStats { Msgs = 10, Bytes = 1024 }, + Received = new DataStats { Msgs = 5, Bytes = 512 }, + }; + + var json = JsonSerializer.Serialize(evt, EventJsonContext.Default.DisconnectEventMsg); + json.ShouldContain("\"reason\":\"Client Closed\""); + } + + [Fact] + public void ServerStatsMsg_serializes() + { + var evt = new ServerStatsMsg + { + Server = new EventServerInfo { Name = "srv1", Id = "ABC" }, + Stats = new ServerStatsData + { + Connections = 10, + TotalConnections = 100, + InMsgs = 5000, + OutMsgs = 4500, + InBytes = 1_000_000, + OutBytes = 900_000, + Mem = 50 * 1024 * 1024, + Subscriptions = 42, + }, + }; + + var json = JsonSerializer.Serialize(evt, EventJsonContext.Default.ServerStatsMsg); + json.ShouldContain("\"connections\":10"); + json.ShouldContain("\"in_msgs\":5000"); + } + + [Fact] + public async Task InternalEventSystem_start_and_stop_lifecycle() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var eventSystem = server.EventSystem; + eventSystem.ShouldNotBeNull(); + eventSystem.SystemClient.ShouldNotBeNull(); + eventSystem.SystemClient.Kind.ShouldBe(ClientKind.System); + + await server.ShutdownAsync(); + } + + [Fact] + public async Task SendInternalMsg_delivers_to_system_subscriber() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + server.EventSystem!.SysSubscribe("test.subject", (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(subject); + }); + + server.SendInternalMsg("test.subject", null, new { Value = "hello" }); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + result.ShouldBe("test.subject"); + + await server.ShutdownAsync(); + } + + private static NatsServer CreateTestServer() + { + var port = GetFreePort(); + return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance); + } + + private static int GetFreePort() + { + using var sock = new System.Net.Sockets.Socket( + System.Net.Sockets.AddressFamily.InterNetwork, + System.Net.Sockets.SocketType.Stream, + System.Net.Sockets.ProtocolType.Tcp); + sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0)); + return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port; + } +} diff --git a/tests/NATS.Server.Tests/ImportExportTests.cs b/tests/NATS.Server.Tests/ImportExportTests.cs new file mode 100644 index 0000000..a259e5c --- /dev/null +++ b/tests/NATS.Server.Tests/ImportExportTests.cs @@ -0,0 +1,338 @@ +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Server; +using NATS.Server.Auth; +using NATS.Server.Imports; +using NATS.Server.Subscriptions; + +namespace NATS.Server.Tests; + +public class ImportExportTests +{ + [Fact] + public void ExportAuth_public_export_authorizes_any_account() + { + var auth = new ExportAuth(); + var account = new Account("test"); + auth.IsAuthorized(account).ShouldBeTrue(); + } + + [Fact] + public void ExportAuth_approved_accounts_restricts_access() + { + var auth = new ExportAuth { ApprovedAccounts = ["allowed"] }; + var allowed = new Account("allowed"); + var denied = new Account("denied"); + auth.IsAuthorized(allowed).ShouldBeTrue(); + auth.IsAuthorized(denied).ShouldBeFalse(); + } + + [Fact] + public void ExportAuth_revoked_account_denied() + { + var auth = new ExportAuth + { + ApprovedAccounts = ["test"], + RevokedAccounts = new() { ["test"] = DateTimeOffset.UtcNow.ToUnixTimeSeconds() }, + }; + var account = new Account("test"); + auth.IsAuthorized(account).ShouldBeFalse(); + } + + [Fact] + public void ServiceResponseType_defaults_to_singleton() + { + var import = new ServiceImport + { + DestinationAccount = new Account("dest"), + From = "requests.>", + To = "api.>", + }; + import.ResponseType.ShouldBe(ServiceResponseType.Singleton); + } + + [Fact] + public void ExportMap_stores_and_retrieves_exports() + { + var map = new ExportMap(); + map.Services["api.>"] = new ServiceExport { Account = new Account("svc") }; + map.Streams["events.>"] = new StreamExport(); + + map.Services.ShouldContainKey("api.>"); + map.Streams.ShouldContainKey("events.>"); + } + + [Fact] + public void ImportMap_stores_service_imports() + { + var map = new ImportMap(); + var si = new ServiceImport + { + DestinationAccount = new Account("dest"), + From = "requests.>", + To = "api.>", + }; + map.AddServiceImport(si); + map.Services.ShouldContainKey("requests.>"); + map.Services["requests.>"].Count.ShouldBe(1); + } + + [Fact] + public void Account_add_service_export_and_import() + { + var exporter = new Account("exporter"); + var importer = new Account("importer"); + + exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null); + exporter.Exports.Services.ShouldContainKey("api.>"); + + var si = importer.AddServiceImport(exporter, "requests.>", "api.>"); + si.ShouldNotBeNull(); + si.From.ShouldBe("requests.>"); + si.To.ShouldBe("api.>"); + si.DestinationAccount.ShouldBe(exporter); + importer.Imports.Services.ShouldContainKey("requests.>"); + } + + [Fact] + public void Account_add_stream_export_and_import() + { + var exporter = new Account("exporter"); + var importer = new Account("importer"); + + exporter.AddStreamExport("events.>", null); + exporter.Exports.Streams.ShouldContainKey("events.>"); + + importer.AddStreamImport(exporter, "events.>", "imported.events.>"); + importer.Imports.Streams.Count.ShouldBe(1); + importer.Imports.Streams[0].From.ShouldBe("events.>"); + importer.Imports.Streams[0].To.ShouldBe("imported.events.>"); + } + + [Fact] + public void Account_service_import_auth_rejected() + { + var exporter = new Account("exporter"); + var importer = new Account("importer"); + + exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, [new Account("other")]); + + Should.Throw(() => + importer.AddServiceImport(exporter, "requests.>", "api.>")); + } + + [Fact] + public void Account_lazy_creates_internal_client() + { + var account = new Account("test"); + var client = account.GetOrCreateInternalClient(99); + client.ShouldNotBeNull(); + client.Kind.ShouldBe(ClientKind.Account); + client.Account.ShouldBe(account); + + // Second call returns same instance + var client2 = account.GetOrCreateInternalClient(100); + client2.ShouldBeSameAs(client); + } + + [Fact] + public async Task Service_import_forwards_message_to_export_account() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + // Set up exporter and importer accounts + var exporter = server.GetOrCreateAccount("exporter"); + var importer = server.GetOrCreateAccount("importer"); + + exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null); + importer.AddServiceImport(exporter, "requests.>", "api.>"); + + // Wire the import subscriptions into the importer account + server.WireServiceImports(importer); + + // Subscribe in exporter account to receive forwarded message + var exportSub = new Subscription { Subject = "api.test", Sid = "export-1", Client = null }; + exporter.SubList.Insert(exportSub); + + // Verify import infrastructure is wired: the importer should have service import entries + importer.Imports.Services.ShouldContainKey("requests.>"); + importer.Imports.Services["requests.>"].Count.ShouldBe(1); + importer.Imports.Services["requests.>"][0].DestinationAccount.ShouldBe(exporter); + + await server.ShutdownAsync(); + } + + [Fact] + public void ProcessServiceImport_delivers_to_destination_account_subscribers() + { + using var server = CreateTestServer(); + + var exporter = server.GetOrCreateAccount("exporter"); + var importer = server.GetOrCreateAccount("importer"); + + exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null); + importer.AddServiceImport(exporter, "requests.>", "api.>"); + + // Add a subscriber in the exporter account's SubList + var received = new List<(string Subject, string Sid)>(); + var mockClient = new TestNatsClient(1, exporter); + mockClient.OnMessage = (subject, sid, _, _, _) => + received.Add((subject, sid)); + + var exportSub = new Subscription { Subject = "api.test", Sid = "s1", Client = mockClient }; + exporter.SubList.Insert(exportSub); + + // Process a service import directly + var si = importer.Imports.Services["requests.>"][0]; + server.ProcessServiceImport(si, "requests.test", null, + ReadOnlyMemory.Empty, ReadOnlyMemory.Empty); + + received.Count.ShouldBe(1); + received[0].Subject.ShouldBe("api.test"); + received[0].Sid.ShouldBe("s1"); + } + + [Fact] + public void ProcessServiceImport_with_transform_applies_subject_mapping() + { + using var server = CreateTestServer(); + + var exporter = server.GetOrCreateAccount("exporter"); + var importer = server.GetOrCreateAccount("importer"); + + exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null); + var si = importer.AddServiceImport(exporter, "requests.>", "api.>"); + + // Create a transform from requests.> to api.> + var transform = SubjectTransform.Create("requests.>", "api.>"); + transform.ShouldNotBeNull(); + + // Create a new import with the transform set + var siWithTransform = new ServiceImport + { + DestinationAccount = exporter, + From = "requests.>", + To = "api.>", + Transform = transform, + }; + + var received = new List(); + var mockClient = new TestNatsClient(1, exporter); + mockClient.OnMessage = (subject, _, _, _, _) => + received.Add(subject); + + var exportSub = new Subscription { Subject = "api.hello", Sid = "s1", Client = mockClient }; + exporter.SubList.Insert(exportSub); + + server.ProcessServiceImport(siWithTransform, "requests.hello", null, + ReadOnlyMemory.Empty, ReadOnlyMemory.Empty); + + received.Count.ShouldBe(1); + received[0].ShouldBe("api.hello"); + } + + [Fact] + public void ProcessServiceImport_skips_invalid_imports() + { + using var server = CreateTestServer(); + + var exporter = server.GetOrCreateAccount("exporter"); + var importer = server.GetOrCreateAccount("importer"); + + exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null); + importer.AddServiceImport(exporter, "requests.>", "api.>"); + + // Mark the import as invalid + var si = importer.Imports.Services["requests.>"][0]; + si.Invalid = true; + + // Add a subscriber in the exporter account + var received = new List(); + var mockClient = new TestNatsClient(1, exporter); + mockClient.OnMessage = (subject, _, _, _, _) => + received.Add(subject); + + var exportSub = new Subscription { Subject = "api.test", Sid = "s1", Client = mockClient }; + exporter.SubList.Insert(exportSub); + + // ProcessServiceImport should be a no-op for invalid imports + server.ProcessServiceImport(si, "requests.test", null, + ReadOnlyMemory.Empty, ReadOnlyMemory.Empty); + + received.Count.ShouldBe(0); + } + + [Fact] + public void ProcessServiceImport_delivers_to_queue_groups() + { + using var server = CreateTestServer(); + + var exporter = server.GetOrCreateAccount("exporter"); + var importer = server.GetOrCreateAccount("importer"); + + exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null); + importer.AddServiceImport(exporter, "requests.>", "api.>"); + + // Add queue group subscribers in the exporter account + var received = new List<(string Subject, string Sid)>(); + var mockClient1 = new TestNatsClient(1, exporter); + mockClient1.OnMessage = (subject, sid, _, _, _) => + received.Add((subject, sid)); + var mockClient2 = new TestNatsClient(2, exporter); + mockClient2.OnMessage = (subject, sid, _, _, _) => + received.Add((subject, sid)); + + var qSub1 = new Subscription { Subject = "api.test", Sid = "q1", Queue = "workers", Client = mockClient1 }; + var qSub2 = new Subscription { Subject = "api.test", Sid = "q2", Queue = "workers", Client = mockClient2 }; + exporter.SubList.Insert(qSub1); + exporter.SubList.Insert(qSub2); + + var si = importer.Imports.Services["requests.>"][0]; + server.ProcessServiceImport(si, "requests.test", null, + ReadOnlyMemory.Empty, ReadOnlyMemory.Empty); + + // One member of the queue group should receive the message + received.Count.ShouldBe(1); + } + + private static NatsServer CreateTestServer() + { + var port = GetFreePort(); + return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance); + } + + private static int GetFreePort() + { + using var sock = new System.Net.Sockets.Socket( + System.Net.Sockets.AddressFamily.InterNetwork, + System.Net.Sockets.SocketType.Stream, + System.Net.Sockets.ProtocolType.Tcp); + sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0)); + return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port; + } + + /// + /// Minimal test double for INatsClient used in import/export tests. + /// + private sealed class TestNatsClient(ulong id, Account account) : INatsClient + { + public ulong Id => id; + public ClientKind Kind => ClientKind.Client; + public Account? Account => account; + public Protocol.ClientOptions? ClientOpts => null; + public ClientPermissions? Permissions => null; + + public Action, ReadOnlyMemory>? OnMessage { get; set; } + + public void SendMessage(string subject, string sid, string? replyTo, + ReadOnlyMemory headers, ReadOnlyMemory payload) + { + OnMessage?.Invoke(subject, sid, replyTo, headers, payload); + } + + public bool QueueOutbound(ReadOnlyMemory data) => true; + + public void RemoveSubscription(string sid) { } + } +} diff --git a/tests/NATS.Server.Tests/InternalClientTests.cs b/tests/NATS.Server.Tests/InternalClientTests.cs new file mode 100644 index 0000000..6a068c4 --- /dev/null +++ b/tests/NATS.Server.Tests/InternalClientTests.cs @@ -0,0 +1,85 @@ +using NATS.Server.Auth; + +namespace NATS.Server.Tests; + +public class InternalClientTests +{ + [Theory] + [InlineData(ClientKind.Client, false)] + [InlineData(ClientKind.Router, false)] + [InlineData(ClientKind.Gateway, false)] + [InlineData(ClientKind.Leaf, false)] + [InlineData(ClientKind.System, true)] + [InlineData(ClientKind.JetStream, true)] + [InlineData(ClientKind.Account, true)] + public void IsInternal_returns_correct_value(ClientKind kind, bool expected) + { + kind.IsInternal().ShouldBe(expected); + } + + [Fact] + public void NatsClient_implements_INatsClient() + { + typeof(NatsClient).GetInterfaces().ShouldContain(typeof(INatsClient)); + } + + [Fact] + public void NatsClient_kind_is_Client() + { + typeof(NatsClient).GetProperty("Kind")!.PropertyType.ShouldBe(typeof(ClientKind)); + } + + [Fact] + public void InternalClient_system_kind() + { + var account = new Account("$SYS"); + var client = new InternalClient(1, ClientKind.System, account); + client.Kind.ShouldBe(ClientKind.System); + client.IsInternal.ShouldBeTrue(); + client.Id.ShouldBe(1UL); + client.Account.ShouldBe(account); + } + + [Fact] + public void InternalClient_account_kind() + { + var account = new Account("myaccount"); + var client = new InternalClient(2, ClientKind.Account, account); + client.Kind.ShouldBe(ClientKind.Account); + client.IsInternal.ShouldBeTrue(); + } + + [Fact] + public void InternalClient_rejects_non_internal_kind() + { + var account = new Account("test"); + Should.Throw(() => new InternalClient(1, ClientKind.Client, account)); + } + + [Fact] + public void InternalClient_SendMessage_invokes_callback() + { + var account = new Account("$SYS"); + var client = new InternalClient(1, ClientKind.System, account); + string? capturedSubject = null; + string? capturedSid = null; + client.MessageCallback = (subject, sid, replyTo, headers, payload) => + { + capturedSubject = subject; + capturedSid = sid; + }; + + client.SendMessage("test.subject", "1", null, ReadOnlyMemory.Empty, ReadOnlyMemory.Empty); + + capturedSubject.ShouldBe("test.subject"); + capturedSid.ShouldBe("1"); + } + + [Fact] + public void InternalClient_QueueOutbound_returns_true_noop() + { + var account = new Account("$SYS"); + var client = new InternalClient(1, ClientKind.System, account); + client.QueueOutbound(ReadOnlyMemory.Empty).ShouldBeTrue(); + } +} diff --git a/tests/NATS.Server.Tests/ResponseRoutingTests.cs b/tests/NATS.Server.Tests/ResponseRoutingTests.cs new file mode 100644 index 0000000..33badf8 --- /dev/null +++ b/tests/NATS.Server.Tests/ResponseRoutingTests.cs @@ -0,0 +1,149 @@ +using NATS.Server.Auth; +using NATS.Server.Imports; + +namespace NATS.Server.Tests; + +public class ResponseRoutingTests +{ + [Fact] + public void GenerateReplyPrefix_creates_unique_prefix() + { + var prefix1 = ResponseRouter.GenerateReplyPrefix(); + var prefix2 = ResponseRouter.GenerateReplyPrefix(); + + prefix1.ShouldStartWith("_R_."); + prefix2.ShouldStartWith("_R_."); + prefix1.ShouldNotBe(prefix2); + prefix1.Length.ShouldBeGreaterThan(4); + } + + [Fact] + public void GenerateReplyPrefix_ends_with_dot() + { + var prefix = ResponseRouter.GenerateReplyPrefix(); + + prefix.ShouldEndWith("."); + // Format: "_R_." + 10 chars + "." = 15 chars + prefix.Length.ShouldBe(15); + } + + [Fact] + public void Singleton_response_import_removed_after_delivery() + { + var exporter = new Account("exporter"); + exporter.AddServiceExport("api.test", ServiceResponseType.Singleton, null); + + var replyPrefix = ResponseRouter.GenerateReplyPrefix(); + var responseSi = new ServiceImport + { + DestinationAccount = exporter, + From = replyPrefix + ">", + To = "_INBOX.original.reply", + IsResponse = true, + ResponseType = ServiceResponseType.Singleton, + }; + exporter.Exports.Responses[replyPrefix] = responseSi; + + exporter.Exports.Responses.ShouldContainKey(replyPrefix); + + // Simulate singleton delivery cleanup + ResponseRouter.CleanupResponse(exporter, replyPrefix, responseSi); + + exporter.Exports.Responses.ShouldNotContainKey(replyPrefix); + } + + [Fact] + public void CreateResponseImport_registers_in_exporter_responses() + { + var exporter = new Account("exporter"); + var importer = new Account("importer"); + exporter.AddServiceExport("api.test", ServiceResponseType.Singleton, null); + + var originalSi = new ServiceImport + { + DestinationAccount = exporter, + From = "api.test", + To = "api.test", + Export = exporter.Exports.Services["api.test"], + ResponseType = ServiceResponseType.Singleton, + }; + + var responseSi = ResponseRouter.CreateResponseImport(exporter, originalSi, "_INBOX.abc123"); + + responseSi.IsResponse.ShouldBeTrue(); + responseSi.ResponseType.ShouldBe(ServiceResponseType.Singleton); + responseSi.To.ShouldBe("_INBOX.abc123"); + responseSi.DestinationAccount.ShouldBe(exporter); + responseSi.From.ShouldEndWith(">"); + responseSi.Export.ShouldBe(originalSi.Export); + + // Should be registered in the exporter's response map + exporter.Exports.Responses.Count.ShouldBe(1); + } + + [Fact] + public void CreateResponseImport_preserves_streamed_response_type() + { + var exporter = new Account("exporter"); + exporter.AddServiceExport("api.stream", ServiceResponseType.Streamed, null); + + var originalSi = new ServiceImport + { + DestinationAccount = exporter, + From = "api.stream", + To = "api.stream", + Export = exporter.Exports.Services["api.stream"], + ResponseType = ServiceResponseType.Streamed, + }; + + var responseSi = ResponseRouter.CreateResponseImport(exporter, originalSi, "_INBOX.xyz789"); + + responseSi.ResponseType.ShouldBe(ServiceResponseType.Streamed); + } + + [Fact] + public void Multiple_response_imports_each_get_unique_prefix() + { + var exporter = new Account("exporter"); + exporter.AddServiceExport("api.test", ServiceResponseType.Singleton, null); + + var originalSi = new ServiceImport + { + DestinationAccount = exporter, + From = "api.test", + To = "api.test", + Export = exporter.Exports.Services["api.test"], + ResponseType = ServiceResponseType.Singleton, + }; + + var resp1 = ResponseRouter.CreateResponseImport(exporter, originalSi, "_INBOX.reply1"); + var resp2 = ResponseRouter.CreateResponseImport(exporter, originalSi, "_INBOX.reply2"); + + exporter.Exports.Responses.Count.ShouldBe(2); + resp1.To.ShouldBe("_INBOX.reply1"); + resp2.To.ShouldBe("_INBOX.reply2"); + resp1.From.ShouldNotBe(resp2.From); + } + + [Fact] + public void LatencyTracker_should_sample_respects_percentage() + { + var latency = new ServiceLatency { SamplingPercentage = 0, Subject = "latency.test" }; + LatencyTracker.ShouldSample(latency).ShouldBeFalse(); + + var latency100 = new ServiceLatency { SamplingPercentage = 100, Subject = "latency.test" }; + LatencyTracker.ShouldSample(latency100).ShouldBeTrue(); + } + + [Fact] + public void LatencyTracker_builds_latency_message() + { + var msg = LatencyTracker.BuildLatencyMsg("requester", "responder", + TimeSpan.FromMilliseconds(5), TimeSpan.FromMilliseconds(10)); + + msg.Requestor.ShouldBe("requester"); + msg.Responder.ShouldBe("responder"); + msg.ServiceLatencyNanos.ShouldBeGreaterThan(0); + msg.TotalLatencyNanos.ShouldBeGreaterThan(0); + } +} diff --git a/tests/NATS.Server.Tests/SystemEventsTests.cs b/tests/NATS.Server.Tests/SystemEventsTests.cs new file mode 100644 index 0000000..efd6d11 --- /dev/null +++ b/tests/NATS.Server.Tests/SystemEventsTests.cs @@ -0,0 +1,133 @@ +using System.Text.Json; +using NATS.Server; +using NATS.Server.Events; +using Microsoft.Extensions.Logging.Abstractions; + +namespace NATS.Server.Tests; + +public class SystemEventsTests +{ + [Fact] + public async Task Server_publishes_connect_event_on_client_auth() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + server.EventSystem!.SysSubscribe("$SYS.ACCOUNT.*.CONNECT", (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(subject); + }); + + // Connect a real client + using var sock = new System.Net.Sockets.Socket( + System.Net.Sockets.AddressFamily.InterNetwork, + System.Net.Sockets.SocketType.Stream, + System.Net.Sockets.ProtocolType.Tcp); + await sock.ConnectAsync(System.Net.IPAddress.Loopback, server.Port); + + // Read INFO + var buf = new byte[4096]; + await sock.ReceiveAsync(buf); + + // Send CONNECT + var connect = System.Text.Encoding.ASCII.GetBytes("CONNECT {}\r\n"); + await sock.SendAsync(connect); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + result.ShouldStartWith("$SYS.ACCOUNT."); + result.ShouldEndWith(".CONNECT"); + + await server.ShutdownAsync(); + } + + [Fact] + public async Task Server_publishes_disconnect_event_on_client_close() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + server.EventSystem!.SysSubscribe("$SYS.ACCOUNT.*.DISCONNECT", (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(subject); + }); + + // Connect and then disconnect + using var sock = new System.Net.Sockets.Socket( + System.Net.Sockets.AddressFamily.InterNetwork, + System.Net.Sockets.SocketType.Stream, + System.Net.Sockets.ProtocolType.Tcp); + await sock.ConnectAsync(System.Net.IPAddress.Loopback, server.Port); + var buf = new byte[4096]; + await sock.ReceiveAsync(buf); + await sock.SendAsync(System.Text.Encoding.ASCII.GetBytes("CONNECT {}\r\n")); + await Task.Delay(100); + sock.Shutdown(System.Net.Sockets.SocketShutdown.Both); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + result.ShouldStartWith("$SYS.ACCOUNT."); + result.ShouldEndWith(".DISCONNECT"); + + await server.ShutdownAsync(); + } + + [Fact] + public async Task Server_publishes_statsz_periodically() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + server.EventSystem!.SysSubscribe("$SYS.SERVER.*.STATSZ", (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(subject); + }); + + // Trigger a manual stats publish (don't wait 10s) + server.EventSystem!.PublishServerStats(); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + result.ShouldContain(".STATSZ"); + + await server.ShutdownAsync(); + } + + [Fact] + public async Task Server_publishes_shutdown_event() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + server.EventSystem!.SysSubscribe("$SYS.SERVER.*.SHUTDOWN", (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(subject); + }); + + await server.ShutdownAsync(); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + result.ShouldContain(".SHUTDOWN"); + } + + private static NatsServer CreateTestServer() + { + var port = GetFreePort(); + return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance); + } + + private static int GetFreePort() + { + using var sock = new System.Net.Sockets.Socket( + System.Net.Sockets.AddressFamily.InterNetwork, + System.Net.Sockets.SocketType.Stream, + System.Net.Sockets.ProtocolType.Tcp); + sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0)); + return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port; + } +} diff --git a/tests/NATS.Server.Tests/SystemRequestReplyTests.cs b/tests/NATS.Server.Tests/SystemRequestReplyTests.cs new file mode 100644 index 0000000..c5d0c97 --- /dev/null +++ b/tests/NATS.Server.Tests/SystemRequestReplyTests.cs @@ -0,0 +1,170 @@ +using System.Text; +using System.Text.Json; +using NATS.Server; +using NATS.Server.Events; +using Microsoft.Extensions.Logging.Abstractions; + +namespace NATS.Server.Tests; + +public class SystemRequestReplyTests +{ + [Fact] + public async Task Varz_request_reply_returns_server_info() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + var replySubject = $"_INBOX.test.{Guid.NewGuid():N}"; + server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(msg.ToArray()); + }); + + var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "VARZ"); + server.SendInternalMsg(reqSubject, replySubject, null); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + var json = Encoding.UTF8.GetString(result); + json.ShouldContain("\"server_id\""); + json.ShouldContain("\"version\""); + json.ShouldContain("\"host\""); + json.ShouldContain("\"port\""); + + await server.ShutdownAsync(); + } + + [Fact] + public async Task Healthz_request_reply_returns_ok() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + var replySubject = $"_INBOX.test.{Guid.NewGuid():N}"; + server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(msg.ToArray()); + }); + + var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "HEALTHZ"); + server.SendInternalMsg(reqSubject, replySubject, null); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + var json = Encoding.UTF8.GetString(result); + json.ShouldContain("ok"); + + await server.ShutdownAsync(); + } + + [Fact] + public async Task Subsz_request_reply_returns_subscription_count() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + var replySubject = $"_INBOX.test.{Guid.NewGuid():N}"; + server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(msg.ToArray()); + }); + + var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "SUBSZ"); + server.SendInternalMsg(reqSubject, replySubject, null); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + var json = Encoding.UTF8.GetString(result); + json.ShouldContain("\"num_subscriptions\""); + + await server.ShutdownAsync(); + } + + [Fact] + public async Task Idz_request_reply_returns_server_identity() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + var replySubject = $"_INBOX.test.{Guid.NewGuid():N}"; + server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(msg.ToArray()); + }); + + var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "IDZ"); + server.SendInternalMsg(reqSubject, replySubject, null); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + var json = Encoding.UTF8.GetString(result); + json.ShouldContain("\"server_id\""); + json.ShouldContain("\"server_name\""); + + await server.ShutdownAsync(); + } + + [Fact] + public async Task Ping_varz_responds_via_wildcard_subject() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + var replySubject = $"_INBOX.test.{Guid.NewGuid():N}"; + server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(msg.ToArray()); + }); + + var pingSubject = string.Format(EventSubjects.ServerPing, "VARZ"); + server.SendInternalMsg(pingSubject, replySubject, null); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + var json = Encoding.UTF8.GetString(result); + json.ShouldContain("\"server_id\""); + + await server.ShutdownAsync(); + } + + [Fact] + public async Task Request_without_reply_is_ignored() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + // Send a request with no reply subject -- should not crash + var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "VARZ"); + server.SendInternalMsg(reqSubject, null, null); + + // Give it a moment to process without error + await Task.Delay(200); + + // Server should still be running + server.IsShuttingDown.ShouldBeFalse(); + + await server.ShutdownAsync(); + } + + private static NatsServer CreateTestServer() + { + var port = GetFreePort(); + return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance); + } + + private static int GetFreePort() + { + using var sock = new System.Net.Sockets.Socket( + System.Net.Sockets.AddressFamily.InterNetwork, + System.Net.Sockets.SocketType.Stream, + System.Net.Sockets.ProtocolType.Tcp); + sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0)); + return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port; + } +}