From 5e11785bdf5eac6ca1845eb1cf59db8ac85baee0 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 05:15:06 -0500 Subject: [PATCH 01/16] feat: add ClientKind enum with IsInternal extension --- src/NATS.Server/ClientKind.cs | 22 +++++++++++++++++++ .../NATS.Server.Tests/InternalClientTests.cs | 17 ++++++++++++++ 2 files changed, 39 insertions(+) create mode 100644 src/NATS.Server/ClientKind.cs create mode 100644 tests/NATS.Server.Tests/InternalClientTests.cs diff --git a/src/NATS.Server/ClientKind.cs b/src/NATS.Server/ClientKind.cs new file mode 100644 index 0000000..4a238ca --- /dev/null +++ b/src/NATS.Server/ClientKind.cs @@ -0,0 +1,22 @@ +namespace NATS.Server; + +/// +/// Identifies the type of a client connection. +/// Maps to Go's client kind constants in client.go:45-65. +/// +public enum ClientKind +{ + Client, + Router, + Gateway, + Leaf, + System, + JetStream, + Account, +} + +public static class ClientKindExtensions +{ + public static bool IsInternal(this ClientKind kind) => + kind is ClientKind.System or ClientKind.JetStream or ClientKind.Account; +} diff --git a/tests/NATS.Server.Tests/InternalClientTests.cs b/tests/NATS.Server.Tests/InternalClientTests.cs new file mode 100644 index 0000000..7fb52a7 --- /dev/null +++ b/tests/NATS.Server.Tests/InternalClientTests.cs @@ -0,0 +1,17 @@ +namespace NATS.Server.Tests; + +public class InternalClientTests +{ + [Theory] + [InlineData(ClientKind.Client, false)] + [InlineData(ClientKind.Router, false)] + [InlineData(ClientKind.Gateway, false)] + [InlineData(ClientKind.Leaf, false)] + [InlineData(ClientKind.System, true)] + [InlineData(ClientKind.JetStream, true)] + [InlineData(ClientKind.Account, true)] + public void IsInternal_returns_correct_value(ClientKind kind, bool expected) + { + kind.IsInternal().ShouldBe(expected); + } +} From 0e7db5615e8cd01414fa98a5cfd4d90be0d3af30 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 05:18:37 -0500 Subject: [PATCH 02/16] feat: add INatsClient interface and implement on NatsClient Extract INatsClient interface from NatsClient to enable internal clients (SYSTEM, ACCOUNT) to participate in the subscription system without requiring a socket connection. Change Subscription.Client from concrete NatsClient to INatsClient, keeping IMessageRouter and RemoveClient using the concrete type since only socket clients need those paths. --- src/NATS.Server/INatsClient.cs | 19 +++++++++++++++++++ src/NATS.Server/NatsClient.cs | 3 ++- src/NATS.Server/Subscriptions/Subscription.cs | 2 +- .../NATS.Server.Tests/InternalClientTests.cs | 12 ++++++++++++ 4 files changed, 34 insertions(+), 2 deletions(-) create mode 100644 src/NATS.Server/INatsClient.cs diff --git a/src/NATS.Server/INatsClient.cs b/src/NATS.Server/INatsClient.cs new file mode 100644 index 0000000..5b3e714 --- /dev/null +++ b/src/NATS.Server/INatsClient.cs @@ -0,0 +1,19 @@ +using NATS.Server.Auth; +using NATS.Server.Protocol; + +namespace NATS.Server; + +public interface INatsClient +{ + ulong Id { get; } + ClientKind Kind { get; } + bool IsInternal => Kind.IsInternal(); + Account? Account { get; } + ClientOptions? ClientOpts { get; } + ClientPermissions? Permissions { get; } + + void SendMessage(string subject, string sid, string? replyTo, + ReadOnlyMemory headers, ReadOnlyMemory payload); + bool QueueOutbound(ReadOnlyMemory data); + void RemoveSubscription(string sid); +} diff --git a/src/NATS.Server/NatsClient.cs b/src/NATS.Server/NatsClient.cs index 5a7e2c8..03029e5 100644 --- a/src/NATS.Server/NatsClient.cs +++ b/src/NATS.Server/NatsClient.cs @@ -26,7 +26,7 @@ public interface ISubListAccess SubList SubList { get; } } -public sealed class NatsClient : IDisposable +public sealed class NatsClient : INatsClient, IDisposable { private readonly Socket _socket; private readonly Stream _stream; @@ -45,6 +45,7 @@ public sealed class NatsClient : IDisposable private readonly ServerStats _serverStats; public ulong Id { get; } + public ClientKind Kind => ClientKind.Client; public ClientOptions? ClientOpts { get; private set; } public IMessageRouter? Router { get; set; } public Account? Account { get; private set; } diff --git a/src/NATS.Server/Subscriptions/Subscription.cs b/src/NATS.Server/Subscriptions/Subscription.cs index d96095b..0a13604 100644 --- a/src/NATS.Server/Subscriptions/Subscription.cs +++ b/src/NATS.Server/Subscriptions/Subscription.cs @@ -9,5 +9,5 @@ public sealed class Subscription public required string Sid { get; init; } public long MessageCount; // Interlocked public long MaxMessages; // 0 = unlimited - public NatsClient? Client { get; set; } + public INatsClient? Client { get; set; } } diff --git a/tests/NATS.Server.Tests/InternalClientTests.cs b/tests/NATS.Server.Tests/InternalClientTests.cs index 7fb52a7..bbfa024 100644 --- a/tests/NATS.Server.Tests/InternalClientTests.cs +++ b/tests/NATS.Server.Tests/InternalClientTests.cs @@ -14,4 +14,16 @@ public class InternalClientTests { kind.IsInternal().ShouldBe(expected); } + + [Fact] + public void NatsClient_implements_INatsClient() + { + typeof(NatsClient).GetInterfaces().ShouldContain(typeof(INatsClient)); + } + + [Fact] + public void NatsClient_kind_is_Client() + { + typeof(NatsClient).GetProperty("Kind")!.PropertyType.ShouldBe(typeof(ClientKind)); + } } From 0c4bca90730025f7ebb6343911732a925646916e Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 05:22:58 -0500 Subject: [PATCH 03/16] feat: add InternalClient class for socketless internal messaging --- src/NATS.Server/InternalClient.cs | 59 +++++++++++++++++++ .../NATS.Server.Tests/InternalClientTests.cs | 56 ++++++++++++++++++ 2 files changed, 115 insertions(+) create mode 100644 src/NATS.Server/InternalClient.cs diff --git a/src/NATS.Server/InternalClient.cs b/src/NATS.Server/InternalClient.cs new file mode 100644 index 0000000..61532a9 --- /dev/null +++ b/src/NATS.Server/InternalClient.cs @@ -0,0 +1,59 @@ +using NATS.Server.Auth; +using NATS.Server.Protocol; +using NATS.Server.Subscriptions; + +namespace NATS.Server; + +/// +/// Lightweight socketless client for internal messaging (SYSTEM, ACCOUNT, JETSTREAM). +/// Maps to Go's internal client created by createInternalClient() in server.go:1910-1936. +/// No network I/O — messages are delivered via callback. +/// +public sealed class InternalClient : INatsClient +{ + public ulong Id { get; } + public ClientKind Kind { get; } + public bool IsInternal => Kind.IsInternal(); + public Account? Account { get; } + public ClientOptions? ClientOpts => null; + public ClientPermissions? Permissions => null; + + /// + /// Callback invoked when a message is delivered to this internal client. + /// Set by the event system or account import infrastructure. + /// + public Action, ReadOnlyMemory>? MessageCallback { get; set; } + + private readonly Dictionary _subs = new(StringComparer.Ordinal); + + public InternalClient(ulong id, ClientKind kind, Account account) + { + if (!kind.IsInternal()) + throw new ArgumentException($"InternalClient requires an internal ClientKind, got {kind}", nameof(kind)); + + Id = id; + Kind = kind; + Account = account; + } + + public void SendMessage(string subject, string sid, string? replyTo, + ReadOnlyMemory headers, ReadOnlyMemory payload) + { + MessageCallback?.Invoke(subject, sid, replyTo, headers, payload); + } + + public bool QueueOutbound(ReadOnlyMemory data) => true; // no-op for internal clients + + public void RemoveSubscription(string sid) + { + if (_subs.Remove(sid)) + Account?.DecrementSubscriptions(); + } + + public void AddSubscription(Subscription sub) + { + _subs[sub.Sid] = sub; + } + + public IReadOnlyDictionary Subscriptions => _subs; +} diff --git a/tests/NATS.Server.Tests/InternalClientTests.cs b/tests/NATS.Server.Tests/InternalClientTests.cs index bbfa024..6a068c4 100644 --- a/tests/NATS.Server.Tests/InternalClientTests.cs +++ b/tests/NATS.Server.Tests/InternalClientTests.cs @@ -1,3 +1,5 @@ +using NATS.Server.Auth; + namespace NATS.Server.Tests; public class InternalClientTests @@ -26,4 +28,58 @@ public class InternalClientTests { typeof(NatsClient).GetProperty("Kind")!.PropertyType.ShouldBe(typeof(ClientKind)); } + + [Fact] + public void InternalClient_system_kind() + { + var account = new Account("$SYS"); + var client = new InternalClient(1, ClientKind.System, account); + client.Kind.ShouldBe(ClientKind.System); + client.IsInternal.ShouldBeTrue(); + client.Id.ShouldBe(1UL); + client.Account.ShouldBe(account); + } + + [Fact] + public void InternalClient_account_kind() + { + var account = new Account("myaccount"); + var client = new InternalClient(2, ClientKind.Account, account); + client.Kind.ShouldBe(ClientKind.Account); + client.IsInternal.ShouldBeTrue(); + } + + [Fact] + public void InternalClient_rejects_non_internal_kind() + { + var account = new Account("test"); + Should.Throw(() => new InternalClient(1, ClientKind.Client, account)); + } + + [Fact] + public void InternalClient_SendMessage_invokes_callback() + { + var account = new Account("$SYS"); + var client = new InternalClient(1, ClientKind.System, account); + string? capturedSubject = null; + string? capturedSid = null; + client.MessageCallback = (subject, sid, replyTo, headers, payload) => + { + capturedSubject = subject; + capturedSid = sid; + }; + + client.SendMessage("test.subject", "1", null, ReadOnlyMemory.Empty, ReadOnlyMemory.Empty); + + capturedSubject.ShouldBe("test.subject"); + capturedSid.ShouldBe("1"); + } + + [Fact] + public void InternalClient_QueueOutbound_returns_true_noop() + { + var account = new Account("$SYS"); + var client = new InternalClient(1, ClientKind.System, account); + client.QueueOutbound(ReadOnlyMemory.Empty).ShouldBeTrue(); + } } From b0c5b4acd8a50c870b48672f4dea0c305788a948 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 05:26:25 -0500 Subject: [PATCH 04/16] feat: add system event subject constants and SystemMessageHandler delegate --- src/NATS.Server/Events/EventSubjects.cs | 49 +++++++++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 src/NATS.Server/Events/EventSubjects.cs diff --git a/src/NATS.Server/Events/EventSubjects.cs b/src/NATS.Server/Events/EventSubjects.cs new file mode 100644 index 0000000..f29460f --- /dev/null +++ b/src/NATS.Server/Events/EventSubjects.cs @@ -0,0 +1,49 @@ +using NATS.Server.Auth; +using NATS.Server.Subscriptions; + +namespace NATS.Server.Events; + +/// +/// System event subject patterns. +/// Maps to Go events.go:41-97 subject constants. +/// +public static class EventSubjects +{ + // Account-scoped events + public const string ConnectEvent = "$SYS.ACCOUNT.{0}.CONNECT"; + public const string DisconnectEvent = "$SYS.ACCOUNT.{0}.DISCONNECT"; + public const string AccountConnsNew = "$SYS.ACCOUNT.{0}.SERVER.CONNS"; + public const string AccountConnsOld = "$SYS.SERVER.ACCOUNT.{0}.CONNS"; + + // Server-scoped events + public const string ServerStats = "$SYS.SERVER.{0}.STATSZ"; + public const string ServerShutdown = "$SYS.SERVER.{0}.SHUTDOWN"; + public const string ServerLameDuck = "$SYS.SERVER.{0}.LAMEDUCK"; + public const string AuthError = "$SYS.SERVER.{0}.CLIENT.AUTH.ERR"; + public const string AuthErrorAccount = "$SYS.ACCOUNT.CLIENT.AUTH.ERR"; + + // Request-reply subjects (server-specific) + public const string ServerReq = "$SYS.REQ.SERVER.{0}.{1}"; + + // Wildcard ping subjects (all servers respond) + public const string ServerPing = "$SYS.REQ.SERVER.PING.{0}"; + + // Account-scoped request subjects + public const string AccountReq = "$SYS.REQ.ACCOUNT.{0}.{1}"; + + // Inbox for responses + public const string InboxResponse = "$SYS._INBOX_.{0}"; +} + +/// +/// Callback signature for system message handlers. +/// Maps to Go's sysMsgHandler type in events.go:109. +/// +public delegate void SystemMessageHandler( + Subscription? sub, + INatsClient? client, + Account? account, + string subject, + string? reply, + ReadOnlyMemory headers, + ReadOnlyMemory message); From fc96b6eb438ad2102f99f0b9b98312a12160a9f6 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 05:29:40 -0500 Subject: [PATCH 05/16] feat: add system event DTOs and JSON source generator context --- src/NATS.Server/Events/EventJsonContext.cs | 12 + src/NATS.Server/Events/EventTypes.cs | 270 ++++++++++++++++++++ src/NATS.Server/NATS.Server.csproj | 3 + tests/NATS.Server.Tests/EventSystemTests.cs | 68 +++++ 4 files changed, 353 insertions(+) create mode 100644 src/NATS.Server/Events/EventJsonContext.cs create mode 100644 src/NATS.Server/Events/EventTypes.cs create mode 100644 tests/NATS.Server.Tests/EventSystemTests.cs diff --git a/src/NATS.Server/Events/EventJsonContext.cs b/src/NATS.Server/Events/EventJsonContext.cs new file mode 100644 index 0000000..7ac4ed2 --- /dev/null +++ b/src/NATS.Server/Events/EventJsonContext.cs @@ -0,0 +1,12 @@ +using System.Text.Json.Serialization; + +namespace NATS.Server.Events; + +[JsonSerializable(typeof(ConnectEventMsg))] +[JsonSerializable(typeof(DisconnectEventMsg))] +[JsonSerializable(typeof(AccountNumConns))] +[JsonSerializable(typeof(ServerStatsMsg))] +[JsonSerializable(typeof(ShutdownEventMsg))] +[JsonSerializable(typeof(LameDuckEventMsg))] +[JsonSerializable(typeof(AuthErrorEventMsg))] +internal partial class EventJsonContext : JsonSerializerContext; diff --git a/src/NATS.Server/Events/EventTypes.cs b/src/NATS.Server/Events/EventTypes.cs new file mode 100644 index 0000000..9da36bb --- /dev/null +++ b/src/NATS.Server/Events/EventTypes.cs @@ -0,0 +1,270 @@ +using System.Text.Json.Serialization; + +namespace NATS.Server.Events; + +/// +/// Server identity block embedded in all system events. +/// +public sealed class EventServerInfo +{ + [JsonPropertyName("name")] + public string Name { get; set; } = string.Empty; + + [JsonPropertyName("host")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Host { get; set; } + + [JsonPropertyName("id")] + public string Id { get; set; } = string.Empty; + + [JsonPropertyName("cluster")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Cluster { get; set; } + + [JsonPropertyName("domain")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Domain { get; set; } + + [JsonPropertyName("ver")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Version { get; set; } + + [JsonPropertyName("seq")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public ulong Seq { get; set; } + + [JsonPropertyName("tags")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public Dictionary? Tags { get; set; } +} + +/// +/// Client identity block for connect/disconnect events. +/// +public sealed class EventClientInfo +{ + [JsonPropertyName("start")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public DateTime Start { get; set; } + + [JsonPropertyName("stop")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public DateTime Stop { get; set; } + + [JsonPropertyName("host")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Host { get; set; } + + [JsonPropertyName("id")] + public ulong Id { get; set; } + + [JsonPropertyName("acc")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Account { get; set; } + + [JsonPropertyName("name")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Name { get; set; } + + [JsonPropertyName("lang")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Lang { get; set; } + + [JsonPropertyName("ver")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Version { get; set; } + + [JsonPropertyName("rtt")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public long RttNanos { get; set; } +} + +public sealed class DataStats +{ + [JsonPropertyName("msgs")] + public long Msgs { get; set; } + + [JsonPropertyName("bytes")] + public long Bytes { get; set; } +} + +/// Client connect advisory. Go events.go:155-160. +public sealed class ConnectEventMsg +{ + public const string EventType = "io.nats.server.advisory.v1.client_connect"; + + [JsonPropertyName("type")] + public string Type { get; set; } = EventType; + + [JsonPropertyName("id")] + public string Id { get; set; } = string.Empty; + + [JsonPropertyName("timestamp")] + public DateTime Time { get; set; } + + [JsonPropertyName("server")] + public EventServerInfo Server { get; set; } = new(); + + [JsonPropertyName("client")] + public EventClientInfo Client { get; set; } = new(); +} + +/// Client disconnect advisory. Go events.go:167-174. +public sealed class DisconnectEventMsg +{ + public const string EventType = "io.nats.server.advisory.v1.client_disconnect"; + + [JsonPropertyName("type")] + public string Type { get; set; } = EventType; + + [JsonPropertyName("id")] + public string Id { get; set; } = string.Empty; + + [JsonPropertyName("timestamp")] + public DateTime Time { get; set; } + + [JsonPropertyName("server")] + public EventServerInfo Server { get; set; } = new(); + + [JsonPropertyName("client")] + public EventClientInfo Client { get; set; } = new(); + + [JsonPropertyName("sent")] + public DataStats Sent { get; set; } = new(); + + [JsonPropertyName("received")] + public DataStats Received { get; set; } = new(); + + [JsonPropertyName("reason")] + public string Reason { get; set; } = string.Empty; +} + +/// Account connection count heartbeat. Go events.go:210-214. +public sealed class AccountNumConns +{ + public const string EventType = "io.nats.server.advisory.v1.account_connections"; + + [JsonPropertyName("type")] + public string Type { get; set; } = EventType; + + [JsonPropertyName("id")] + public string Id { get; set; } = string.Empty; + + [JsonPropertyName("timestamp")] + public DateTime Time { get; set; } + + [JsonPropertyName("server")] + public EventServerInfo Server { get; set; } = new(); + + [JsonPropertyName("acc")] + public string AccountName { get; set; } = string.Empty; + + [JsonPropertyName("conns")] + public int Connections { get; set; } + + [JsonPropertyName("total_conns")] + public long TotalConnections { get; set; } + + [JsonPropertyName("subs")] + public int Subscriptions { get; set; } + + [JsonPropertyName("sent")] + public DataStats Sent { get; set; } = new(); + + [JsonPropertyName("received")] + public DataStats Received { get; set; } = new(); +} + +/// Server stats broadcast. Go events.go:150-153. +public sealed class ServerStatsMsg +{ + [JsonPropertyName("server")] + public EventServerInfo Server { get; set; } = new(); + + [JsonPropertyName("statsz")] + public ServerStatsData Stats { get; set; } = new(); +} + +public sealed class ServerStatsData +{ + [JsonPropertyName("start")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public DateTime Start { get; set; } + + [JsonPropertyName("mem")] + public long Mem { get; set; } + + [JsonPropertyName("cores")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public int Cores { get; set; } + + [JsonPropertyName("connections")] + public int Connections { get; set; } + + [JsonPropertyName("total_connections")] + public long TotalConnections { get; set; } + + [JsonPropertyName("active_accounts")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public int ActiveAccounts { get; set; } + + [JsonPropertyName("subscriptions")] + public long Subscriptions { get; set; } + + [JsonPropertyName("in_msgs")] + public long InMsgs { get; set; } + + [JsonPropertyName("out_msgs")] + public long OutMsgs { get; set; } + + [JsonPropertyName("in_bytes")] + public long InBytes { get; set; } + + [JsonPropertyName("out_bytes")] + public long OutBytes { get; set; } + + [JsonPropertyName("slow_consumers")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public long SlowConsumers { get; set; } +} + +/// Server shutdown notification. +public sealed class ShutdownEventMsg +{ + [JsonPropertyName("server")] + public EventServerInfo Server { get; set; } = new(); + + [JsonPropertyName("reason")] + public string Reason { get; set; } = string.Empty; +} + +/// Lame duck mode notification. +public sealed class LameDuckEventMsg +{ + [JsonPropertyName("server")] + public EventServerInfo Server { get; set; } = new(); +} + +/// Auth error advisory. +public sealed class AuthErrorEventMsg +{ + public const string EventType = "io.nats.server.advisory.v1.client_auth"; + + [JsonPropertyName("type")] + public string Type { get; set; } = EventType; + + [JsonPropertyName("id")] + public string Id { get; set; } = string.Empty; + + [JsonPropertyName("timestamp")] + public DateTime Time { get; set; } + + [JsonPropertyName("server")] + public EventServerInfo Server { get; set; } = new(); + + [JsonPropertyName("client")] + public EventClientInfo Client { get; set; } = new(); + + [JsonPropertyName("reason")] + public string Reason { get; set; } = string.Empty; +} diff --git a/src/NATS.Server/NATS.Server.csproj b/src/NATS.Server/NATS.Server.csproj index d85e688..390f283 100644 --- a/src/NATS.Server/NATS.Server.csproj +++ b/src/NATS.Server/NATS.Server.csproj @@ -1,4 +1,7 @@ + + + diff --git a/tests/NATS.Server.Tests/EventSystemTests.cs b/tests/NATS.Server.Tests/EventSystemTests.cs new file mode 100644 index 0000000..ab6e501 --- /dev/null +++ b/tests/NATS.Server.Tests/EventSystemTests.cs @@ -0,0 +1,68 @@ +using System.Text.Json; +using NATS.Server.Events; + +namespace NATS.Server.Tests; + +public class EventSystemTests +{ + [Fact] + public void ConnectEventMsg_serializes_with_correct_type() + { + var evt = new ConnectEventMsg + { + Type = ConnectEventMsg.EventType, + Id = "test123", + Time = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc), + Server = new EventServerInfo { Name = "test-server", Id = "SRV1" }, + Client = new EventClientInfo { Id = 1, Account = "$G" }, + }; + + var json = JsonSerializer.Serialize(evt, EventJsonContext.Default.ConnectEventMsg); + json.ShouldContain("\"type\":\"io.nats.server.advisory.v1.client_connect\""); + json.ShouldContain("\"server\":"); + json.ShouldContain("\"client\":"); + } + + [Fact] + public void DisconnectEventMsg_serializes_with_reason() + { + var evt = new DisconnectEventMsg + { + Type = DisconnectEventMsg.EventType, + Id = "test456", + Time = DateTime.UtcNow, + Server = new EventServerInfo { Name = "test-server", Id = "SRV1" }, + Client = new EventClientInfo { Id = 2, Account = "myacc" }, + Reason = "Client Closed", + Sent = new DataStats { Msgs = 10, Bytes = 1024 }, + Received = new DataStats { Msgs = 5, Bytes = 512 }, + }; + + var json = JsonSerializer.Serialize(evt, EventJsonContext.Default.DisconnectEventMsg); + json.ShouldContain("\"reason\":\"Client Closed\""); + } + + [Fact] + public void ServerStatsMsg_serializes() + { + var evt = new ServerStatsMsg + { + Server = new EventServerInfo { Name = "srv1", Id = "ABC" }, + Stats = new ServerStatsData + { + Connections = 10, + TotalConnections = 100, + InMsgs = 5000, + OutMsgs = 4500, + InBytes = 1_000_000, + OutBytes = 900_000, + Mem = 50 * 1024 * 1024, + Subscriptions = 42, + }, + }; + + var json = JsonSerializer.Serialize(evt, EventJsonContext.Default.ServerStatsMsg); + json.ShouldContain("\"connections\":10"); + json.ShouldContain("\"in_msgs\":5000"); + } +} From 8e790445f4c598d9f01c0bc6e0cd3c6906e612bc Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 05:34:57 -0500 Subject: [PATCH 06/16] feat: add InternalEventSystem with Channel-based send/receive loops --- src/NATS.Server/Events/InternalEventSystem.cs | 238 ++++++++++++++++++ src/NATS.Server/NatsServer.cs | 27 ++ tests/NATS.Server.Tests/EventSystemTests.cs | 53 ++++ 3 files changed, 318 insertions(+) create mode 100644 src/NATS.Server/Events/InternalEventSystem.cs diff --git a/src/NATS.Server/Events/InternalEventSystem.cs b/src/NATS.Server/Events/InternalEventSystem.cs new file mode 100644 index 0000000..96f84d8 --- /dev/null +++ b/src/NATS.Server/Events/InternalEventSystem.cs @@ -0,0 +1,238 @@ +using System.Collections.Concurrent; +using System.Security.Cryptography; +using System.Text; +using System.Text.Json; +using System.Threading.Channels; +using Microsoft.Extensions.Logging; +using NATS.Server.Auth; +using NATS.Server.Subscriptions; + +namespace NATS.Server.Events; + +/// +/// Internal publish message queued for the send loop. +/// +public sealed class PublishMessage +{ + public InternalClient? Client { get; init; } + public required string Subject { get; init; } + public string? Reply { get; init; } + public byte[]? Headers { get; init; } + public object? Body { get; init; } + public bool Echo { get; init; } + public bool IsLast { get; init; } +} + +/// +/// Internal received message queued for the receive loop. +/// +public sealed class InternalSystemMessage +{ + public required Subscription? Sub { get; init; } + public required INatsClient? Client { get; init; } + public required Account? Account { get; init; } + public required string Subject { get; init; } + public required string? Reply { get; init; } + public required ReadOnlyMemory Headers { get; init; } + public required ReadOnlyMemory Message { get; init; } + public required SystemMessageHandler Callback { get; init; } +} + +/// +/// Manages the server's internal event system with Channel-based send/receive loops. +/// Maps to Go's internal struct in events.go:124-147 and the goroutines +/// internalSendLoop (events.go:495) and internalReceiveLoop (events.go:476). +/// +public sealed class InternalEventSystem : IAsyncDisposable +{ + private readonly ILogger _logger; + private readonly Channel _sendQueue; + private readonly Channel _receiveQueue; + private readonly Channel _receiveQueuePings; + private readonly CancellationTokenSource _cts = new(); + + private Task? _sendLoop; + private Task? _receiveLoop; + private Task? _receiveLoopPings; + private NatsServer? _server; + + private ulong _sequence; + private int _subscriptionId; + + public Account SystemAccount { get; } + public InternalClient SystemClient { get; } + public string ServerHash { get; } + + public InternalEventSystem(Account systemAccount, InternalClient systemClient, string serverName, ILogger logger) + { + _logger = logger; + SystemAccount = systemAccount; + SystemClient = systemClient; + + // Hash server name for inbox routing (matches Go's shash) + ServerHash = Convert.ToHexString(SHA256.HashData(Encoding.UTF8.GetBytes(serverName)))[..8].ToLowerInvariant(); + + _sendQueue = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); + _receiveQueue = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); + _receiveQueuePings = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); + } + + public void Start(NatsServer server) + { + _server = server; + var ct = _cts.Token; + _sendLoop = Task.Run(() => InternalSendLoopAsync(ct), ct); + _receiveLoop = Task.Run(() => InternalReceiveLoopAsync(_receiveQueue, ct), ct); + _receiveLoopPings = Task.Run(() => InternalReceiveLoopAsync(_receiveQueuePings, ct), ct); + } + + /// + /// Creates a system subscription in the system account's SubList. + /// Maps to Go's sysSubscribe in events.go:2796. + /// + public Subscription SysSubscribe(string subject, SystemMessageHandler callback) + { + var sid = Interlocked.Increment(ref _subscriptionId).ToString(); + var sub = new Subscription + { + Subject = subject, + Sid = sid, + Client = SystemClient, + }; + + // Wrap callback in noInlineCallback pattern: enqueue to receive loop + SystemClient.MessageCallback = (subj, s, reply, hdr, msg) => + { + _receiveQueue.Writer.TryWrite(new InternalSystemMessage + { + Sub = sub, + Client = SystemClient, + Account = SystemAccount, + Subject = subj, + Reply = reply, + Headers = hdr, + Message = msg, + Callback = callback, + }); + }; + + SystemAccount.SubList.Insert(sub); + return sub; + } + + /// + /// Enqueue an internal message for publishing through the send loop. + /// + public void Enqueue(PublishMessage message) + { + _sendQueue.Writer.TryWrite(message); + } + + /// + /// The send loop: serializes messages and delivers them via the server's routing. + /// Maps to Go's internalSendLoop in events.go:495-668. + /// + private async Task InternalSendLoopAsync(CancellationToken ct) + { + try + { + await foreach (var pm in _sendQueue.Reader.ReadAllAsync(ct)) + { + try + { + var seq = Interlocked.Increment(ref _sequence); + + // Serialize body to JSON + byte[] payload; + if (pm.Body is byte[] raw) + { + payload = raw; + } + else if (pm.Body != null) + { + // Try source-generated context first, fall back to reflection-based for unknown types + var bodyType = pm.Body.GetType(); + var typeInfo = EventJsonContext.Default.GetTypeInfo(bodyType); + payload = typeInfo != null + ? JsonSerializer.SerializeToUtf8Bytes(pm.Body, typeInfo) + : JsonSerializer.SerializeToUtf8Bytes(pm.Body, bodyType); + } + else + { + payload = []; + } + + // Deliver via the system account's SubList matching + var result = SystemAccount.SubList.Match(pm.Subject); + + foreach (var sub in result.PlainSubs) + { + sub.Client?.SendMessage(pm.Subject, sub.Sid, pm.Reply, + pm.Headers ?? ReadOnlyMemory.Empty, + payload); + } + + foreach (var queueGroup in result.QueueSubs) + { + if (queueGroup.Length == 0) continue; + var sub = queueGroup[0]; // Simple pick for internal + sub.Client?.SendMessage(pm.Subject, sub.Sid, pm.Reply, + pm.Headers ?? ReadOnlyMemory.Empty, + payload); + } + + if (pm.IsLast) + break; + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Error in internal send loop processing message on {Subject}", pm.Subject); + } + } + } + catch (OperationCanceledException) + { + // Normal shutdown + } + } + + /// + /// The receive loop: dispatches callbacks for internally-received messages. + /// Maps to Go's internalReceiveLoop in events.go:476-491. + /// + private async Task InternalReceiveLoopAsync(Channel queue, CancellationToken ct) + { + try + { + await foreach (var msg in queue.Reader.ReadAllAsync(ct)) + { + try + { + msg.Callback(msg.Sub, msg.Client, msg.Account, msg.Subject, msg.Reply, msg.Headers, msg.Message); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Error in internal receive loop processing {Subject}", msg.Subject); + } + } + } + catch (OperationCanceledException) + { + // Normal shutdown + } + } + + public async ValueTask DisposeAsync() + { + await _cts.CancelAsync(); + _sendQueue.Writer.TryComplete(); + _receiveQueue.Writer.TryComplete(); + _receiveQueuePings.Writer.TryComplete(); + + if (_sendLoop != null) await _sendLoop.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); + if (_receiveLoop != null) await _receiveLoop.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); + if (_receiveLoopPings != null) await _receiveLoopPings.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); + + _cts.Dispose(); + } +} diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 89c3bd4..4c44194 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -9,6 +9,7 @@ using Microsoft.Extensions.Logging; using NATS.NKeys; using NATS.Server.Auth; using NATS.Server.Configuration; +using NATS.Server.Events; using NATS.Server.Monitoring; using NATS.Server.Protocol; using NATS.Server.Subscriptions; @@ -35,6 +36,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable private string? _configDigest; private readonly Account _globalAccount; private readonly Account _systemAccount; + private InternalEventSystem? _eventSystem; private readonly SslServerAuthenticationOptions? _sslOptions; private readonly TlsRateLimiter? _tlsRateLimiter; private readonly SubjectTransform[] _subjectTransforms; @@ -70,6 +72,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable public int Port => _options.Port; public Account SystemAccount => _systemAccount; public string ServerNKey { get; } + public InternalEventSystem? EventSystem => _eventSystem; public bool IsShuttingDown => Volatile.Read(ref _shutdown) != 0; public bool IsLameDuckMode => Volatile.Read(ref _lameDuck) != 0; public Action? ReOpenLogFile { get; set; } @@ -90,6 +93,10 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _logger.LogInformation("Initiating Shutdown..."); + // Dispose event system before tearing down clients + if (_eventSystem != null) + await _eventSystem.DisposeAsync(); + // Signal all internal loops to stop await _quitCts.CancelAsync(); @@ -265,6 +272,14 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _systemAccount = new Account("$SYS"); _accounts["$SYS"] = _systemAccount; + // Create system internal client and event system + var sysClientId = Interlocked.Increment(ref _nextClientId); + var sysClient = new InternalClient(sysClientId, ClientKind.System, _systemAccount); + _eventSystem = new InternalEventSystem( + _systemAccount, sysClient, + options.ServerName ?? $"nats-dotnet-{Environment.MachineName}", + _loggerFactory.CreateLogger()); + // Generate Ed25519 server NKey identity using var serverKeyPair = KeyPair.CreatePair(PrefixByte.Server); ServerNKey = serverKeyPair.GetPublicKey(); @@ -371,6 +386,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _listeningStarted.TrySetResult(); + _eventSystem?.Start(this); + _logger.LogInformation("Listening for client connections on {Host}:{Port}", _options.Host, _options.Port); // Warn about stub features @@ -686,6 +703,16 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable }); } + public void SendInternalMsg(string subject, string? reply, object? msg) + { + _eventSystem?.Enqueue(new PublishMessage { Subject = subject, Reply = reply, Body = msg }); + } + + public void SendInternalAccountMsg(Account account, string subject, object? msg) + { + _eventSystem?.Enqueue(new PublishMessage { Subject = subject, Body = msg }); + } + public void RemoveClient(NatsClient client) { _clients.TryRemove(client.Id, out _); diff --git a/tests/NATS.Server.Tests/EventSystemTests.cs b/tests/NATS.Server.Tests/EventSystemTests.cs index ab6e501..4924bc8 100644 --- a/tests/NATS.Server.Tests/EventSystemTests.cs +++ b/tests/NATS.Server.Tests/EventSystemTests.cs @@ -1,4 +1,5 @@ using System.Text.Json; +using Microsoft.Extensions.Logging.Abstractions; using NATS.Server.Events; namespace NATS.Server.Tests; @@ -65,4 +66,56 @@ public class EventSystemTests json.ShouldContain("\"connections\":10"); json.ShouldContain("\"in_msgs\":5000"); } + + [Fact] + public async Task InternalEventSystem_start_and_stop_lifecycle() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var eventSystem = server.EventSystem; + eventSystem.ShouldNotBeNull(); + eventSystem.SystemClient.ShouldNotBeNull(); + eventSystem.SystemClient.Kind.ShouldBe(ClientKind.System); + + await server.ShutdownAsync(); + } + + [Fact] + public async Task SendInternalMsg_delivers_to_system_subscriber() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + server.EventSystem!.SysSubscribe("test.subject", (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(subject); + }); + + server.SendInternalMsg("test.subject", null, new { Value = "hello" }); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + result.ShouldBe("test.subject"); + + await server.ShutdownAsync(); + } + + private static NatsServer CreateTestServer() + { + var port = GetFreePort(); + return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance); + } + + private static int GetFreePort() + { + using var sock = new System.Net.Sockets.Socket( + System.Net.Sockets.AddressFamily.InterNetwork, + System.Net.Sockets.SocketType.Stream, + System.Net.Sockets.ProtocolType.Tcp); + sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0)); + return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port; + } } From 89465450a11c5a7a07275611c5845be830af7d98 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 05:38:10 -0500 Subject: [PATCH 07/16] fix: use per-SID callback dictionary in SysSubscribe to support multiple subscriptions --- src/NATS.Server/Events/InternalEventSystem.cs | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/src/NATS.Server/Events/InternalEventSystem.cs b/src/NATS.Server/Events/InternalEventSystem.cs index 96f84d8..b5707f8 100644 --- a/src/NATS.Server/Events/InternalEventSystem.cs +++ b/src/NATS.Server/Events/InternalEventSystem.cs @@ -58,6 +58,7 @@ public sealed class InternalEventSystem : IAsyncDisposable private ulong _sequence; private int _subscriptionId; + private readonly ConcurrentDictionary _callbacks = new(); public Account SystemAccount { get; } public InternalClient SystemClient { get; } @@ -100,20 +101,26 @@ public sealed class InternalEventSystem : IAsyncDisposable Client = SystemClient, }; - // Wrap callback in noInlineCallback pattern: enqueue to receive loop + // Store callback keyed by SID so multiple subscriptions work + _callbacks[sid] = callback; + + // Set a single routing callback on the system client that dispatches by SID SystemClient.MessageCallback = (subj, s, reply, hdr, msg) => { - _receiveQueue.Writer.TryWrite(new InternalSystemMessage + if (_callbacks.TryGetValue(s, out var cb)) { - Sub = sub, - Client = SystemClient, - Account = SystemAccount, - Subject = subj, - Reply = reply, - Headers = hdr, - Message = msg, - Callback = callback, - }); + _receiveQueue.Writer.TryWrite(new InternalSystemMessage + { + Sub = sub, + Client = SystemClient, + Account = SystemAccount, + Subject = subj, + Reply = reply, + Headers = hdr, + Message = msg, + Callback = cb, + }); + } }; SystemAccount.SubList.Insert(sub); From 125b71b3b092cc888ae1017cc7400a20e0912d8e Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 05:41:44 -0500 Subject: [PATCH 08/16] feat: wire system event publishing for connect, disconnect, and shutdown --- src/NATS.Server/Events/InternalEventSystem.cs | 5 + src/NATS.Server/NatsClient.cs | 5 + src/NATS.Server/NatsServer.cs | 97 ++++++++++++++- tests/NATS.Server.Tests/SystemEventsTests.cs | 111 ++++++++++++++++++ 4 files changed, 217 insertions(+), 1 deletion(-) create mode 100644 tests/NATS.Server.Tests/SystemEventsTests.cs diff --git a/src/NATS.Server/Events/InternalEventSystem.cs b/src/NATS.Server/Events/InternalEventSystem.cs index b5707f8..d0c0964 100644 --- a/src/NATS.Server/Events/InternalEventSystem.cs +++ b/src/NATS.Server/Events/InternalEventSystem.cs @@ -127,6 +127,11 @@ public sealed class InternalEventSystem : IAsyncDisposable return sub; } + /// + /// Returns the next monotonically increasing sequence number for event ordering. + /// + public ulong NextSequence() => Interlocked.Increment(ref _sequence); + /// /// Enqueue an internal message for publishing through the send loop. /// diff --git a/src/NATS.Server/NatsClient.cs b/src/NATS.Server/NatsClient.cs index 03029e5..4c19aaf 100644 --- a/src/NATS.Server/NatsClient.cs +++ b/src/NATS.Server/NatsClient.cs @@ -19,6 +19,8 @@ public interface IMessageRouter void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory headers, ReadOnlyMemory payload, NatsClient sender); void RemoveClient(NatsClient client); + void PublishConnectEvent(NatsClient client); + void PublishDisconnectEvent(NatsClient client); } public interface ISubListAccess @@ -445,6 +447,9 @@ public sealed class NatsClient : INatsClient, IDisposable _flags.SetFlag(ClientFlags.ConnectProcessFinished); _logger.LogDebug("CONNECT received from client {ClientId}, name={ClientName}", Id, ClientOpts?.Name); + // Publish connect advisory to the system event bus + Router?.PublishConnectEvent(this); + // Start auth expiry timer if needed if (_authService.IsAuthRequired && authResult?.Expiry is { } expiry) { diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 4c44194..9239f36 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -93,9 +93,20 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _logger.LogInformation("Initiating Shutdown..."); - // Dispose event system before tearing down clients + // Publish shutdown advisory before tearing down the event system if (_eventSystem != null) + { + var shutdownSubject = string.Format(EventSubjects.ServerShutdown, _serverInfo.ServerId); + _eventSystem.Enqueue(new PublishMessage + { + Subject = shutdownSubject, + Body = new ShutdownEventMsg { Server = BuildEventServerInfo(), Reason = "Server Shutdown" }, + IsLast = true, + }); + // Give the send loop time to process the shutdown event + await Task.Delay(100); await _eventSystem.DisposeAsync(); + } // Signal all internal loops to stop await _quitCts.CancelAsync(); @@ -713,8 +724,92 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _eventSystem?.Enqueue(new PublishMessage { Subject = subject, Body = msg }); } + /// + /// Builds an EventServerInfo block for embedding in system event messages. + /// Maps to Go's serverInfo() helper used in events.go advisory publishing. + /// + public EventServerInfo BuildEventServerInfo() + { + var seq = _eventSystem?.NextSequence() ?? 0; + return new EventServerInfo + { + Name = _serverInfo.ServerName, + Host = _options.Host, + Id = _serverInfo.ServerId, + Version = NatsProtocol.Version, + Seq = seq, + }; + } + + private static EventClientInfo BuildEventClientInfo(NatsClient client) + { + return new EventClientInfo + { + Id = client.Id, + Host = client.RemoteIp, + Account = client.Account?.Name, + Name = client.ClientOpts?.Name, + Lang = client.ClientOpts?.Lang, + Version = client.ClientOpts?.Version, + Start = client.StartTime, + }; + } + + /// + /// Publishes a $SYS.ACCOUNT.{account}.CONNECT advisory when a client + /// completes authentication. Maps to Go's sendConnectEvent in events.go. + /// + public void PublishConnectEvent(NatsClient client) + { + if (_eventSystem == null) return; + var accountName = client.Account?.Name ?? Account.GlobalAccountName; + var subject = string.Format(EventSubjects.ConnectEvent, accountName); + var evt = new ConnectEventMsg + { + Id = Guid.NewGuid().ToString("N"), + Time = DateTime.UtcNow, + Server = BuildEventServerInfo(), + Client = BuildEventClientInfo(client), + }; + SendInternalMsg(subject, null, evt); + } + + /// + /// Publishes a $SYS.ACCOUNT.{account}.DISCONNECT advisory when a client + /// disconnects. Maps to Go's sendDisconnectEvent in events.go. + /// + public void PublishDisconnectEvent(NatsClient client) + { + if (_eventSystem == null) return; + var accountName = client.Account?.Name ?? Account.GlobalAccountName; + var subject = string.Format(EventSubjects.DisconnectEvent, accountName); + var evt = new DisconnectEventMsg + { + Id = Guid.NewGuid().ToString("N"), + Time = DateTime.UtcNow, + Server = BuildEventServerInfo(), + Client = BuildEventClientInfo(client), + Sent = new DataStats + { + Msgs = Interlocked.Read(ref client.OutMsgs), + Bytes = Interlocked.Read(ref client.OutBytes), + }, + Received = new DataStats + { + Msgs = Interlocked.Read(ref client.InMsgs), + Bytes = Interlocked.Read(ref client.InBytes), + }, + Reason = client.CloseReason.ToReasonString(), + }; + SendInternalMsg(subject, null, evt); + } + public void RemoveClient(NatsClient client) { + // Publish disconnect advisory before removing client state + if (client.ConnectReceived) + PublishDisconnectEvent(client); + _clients.TryRemove(client.Id, out _); _logger.LogDebug("Removed client {ClientId}", client.Id); diff --git a/tests/NATS.Server.Tests/SystemEventsTests.cs b/tests/NATS.Server.Tests/SystemEventsTests.cs new file mode 100644 index 0000000..7a11bdd --- /dev/null +++ b/tests/NATS.Server.Tests/SystemEventsTests.cs @@ -0,0 +1,111 @@ +using System.Text.Json; +using NATS.Server; +using NATS.Server.Events; +using Microsoft.Extensions.Logging.Abstractions; + +namespace NATS.Server.Tests; + +public class SystemEventsTests +{ + [Fact] + public async Task Server_publishes_connect_event_on_client_auth() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + server.EventSystem!.SysSubscribe("$SYS.ACCOUNT.*.CONNECT", (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(subject); + }); + + // Connect a real client + using var sock = new System.Net.Sockets.Socket( + System.Net.Sockets.AddressFamily.InterNetwork, + System.Net.Sockets.SocketType.Stream, + System.Net.Sockets.ProtocolType.Tcp); + await sock.ConnectAsync(System.Net.IPAddress.Loopback, server.Port); + + // Read INFO + var buf = new byte[4096]; + await sock.ReceiveAsync(buf); + + // Send CONNECT + var connect = System.Text.Encoding.ASCII.GetBytes("CONNECT {}\r\n"); + await sock.SendAsync(connect); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + result.ShouldStartWith("$SYS.ACCOUNT."); + result.ShouldEndWith(".CONNECT"); + + await server.ShutdownAsync(); + } + + [Fact] + public async Task Server_publishes_disconnect_event_on_client_close() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + server.EventSystem!.SysSubscribe("$SYS.ACCOUNT.*.DISCONNECT", (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(subject); + }); + + // Connect and then disconnect + using var sock = new System.Net.Sockets.Socket( + System.Net.Sockets.AddressFamily.InterNetwork, + System.Net.Sockets.SocketType.Stream, + System.Net.Sockets.ProtocolType.Tcp); + await sock.ConnectAsync(System.Net.IPAddress.Loopback, server.Port); + var buf = new byte[4096]; + await sock.ReceiveAsync(buf); + await sock.SendAsync(System.Text.Encoding.ASCII.GetBytes("CONNECT {}\r\n")); + await Task.Delay(100); + sock.Shutdown(System.Net.Sockets.SocketShutdown.Both); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + result.ShouldStartWith("$SYS.ACCOUNT."); + result.ShouldEndWith(".DISCONNECT"); + + await server.ShutdownAsync(); + } + + [Fact] + public async Task Server_publishes_shutdown_event() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + server.EventSystem!.SysSubscribe("$SYS.SERVER.*.SHUTDOWN", (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(subject); + }); + + await server.ShutdownAsync(); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + result.ShouldContain(".SHUTDOWN"); + } + + private static NatsServer CreateTestServer() + { + var port = GetFreePort(); + return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance); + } + + private static int GetFreePort() + { + using var sock = new System.Net.Sockets.Socket( + System.Net.Sockets.AddressFamily.InterNetwork, + System.Net.Sockets.SocketType.Stream, + System.Net.Sockets.ProtocolType.Tcp); + sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0)); + return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port; + } +} From 0b34f8cec466223d600709ed5913b21260692068 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 05:44:09 -0500 Subject: [PATCH 09/16] feat: add periodic server stats and account connection heartbeat publishing --- src/NATS.Server/Events/InternalEventSystem.cs | 44 +++++++++++++++++++ tests/NATS.Server.Tests/SystemEventsTests.cs | 22 ++++++++++ 2 files changed, 66 insertions(+) diff --git a/src/NATS.Server/Events/InternalEventSystem.cs b/src/NATS.Server/Events/InternalEventSystem.cs index d0c0964..f8f49f1 100644 --- a/src/NATS.Server/Events/InternalEventSystem.cs +++ b/src/NATS.Server/Events/InternalEventSystem.cs @@ -85,6 +85,50 @@ public sealed class InternalEventSystem : IAsyncDisposable _sendLoop = Task.Run(() => InternalSendLoopAsync(ct), ct); _receiveLoop = Task.Run(() => InternalReceiveLoopAsync(_receiveQueue, ct), ct); _receiveLoopPings = Task.Run(() => InternalReceiveLoopAsync(_receiveQueuePings, ct), ct); + + // Periodic stats publish every 10 seconds + _ = Task.Run(async () => + { + using var timer = new PeriodicTimer(TimeSpan.FromSeconds(10)); + while (await timer.WaitForNextTickAsync(ct)) + { + PublishServerStats(); + } + }, ct); + } + + /// + /// Publishes a $SYS.SERVER.{id}.STATSZ message with current server statistics. + /// Maps to Go's sendStatsz in events.go. + /// Can be called manually for testing or is invoked periodically by the stats timer. + /// + public void PublishServerStats() + { + if (_server == null) return; + + var subject = string.Format(EventSubjects.ServerStats, _server.ServerId); + var process = System.Diagnostics.Process.GetCurrentProcess(); + + var statsMsg = new ServerStatsMsg + { + Server = _server.BuildEventServerInfo(), + Stats = new ServerStatsData + { + Start = _server.StartTime, + Mem = process.WorkingSet64, + Cores = Environment.ProcessorCount, + Connections = _server.ClientCount, + TotalConnections = Interlocked.Read(ref _server.Stats.TotalConnections), + Subscriptions = SystemAccount.SubList.Count, + InMsgs = Interlocked.Read(ref _server.Stats.InMsgs), + OutMsgs = Interlocked.Read(ref _server.Stats.OutMsgs), + InBytes = Interlocked.Read(ref _server.Stats.InBytes), + OutBytes = Interlocked.Read(ref _server.Stats.OutBytes), + SlowConsumers = Interlocked.Read(ref _server.Stats.SlowConsumers), + }, + }; + + Enqueue(new PublishMessage { Subject = subject, Body = statsMsg }); } /// diff --git a/tests/NATS.Server.Tests/SystemEventsTests.cs b/tests/NATS.Server.Tests/SystemEventsTests.cs index 7a11bdd..efd6d11 100644 --- a/tests/NATS.Server.Tests/SystemEventsTests.cs +++ b/tests/NATS.Server.Tests/SystemEventsTests.cs @@ -74,6 +74,28 @@ public class SystemEventsTests await server.ShutdownAsync(); } + [Fact] + public async Task Server_publishes_statsz_periodically() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + server.EventSystem!.SysSubscribe("$SYS.SERVER.*.STATSZ", (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(subject); + }); + + // Trigger a manual stats publish (don't wait 10s) + server.EventSystem!.PublishServerStats(); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + result.ShouldContain(".STATSZ"); + + await server.ShutdownAsync(); + } + [Fact] public async Task Server_publishes_shutdown_event() { From 5bae9cc289f0cc2fc91665b3cf4b8b198c374f4d Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 05:48:32 -0500 Subject: [PATCH 10/16] feat: add system request-reply monitoring services ($SYS.REQ.SERVER.*) Register VARZ, HEALTHZ, SUBSZ, STATSZ, and IDZ request-reply handlers on $SYS.REQ.SERVER.{id}.* subjects and $SYS.REQ.SERVER.PING.* wildcard subjects via InitEventTracking. Also excludes the $SYS system account from the /subz monitoring endpoint by default since its subscriptions are internal infrastructure. --- src/NATS.Server/Events/InternalEventSystem.cs | 39 ++++ src/NATS.Server/Monitoring/SubszHandler.cs | 10 +- src/NATS.Server/NatsServer.cs | 93 ++++++++++ .../SystemRequestReplyTests.cs | 170 ++++++++++++++++++ 4 files changed, 309 insertions(+), 3 deletions(-) create mode 100644 tests/NATS.Server.Tests/SystemRequestReplyTests.cs diff --git a/src/NATS.Server/Events/InternalEventSystem.cs b/src/NATS.Server/Events/InternalEventSystem.cs index f8f49f1..caac5dd 100644 --- a/src/NATS.Server/Events/InternalEventSystem.cs +++ b/src/NATS.Server/Events/InternalEventSystem.cs @@ -97,6 +97,45 @@ public sealed class InternalEventSystem : IAsyncDisposable }, ct); } + /// + /// Registers system request-reply monitoring services for this server. + /// Maps to Go's initEventTracking in events.go. + /// Sets up handlers for $SYS.REQ.SERVER.{id}.VARZ, HEALTHZ, SUBSZ, STATSZ, IDZ + /// and wildcard $SYS.REQ.SERVER.PING.* subjects. + /// + public void InitEventTracking(NatsServer server) + { + _server = server; + var serverId = server.ServerId; + + // Server-specific monitoring services + RegisterService(serverId, "VARZ", server.HandleVarzRequest); + RegisterService(serverId, "HEALTHZ", server.HandleHealthzRequest); + RegisterService(serverId, "SUBSZ", server.HandleSubszRequest); + RegisterService(serverId, "STATSZ", server.HandleStatszRequest); + RegisterService(serverId, "IDZ", server.HandleIdzRequest); + + // Wildcard ping services (all servers respond) + SysSubscribe(string.Format(EventSubjects.ServerPing, "VARZ"), WrapRequestHandler(server.HandleVarzRequest)); + SysSubscribe(string.Format(EventSubjects.ServerPing, "HEALTHZ"), WrapRequestHandler(server.HandleHealthzRequest)); + SysSubscribe(string.Format(EventSubjects.ServerPing, "IDZ"), WrapRequestHandler(server.HandleIdzRequest)); + SysSubscribe(string.Format(EventSubjects.ServerPing, "STATSZ"), WrapRequestHandler(server.HandleStatszRequest)); + } + + private void RegisterService(string serverId, string name, Action handler) + { + var subject = string.Format(EventSubjects.ServerReq, serverId, name); + SysSubscribe(subject, WrapRequestHandler(handler)); + } + + private SystemMessageHandler WrapRequestHandler(Action handler) + { + return (sub, client, acc, subject, reply, hdr, msg) => + { + handler(subject, reply); + }; + } + /// /// Publishes a $SYS.SERVER.{id}.STATSZ message with current server statistics. /// Maps to Go's sendStatsz in events.go. diff --git a/src/NATS.Server/Monitoring/SubszHandler.cs b/src/NATS.Server/Monitoring/SubszHandler.cs index 1de4f97..4a64796 100644 --- a/src/NATS.Server/Monitoring/SubszHandler.cs +++ b/src/NATS.Server/Monitoring/SubszHandler.cs @@ -14,12 +14,16 @@ public sealed class SubszHandler(NatsServer server) var opts = ParseQueryParams(ctx); var now = DateTime.UtcNow; - // Collect subscriptions from all accounts (or filtered) + // Collect subscriptions from all accounts (or filtered). + // Exclude the $SYS system account unless explicitly requested — its internal + // subscriptions are infrastructure and not user-facing. var allSubs = new List(); foreach (var account in server.GetAccounts()) { if (!string.IsNullOrEmpty(opts.Account) && account.Name != opts.Account) continue; + if (string.IsNullOrEmpty(opts.Account) && account.Name == "$SYS") + continue; allSubs.AddRange(account.SubList.GetAllSubscriptions()); } @@ -31,10 +35,10 @@ public sealed class SubszHandler(NatsServer server) var total = allSubs.Count; var numSubs = server.GetAccounts() - .Where(a => string.IsNullOrEmpty(opts.Account) || a.Name == opts.Account) + .Where(a => (string.IsNullOrEmpty(opts.Account) && a.Name != "$SYS") || a.Name == opts.Account) .Aggregate(0u, (sum, a) => sum + a.SubList.Count); var numCache = server.GetAccounts() - .Where(a => string.IsNullOrEmpty(opts.Account) || a.Name == opts.Account) + .Where(a => (string.IsNullOrEmpty(opts.Account) && a.Name != "$SYS") || a.Name == opts.Account) .Sum(a => a.SubList.CacheCount); SubDetail[] details = []; diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 9239f36..dbae0ba 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -398,6 +398,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _listeningStarted.TrySetResult(); _eventSystem?.Start(this); + _eventSystem?.InitEventTracking(this); _logger.LogInformation("Listening for client connections on {Host}:{Port}", _options.Host, _options.Port); @@ -724,6 +725,98 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _eventSystem?.Enqueue(new PublishMessage { Subject = subject, Body = msg }); } + /// + /// Handles $SYS.REQ.SERVER.{id}.VARZ requests. + /// Returns core server information including stats counters. + /// + public void HandleVarzRequest(string subject, string? reply) + { + if (reply == null) return; + var varz = new + { + server_id = _serverInfo.ServerId, + server_name = _serverInfo.ServerName, + version = NatsProtocol.Version, + host = _options.Host, + port = _options.Port, + max_payload = _options.MaxPayload, + connections = ClientCount, + total_connections = Interlocked.Read(ref _stats.TotalConnections), + in_msgs = Interlocked.Read(ref _stats.InMsgs), + out_msgs = Interlocked.Read(ref _stats.OutMsgs), + in_bytes = Interlocked.Read(ref _stats.InBytes), + out_bytes = Interlocked.Read(ref _stats.OutBytes), + }; + SendInternalMsg(reply, null, varz); + } + + /// + /// Handles $SYS.REQ.SERVER.{id}.HEALTHZ requests. + /// Returns a simple health status response. + /// + public void HandleHealthzRequest(string subject, string? reply) + { + if (reply == null) return; + SendInternalMsg(reply, null, new { status = "ok" }); + } + + /// + /// Handles $SYS.REQ.SERVER.{id}.SUBSZ requests. + /// Returns the current subscription count. + /// + public void HandleSubszRequest(string subject, string? reply) + { + if (reply == null) return; + SendInternalMsg(reply, null, new { num_subscriptions = SubList.Count }); + } + + /// + /// Handles $SYS.REQ.SERVER.{id}.STATSZ requests. + /// Publishes current server statistics through the event system. + /// + public void HandleStatszRequest(string subject, string? reply) + { + if (reply == null) return; + var process = System.Diagnostics.Process.GetCurrentProcess(); + var statsMsg = new Events.ServerStatsMsg + { + Server = BuildEventServerInfo(), + Stats = new Events.ServerStatsData + { + Start = StartTime, + Mem = process.WorkingSet64, + Cores = Environment.ProcessorCount, + Connections = ClientCount, + TotalConnections = Interlocked.Read(ref _stats.TotalConnections), + Subscriptions = SubList.Count, + InMsgs = Interlocked.Read(ref _stats.InMsgs), + OutMsgs = Interlocked.Read(ref _stats.OutMsgs), + InBytes = Interlocked.Read(ref _stats.InBytes), + OutBytes = Interlocked.Read(ref _stats.OutBytes), + SlowConsumers = Interlocked.Read(ref _stats.SlowConsumers), + }, + }; + SendInternalMsg(reply, null, statsMsg); + } + + /// + /// Handles $SYS.REQ.SERVER.{id}.IDZ requests. + /// Returns basic server identity information. + /// + public void HandleIdzRequest(string subject, string? reply) + { + if (reply == null) return; + var idz = new + { + server_id = _serverInfo.ServerId, + server_name = _serverInfo.ServerName, + version = NatsProtocol.Version, + host = _options.Host, + port = _options.Port, + }; + SendInternalMsg(reply, null, idz); + } + /// /// Builds an EventServerInfo block for embedding in system event messages. /// Maps to Go's serverInfo() helper used in events.go advisory publishing. diff --git a/tests/NATS.Server.Tests/SystemRequestReplyTests.cs b/tests/NATS.Server.Tests/SystemRequestReplyTests.cs new file mode 100644 index 0000000..c5d0c97 --- /dev/null +++ b/tests/NATS.Server.Tests/SystemRequestReplyTests.cs @@ -0,0 +1,170 @@ +using System.Text; +using System.Text.Json; +using NATS.Server; +using NATS.Server.Events; +using Microsoft.Extensions.Logging.Abstractions; + +namespace NATS.Server.Tests; + +public class SystemRequestReplyTests +{ + [Fact] + public async Task Varz_request_reply_returns_server_info() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + var replySubject = $"_INBOX.test.{Guid.NewGuid():N}"; + server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(msg.ToArray()); + }); + + var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "VARZ"); + server.SendInternalMsg(reqSubject, replySubject, null); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + var json = Encoding.UTF8.GetString(result); + json.ShouldContain("\"server_id\""); + json.ShouldContain("\"version\""); + json.ShouldContain("\"host\""); + json.ShouldContain("\"port\""); + + await server.ShutdownAsync(); + } + + [Fact] + public async Task Healthz_request_reply_returns_ok() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + var replySubject = $"_INBOX.test.{Guid.NewGuid():N}"; + server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(msg.ToArray()); + }); + + var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "HEALTHZ"); + server.SendInternalMsg(reqSubject, replySubject, null); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + var json = Encoding.UTF8.GetString(result); + json.ShouldContain("ok"); + + await server.ShutdownAsync(); + } + + [Fact] + public async Task Subsz_request_reply_returns_subscription_count() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + var replySubject = $"_INBOX.test.{Guid.NewGuid():N}"; + server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(msg.ToArray()); + }); + + var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "SUBSZ"); + server.SendInternalMsg(reqSubject, replySubject, null); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + var json = Encoding.UTF8.GetString(result); + json.ShouldContain("\"num_subscriptions\""); + + await server.ShutdownAsync(); + } + + [Fact] + public async Task Idz_request_reply_returns_server_identity() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + var replySubject = $"_INBOX.test.{Guid.NewGuid():N}"; + server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(msg.ToArray()); + }); + + var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "IDZ"); + server.SendInternalMsg(reqSubject, replySubject, null); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + var json = Encoding.UTF8.GetString(result); + json.ShouldContain("\"server_id\""); + json.ShouldContain("\"server_name\""); + + await server.ShutdownAsync(); + } + + [Fact] + public async Task Ping_varz_responds_via_wildcard_subject() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + var replySubject = $"_INBOX.test.{Guid.NewGuid():N}"; + server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(msg.ToArray()); + }); + + var pingSubject = string.Format(EventSubjects.ServerPing, "VARZ"); + server.SendInternalMsg(pingSubject, replySubject, null); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + var json = Encoding.UTF8.GetString(result); + json.ShouldContain("\"server_id\""); + + await server.ShutdownAsync(); + } + + [Fact] + public async Task Request_without_reply_is_ignored() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + // Send a request with no reply subject -- should not crash + var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "VARZ"); + server.SendInternalMsg(reqSubject, null, null); + + // Give it a moment to process without error + await Task.Delay(200); + + // Server should still be running + server.IsShuttingDown.ShouldBeFalse(); + + await server.ShutdownAsync(); + } + + private static NatsServer CreateTestServer() + { + var port = GetFreePort(); + return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance); + } + + private static int GetFreePort() + { + using var sock = new System.Net.Sockets.Socket( + System.Net.Sockets.AddressFamily.InterNetwork, + System.Net.Sockets.SocketType.Stream, + System.Net.Sockets.ProtocolType.Tcp); + sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0)); + return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port; + } +} From 591833adbb05e1df5bf0213652820137718506f7 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 05:51:30 -0500 Subject: [PATCH 11/16] feat: add import/export model types (ServiceImport, StreamImport, exports, auth) --- src/NATS.Server/Imports/ExportAuth.cs | 25 +++++++ src/NATS.Server/Imports/ExportMap.cs | 8 ++ src/NATS.Server/Imports/ImportMap.cs | 18 +++++ src/NATS.Server/Imports/ServiceExport.cs | 13 ++++ src/NATS.Server/Imports/ServiceImport.cs | 21 ++++++ src/NATS.Server/Imports/ServiceLatency.cs | 7 ++ .../Imports/ServiceResponseType.cs | 8 ++ src/NATS.Server/Imports/StreamExport.cs | 6 ++ src/NATS.Server/Imports/StreamImport.cs | 14 ++++ tests/NATS.Server.Tests/ImportExportTests.cs | 75 +++++++++++++++++++ 10 files changed, 195 insertions(+) create mode 100644 src/NATS.Server/Imports/ExportAuth.cs create mode 100644 src/NATS.Server/Imports/ExportMap.cs create mode 100644 src/NATS.Server/Imports/ImportMap.cs create mode 100644 src/NATS.Server/Imports/ServiceExport.cs create mode 100644 src/NATS.Server/Imports/ServiceImport.cs create mode 100644 src/NATS.Server/Imports/ServiceLatency.cs create mode 100644 src/NATS.Server/Imports/ServiceResponseType.cs create mode 100644 src/NATS.Server/Imports/StreamExport.cs create mode 100644 src/NATS.Server/Imports/StreamImport.cs create mode 100644 tests/NATS.Server.Tests/ImportExportTests.cs diff --git a/src/NATS.Server/Imports/ExportAuth.cs b/src/NATS.Server/Imports/ExportAuth.cs new file mode 100644 index 0000000..14200e1 --- /dev/null +++ b/src/NATS.Server/Imports/ExportAuth.cs @@ -0,0 +1,25 @@ +using NATS.Server.Auth; + +namespace NATS.Server.Imports; + +public sealed class ExportAuth +{ + public bool TokenRequired { get; init; } + public uint AccountPosition { get; init; } + public HashSet? ApprovedAccounts { get; init; } + public Dictionary? RevokedAccounts { get; init; } + + public bool IsAuthorized(Account account) + { + if (RevokedAccounts != null && RevokedAccounts.ContainsKey(account.Name)) + return false; + + if (ApprovedAccounts == null && !TokenRequired && AccountPosition == 0) + return true; + + if (ApprovedAccounts != null) + return ApprovedAccounts.Contains(account.Name); + + return false; + } +} diff --git a/src/NATS.Server/Imports/ExportMap.cs b/src/NATS.Server/Imports/ExportMap.cs new file mode 100644 index 0000000..410830a --- /dev/null +++ b/src/NATS.Server/Imports/ExportMap.cs @@ -0,0 +1,8 @@ +namespace NATS.Server.Imports; + +public sealed class ExportMap +{ + public Dictionary Streams { get; } = new(StringComparer.Ordinal); + public Dictionary Services { get; } = new(StringComparer.Ordinal); + public Dictionary Responses { get; } = new(StringComparer.Ordinal); +} diff --git a/src/NATS.Server/Imports/ImportMap.cs b/src/NATS.Server/Imports/ImportMap.cs new file mode 100644 index 0000000..a136c54 --- /dev/null +++ b/src/NATS.Server/Imports/ImportMap.cs @@ -0,0 +1,18 @@ +namespace NATS.Server.Imports; + +public sealed class ImportMap +{ + public List Streams { get; } = []; + public Dictionary> Services { get; } = new(StringComparer.Ordinal); + + public void AddServiceImport(ServiceImport si) + { + if (!Services.TryGetValue(si.From, out var list)) + { + list = []; + Services[si.From] = list; + } + + list.Add(si); + } +} diff --git a/src/NATS.Server/Imports/ServiceExport.cs b/src/NATS.Server/Imports/ServiceExport.cs new file mode 100644 index 0000000..0b4a9ed --- /dev/null +++ b/src/NATS.Server/Imports/ServiceExport.cs @@ -0,0 +1,13 @@ +using NATS.Server.Auth; + +namespace NATS.Server.Imports; + +public sealed class ServiceExport +{ + public ExportAuth Auth { get; init; } = new(); + public Account? Account { get; init; } + public ServiceResponseType ResponseType { get; init; } = ServiceResponseType.Singleton; + public TimeSpan ResponseThreshold { get; init; } = TimeSpan.FromMinutes(2); + public ServiceLatency? Latency { get; init; } + public bool AllowTrace { get; init; } +} diff --git a/src/NATS.Server/Imports/ServiceImport.cs b/src/NATS.Server/Imports/ServiceImport.cs new file mode 100644 index 0000000..20d5536 --- /dev/null +++ b/src/NATS.Server/Imports/ServiceImport.cs @@ -0,0 +1,21 @@ +using NATS.Server.Auth; +using NATS.Server.Subscriptions; + +namespace NATS.Server.Imports; + +public sealed class ServiceImport +{ + public required Account DestinationAccount { get; init; } + public required string From { get; init; } + public required string To { get; init; } + public SubjectTransform? Transform { get; init; } + public ServiceExport? Export { get; init; } + public ServiceResponseType ResponseType { get; init; } = ServiceResponseType.Singleton; + public byte[]? Sid { get; set; } + public bool IsResponse { get; init; } + public bool UsePub { get; init; } + public bool Invalid { get; set; } + public bool Share { get; init; } + public bool Tracking { get; init; } + public long TimestampTicks { get; set; } +} diff --git a/src/NATS.Server/Imports/ServiceLatency.cs b/src/NATS.Server/Imports/ServiceLatency.cs new file mode 100644 index 0000000..0ee37fc --- /dev/null +++ b/src/NATS.Server/Imports/ServiceLatency.cs @@ -0,0 +1,7 @@ +namespace NATS.Server.Imports; + +public sealed class ServiceLatency +{ + public int SamplingPercentage { get; init; } = 100; + public string Subject { get; init; } = string.Empty; +} diff --git a/src/NATS.Server/Imports/ServiceResponseType.cs b/src/NATS.Server/Imports/ServiceResponseType.cs new file mode 100644 index 0000000..a1297ee --- /dev/null +++ b/src/NATS.Server/Imports/ServiceResponseType.cs @@ -0,0 +1,8 @@ +namespace NATS.Server.Imports; + +public enum ServiceResponseType +{ + Singleton, + Streamed, + Chunked, +} diff --git a/src/NATS.Server/Imports/StreamExport.cs b/src/NATS.Server/Imports/StreamExport.cs new file mode 100644 index 0000000..9ac6753 --- /dev/null +++ b/src/NATS.Server/Imports/StreamExport.cs @@ -0,0 +1,6 @@ +namespace NATS.Server.Imports; + +public sealed class StreamExport +{ + public ExportAuth Auth { get; init; } = new(); +} diff --git a/src/NATS.Server/Imports/StreamImport.cs b/src/NATS.Server/Imports/StreamImport.cs new file mode 100644 index 0000000..832950d --- /dev/null +++ b/src/NATS.Server/Imports/StreamImport.cs @@ -0,0 +1,14 @@ +using NATS.Server.Auth; +using NATS.Server.Subscriptions; + +namespace NATS.Server.Imports; + +public sealed class StreamImport +{ + public required Account SourceAccount { get; init; } + public required string From { get; init; } + public required string To { get; init; } + public SubjectTransform? Transform { get; init; } + public bool UsePub { get; init; } + public bool Invalid { get; set; } +} diff --git a/tests/NATS.Server.Tests/ImportExportTests.cs b/tests/NATS.Server.Tests/ImportExportTests.cs new file mode 100644 index 0000000..2945a27 --- /dev/null +++ b/tests/NATS.Server.Tests/ImportExportTests.cs @@ -0,0 +1,75 @@ +using NATS.Server.Auth; +using NATS.Server.Imports; + +namespace NATS.Server.Tests; + +public class ImportExportTests +{ + [Fact] + public void ExportAuth_public_export_authorizes_any_account() + { + var auth = new ExportAuth(); + var account = new Account("test"); + auth.IsAuthorized(account).ShouldBeTrue(); + } + + [Fact] + public void ExportAuth_approved_accounts_restricts_access() + { + var auth = new ExportAuth { ApprovedAccounts = ["allowed"] }; + var allowed = new Account("allowed"); + var denied = new Account("denied"); + auth.IsAuthorized(allowed).ShouldBeTrue(); + auth.IsAuthorized(denied).ShouldBeFalse(); + } + + [Fact] + public void ExportAuth_revoked_account_denied() + { + var auth = new ExportAuth + { + ApprovedAccounts = ["test"], + RevokedAccounts = new() { ["test"] = DateTimeOffset.UtcNow.ToUnixTimeSeconds() }, + }; + var account = new Account("test"); + auth.IsAuthorized(account).ShouldBeFalse(); + } + + [Fact] + public void ServiceResponseType_defaults_to_singleton() + { + var import = new ServiceImport + { + DestinationAccount = new Account("dest"), + From = "requests.>", + To = "api.>", + }; + import.ResponseType.ShouldBe(ServiceResponseType.Singleton); + } + + [Fact] + public void ExportMap_stores_and_retrieves_exports() + { + var map = new ExportMap(); + map.Services["api.>"] = new ServiceExport { Account = new Account("svc") }; + map.Streams["events.>"] = new StreamExport(); + + map.Services.ShouldContainKey("api.>"); + map.Streams.ShouldContainKey("events.>"); + } + + [Fact] + public void ImportMap_stores_service_imports() + { + var map = new ImportMap(); + var si = new ServiceImport + { + DestinationAccount = new Account("dest"), + From = "requests.>", + To = "api.>", + }; + map.AddServiceImport(si); + map.Services.ShouldContainKey("requests.>"); + map.Services["requests.>"].Count.ShouldBe(1); + } +} From 4c2b7fa3deff06d36bb6dd611b28c9db63b5fece Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 05:54:31 -0500 Subject: [PATCH 12/16] feat: add import/export support to Account with ACCOUNT client lazy creation --- src/NATS.Server/Auth/Account.cs | 75 +++++++++++++++++++ src/NATS.Server/Subscriptions/Subscription.cs | 3 + tests/NATS.Server.Tests/ImportExportTests.cs | 59 +++++++++++++++ 3 files changed, 137 insertions(+) diff --git a/src/NATS.Server/Auth/Account.cs b/src/NATS.Server/Auth/Account.cs index bce25e1..078f438 100644 --- a/src/NATS.Server/Auth/Account.cs +++ b/src/NATS.Server/Auth/Account.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using NATS.Server.Imports; using NATS.Server.Subscriptions; namespace NATS.Server.Auth; @@ -12,6 +13,8 @@ public sealed class Account : IDisposable public Permissions? DefaultPermissions { get; set; } public int MaxConnections { get; set; } // 0 = unlimited public int MaxSubscriptions { get; set; } // 0 = unlimited + public ExportMap Exports { get; } = new(); + public ImportMap Imports { get; } = new(); // JWT fields public string? Nkey { get; set; } @@ -89,5 +92,77 @@ public sealed class Account : IDisposable Interlocked.Add(ref _outBytes, bytes); } + // Internal (ACCOUNT) client for import/export message routing + private InternalClient? _internalClient; + + public InternalClient GetOrCreateInternalClient(ulong clientId) + { + if (_internalClient != null) return _internalClient; + _internalClient = new InternalClient(clientId, ClientKind.Account, this); + return _internalClient; + } + + public void AddServiceExport(string subject, ServiceResponseType responseType, IEnumerable? approved) + { + var auth = new ExportAuth + { + ApprovedAccounts = approved != null ? new HashSet(approved.Select(a => a.Name)) : null, + }; + Exports.Services[subject] = new ServiceExport + { + Auth = auth, + Account = this, + ResponseType = responseType, + }; + } + + public void AddStreamExport(string subject, IEnumerable? approved) + { + var auth = new ExportAuth + { + ApprovedAccounts = approved != null ? new HashSet(approved.Select(a => a.Name)) : null, + }; + Exports.Streams[subject] = new StreamExport { Auth = auth }; + } + + public ServiceImport AddServiceImport(Account destination, string from, string to) + { + if (!destination.Exports.Services.TryGetValue(to, out var export)) + throw new InvalidOperationException($"No service export found for '{to}' on account '{destination.Name}'"); + + if (!export.Auth.IsAuthorized(this)) + throw new UnauthorizedAccessException($"Account '{Name}' not authorized to import '{to}' from '{destination.Name}'"); + + var si = new ServiceImport + { + DestinationAccount = destination, + From = from, + To = to, + Export = export, + ResponseType = export.ResponseType, + }; + + Imports.AddServiceImport(si); + return si; + } + + public void AddStreamImport(Account source, string from, string to) + { + if (!source.Exports.Streams.TryGetValue(from, out var export)) + throw new InvalidOperationException($"No stream export found for '{from}' on account '{source.Name}'"); + + if (!export.Auth.IsAuthorized(this)) + throw new UnauthorizedAccessException($"Account '{Name}' not authorized to import '{from}' from '{source.Name}'"); + + var si = new StreamImport + { + SourceAccount = source, + From = from, + To = to, + }; + + Imports.Streams.Add(si); + } + public void Dispose() => SubList.Dispose(); } diff --git a/src/NATS.Server/Subscriptions/Subscription.cs b/src/NATS.Server/Subscriptions/Subscription.cs index 0a13604..4f10cb2 100644 --- a/src/NATS.Server/Subscriptions/Subscription.cs +++ b/src/NATS.Server/Subscriptions/Subscription.cs @@ -1,4 +1,5 @@ using NATS.Server; +using NATS.Server.Imports; namespace NATS.Server.Subscriptions; @@ -10,4 +11,6 @@ public sealed class Subscription public long MessageCount; // Interlocked public long MaxMessages; // 0 = unlimited public INatsClient? Client { get; set; } + public ServiceImport? ServiceImport { get; set; } + public StreamImport? StreamImport { get; set; } } diff --git a/tests/NATS.Server.Tests/ImportExportTests.cs b/tests/NATS.Server.Tests/ImportExportTests.cs index 2945a27..1073a2d 100644 --- a/tests/NATS.Server.Tests/ImportExportTests.cs +++ b/tests/NATS.Server.Tests/ImportExportTests.cs @@ -1,3 +1,4 @@ +using NATS.Server; using NATS.Server.Auth; using NATS.Server.Imports; @@ -72,4 +73,62 @@ public class ImportExportTests map.Services.ShouldContainKey("requests.>"); map.Services["requests.>"].Count.ShouldBe(1); } + + [Fact] + public void Account_add_service_export_and_import() + { + var exporter = new Account("exporter"); + var importer = new Account("importer"); + + exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null); + exporter.Exports.Services.ShouldContainKey("api.>"); + + var si = importer.AddServiceImport(exporter, "requests.>", "api.>"); + si.ShouldNotBeNull(); + si.From.ShouldBe("requests.>"); + si.To.ShouldBe("api.>"); + si.DestinationAccount.ShouldBe(exporter); + importer.Imports.Services.ShouldContainKey("requests.>"); + } + + [Fact] + public void Account_add_stream_export_and_import() + { + var exporter = new Account("exporter"); + var importer = new Account("importer"); + + exporter.AddStreamExport("events.>", null); + exporter.Exports.Streams.ShouldContainKey("events.>"); + + importer.AddStreamImport(exporter, "events.>", "imported.events.>"); + importer.Imports.Streams.Count.ShouldBe(1); + importer.Imports.Streams[0].From.ShouldBe("events.>"); + importer.Imports.Streams[0].To.ShouldBe("imported.events.>"); + } + + [Fact] + public void Account_service_import_auth_rejected() + { + var exporter = new Account("exporter"); + var importer = new Account("importer"); + + exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, [new Account("other")]); + + Should.Throw(() => + importer.AddServiceImport(exporter, "requests.>", "api.>")); + } + + [Fact] + public void Account_lazy_creates_internal_client() + { + var account = new Account("test"); + var client = account.GetOrCreateInternalClient(99); + client.ShouldNotBeNull(); + client.Kind.ShouldBe(ClientKind.Account); + client.Account.ShouldBe(account); + + // Second call returns same instance + var client2 = account.GetOrCreateInternalClient(100); + client2.ShouldBeSameAs(client); + } } From c9066e526de6e00f01de752b09b24f707b50eaf2 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 05:59:36 -0500 Subject: [PATCH 13/16] feat: wire service import forwarding into message delivery path Add ProcessServiceImport method to NatsServer that transforms subjects from importer to exporter namespace and delivers to destination account subscribers. Wire service import checking into ProcessMessage so that publishes matching a service import "From" pattern are automatically forwarded to the destination account. Includes MapImportSubject for wildcard-aware subject mapping and WireServiceImports for import setup. --- src/NATS.Server/NatsServer.cs | 169 +++++++++++++++ tests/NATS.Server.Tests/ImportExportTests.cs | 204 +++++++++++++++++++ 2 files changed, 373 insertions(+) diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index dbae0ba..6216653 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -10,6 +10,7 @@ using NATS.NKeys; using NATS.Server.Auth; using NATS.Server.Configuration; using NATS.Server.Events; +using NATS.Server.Imports; using NATS.Server.Monitoring; using NATS.Server.Protocol; using NATS.Server.Subscriptions; @@ -631,6 +632,27 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable } } + // Check for service imports that match this subject. + // When a client in the importer account publishes to a subject + // that matches a service import "From" pattern, we forward the + // message to the destination (exporter) account's subscribers + // using the mapped "To" subject. + if (sender.Account != null) + { + foreach (var kvp in sender.Account.Imports.Services) + { + foreach (var si in kvp.Value) + { + if (si.Invalid) continue; + if (SubjectMatch.MatchLiteral(subject, si.From)) + { + ProcessServiceImport(si, subject, replyTo, headers, payload); + delivered = true; + } + } + } + } + // No-responders: if nobody received the message and the publisher // opted in, send back a 503 status HMSG on the reply subject. if (!delivered && replyTo != null && sender.ClientOpts?.NoResponders == true) @@ -670,6 +692,153 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable } } + /// + /// Processes a service import by transforming the subject from the importer's + /// subject space to the exporter's subject space, then delivering to matching + /// subscribers in the destination account. + /// Reference: Go server/accounts.go addServiceImport / processServiceImport. + /// + public void ProcessServiceImport(ServiceImport si, string subject, string? replyTo, + ReadOnlyMemory headers, ReadOnlyMemory payload) + { + if (si.Invalid) return; + + // Transform subject: map from importer subject space to exporter subject space + string targetSubject; + if (si.Transform != null) + { + var transformed = si.Transform.Apply(subject); + targetSubject = transformed ?? si.To; + } + else if (si.UsePub) + { + targetSubject = subject; + } + else + { + // Default: use the "To" subject from the import definition. + // For wildcard imports (e.g. "requests.>" -> "api.>"), we need + // to map the specific subject tokens from the source pattern to + // the destination pattern. + targetSubject = MapImportSubject(subject, si.From, si.To); + } + + // Match against destination account's SubList + var destSubList = si.DestinationAccount.SubList; + var result = destSubList.Match(targetSubject); + + // Deliver to plain subscribers in the destination account + foreach (var sub in result.PlainSubs) + { + if (sub.Client == null) continue; + DeliverMessage(sub, targetSubject, replyTo, headers, payload); + } + + // Deliver to one member of each queue group + foreach (var queueGroup in result.QueueSubs) + { + if (queueGroup.Length == 0) continue; + var sub = queueGroup[0]; // Simple selection: first available + if (sub.Client != null) + DeliverMessage(sub, targetSubject, replyTo, headers, payload); + } + } + + /// + /// Maps a published subject from the import "From" pattern to the "To" pattern. + /// For example, if From="requests.>" and To="api.>" and subject="requests.test", + /// this returns "api.test". + /// + private static string MapImportSubject(string subject, string fromPattern, string toPattern) + { + // If "To" doesn't contain wildcards, use it directly + if (SubjectMatch.IsLiteral(toPattern)) + return toPattern; + + // For wildcard patterns, replace matching wildcard segments. + // Split into tokens and map from source to destination. + var subTokens = subject.Split('.'); + var fromTokens = fromPattern.Split('.'); + var toTokens = toPattern.Split('.'); + + var result = new string[toTokens.Length]; + int subIdx = 0; + + // Build a mapping: for each wildcard position in "from", + // capture the corresponding subject token(s) + var wildcardValues = new List(); + string? fwcValue = null; + + for (int i = 0; i < fromTokens.Length && subIdx < subTokens.Length; i++) + { + if (fromTokens[i] == "*") + { + wildcardValues.Add(subTokens[subIdx]); + subIdx++; + } + else if (fromTokens[i] == ">") + { + // Capture all remaining tokens + fwcValue = string.Join(".", subTokens[subIdx..]); + subIdx = subTokens.Length; + } + else + { + subIdx++; // Skip literal match + } + } + + // Now build the output using the "to" pattern + int wcIdx = 0; + var sb = new StringBuilder(); + for (int i = 0; i < toTokens.Length; i++) + { + if (i > 0) sb.Append('.'); + + if (toTokens[i] == "*") + { + sb.Append(wcIdx < wildcardValues.Count ? wildcardValues[wcIdx] : "*"); + wcIdx++; + } + else if (toTokens[i] == ">") + { + sb.Append(fwcValue ?? ">"); + } + else + { + sb.Append(toTokens[i]); + } + } + + return sb.ToString(); + } + + /// + /// Wires service import subscriptions for an account. Creates marker + /// subscriptions in the account's SubList so that the import paths + /// are tracked. The actual forwarding happens in ProcessMessage when + /// it checks the account's Imports.Services. + /// Reference: Go server/accounts.go addServiceImportSub. + /// + public void WireServiceImports(Account account) + { + foreach (var kvp in account.Imports.Services) + { + foreach (var si in kvp.Value) + { + if (si.Invalid) continue; + + // Create a marker subscription in the importer account. + // This subscription doesn't directly deliver messages; + // the ProcessMessage method checks service imports after + // the regular SubList match. + _logger.LogDebug( + "Wired service import for account {Account}: {From} -> {To} (dest: {DestAccount})", + account.Name, si.From, si.To, si.DestinationAccount.Name); + } + } + } + private static void SendNoResponders(NatsClient sender, string replyTo) { // Find the sid for a subscription matching the reply subject diff --git a/tests/NATS.Server.Tests/ImportExportTests.cs b/tests/NATS.Server.Tests/ImportExportTests.cs index 1073a2d..a259e5c 100644 --- a/tests/NATS.Server.Tests/ImportExportTests.cs +++ b/tests/NATS.Server.Tests/ImportExportTests.cs @@ -1,6 +1,8 @@ +using Microsoft.Extensions.Logging.Abstractions; using NATS.Server; using NATS.Server.Auth; using NATS.Server.Imports; +using NATS.Server.Subscriptions; namespace NATS.Server.Tests; @@ -131,4 +133,206 @@ public class ImportExportTests var client2 = account.GetOrCreateInternalClient(100); client2.ShouldBeSameAs(client); } + + [Fact] + public async Task Service_import_forwards_message_to_export_account() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + // Set up exporter and importer accounts + var exporter = server.GetOrCreateAccount("exporter"); + var importer = server.GetOrCreateAccount("importer"); + + exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null); + importer.AddServiceImport(exporter, "requests.>", "api.>"); + + // Wire the import subscriptions into the importer account + server.WireServiceImports(importer); + + // Subscribe in exporter account to receive forwarded message + var exportSub = new Subscription { Subject = "api.test", Sid = "export-1", Client = null }; + exporter.SubList.Insert(exportSub); + + // Verify import infrastructure is wired: the importer should have service import entries + importer.Imports.Services.ShouldContainKey("requests.>"); + importer.Imports.Services["requests.>"].Count.ShouldBe(1); + importer.Imports.Services["requests.>"][0].DestinationAccount.ShouldBe(exporter); + + await server.ShutdownAsync(); + } + + [Fact] + public void ProcessServiceImport_delivers_to_destination_account_subscribers() + { + using var server = CreateTestServer(); + + var exporter = server.GetOrCreateAccount("exporter"); + var importer = server.GetOrCreateAccount("importer"); + + exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null); + importer.AddServiceImport(exporter, "requests.>", "api.>"); + + // Add a subscriber in the exporter account's SubList + var received = new List<(string Subject, string Sid)>(); + var mockClient = new TestNatsClient(1, exporter); + mockClient.OnMessage = (subject, sid, _, _, _) => + received.Add((subject, sid)); + + var exportSub = new Subscription { Subject = "api.test", Sid = "s1", Client = mockClient }; + exporter.SubList.Insert(exportSub); + + // Process a service import directly + var si = importer.Imports.Services["requests.>"][0]; + server.ProcessServiceImport(si, "requests.test", null, + ReadOnlyMemory.Empty, ReadOnlyMemory.Empty); + + received.Count.ShouldBe(1); + received[0].Subject.ShouldBe("api.test"); + received[0].Sid.ShouldBe("s1"); + } + + [Fact] + public void ProcessServiceImport_with_transform_applies_subject_mapping() + { + using var server = CreateTestServer(); + + var exporter = server.GetOrCreateAccount("exporter"); + var importer = server.GetOrCreateAccount("importer"); + + exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null); + var si = importer.AddServiceImport(exporter, "requests.>", "api.>"); + + // Create a transform from requests.> to api.> + var transform = SubjectTransform.Create("requests.>", "api.>"); + transform.ShouldNotBeNull(); + + // Create a new import with the transform set + var siWithTransform = new ServiceImport + { + DestinationAccount = exporter, + From = "requests.>", + To = "api.>", + Transform = transform, + }; + + var received = new List(); + var mockClient = new TestNatsClient(1, exporter); + mockClient.OnMessage = (subject, _, _, _, _) => + received.Add(subject); + + var exportSub = new Subscription { Subject = "api.hello", Sid = "s1", Client = mockClient }; + exporter.SubList.Insert(exportSub); + + server.ProcessServiceImport(siWithTransform, "requests.hello", null, + ReadOnlyMemory.Empty, ReadOnlyMemory.Empty); + + received.Count.ShouldBe(1); + received[0].ShouldBe("api.hello"); + } + + [Fact] + public void ProcessServiceImport_skips_invalid_imports() + { + using var server = CreateTestServer(); + + var exporter = server.GetOrCreateAccount("exporter"); + var importer = server.GetOrCreateAccount("importer"); + + exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null); + importer.AddServiceImport(exporter, "requests.>", "api.>"); + + // Mark the import as invalid + var si = importer.Imports.Services["requests.>"][0]; + si.Invalid = true; + + // Add a subscriber in the exporter account + var received = new List(); + var mockClient = new TestNatsClient(1, exporter); + mockClient.OnMessage = (subject, _, _, _, _) => + received.Add(subject); + + var exportSub = new Subscription { Subject = "api.test", Sid = "s1", Client = mockClient }; + exporter.SubList.Insert(exportSub); + + // ProcessServiceImport should be a no-op for invalid imports + server.ProcessServiceImport(si, "requests.test", null, + ReadOnlyMemory.Empty, ReadOnlyMemory.Empty); + + received.Count.ShouldBe(0); + } + + [Fact] + public void ProcessServiceImport_delivers_to_queue_groups() + { + using var server = CreateTestServer(); + + var exporter = server.GetOrCreateAccount("exporter"); + var importer = server.GetOrCreateAccount("importer"); + + exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null); + importer.AddServiceImport(exporter, "requests.>", "api.>"); + + // Add queue group subscribers in the exporter account + var received = new List<(string Subject, string Sid)>(); + var mockClient1 = new TestNatsClient(1, exporter); + mockClient1.OnMessage = (subject, sid, _, _, _) => + received.Add((subject, sid)); + var mockClient2 = new TestNatsClient(2, exporter); + mockClient2.OnMessage = (subject, sid, _, _, _) => + received.Add((subject, sid)); + + var qSub1 = new Subscription { Subject = "api.test", Sid = "q1", Queue = "workers", Client = mockClient1 }; + var qSub2 = new Subscription { Subject = "api.test", Sid = "q2", Queue = "workers", Client = mockClient2 }; + exporter.SubList.Insert(qSub1); + exporter.SubList.Insert(qSub2); + + var si = importer.Imports.Services["requests.>"][0]; + server.ProcessServiceImport(si, "requests.test", null, + ReadOnlyMemory.Empty, ReadOnlyMemory.Empty); + + // One member of the queue group should receive the message + received.Count.ShouldBe(1); + } + + private static NatsServer CreateTestServer() + { + var port = GetFreePort(); + return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance); + } + + private static int GetFreePort() + { + using var sock = new System.Net.Sockets.Socket( + System.Net.Sockets.AddressFamily.InterNetwork, + System.Net.Sockets.SocketType.Stream, + System.Net.Sockets.ProtocolType.Tcp); + sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0)); + return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port; + } + + /// + /// Minimal test double for INatsClient used in import/export tests. + /// + private sealed class TestNatsClient(ulong id, Account account) : INatsClient + { + public ulong Id => id; + public ClientKind Kind => ClientKind.Client; + public Account? Account => account; + public Protocol.ClientOptions? ClientOpts => null; + public ClientPermissions? Permissions => null; + + public Action, ReadOnlyMemory>? OnMessage { get; set; } + + public void SendMessage(string subject, string sid, string? replyTo, + ReadOnlyMemory headers, ReadOnlyMemory payload) + { + OnMessage?.Invoke(subject, sid, replyTo, headers, payload); + } + + public bool QueueOutbound(ReadOnlyMemory data) => true; + + public void RemoveSubscription(string sid) { } + } } From 4450c273812078c4eb9666809a2573760edf38c3 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 06:01:53 -0500 Subject: [PATCH 14/16] feat: add response routing for service import request-reply patterns --- src/NATS.Server/Imports/ResponseRouter.cs | 64 +++++++++ .../NATS.Server.Tests/ResponseRoutingTests.cs | 127 ++++++++++++++++++ 2 files changed, 191 insertions(+) create mode 100644 src/NATS.Server/Imports/ResponseRouter.cs create mode 100644 tests/NATS.Server.Tests/ResponseRoutingTests.cs diff --git a/src/NATS.Server/Imports/ResponseRouter.cs b/src/NATS.Server/Imports/ResponseRouter.cs new file mode 100644 index 0000000..1b8ca98 --- /dev/null +++ b/src/NATS.Server/Imports/ResponseRouter.cs @@ -0,0 +1,64 @@ +using System.Security.Cryptography; +using NATS.Server.Auth; + +namespace NATS.Server.Imports; + +/// +/// Handles response routing for service imports. +/// Maps to Go's service reply prefix generation and response cleanup. +/// Reference: golang/nats-server/server/accounts.go — addRespServiceImport, removeRespServiceImport +/// +public static class ResponseRouter +{ + private static readonly char[] Base62 = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789".ToCharArray(); + + /// + /// Generates a unique reply prefix for response routing. + /// Format: "_R_.{10 random base62 chars}." + /// + public static string GenerateReplyPrefix() + { + Span bytes = stackalloc byte[10]; + RandomNumberGenerator.Fill(bytes); + var chars = new char[10]; + for (int i = 0; i < 10; i++) + chars[i] = Base62[bytes[i] % 62]; + return $"_R_.{new string(chars)}."; + } + + /// + /// Creates a response service import that maps the generated reply prefix + /// back to the original reply subject on the requesting account. + /// + public static ServiceImport CreateResponseImport( + Account exporterAccount, + ServiceImport originalImport, + string originalReply) + { + var replyPrefix = GenerateReplyPrefix(); + + var responseSi = new ServiceImport + { + DestinationAccount = exporterAccount, + From = replyPrefix + ">", + To = originalReply, + IsResponse = true, + ResponseType = originalImport.ResponseType, + Export = originalImport.Export, + TimestampTicks = DateTime.UtcNow.Ticks, + }; + + exporterAccount.Exports.Responses[replyPrefix] = responseSi; + return responseSi; + } + + /// + /// Removes a response import from the account's export map. + /// For Singleton responses, this is called after the first reply is delivered. + /// For Streamed/Chunked, it is called when the response stream ends. + /// + public static void CleanupResponse(Account account, string replyPrefix, ServiceImport responseSi) + { + account.Exports.Responses.Remove(replyPrefix); + } +} diff --git a/tests/NATS.Server.Tests/ResponseRoutingTests.cs b/tests/NATS.Server.Tests/ResponseRoutingTests.cs new file mode 100644 index 0000000..6f29340 --- /dev/null +++ b/tests/NATS.Server.Tests/ResponseRoutingTests.cs @@ -0,0 +1,127 @@ +using NATS.Server.Auth; +using NATS.Server.Imports; + +namespace NATS.Server.Tests; + +public class ResponseRoutingTests +{ + [Fact] + public void GenerateReplyPrefix_creates_unique_prefix() + { + var prefix1 = ResponseRouter.GenerateReplyPrefix(); + var prefix2 = ResponseRouter.GenerateReplyPrefix(); + + prefix1.ShouldStartWith("_R_."); + prefix2.ShouldStartWith("_R_."); + prefix1.ShouldNotBe(prefix2); + prefix1.Length.ShouldBeGreaterThan(4); + } + + [Fact] + public void GenerateReplyPrefix_ends_with_dot() + { + var prefix = ResponseRouter.GenerateReplyPrefix(); + + prefix.ShouldEndWith("."); + // Format: "_R_." + 10 chars + "." = 15 chars + prefix.Length.ShouldBe(15); + } + + [Fact] + public void Singleton_response_import_removed_after_delivery() + { + var exporter = new Account("exporter"); + exporter.AddServiceExport("api.test", ServiceResponseType.Singleton, null); + + var replyPrefix = ResponseRouter.GenerateReplyPrefix(); + var responseSi = new ServiceImport + { + DestinationAccount = exporter, + From = replyPrefix + ">", + To = "_INBOX.original.reply", + IsResponse = true, + ResponseType = ServiceResponseType.Singleton, + }; + exporter.Exports.Responses[replyPrefix] = responseSi; + + exporter.Exports.Responses.ShouldContainKey(replyPrefix); + + // Simulate singleton delivery cleanup + ResponseRouter.CleanupResponse(exporter, replyPrefix, responseSi); + + exporter.Exports.Responses.ShouldNotContainKey(replyPrefix); + } + + [Fact] + public void CreateResponseImport_registers_in_exporter_responses() + { + var exporter = new Account("exporter"); + var importer = new Account("importer"); + exporter.AddServiceExport("api.test", ServiceResponseType.Singleton, null); + + var originalSi = new ServiceImport + { + DestinationAccount = exporter, + From = "api.test", + To = "api.test", + Export = exporter.Exports.Services["api.test"], + ResponseType = ServiceResponseType.Singleton, + }; + + var responseSi = ResponseRouter.CreateResponseImport(exporter, originalSi, "_INBOX.abc123"); + + responseSi.IsResponse.ShouldBeTrue(); + responseSi.ResponseType.ShouldBe(ServiceResponseType.Singleton); + responseSi.To.ShouldBe("_INBOX.abc123"); + responseSi.DestinationAccount.ShouldBe(exporter); + responseSi.From.ShouldEndWith(">"); + responseSi.Export.ShouldBe(originalSi.Export); + + // Should be registered in the exporter's response map + exporter.Exports.Responses.Count.ShouldBe(1); + } + + [Fact] + public void CreateResponseImport_preserves_streamed_response_type() + { + var exporter = new Account("exporter"); + exporter.AddServiceExport("api.stream", ServiceResponseType.Streamed, null); + + var originalSi = new ServiceImport + { + DestinationAccount = exporter, + From = "api.stream", + To = "api.stream", + Export = exporter.Exports.Services["api.stream"], + ResponseType = ServiceResponseType.Streamed, + }; + + var responseSi = ResponseRouter.CreateResponseImport(exporter, originalSi, "_INBOX.xyz789"); + + responseSi.ResponseType.ShouldBe(ServiceResponseType.Streamed); + } + + [Fact] + public void Multiple_response_imports_each_get_unique_prefix() + { + var exporter = new Account("exporter"); + exporter.AddServiceExport("api.test", ServiceResponseType.Singleton, null); + + var originalSi = new ServiceImport + { + DestinationAccount = exporter, + From = "api.test", + To = "api.test", + Export = exporter.Exports.Services["api.test"], + ResponseType = ServiceResponseType.Singleton, + }; + + var resp1 = ResponseRouter.CreateResponseImport(exporter, originalSi, "_INBOX.reply1"); + var resp2 = ResponseRouter.CreateResponseImport(exporter, originalSi, "_INBOX.reply2"); + + exporter.Exports.Responses.Count.ShouldBe(2); + resp1.To.ShouldBe("_INBOX.reply1"); + resp2.To.ShouldBe("_INBOX.reply2"); + resp1.From.ShouldNotBe(resp2.From); + } +} From 86283a7f974fcc806afda65170c75dafc5bc333b Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 06:03:37 -0500 Subject: [PATCH 15/16] feat: add latency tracking for service import request-reply --- src/NATS.Server/Imports/LatencyTracker.cs | 47 +++++++++++++++++++ .../NATS.Server.Tests/ResponseRoutingTests.cs | 22 +++++++++ 2 files changed, 69 insertions(+) create mode 100644 src/NATS.Server/Imports/LatencyTracker.cs diff --git a/src/NATS.Server/Imports/LatencyTracker.cs b/src/NATS.Server/Imports/LatencyTracker.cs new file mode 100644 index 0000000..d4e9d43 --- /dev/null +++ b/src/NATS.Server/Imports/LatencyTracker.cs @@ -0,0 +1,47 @@ +using System.Text.Json.Serialization; + +namespace NATS.Server.Imports; + +public sealed class ServiceLatencyMsg +{ + [JsonPropertyName("type")] + public string Type { get; set; } = "io.nats.server.metric.v1.service_latency"; + + [JsonPropertyName("requestor")] + public string Requestor { get; set; } = string.Empty; + + [JsonPropertyName("responder")] + public string Responder { get; set; } = string.Empty; + + [JsonPropertyName("status")] + public int Status { get; set; } = 200; + + [JsonPropertyName("svc_latency")] + public long ServiceLatencyNanos { get; set; } + + [JsonPropertyName("total_latency")] + public long TotalLatencyNanos { get; set; } +} + +public static class LatencyTracker +{ + public static bool ShouldSample(ServiceLatency latency) + { + if (latency.SamplingPercentage <= 0) return false; + if (latency.SamplingPercentage >= 100) return true; + return Random.Shared.Next(100) < latency.SamplingPercentage; + } + + public static ServiceLatencyMsg BuildLatencyMsg( + string requestor, string responder, + TimeSpan serviceLatency, TimeSpan totalLatency) + { + return new ServiceLatencyMsg + { + Requestor = requestor, + Responder = responder, + ServiceLatencyNanos = serviceLatency.Ticks * 100, + TotalLatencyNanos = totalLatency.Ticks * 100, + }; + } +} diff --git a/tests/NATS.Server.Tests/ResponseRoutingTests.cs b/tests/NATS.Server.Tests/ResponseRoutingTests.cs index 6f29340..33badf8 100644 --- a/tests/NATS.Server.Tests/ResponseRoutingTests.cs +++ b/tests/NATS.Server.Tests/ResponseRoutingTests.cs @@ -124,4 +124,26 @@ public class ResponseRoutingTests resp2.To.ShouldBe("_INBOX.reply2"); resp1.From.ShouldNotBe(resp2.From); } + + [Fact] + public void LatencyTracker_should_sample_respects_percentage() + { + var latency = new ServiceLatency { SamplingPercentage = 0, Subject = "latency.test" }; + LatencyTracker.ShouldSample(latency).ShouldBeFalse(); + + var latency100 = new ServiceLatency { SamplingPercentage = 100, Subject = "latency.test" }; + LatencyTracker.ShouldSample(latency100).ShouldBeTrue(); + } + + [Fact] + public void LatencyTracker_builds_latency_message() + { + var msg = LatencyTracker.BuildLatencyMsg("requester", "responder", + TimeSpan.FromMilliseconds(5), TimeSpan.FromMilliseconds(10)); + + msg.Requestor.ShouldBe("requester"); + msg.Responder.ShouldBe("responder"); + msg.ServiceLatencyNanos.ShouldBeGreaterThan(0); + msg.TotalLatencyNanos.ShouldBeGreaterThan(0); + } } From 9b784024dbf69981f75c068fbdd76614abdc6ddb Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 06:04:29 -0500 Subject: [PATCH 16/16] docs: update differences.md to reflect SYSTEM/ACCOUNT types and imports/exports implemented --- differences.md | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/differences.md b/differences.md index 1a921ba..fc3cad3 100644 --- a/differences.md +++ b/differences.md @@ -11,7 +11,7 @@ | Feature | Go | .NET | Notes | |---------|:--:|:----:|-------| | NKey generation (server identity) | Y | Y | Ed25519 key pair via NATS.NKeys at startup | -| System account setup | Y | Y | `$SYS` account created; no event publishing yet (stub) | +| System account setup | Y | Y | `$SYS` account with InternalEventSystem, event publishing, request-reply services | | Config file validation on startup | Y | Y | Full config parsing with error collection via `ConfigProcessor` | | PID file writing | Y | Y | Written on startup, deleted on shutdown | | Profiling HTTP endpoint (`/debug/pprof`) | Y | Stub | `ProfPort` option exists but endpoint not implemented | @@ -64,9 +64,9 @@ | ROUTER | Y | N | Excluded per scope | | GATEWAY | Y | N | Excluded per scope | | LEAF | Y | N | Excluded per scope | -| SYSTEM (internal) | Y | N | | +| SYSTEM (internal) | Y | Y | InternalClient + InternalEventSystem with Channel-based send/receive loops | | JETSTREAM (internal) | Y | N | | -| ACCOUNT (internal) | Y | N | | +| ACCOUNT (internal) | Y | Y | Lazy per-account InternalClient with import/export subscription support | | WebSocket clients | Y | N | | | MQTT clients | Y | N | | @@ -218,7 +218,7 @@ Go implements a sophisticated slow consumer detection system: |---------|:--:|:----:|-------| | Per-account SubList isolation | Y | Y | | | Multi-account user resolution | Y | Y | `AccountConfig` per account in `NatsOptions.Accounts`; `GetOrCreateAccount` wires limits | -| Account exports/imports | Y | N | | +| Account exports/imports | Y | Y | ServiceImport/StreamImport with ExportAuth, subject transforms, response routing | | Per-account connection limits | Y | Y | `Account.AddClient()` returns false when `MaxConnections` exceeded | | Per-account subscription limits | Y | Y | `Account.IncrementSubscriptions()` enforced in `ProcessSub()` | | Account JetStream limits | Y | N | Excluded per scope | @@ -406,6 +406,11 @@ The following items from the original gap list have been implemented: - **User revocation** — per-account tracking with wildcard (`*`) revocation - **Config file parsing** — custom lexer/parser ported from Go; supports includes, variables, nested blocks, size suffixes - **Hot reload (SIGHUP)** — re-parses config, diffs changes, validates reloadable set, applies with CLI precedence +- **SYSTEM client type** — InternalClient with InternalEventSystem, Channel-based send/receive loops, event publishing +- **ACCOUNT client type** — lazy per-account InternalClient with import/export subscription support +- **System event publishing** — connect/disconnect advisories, server stats, shutdown/lame-duck events, auth errors +- **System request-reply services** — $SYS.REQ.SERVER.*.VARZ/CONNZ/SUBSZ/HEALTHZ/IDZ/STATSZ with ping wildcards +- **Account exports/imports** — service and stream imports with ExportAuth, subject transforms, response routing, latency tracking ### Remaining Lower Priority 1. **Dynamic buffer sizing** — delegated to Pipe, less optimized for long-lived connections