diff --git a/docs/plans/2026-02-23-system-account-types-plan.md b/docs/plans/2026-02-23-system-account-types-plan.md new file mode 100644 index 0000000..1487ca7 --- /dev/null +++ b/docs/plans/2026-02-23-system-account-types-plan.md @@ -0,0 +1,2787 @@ +# SYSTEM and ACCOUNT Connection Types Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans to implement this plan task-by-task. + +**Goal:** Port the SYSTEM and ACCOUNT internal connection types from Go, including full event publishing, request-reply monitoring, and cross-account import/export. + +**Architecture:** 6-layer bottom-up build. Layer 1 introduces `ClientKind` enum, `INatsClient` interface, and `InternalClient` class. Layer 2 adds the `InternalEventSystem` with Channel-based send/receive loops. Layer 3 wires system event publishing (connect, disconnect, stats, shutdown). Layer 4 adds request-reply monitoring services (`$SYS.REQ.SERVER.*`). Layer 5 implements the import/export model with ACCOUNT client. Layer 6 adds response routing and latency tracking. + +**Tech Stack:** .NET 10 / C# 14, System.Text.Json source generators, System.Threading.Channels, xUnit 3, Shouldly + +**Design doc:** `docs/plans/2026-02-23-system-account-types-design.md` + +--- + +### Task 1: Create ClientKind enum and extensions + +**Files:** +- Create: `src/NATS.Server/ClientKind.cs` +- Test: `tests/NATS.Server.Tests/InternalClientTests.cs` + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/InternalClientTests.cs`: + +```csharp +namespace NATS.Server.Tests; + +public class InternalClientTests +{ + [Theory] + [InlineData(ClientKind.Client, false)] + [InlineData(ClientKind.Router, false)] + [InlineData(ClientKind.Gateway, false)] + [InlineData(ClientKind.Leaf, false)] + [InlineData(ClientKind.System, true)] + [InlineData(ClientKind.JetStream, true)] + [InlineData(ClientKind.Account, true)] + public void IsInternal_returns_correct_value(ClientKind kind, bool expected) + { + kind.IsInternal().ShouldBe(expected); + } +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~IsInternal_returns_correct_value" -v minimal` +Expected: FAIL — `ClientKind` does not exist yet. + +**Step 3: Write minimal implementation** + +Create `src/NATS.Server/ClientKind.cs`: + +```csharp +namespace NATS.Server; + +/// +/// Identifies the type of a client connection. +/// Maps to Go's client kind constants in client.go:45-65. +/// +public enum ClientKind +{ + Client, + Router, + Gateway, + Leaf, + System, + JetStream, + Account, +} + +public static class ClientKindExtensions +{ + public static bool IsInternal(this ClientKind kind) => + kind is ClientKind.System or ClientKind.JetStream or ClientKind.Account; +} +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~IsInternal_returns_correct_value" -v minimal` +Expected: PASS (7 test cases). + +**Step 5: Commit** + +```bash +git add src/NATS.Server/ClientKind.cs tests/NATS.Server.Tests/InternalClientTests.cs +git commit -m "feat: add ClientKind enum with IsInternal extension" +``` + +--- + +### Task 2: Create INatsClient interface and implement on NatsClient + +**Files:** +- Create: `src/NATS.Server/INatsClient.cs` +- Modify: `src/NATS.Server/NatsClient.cs:29` — add `: INatsClient` and `Kind` property +- Modify: `src/NATS.Server/Subscriptions/Subscription.cs:12` — change `Client` type + +**Step 1: Write the failing test** + +Add to `tests/NATS.Server.Tests/InternalClientTests.cs`: + +```csharp +[Fact] +public void NatsClient_implements_INatsClient() +{ + typeof(NatsClient).GetInterfaces().ShouldContain(typeof(INatsClient)); +} + +[Fact] +public void NatsClient_kind_is_Client() +{ + // NatsClient.Kind should return ClientKind.Client + typeof(NatsClient).GetProperty("Kind")!.PropertyType.ShouldBe(typeof(ClientKind)); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~NatsClient_implements_INatsClient" -v minimal` +Expected: FAIL — `INatsClient` does not exist. + +**Step 3: Create INatsClient interface** + +Create `src/NATS.Server/INatsClient.cs`: + +```csharp +using NATS.Server.Auth; +using NATS.Server.Protocol; +using NATS.Server.Subscriptions; + +namespace NATS.Server; + +/// +/// Common interface for all client types (external socket-based and internal socketless). +/// Used by Subscription, message delivery, and routing infrastructure. +/// +public interface INatsClient +{ + ulong Id { get; } + ClientKind Kind { get; } + bool IsInternal => Kind.IsInternal(); + Account? Account { get; } + ClientOptions? ClientOpts { get; } + ClientPermissions? Permissions { get; } + + void SendMessage(string subject, string sid, string? replyTo, + ReadOnlyMemory headers, ReadOnlyMemory payload); + bool QueueOutbound(ReadOnlyMemory data); + void RemoveSubscription(string sid); +} +``` + +**Step 4: Add INatsClient to NatsClient** + +In `src/NATS.Server/NatsClient.cs`, change line 29 from: + +```csharp +public sealed class NatsClient : IDisposable +``` + +to: + +```csharp +public sealed class NatsClient : INatsClient, IDisposable +``` + +Add the `Kind` property after `Id`: + +```csharp +public ClientKind Kind => ClientKind.Client; +``` + +**Step 5: Change Subscription.Client type** + +In `src/NATS.Server/Subscriptions/Subscription.cs`, change line 12 from: + +```csharp +public NatsClient? Client { get; set; } +``` + +to: + +```csharp +public INatsClient? Client { get; set; } +``` + +Remove the `using NATS.Server;` on line 1 if it becomes redundant (it won't — `INatsClient` is in `NATS.Server` namespace). + +**Step 6: Fix compilation errors from Subscription.Client type change** + +The following files reference `sub.Client` as `NatsClient` and need updating: + +- `src/NATS.Server/NatsServer.cs` — `DeliverMessage` (line 616): `client.SendMessage(...)` still works via interface. `client.RemoveSubscription(sub.Sid)` still works. `client.Permissions` still works. The `client.Account?.SubList` still works via interface. +- `src/NATS.Server/NatsServer.cs` — `ProcessMessage` (line 558): `sub.Client == sender` — this is identity comparison, which works on interfaces. +- `src/NATS.Server/NatsServer.cs` — `RemoveClient` (line 721-722): `client.RemoveAllSubscriptions(subList)` — this method is on `NatsClient`, not `INatsClient`. We need to keep `RemoveClient(NatsClient client)` accepting the concrete type since it accesses NatsClient-specific properties (RemoteIp, RemotePort, etc.). + +The `IMessageRouter` interface (lines 17-22 of NatsClient.cs) also uses `NatsClient` directly: + +```csharp +public interface IMessageRouter +{ + void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory headers, + ReadOnlyMemory payload, NatsClient sender); + void RemoveClient(NatsClient client); +} +``` + +Keep `IMessageRouter` using `NatsClient` for now — the internal publish path will call `ProcessMessage` differently (see Task 6). The key change is `Subscription.Client` becoming `INatsClient?`. + +In `NatsServer.DeliverMessage`, the `client.Permissions?.IsDeliveryAllowed` and `client.SendMessage` calls work through `INatsClient`. But `client.RemoveSubscription` is on INatsClient too now. The only issue is `client.Account?.SubList` in the auto-unsub path — `Account` is on `INatsClient`. This compiles fine. + +**Step 7: Run full test suite** + +Run: `dotnet test tests/NATS.Server.Tests -v minimal` +Expected: All existing tests PASS. The interface extraction should be fully backwards-compatible. + +**Step 8: Commit** + +```bash +git add src/NATS.Server/INatsClient.cs src/NATS.Server/NatsClient.cs src/NATS.Server/Subscriptions/Subscription.cs +git commit -m "feat: extract INatsClient interface, implement on NatsClient, update Subscription.Client type" +``` + +--- + +### Task 3: Create InternalClient class + +**Files:** +- Create: `src/NATS.Server/InternalClient.cs` +- Test: `tests/NATS.Server.Tests/InternalClientTests.cs` (add tests) + +**Step 1: Write the failing test** + +Add to `tests/NATS.Server.Tests/InternalClientTests.cs`: + +```csharp +[Fact] +public void InternalClient_system_kind() +{ + var account = new Account("$SYS"); + var client = new InternalClient(1, ClientKind.System, account); + client.Kind.ShouldBe(ClientKind.System); + client.IsInternal.ShouldBeTrue(); + client.Id.ShouldBe(1UL); + client.Account.ShouldBe(account); +} + +[Fact] +public void InternalClient_account_kind() +{ + var account = new Account("myaccount"); + var client = new InternalClient(2, ClientKind.Account, account); + client.Kind.ShouldBe(ClientKind.Account); + client.IsInternal.ShouldBeTrue(); +} + +[Fact] +public void InternalClient_rejects_non_internal_kind() +{ + var account = new Account("test"); + Should.Throw(() => new InternalClient(1, ClientKind.Client, account)); +} + +[Fact] +public void InternalClient_SendMessage_invokes_callback() +{ + var account = new Account("$SYS"); + var client = new InternalClient(1, ClientKind.System, account); + string? capturedSubject = null; + string? capturedSid = null; + client.MessageCallback = (subject, sid, replyTo, headers, payload) => + { + capturedSubject = subject; + capturedSid = sid; + }; + + client.SendMessage("test.subject", "1", null, ReadOnlyMemory.Empty, ReadOnlyMemory.Empty); + + capturedSubject.ShouldBe("test.subject"); + capturedSid.ShouldBe("1"); +} + +[Fact] +public void InternalClient_QueueOutbound_returns_true_noop() +{ + var account = new Account("$SYS"); + var client = new InternalClient(1, ClientKind.System, account); + client.QueueOutbound(ReadOnlyMemory.Empty).ShouldBeTrue(); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~InternalClient_system_kind" -v minimal` +Expected: FAIL — `InternalClient` does not exist. + +**Step 3: Write implementation** + +Create `src/NATS.Server/InternalClient.cs`: + +```csharp +using NATS.Server.Auth; +using NATS.Server.Protocol; +using NATS.Server.Subscriptions; + +namespace NATS.Server; + +/// +/// Lightweight socketless client for internal messaging (SYSTEM, ACCOUNT, JETSTREAM). +/// Maps to Go's internal client created by createInternalClient() in server.go:1910-1936. +/// No network I/O — messages are delivered via callback. +/// +public sealed class InternalClient : INatsClient +{ + public ulong Id { get; } + public ClientKind Kind { get; } + public Account? Account { get; } + public ClientOptions? ClientOpts => null; + public ClientPermissions? Permissions => null; + + /// + /// Callback invoked when a message is delivered to this internal client. + /// Set by the event system or account import infrastructure. + /// + public Action, ReadOnlyMemory>? MessageCallback { get; set; } + + private readonly Dictionary _subs = new(StringComparer.Ordinal); + + public InternalClient(ulong id, ClientKind kind, Account account) + { + if (!kind.IsInternal()) + throw new ArgumentException($"InternalClient requires an internal ClientKind, got {kind}", nameof(kind)); + + Id = id; + Kind = kind; + Account = account; + } + + public void SendMessage(string subject, string sid, string? replyTo, + ReadOnlyMemory headers, ReadOnlyMemory payload) + { + MessageCallback?.Invoke(subject, sid, replyTo, headers, payload); + } + + public bool QueueOutbound(ReadOnlyMemory data) => true; // no-op for internal clients + + public void RemoveSubscription(string sid) + { + if (_subs.Remove(sid)) + Account?.DecrementSubscriptions(); + } + + public void AddSubscription(Subscription sub) + { + _subs[sub.Sid] = sub; + } + + public IReadOnlyDictionary Subscriptions => _subs; +} +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~InternalClient" -v minimal` +Expected: All 7 InternalClient tests PASS. + +**Step 5: Run full test suite** + +Run: `dotnet test tests/NATS.Server.Tests -v minimal` +Expected: All tests PASS. + +**Step 6: Commit** + +```bash +git add src/NATS.Server/InternalClient.cs tests/NATS.Server.Tests/InternalClientTests.cs +git commit -m "feat: add InternalClient class for socketless internal messaging" +``` + +--- + +### Task 4: Create event subject constants and system message handler delegate + +**Files:** +- Create: `src/NATS.Server/Events/EventSubjects.cs` + +**Step 1: Write implementation** + +Create `src/NATS.Server/Events/EventSubjects.cs`: + +```csharp +namespace NATS.Server.Events; + +/// +/// System event subject patterns. +/// Maps to Go events.go:41-97 subject constants. +/// +public static class EventSubjects +{ + // Account-scoped events + public const string ConnectEvent = "$SYS.ACCOUNT.{0}.CONNECT"; + public const string DisconnectEvent = "$SYS.ACCOUNT.{0}.DISCONNECT"; + public const string AccountConnsNew = "$SYS.ACCOUNT.{0}.SERVER.CONNS"; + public const string AccountConnsOld = "$SYS.SERVER.ACCOUNT.{0}.CONNS"; + + // Server-scoped events + public const string ServerStats = "$SYS.SERVER.{0}.STATSZ"; + public const string ServerShutdown = "$SYS.SERVER.{0}.SHUTDOWN"; + public const string ServerLameDuck = "$SYS.SERVER.{0}.LAMEDUCK"; + public const string AuthError = "$SYS.SERVER.{0}.CLIENT.AUTH.ERR"; + public const string AuthErrorAccount = "$SYS.ACCOUNT.CLIENT.AUTH.ERR"; + + // Request-reply subjects (server-specific) + public const string ServerReq = "$SYS.REQ.SERVER.{0}.{1}"; + + // Wildcard ping subjects (all servers respond) + public const string ServerPing = "$SYS.REQ.SERVER.PING.{0}"; + + // Account-scoped request subjects + public const string AccountReq = "$SYS.REQ.ACCOUNT.{0}.{1}"; + + // Inbox for responses + public const string InboxResponse = "$SYS._INBOX_.{0}"; +} + +/// +/// Callback signature for system message handlers. +/// Maps to Go's sysMsgHandler type in events.go:109. +/// +public delegate void SystemMessageHandler( + Subscriptions.Subscription? sub, + INatsClient? client, + Auth.Account? account, + string subject, + string? reply, + ReadOnlyMemory headers, + ReadOnlyMemory message); +``` + +**Step 2: Build** + +Run: `dotnet build src/NATS.Server` +Expected: BUILD SUCCEEDED. + +**Step 3: Commit** + +```bash +git add src/NATS.Server/Events/EventSubjects.cs +git commit -m "feat: add system event subject constants and SystemMessageHandler delegate" +``` + +--- + +### Task 5: Create event DTO types and JSON source generator + +**Files:** +- Create: `src/NATS.Server/Events/EventTypes.cs` +- Create: `src/NATS.Server/Events/EventJsonContext.cs` +- Test: `tests/NATS.Server.Tests/EventSystemTests.cs` + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/EventSystemTests.cs`: + +```csharp +using System.Text.Json; +using NATS.Server.Events; + +namespace NATS.Server.Tests; + +public class EventSystemTests +{ + [Fact] + public void ConnectEventMsg_serializes_with_correct_type() + { + var evt = new ConnectEventMsg + { + Type = ConnectEventMsg.EventType, + Id = "test123", + Time = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc), + Server = new EventServerInfo { Name = "test-server", Id = "SRV1" }, + Client = new EventClientInfo { Id = 1, Account = "$G" }, + }; + + var json = JsonSerializer.Serialize(evt, EventJsonContext.Default.ConnectEventMsg); + json.ShouldContain("\"type\":\"io.nats.server.advisory.v1.client_connect\""); + json.ShouldContain("\"server\":"); + json.ShouldContain("\"client\":"); + } + + [Fact] + public void DisconnectEventMsg_serializes_with_reason() + { + var evt = new DisconnectEventMsg + { + Type = DisconnectEventMsg.EventType, + Id = "test456", + Time = DateTime.UtcNow, + Server = new EventServerInfo { Name = "test-server", Id = "SRV1" }, + Client = new EventClientInfo { Id = 2, Account = "myacc" }, + Reason = "Client Closed", + Sent = new DataStats { Msgs = 10, Bytes = 1024 }, + Received = new DataStats { Msgs = 5, Bytes = 512 }, + }; + + var json = JsonSerializer.Serialize(evt, EventJsonContext.Default.DisconnectEventMsg); + json.ShouldContain("\"reason\":\"Client Closed\""); + } + + [Fact] + public void ServerStatsMsg_serializes() + { + var evt = new ServerStatsMsg + { + Server = new EventServerInfo { Name = "srv1", Id = "ABC" }, + Stats = new ServerStatsData + { + Connections = 10, + TotalConnections = 100, + InMsgs = 5000, + OutMsgs = 4500, + InBytes = 1_000_000, + OutBytes = 900_000, + Mem = 50 * 1024 * 1024, + Subscriptions = 42, + }, + }; + + var json = JsonSerializer.Serialize(evt, EventJsonContext.Default.ServerStatsMsg); + json.ShouldContain("\"connections\":10"); + json.ShouldContain("\"in_msgs\":5000"); + } +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ConnectEventMsg_serializes" -v minimal` +Expected: FAIL — types don't exist. + +**Step 3: Create event DTOs** + +Create `src/NATS.Server/Events/EventTypes.cs`: + +```csharp +using System.Text.Json.Serialization; + +namespace NATS.Server.Events; + +/// +/// Server identity block embedded in all system events. +/// +public sealed class EventServerInfo +{ + [JsonPropertyName("name")] + public string Name { get; set; } = string.Empty; + + [JsonPropertyName("host")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Host { get; set; } + + [JsonPropertyName("id")] + public string Id { get; set; } = string.Empty; + + [JsonPropertyName("cluster")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Cluster { get; set; } + + [JsonPropertyName("domain")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Domain { get; set; } + + [JsonPropertyName("ver")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Version { get; set; } + + [JsonPropertyName("seq")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public ulong Seq { get; set; } + + [JsonPropertyName("tags")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public Dictionary? Tags { get; set; } +} + +/// +/// Client identity block for connect/disconnect events. +/// +public sealed class EventClientInfo +{ + [JsonPropertyName("start")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public DateTime Start { get; set; } + + [JsonPropertyName("stop")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public DateTime Stop { get; set; } + + [JsonPropertyName("host")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Host { get; set; } + + [JsonPropertyName("id")] + public ulong Id { get; set; } + + [JsonPropertyName("acc")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Account { get; set; } + + [JsonPropertyName("name")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Name { get; set; } + + [JsonPropertyName("lang")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Lang { get; set; } + + [JsonPropertyName("ver")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Version { get; set; } + + [JsonPropertyName("rtt")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public long RttNanos { get; set; } +} + +public sealed class DataStats +{ + [JsonPropertyName("msgs")] + public long Msgs { get; set; } + + [JsonPropertyName("bytes")] + public long Bytes { get; set; } +} + +/// Client connect advisory. Go events.go:155-160. +public sealed class ConnectEventMsg +{ + public const string EventType = "io.nats.server.advisory.v1.client_connect"; + + [JsonPropertyName("type")] + public string Type { get; set; } = EventType; + + [JsonPropertyName("id")] + public string Id { get; set; } = string.Empty; + + [JsonPropertyName("timestamp")] + public DateTime Time { get; set; } + + [JsonPropertyName("server")] + public EventServerInfo Server { get; set; } = new(); + + [JsonPropertyName("client")] + public EventClientInfo Client { get; set; } = new(); +} + +/// Client disconnect advisory. Go events.go:167-174. +public sealed class DisconnectEventMsg +{ + public const string EventType = "io.nats.server.advisory.v1.client_disconnect"; + + [JsonPropertyName("type")] + public string Type { get; set; } = EventType; + + [JsonPropertyName("id")] + public string Id { get; set; } = string.Empty; + + [JsonPropertyName("timestamp")] + public DateTime Time { get; set; } + + [JsonPropertyName("server")] + public EventServerInfo Server { get; set; } = new(); + + [JsonPropertyName("client")] + public EventClientInfo Client { get; set; } = new(); + + [JsonPropertyName("sent")] + public DataStats Sent { get; set; } = new(); + + [JsonPropertyName("received")] + public DataStats Received { get; set; } = new(); + + [JsonPropertyName("reason")] + public string Reason { get; set; } = string.Empty; +} + +/// Account connection count heartbeat. Go events.go:210-214. +public sealed class AccountNumConns +{ + public const string EventType = "io.nats.server.advisory.v1.account_connections"; + + [JsonPropertyName("type")] + public string Type { get; set; } = EventType; + + [JsonPropertyName("id")] + public string Id { get; set; } = string.Empty; + + [JsonPropertyName("timestamp")] + public DateTime Time { get; set; } + + [JsonPropertyName("server")] + public EventServerInfo Server { get; set; } = new(); + + [JsonPropertyName("acc")] + public string AccountName { get; set; } = string.Empty; + + [JsonPropertyName("conns")] + public int Connections { get; set; } + + [JsonPropertyName("total_conns")] + public long TotalConnections { get; set; } + + [JsonPropertyName("subs")] + public int Subscriptions { get; set; } + + [JsonPropertyName("sent")] + public DataStats Sent { get; set; } = new(); + + [JsonPropertyName("received")] + public DataStats Received { get; set; } = new(); +} + +/// Server stats broadcast. Go events.go:150-153. +public sealed class ServerStatsMsg +{ + [JsonPropertyName("server")] + public EventServerInfo Server { get; set; } = new(); + + [JsonPropertyName("statsz")] + public ServerStatsData Stats { get; set; } = new(); +} + +public sealed class ServerStatsData +{ + [JsonPropertyName("start")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public DateTime Start { get; set; } + + [JsonPropertyName("mem")] + public long Mem { get; set; } + + [JsonPropertyName("cores")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public int Cores { get; set; } + + [JsonPropertyName("connections")] + public int Connections { get; set; } + + [JsonPropertyName("total_connections")] + public long TotalConnections { get; set; } + + [JsonPropertyName("active_accounts")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public int ActiveAccounts { get; set; } + + [JsonPropertyName("subscriptions")] + public long Subscriptions { get; set; } + + [JsonPropertyName("in_msgs")] + public long InMsgs { get; set; } + + [JsonPropertyName("out_msgs")] + public long OutMsgs { get; set; } + + [JsonPropertyName("in_bytes")] + public long InBytes { get; set; } + + [JsonPropertyName("out_bytes")] + public long OutBytes { get; set; } + + [JsonPropertyName("slow_consumers")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public long SlowConsumers { get; set; } +} + +/// Server shutdown notification. +public sealed class ShutdownEventMsg +{ + [JsonPropertyName("server")] + public EventServerInfo Server { get; set; } = new(); + + [JsonPropertyName("reason")] + public string Reason { get; set; } = string.Empty; +} + +/// Lame duck mode notification. +public sealed class LameDuckEventMsg +{ + [JsonPropertyName("server")] + public EventServerInfo Server { get; set; } = new(); +} + +/// Auth error advisory. +public sealed class AuthErrorEventMsg +{ + public const string EventType = "io.nats.server.advisory.v1.client_auth"; + + [JsonPropertyName("type")] + public string Type { get; set; } = EventType; + + [JsonPropertyName("id")] + public string Id { get; set; } = string.Empty; + + [JsonPropertyName("timestamp")] + public DateTime Time { get; set; } + + [JsonPropertyName("server")] + public EventServerInfo Server { get; set; } = new(); + + [JsonPropertyName("client")] + public EventClientInfo Client { get; set; } = new(); + + [JsonPropertyName("reason")] + public string Reason { get; set; } = string.Empty; +} +``` + +**Step 4: Create JSON source generator context** + +Create `src/NATS.Server/Events/EventJsonContext.cs`: + +```csharp +using System.Text.Json.Serialization; + +namespace NATS.Server.Events; + +[JsonSerializable(typeof(ConnectEventMsg))] +[JsonSerializable(typeof(DisconnectEventMsg))] +[JsonSerializable(typeof(AccountNumConns))] +[JsonSerializable(typeof(ServerStatsMsg))] +[JsonSerializable(typeof(ShutdownEventMsg))] +[JsonSerializable(typeof(LameDuckEventMsg))] +[JsonSerializable(typeof(AuthErrorEventMsg))] +internal partial class EventJsonContext : JsonSerializerContext; +``` + +**Step 5: Run tests** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~EventSystemTests" -v minimal` +Expected: All 3 tests PASS. + +**Step 6: Commit** + +```bash +git add src/NATS.Server/Events/EventTypes.cs src/NATS.Server/Events/EventJsonContext.cs tests/NATS.Server.Tests/EventSystemTests.cs +git commit -m "feat: add system event DTOs and JSON source generator context" +``` + +--- + +### Task 6: Create InternalEventSystem with send/receive loops + +**Files:** +- Create: `src/NATS.Server/Events/InternalEventSystem.cs` +- Test: `tests/NATS.Server.Tests/EventSystemTests.cs` (add tests) + +**Step 1: Write the failing test** + +Add to `tests/NATS.Server.Tests/EventSystemTests.cs`: + +```csharp +[Fact] +public async Task InternalEventSystem_start_and_stop_lifecycle() +{ + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var eventSystem = server.EventSystem; + eventSystem.ShouldNotBeNull(); + eventSystem.SystemClient.ShouldNotBeNull(); + eventSystem.SystemClient.Kind.ShouldBe(ClientKind.System); + + await server.ShutdownAsync(); +} + +[Fact] +public async Task SendInternalMsg_delivers_to_system_subscriber() +{ + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + server.EventSystem!.SysSubscribe("test.subject", (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(subject); + }); + + server.SendInternalMsg("test.subject", null, new { Value = "hello" }); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + result.ShouldBe("test.subject"); + + await server.ShutdownAsync(); +} + +private static NatsServer CreateTestServer() +{ + var port = GetFreePort(); + return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance); +} + +private static int GetFreePort() +{ + using var sock = new System.Net.Sockets.Socket( + System.Net.Sockets.AddressFamily.InterNetwork, + System.Net.Sockets.SocketType.Stream, + System.Net.Sockets.ProtocolType.Tcp); + sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0)); + return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port; +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~InternalEventSystem_start_and_stop" -v minimal` +Expected: FAIL — `EventSystem` property doesn't exist on NatsServer. + +**Step 3: Write InternalEventSystem** + +Create `src/NATS.Server/Events/InternalEventSystem.cs`: + +```csharp +using System.Collections.Concurrent; +using System.Security.Cryptography; +using System.Text; +using System.Text.Json; +using System.Threading.Channels; +using Microsoft.Extensions.Logging; +using NATS.Server.Auth; +using NATS.Server.Subscriptions; + +namespace NATS.Server.Events; + +/// +/// Internal publish message queued for the send loop. +/// +public sealed class PublishMessage +{ + public InternalClient? Client { get; init; } + public required string Subject { get; init; } + public string? Reply { get; init; } + public byte[]? Headers { get; init; } + public object? Body { get; init; } + public bool Echo { get; init; } + public bool IsLast { get; init; } +} + +/// +/// Internal received message queued for the receive loop. +/// +public sealed class InternalSystemMessage +{ + public required Subscription? Sub { get; init; } + public required INatsClient? Client { get; init; } + public required Account? Account { get; init; } + public required string Subject { get; init; } + public required string? Reply { get; init; } + public required ReadOnlyMemory Headers { get; init; } + public required ReadOnlyMemory Message { get; init; } + public required SystemMessageHandler Callback { get; init; } +} + +/// +/// Manages the server's internal event system with Channel-based send/receive loops. +/// Maps to Go's internal struct in events.go:124-147 and the goroutines +/// internalSendLoop (events.go:495) and internalReceiveLoop (events.go:476). +/// +public sealed class InternalEventSystem : IAsyncDisposable +{ + private readonly ILogger _logger; + private readonly Channel _sendQueue; + private readonly Channel _receiveQueue; + private readonly Channel _receiveQueuePings; + private readonly CancellationTokenSource _cts = new(); + + private Task? _sendLoop; + private Task? _receiveLoop; + private Task? _receiveLoopPings; + private NatsServer? _server; + + private ulong _sequence; + private int _subscriptionId; + + public Account SystemAccount { get; } + public InternalClient SystemClient { get; } + public string ServerHash { get; } + + public InternalEventSystem(Account systemAccount, InternalClient systemClient, string serverName, ILogger logger) + { + _logger = logger; + SystemAccount = systemAccount; + SystemClient = systemClient; + + // Hash server name for inbox routing (matches Go's shash) + ServerHash = Convert.ToHexString(SHA256.HashData(Encoding.UTF8.GetBytes(serverName)))[..8].ToLowerInvariant(); + + _sendQueue = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); + _receiveQueue = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); + _receiveQueuePings = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); + } + + public void Start(NatsServer server) + { + _server = server; + var ct = _cts.Token; + _sendLoop = Task.Run(() => InternalSendLoopAsync(ct), ct); + _receiveLoop = Task.Run(() => InternalReceiveLoopAsync(_receiveQueue, ct), ct); + _receiveLoopPings = Task.Run(() => InternalReceiveLoopAsync(_receiveQueuePings, ct), ct); + } + + /// + /// Creates a system subscription in the system account's SubList. + /// Maps to Go's sysSubscribe in events.go:2796. + /// + public Subscription SysSubscribe(string subject, SystemMessageHandler callback) + { + var sid = Interlocked.Increment(ref _subscriptionId).ToString(); + var sub = new Subscription + { + Subject = subject, + Sid = sid, + Client = SystemClient, + }; + + // Wrap callback in noInlineCallback pattern: enqueue to receive loop + SystemClient.MessageCallback = (subj, s, reply, hdr, msg) => + { + _receiveQueue.Writer.TryWrite(new InternalSystemMessage + { + Sub = sub, + Client = SystemClient, + Account = SystemAccount, + Subject = subj, + Reply = reply, + Headers = hdr, + Message = msg, + Callback = callback, + }); + }; + + SystemAccount.SubList.Insert(sub); + return sub; + } + + /// + /// Enqueue an internal message for publishing through the send loop. + /// + public void Enqueue(PublishMessage message) + { + _sendQueue.Writer.TryWrite(message); + } + + /// + /// The send loop: serializes messages and delivers them via the server's routing. + /// Maps to Go's internalSendLoop in events.go:495-668. + /// + private async Task InternalSendLoopAsync(CancellationToken ct) + { + try + { + await foreach (var pm in _sendQueue.Reader.ReadAllAsync(ct)) + { + try + { + var seq = Interlocked.Increment(ref _sequence); + + // Serialize body to JSON + byte[] payload; + if (pm.Body is byte[] raw) + { + payload = raw; + } + else if (pm.Body != null) + { + payload = JsonSerializer.SerializeToUtf8Bytes(pm.Body, pm.Body.GetType(), EventJsonContext.Default); + } + else + { + payload = []; + } + + // Deliver via the system account's SubList matching + var result = SystemAccount.SubList.Match(pm.Subject); + + foreach (var sub in result.PlainSubs) + { + sub.Client?.SendMessage(pm.Subject, sub.Sid, pm.Reply, + pm.Headers ?? ReadOnlyMemory.Empty, + payload); + } + + foreach (var queueGroup in result.QueueSubs) + { + if (queueGroup.Length == 0) continue; + var sub = queueGroup[0]; // Simple pick for internal + sub.Client?.SendMessage(pm.Subject, sub.Sid, pm.Reply, + pm.Headers ?? ReadOnlyMemory.Empty, + payload); + } + + if (pm.IsLast) + break; + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Error in internal send loop processing message on {Subject}", pm.Subject); + } + } + } + catch (OperationCanceledException) + { + // Normal shutdown + } + } + + /// + /// The receive loop: dispatches callbacks for internally-received messages. + /// Maps to Go's internalReceiveLoop in events.go:476-491. + /// + private async Task InternalReceiveLoopAsync(Channel queue, CancellationToken ct) + { + try + { + await foreach (var msg in queue.Reader.ReadAllAsync(ct)) + { + try + { + msg.Callback(msg.Sub, msg.Client, msg.Account, msg.Subject, msg.Reply, msg.Headers, msg.Message); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Error in internal receive loop processing {Subject}", msg.Subject); + } + } + } + catch (OperationCanceledException) + { + // Normal shutdown + } + } + + public async ValueTask DisposeAsync() + { + await _cts.CancelAsync(); + _sendQueue.Writer.TryComplete(); + _receiveQueue.Writer.TryComplete(); + _receiveQueuePings.Writer.TryComplete(); + + if (_sendLoop != null) await _sendLoop.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); + if (_receiveLoop != null) await _receiveLoop.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); + if (_receiveLoopPings != null) await _receiveLoopPings.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); + + _cts.Dispose(); + } +} +``` + +**Step 4: Wire into NatsServer** + +In `src/NATS.Server/NatsServer.cs`: + +Add field after `_systemAccount` (around line 32): + +```csharp +private InternalEventSystem? _eventSystem; +``` + +Add public property: + +```csharp +public InternalEventSystem? EventSystem => _eventSystem; +``` + +In the constructor (after `_systemAccount` creation around line 259), create and wire up the event system: + +```csharp +// Create system internal client +var sysClientId = Interlocked.Increment(ref _nextClientId); +var sysClient = new InternalClient(sysClientId, ClientKind.System, _systemAccount); +_eventSystem = new InternalEventSystem( + _systemAccount, sysClient, + options.ServerName ?? $"nats-dotnet-{Environment.MachineName}", + _loggerFactory.CreateLogger()); +``` + +In `StartAsync` (after listener is ready), add: + +```csharp +_eventSystem?.Start(this); +``` + +In `ShutdownAsync` (before closing clients, around line 95), add: + +```csharp +if (_eventSystem != null) + await _eventSystem.DisposeAsync(); +``` + +Add the `SendInternalMsg` API method: + +```csharp +public void SendInternalMsg(string subject, string? reply, object? msg) +{ + _eventSystem?.Enqueue(new PublishMessage { Subject = subject, Reply = reply, Body = msg }); +} + +public void SendInternalAccountMsg(Account account, string subject, object? msg) +{ + _eventSystem?.Enqueue(new PublishMessage { Subject = subject, Body = msg }); +} +``` + +**Step 5: Run tests** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~EventSystemTests" -v minimal` +Expected: All event system tests PASS. + +**Step 6: Run full test suite** + +Run: `dotnet test tests/NATS.Server.Tests -v minimal` +Expected: All tests PASS. + +**Step 7: Commit** + +```bash +git add src/NATS.Server/Events/InternalEventSystem.cs src/NATS.Server/NatsServer.cs tests/NATS.Server.Tests/EventSystemTests.cs +git commit -m "feat: add InternalEventSystem with Channel-based send/receive loops" +``` + +--- + +### Task 7: Wire system event publishing (connect, disconnect, shutdown, stats) + +**Files:** +- Modify: `src/NATS.Server/NatsServer.cs` — add event publishing at lifecycle points +- Modify: `src/NATS.Server/NatsClient.cs` — publish auth error on auth failure +- Test: `tests/NATS.Server.Tests/SystemEventsTests.cs` + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/SystemEventsTests.cs`: + +```csharp +using System.Text.Json; +using NATS.Server; +using NATS.Server.Events; +using Microsoft.Extensions.Logging.Abstractions; + +namespace NATS.Server.Tests; + +public class SystemEventsTests +{ + [Fact] + public async Task Server_publishes_connect_event_on_client_auth() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + server.EventSystem!.SysSubscribe($"$SYS.ACCOUNT.*.CONNECT", (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(subject); + }); + + // Connect a real client + using var sock = new System.Net.Sockets.Socket( + System.Net.Sockets.AddressFamily.InterNetwork, + System.Net.Sockets.SocketType.Stream, + System.Net.Sockets.ProtocolType.Tcp); + await sock.ConnectAsync(System.Net.IPAddress.Loopback, server.Port); + + // Read INFO + var buf = new byte[4096]; + await sock.ReceiveAsync(buf); + + // Send CONNECT + var connect = System.Text.Encoding.ASCII.GetBytes("CONNECT {}\r\n"); + await sock.SendAsync(connect); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + result.ShouldStartWith("$SYS.ACCOUNT."); + result.ShouldEndWith(".CONNECT"); + + await server.ShutdownAsync(); + } + + [Fact] + public async Task Server_publishes_disconnect_event_on_client_close() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + server.EventSystem!.SysSubscribe($"$SYS.ACCOUNT.*.DISCONNECT", (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(subject); + }); + + // Connect and then disconnect + using var sock = new System.Net.Sockets.Socket( + System.Net.Sockets.AddressFamily.InterNetwork, + System.Net.Sockets.SocketType.Stream, + System.Net.Sockets.ProtocolType.Tcp); + await sock.ConnectAsync(System.Net.IPAddress.Loopback, server.Port); + var buf = new byte[4096]; + await sock.ReceiveAsync(buf); + await sock.SendAsync(System.Text.Encoding.ASCII.GetBytes("CONNECT {}\r\n")); + await Task.Delay(100); + sock.Shutdown(System.Net.Sockets.SocketShutdown.Both); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + result.ShouldStartWith("$SYS.ACCOUNT."); + result.ShouldEndWith(".DISCONNECT"); + + await server.ShutdownAsync(); + } + + [Fact] + public async Task Server_publishes_shutdown_event() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + server.EventSystem!.SysSubscribe($"$SYS.SERVER.*.SHUTDOWN", (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(subject); + }); + + await server.ShutdownAsync(); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + result.ShouldContain(".SHUTDOWN"); + } + + private static NatsServer CreateTestServer() + { + var port = GetFreePort(); + return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance); + } + + private static int GetFreePort() + { + using var sock = new System.Net.Sockets.Socket( + System.Net.Sockets.AddressFamily.InterNetwork, + System.Net.Sockets.SocketType.Stream, + System.Net.Sockets.ProtocolType.Tcp); + sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0)); + return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port; + } +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~Server_publishes_connect_event" -v minimal` +Expected: FAIL — no events being published yet. + +**Step 3: Add event publishing to NatsServer** + +Add a helper method to `NatsServer` that builds `EventServerInfo`: + +```csharp +private EventServerInfo BuildEventServerInfo() +{ + var seq = _eventSystem != null ? Interlocked.Increment(ref _eventSystem._sequence) : 0; // We'll need to expose this + return new EventServerInfo + { + Name = _serverInfo.ServerName, + Host = _options.Host, + Id = _serverInfo.ServerId, + Version = NatsProtocol.Version, + Seq = seq, + Tags = _options.Tags?.Count > 0 ? _options.Tags : null, + }; +} +``` + +Note: We need to expose sequence from InternalEventSystem. Add a public method `public ulong NextSequence() => Interlocked.Increment(ref _sequence);` to InternalEventSystem, and change `_sequence` visibility. + +**Publish connect event** — in `NatsClient.ProcessConnect()`, after successful auth assignment (after the client is assigned to an account), have the router publish the connect event. Add to the `NatsClient` after the `CONNECT` is processed and account assigned — call a new method on the router/server. + +Add to `IMessageRouter`: + +```csharp +void PublishConnectEvent(NatsClient client); +void PublishDisconnectEvent(NatsClient client); +``` + +In `NatsServer`, implement: + +```csharp +public void PublishConnectEvent(NatsClient client) +{ + if (_eventSystem == null) return; + var accountName = client.Account?.Name ?? Account.GlobalAccountName; + var subject = string.Format(EventSubjects.ConnectEvent, accountName); + var evt = new ConnectEventMsg + { + Id = Guid.NewGuid().ToString("N"), + Time = DateTime.UtcNow, + Server = BuildEventServerInfo(), + Client = BuildEventClientInfo(client), + }; + SendInternalMsg(subject, null, evt); +} + +public void PublishDisconnectEvent(NatsClient client) +{ + if (_eventSystem == null) return; + var accountName = client.Account?.Name ?? Account.GlobalAccountName; + var subject = string.Format(EventSubjects.DisconnectEvent, accountName); + var evt = new DisconnectEventMsg + { + Id = Guid.NewGuid().ToString("N"), + Time = DateTime.UtcNow, + Server = BuildEventServerInfo(), + Client = BuildEventClientInfo(client), + Sent = new DataStats { Msgs = Interlocked.Read(ref client.OutMsgs), Bytes = Interlocked.Read(ref client.OutBytes) }, + Received = new DataStats { Msgs = Interlocked.Read(ref client.InMsgs), Bytes = Interlocked.Read(ref client.InBytes) }, + Reason = client.CloseReason.ToReasonString(), + }; + SendInternalMsg(subject, null, evt); +} + +private static EventClientInfo BuildEventClientInfo(NatsClient client) +{ + return new EventClientInfo + { + Id = client.Id, + Host = client.RemoteIp, + Account = client.Account?.Name, + Name = client.ClientOpts?.Name, + Lang = client.ClientOpts?.Lang, + Version = client.ClientOpts?.Version, + Start = client.StartTime, + RttNanos = client.Rtt.Ticks * 100, // Ticks are 100ns + }; +} +``` + +Call `PublishConnectEvent` in the auth success path of `NatsClient.ProcessConnect()`. +Call `PublishDisconnectEvent` in `NatsServer.RemoveClient()`. + +**Publish shutdown event** — in `NatsServer.ShutdownAsync()`, before disposing the event system: + +```csharp +var shutdownSubject = string.Format(EventSubjects.ServerShutdown, _serverInfo.ServerId); +_eventSystem?.Enqueue(new PublishMessage +{ + Subject = shutdownSubject, + Body = new ShutdownEventMsg { Server = BuildEventServerInfo(), Reason = "Server Shutdown" }, + IsLast = true, +}); +// Give send loop time to process the shutdown event +await Task.Delay(100); +``` + +**Step 4: Run tests** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~SystemEventsTests" -v minimal` +Expected: All system event tests PASS. + +**Step 5: Run full test suite** + +Run: `dotnet test tests/NATS.Server.Tests -v minimal` +Expected: All tests PASS. + +**Step 6: Commit** + +```bash +git add src/NATS.Server/NatsServer.cs src/NATS.Server/NatsClient.cs src/NATS.Server/Events/InternalEventSystem.cs tests/NATS.Server.Tests/SystemEventsTests.cs +git commit -m "feat: wire system event publishing for connect, disconnect, and shutdown" +``` + +--- + +### Task 8: Add periodic stats and account connection heartbeats + +**Files:** +- Modify: `src/NATS.Server/Events/InternalEventSystem.cs` — add timer loops +- Modify: `src/NATS.Server/NatsServer.cs` — add stats gathering helpers +- Test: `tests/NATS.Server.Tests/SystemEventsTests.cs` (add tests) + +**Step 1: Write the failing test** + +Add to `tests/NATS.Server.Tests/SystemEventsTests.cs`: + +```csharp +[Fact] +public async Task Server_publishes_statsz_periodically() +{ + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + server.EventSystem!.SysSubscribe($"$SYS.SERVER.*.STATSZ", (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(subject); + }); + + // Trigger a manual stats publish (don't wait 10s) + server.EventSystem!.PublishServerStats(); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + result.ShouldContain(".STATSZ"); + + await server.ShutdownAsync(); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~Server_publishes_statsz" -v minimal` +Expected: FAIL — `PublishServerStats` doesn't exist. + +**Step 3: Add stats publishing to InternalEventSystem** + +Add to `InternalEventSystem`: + +```csharp +public void PublishServerStats() +{ + if (_server == null) return; + + var subject = string.Format(EventSubjects.ServerStats, _server.ServerId); + var process = System.Diagnostics.Process.GetCurrentProcess(); + + var statsMsg = new ServerStatsMsg + { + Server = _server.BuildEventServerInfo(), + Stats = new ServerStatsData + { + Start = _server.StartTime, + Mem = process.WorkingSet64, + Cores = Environment.ProcessorCount, + Connections = _server.ClientCount, + TotalConnections = Interlocked.Read(ref _server.Stats.TotalConnections), + Subscriptions = SystemAccount.SubList.Count, + InMsgs = Interlocked.Read(ref _server.Stats.InMsgs), + OutMsgs = Interlocked.Read(ref _server.Stats.OutMsgs), + InBytes = Interlocked.Read(ref _server.Stats.InBytes), + OutBytes = Interlocked.Read(ref _server.Stats.OutBytes), + SlowConsumers = Interlocked.Read(ref _server.Stats.SlowConsumers), + }, + }; + + Enqueue(new PublishMessage { Subject = subject, Body = statsMsg }); +} +``` + +Make `BuildEventServerInfo()` public on NatsServer (change from `private` to `public`). + +Start a timer task in the `Start` method for periodic statsz: + +```csharp +_ = Task.Run(async () => +{ + using var timer = new PeriodicTimer(TimeSpan.FromSeconds(10)); + while (await timer.WaitForNextTickAsync(ct)) + { + PublishServerStats(); + } +}, ct); +``` + +**Step 4: Run tests** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~Server_publishes_statsz" -v minimal` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/Events/InternalEventSystem.cs src/NATS.Server/NatsServer.cs tests/NATS.Server.Tests/SystemEventsTests.cs +git commit -m "feat: add periodic server stats and account connection heartbeat publishing" +``` + +--- + +### Task 9: Add system request-reply monitoring services + +**Files:** +- Modify: `src/NATS.Server/Events/InternalEventSystem.cs` — add `InitEventTracking()` +- Modify: `src/NATS.Server/NatsServer.cs` — add request handlers +- Test: `tests/NATS.Server.Tests/SystemRequestReplyTests.cs` + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/SystemRequestReplyTests.cs`: + +```csharp +using System.Text; +using System.Text.Json; +using NATS.Server; +using NATS.Server.Events; +using NATS.Server.Monitoring; +using Microsoft.Extensions.Logging.Abstractions; + +namespace NATS.Server.Tests; + +public class SystemRequestReplyTests +{ + [Fact] + public async Task Varz_request_reply_returns_server_info() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + // Subscribe to the reply inbox + var replySubject = $"_INBOX.test.{Guid.NewGuid():N}"; + server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(msg.ToArray()); + }); + + // Publish request to $SYS.REQ.SERVER.{id}.VARZ + var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "VARZ"); + server.SendInternalMsg(reqSubject, replySubject, null); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + var json = Encoding.UTF8.GetString(result); + json.ShouldContain("\"server_id\""); + + await server.ShutdownAsync(); + } + + [Fact] + public async Task Healthz_request_reply_returns_ok() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + var replySubject = $"_INBOX.test.{Guid.NewGuid():N}"; + server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(msg.ToArray()); + }); + + var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "HEALTHZ"); + server.SendInternalMsg(reqSubject, replySubject, null); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + var json = Encoding.UTF8.GetString(result); + json.ShouldContain("ok"); + + await server.ShutdownAsync(); + } + + private static NatsServer CreateTestServer() + { + var port = GetFreePort(); + return new NatsServer(new NatsOptions { Port = port, MonitorPort = GetFreePort() }, NullLoggerFactory.Instance); + } + + private static int GetFreePort() + { + using var sock = new System.Net.Sockets.Socket( + System.Net.Sockets.AddressFamily.InterNetwork, + System.Net.Sockets.SocketType.Stream, + System.Net.Sockets.ProtocolType.Tcp); + sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0)); + return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port; + } +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~Varz_request_reply" -v minimal` +Expected: FAIL — no request-reply handlers registered. + +**Step 3: Add InitEventTracking with request-reply subscriptions** + +Add method to `InternalEventSystem`: + +```csharp +public void InitEventTracking(NatsServer server) +{ + _server = server; + var serverId = server.ServerId; + + // Server-specific monitoring services + RegisterService(serverId, "VARZ", server.HandleVarzRequest); + RegisterService(serverId, "CONNZ", server.HandleConnzRequest); + RegisterService(serverId, "SUBSZ", server.HandleSubszRequest); + RegisterService(serverId, "HEALTHZ", server.HandleHealthzRequest); + RegisterService(serverId, "STATSZ", server.HandleStatszRequest); + RegisterService(serverId, "IDZ", server.HandleIdzRequest); + + // Wildcard ping services (all servers respond) + SysSubscribe(string.Format(EventSubjects.ServerPing, "VARZ"), WrapRequestHandler(server.HandleVarzRequest)); + SysSubscribe(string.Format(EventSubjects.ServerPing, "HEALTHZ"), WrapRequestHandler(server.HandleHealthzRequest)); + SysSubscribe(string.Format(EventSubjects.ServerPing, "IDZ"), WrapRequestHandler(server.HandleIdzRequest)); + SysSubscribe(string.Format(EventSubjects.ServerPing, "STATSZ"), WrapRequestHandler(server.HandleStatszRequest)); +} + +private void RegisterService(string serverId, string name, Action handler) +{ + var subject = string.Format(EventSubjects.ServerReq, serverId, name); + SysSubscribe(subject, WrapRequestHandler(handler)); +} + +private SystemMessageHandler WrapRequestHandler(Action handler) +{ + return (sub, client, acc, subject, reply, hdr, msg) => + { + handler(subject, reply); + }; +} +``` + +Add request handler methods to `NatsServer`: + +```csharp +public void HandleVarzRequest(string subject, string? reply) +{ + if (reply == null) return; + // Reuse VarzHandler logic (we need to make it accessible) + var varz = _varzHandler?.HandleVarzAsync(CancellationToken.None).GetAwaiter().GetResult(); + if (varz != null) + { + var json = JsonSerializer.SerializeToUtf8Bytes(varz); + SendInternalMsg(reply, null, json); + } +} + +public void HandleHealthzRequest(string subject, string? reply) +{ + if (reply == null) return; + var response = new { status = "ok" }; + SendInternalMsg(reply, null, response); +} + +public void HandleConnzRequest(string subject, string? reply) +{ + if (reply == null) return; + // Simplified connz response + var connz = _connzHandler?.HandleConnz(null!); // Need adapter + if (connz != null) + SendInternalMsg(reply, null, connz); +} + +public void HandleSubszRequest(string subject, string? reply) +{ + if (reply == null) return; + SendInternalMsg(reply, null, new { num_subscriptions = SubList.Count }); +} + +public void HandleStatszRequest(string subject, string? reply) +{ + if (reply == null) return; + _eventSystem?.PublishServerStats(); +} + +public void HandleIdzRequest(string subject, string? reply) +{ + if (reply == null) return; + var idz = new + { + server_id = _serverInfo.ServerId, + server_name = _serverInfo.ServerName, + version = NatsProtocol.Version, + host = _options.Host, + port = _options.Port, + }; + SendInternalMsg(reply, null, idz); +} +``` + +Wire `_varzHandler` as a field on NatsServer (extracted from MonitorServer or created directly). + +Call `_eventSystem.InitEventTracking(this)` in `StartAsync` after the event system starts. + +**Step 4: Run tests** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~SystemRequestReplyTests" -v minimal` +Expected: All request-reply tests PASS. + +**Step 5: Run full test suite** + +Run: `dotnet test tests/NATS.Server.Tests -v minimal` +Expected: All tests PASS. + +**Step 6: Commit** + +```bash +git add src/NATS.Server/Events/InternalEventSystem.cs src/NATS.Server/NatsServer.cs tests/NATS.Server.Tests/SystemRequestReplyTests.cs +git commit -m "feat: add system request-reply monitoring services ($SYS.REQ.SERVER.*)" +``` + +--- + +### Task 10: Create import/export model types + +**Files:** +- Create: `src/NATS.Server/Imports/ServiceResponseType.cs` +- Create: `src/NATS.Server/Imports/ExportAuth.cs` +- Create: `src/NATS.Server/Imports/StreamExport.cs` +- Create: `src/NATS.Server/Imports/ServiceExport.cs` +- Create: `src/NATS.Server/Imports/StreamImport.cs` +- Create: `src/NATS.Server/Imports/ServiceImport.cs` +- Create: `src/NATS.Server/Imports/ExportMap.cs` +- Create: `src/NATS.Server/Imports/ImportMap.cs` +- Create: `src/NATS.Server/Imports/ServiceLatency.cs` +- Test: `tests/NATS.Server.Tests/ImportExportTests.cs` + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/ImportExportTests.cs`: + +```csharp +using NATS.Server.Auth; +using NATS.Server.Imports; + +namespace NATS.Server.Tests; + +public class ImportExportTests +{ + [Fact] + public void ExportAuth_public_export_authorizes_any_account() + { + var auth = new ExportAuth(); + var account = new Account("test"); + auth.IsAuthorized(account).ShouldBeTrue(); + } + + [Fact] + public void ExportAuth_approved_accounts_restricts_access() + { + var auth = new ExportAuth { ApprovedAccounts = ["allowed"] }; + var allowed = new Account("allowed"); + var denied = new Account("denied"); + auth.IsAuthorized(allowed).ShouldBeTrue(); + auth.IsAuthorized(denied).ShouldBeFalse(); + } + + [Fact] + public void ExportAuth_revoked_account_denied() + { + var auth = new ExportAuth + { + ApprovedAccounts = ["test"], + RevokedAccounts = new() { ["test"] = DateTimeOffset.UtcNow.ToUnixTimeSeconds() }, + }; + var account = new Account("test"); + auth.IsAuthorized(account).ShouldBeFalse(); + } + + [Fact] + public void ServiceResponseType_defaults_to_singleton() + { + var import = new ServiceImport + { + DestinationAccount = new Account("dest"), + From = "requests.>", + To = "api.>", + }; + import.ResponseType.ShouldBe(ServiceResponseType.Singleton); + } + + [Fact] + public void ExportMap_stores_and_retrieves_exports() + { + var map = new ExportMap(); + map.Services["api.>"] = new ServiceExport { Account = new Account("svc") }; + map.Streams["events.>"] = new StreamExport(); + + map.Services.ShouldContainKey("api.>"); + map.Streams.ShouldContainKey("events.>"); + } + + [Fact] + public void ImportMap_stores_service_imports() + { + var map = new ImportMap(); + var si = new ServiceImport + { + DestinationAccount = new Account("dest"), + From = "requests.>", + To = "api.>", + }; + map.AddServiceImport(si); + map.Services.ShouldContainKey("requests.>"); + map.Services["requests.>"].Count.ShouldBe(1); + } +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ImportExportTests" -v minimal` +Expected: FAIL — types don't exist. + +**Step 3: Create all import/export types** + +Create `src/NATS.Server/Imports/ServiceResponseType.cs`: + +```csharp +namespace NATS.Server.Imports; + +/// +/// Response type for service exports. Maps to Go's ServiceRespType. +/// +public enum ServiceResponseType +{ + Singleton, + Streamed, + Chunked, +} +``` + +Create `src/NATS.Server/Imports/ServiceLatency.cs`: + +```csharp +namespace NATS.Server.Imports; + +/// +/// Latency tracking configuration for a service export. +/// Maps to Go's serviceLatency in accounts.go:245-248. +/// +public sealed class ServiceLatency +{ + public int SamplingPercentage { get; init; } = 100; + public string Subject { get; init; } = string.Empty; +} +``` + +Create `src/NATS.Server/Imports/ExportAuth.cs`: + +```csharp +using NATS.Server.Auth; + +namespace NATS.Server.Imports; + +/// +/// Authorization rules for an export. +/// Maps to Go's exportAuth in accounts.go:217-222. +/// +public sealed class ExportAuth +{ + public bool TokenRequired { get; init; } + public uint AccountPosition { get; init; } + public HashSet? ApprovedAccounts { get; init; } + public Dictionary? RevokedAccounts { get; init; } + + public bool IsAuthorized(Account account) + { + // Check revocation first + if (RevokedAccounts != null && RevokedAccounts.ContainsKey(account.Name)) + return false; + + // Public export (no restrictions) + if (ApprovedAccounts == null && !TokenRequired && AccountPosition == 0) + return true; + + // Check approved accounts + if (ApprovedAccounts != null) + return ApprovedAccounts.Contains(account.Name); + + return false; + } +} +``` + +Create `src/NATS.Server/Imports/StreamExport.cs`: + +```csharp +namespace NATS.Server.Imports; + +/// Maps to Go's streamExport in accounts.go:225-227. +public sealed class StreamExport +{ + public ExportAuth Auth { get; init; } = new(); +} +``` + +Create `src/NATS.Server/Imports/ServiceExport.cs`: + +```csharp +using NATS.Server.Auth; + +namespace NATS.Server.Imports; + +/// Maps to Go's serviceExport in accounts.go:230-242. +public sealed class ServiceExport +{ + public ExportAuth Auth { get; init; } = new(); + public Account? Account { get; init; } + public ServiceResponseType ResponseType { get; init; } = ServiceResponseType.Singleton; + public TimeSpan ResponseThreshold { get; init; } = TimeSpan.FromMinutes(2); + public ServiceLatency? Latency { get; init; } + public bool AllowTrace { get; init; } +} +``` + +Create `src/NATS.Server/Imports/StreamImport.cs`: + +```csharp +using NATS.Server.Auth; +using NATS.Server.Subscriptions; + +namespace NATS.Server.Imports; + +/// Maps to Go's streamImport in accounts.go:142-155. +public sealed class StreamImport +{ + public required Account SourceAccount { get; init; } + public required string From { get; init; } + public required string To { get; init; } + public SubjectTransform? Transform { get; init; } + public bool UsePub { get; init; } + public bool Invalid { get; set; } +} +``` + +Create `src/NATS.Server/Imports/ServiceImport.cs`: + +```csharp +using NATS.Server.Auth; +using NATS.Server.Subscriptions; + +namespace NATS.Server.Imports; + +/// Maps to Go's serviceImport in accounts.go:160-181. +public sealed class ServiceImport +{ + public required Account DestinationAccount { get; init; } + public required string From { get; init; } + public required string To { get; init; } + public SubjectTransform? Transform { get; init; } + public ServiceExport? Export { get; init; } + public ServiceResponseType ResponseType { get; init; } = ServiceResponseType.Singleton; + public byte[]? Sid { get; set; } + public bool IsResponse { get; init; } + public bool UsePub { get; init; } + public bool Invalid { get; set; } + public bool Share { get; init; } + public bool Tracking { get; init; } + public long TimestampTicks { get; set; } +} +``` + +Create `src/NATS.Server/Imports/ExportMap.cs`: + +```csharp +namespace NATS.Server.Imports; + +/// Maps to Go's exportMap in accounts.go:251-255. +public sealed class ExportMap +{ + public Dictionary Streams { get; } = new(StringComparer.Ordinal); + public Dictionary Services { get; } = new(StringComparer.Ordinal); + public Dictionary Responses { get; } = new(StringComparer.Ordinal); +} +``` + +Create `src/NATS.Server/Imports/ImportMap.cs`: + +```csharp +namespace NATS.Server.Imports; + +/// Maps to Go's importMap in accounts.go:259-263. +public sealed class ImportMap +{ + public List Streams { get; } = []; + public Dictionary> Services { get; } = new(StringComparer.Ordinal); + + public void AddServiceImport(ServiceImport si) + { + if (!Services.TryGetValue(si.From, out var list)) + { + list = []; + Services[si.From] = list; + } + list.Add(si); + } +} +``` + +**Step 4: Run tests** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ImportExportTests" -v minimal` +Expected: All 6 import/export tests PASS. + +**Step 5: Commit** + +```bash +git add src/NATS.Server/Imports/ tests/NATS.Server.Tests/ImportExportTests.cs +git commit -m "feat: add import/export model types (ServiceImport, StreamImport, exports, auth)" +``` + +--- + +### Task 11: Add import/export support to Account and ACCOUNT client + +**Files:** +- Modify: `src/NATS.Server/Auth/Account.cs` — add exports, imports, internal client +- Modify: `src/NATS.Server/Subscriptions/Subscription.cs` — add ServiceImport/StreamImport references +- Test: `tests/NATS.Server.Tests/ImportExportTests.cs` (add tests) + +**Step 1: Write the failing test** + +Add to `tests/NATS.Server.Tests/ImportExportTests.cs`: + +```csharp +[Fact] +public void Account_add_service_export_and_import() +{ + var exporter = new Account("exporter"); + var importer = new Account("importer"); + + exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null); + exporter.Exports.Services.ShouldContainKey("api.>"); + + var si = importer.AddServiceImport(exporter, "requests.>", "api.>"); + si.ShouldNotBeNull(); + si.From.ShouldBe("requests.>"); + si.To.ShouldBe("api.>"); + si.DestinationAccount.ShouldBe(exporter); + importer.Imports.Services.ShouldContainKey("requests.>"); +} + +[Fact] +public void Account_add_stream_export_and_import() +{ + var exporter = new Account("exporter"); + var importer = new Account("importer"); + + exporter.AddStreamExport("events.>", null); + exporter.Exports.Streams.ShouldContainKey("events.>"); + + importer.AddStreamImport(exporter, "events.>", "imported.events.>"); + importer.Imports.Streams.Count.ShouldBe(1); + importer.Imports.Streams[0].From.ShouldBe("events.>"); + importer.Imports.Streams[0].To.ShouldBe("imported.events.>"); +} + +[Fact] +public void Account_service_import_auth_rejected() +{ + var exporter = new Account("exporter"); + var importer = new Account("importer"); + + exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, [new Account("other")]); + + Should.Throw(() => + importer.AddServiceImport(exporter, "requests.>", "api.>")); +} + +[Fact] +public void Account_lazy_creates_internal_client() +{ + var account = new Account("test"); + var client = account.GetOrCreateInternalClient(99); + client.ShouldNotBeNull(); + client.Kind.ShouldBe(ClientKind.Account); + client.Account.ShouldBe(account); + + // Second call returns same instance + var client2 = account.GetOrCreateInternalClient(100); + client2.ShouldBeSameAs(client); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~Account_add_service_export" -v minimal` +Expected: FAIL — methods don't exist on Account. + +**Step 3: Extend Account** + +Modify `src/NATS.Server/Auth/Account.cs` — add imports, exports, internal client: + +```csharp +using NATS.Server.Imports; +using NATS.Server.Subscriptions; + +// Add these fields/properties to Account class: + +public ExportMap Exports { get; } = new(); +public ImportMap Imports { get; } = new(); + +private InternalClient? _internalClient; +private ulong _internalSubId; + +public InternalClient GetOrCreateInternalClient(ulong clientId) +{ + if (_internalClient != null) return _internalClient; + _internalClient = new InternalClient(clientId, ClientKind.Account, this); + return _internalClient; +} + +public void AddServiceExport(string subject, ServiceResponseType responseType, IEnumerable? approved) +{ + var auth = new ExportAuth + { + ApprovedAccounts = approved != null ? new HashSet(approved.Select(a => a.Name)) : null, + }; + Exports.Services[subject] = new ServiceExport + { + Auth = auth, + Account = this, + ResponseType = responseType, + }; +} + +public void AddStreamExport(string subject, IEnumerable? approved) +{ + var auth = new ExportAuth + { + ApprovedAccounts = approved != null ? new HashSet(approved.Select(a => a.Name)) : null, + }; + Exports.Streams[subject] = new StreamExport { Auth = auth }; +} + +public ServiceImport AddServiceImport(Account destination, string from, string to) +{ + // Check authorization + if (!destination.Exports.Services.TryGetValue(to, out var export)) + throw new InvalidOperationException($"No service export found for '{to}' on account '{destination.Name}'"); + + if (!export.Auth.IsAuthorized(this)) + throw new UnauthorizedAccessException($"Account '{Name}' not authorized to import '{to}' from '{destination.Name}'"); + + var si = new ServiceImport + { + DestinationAccount = destination, + From = from, + To = to, + Export = export, + ResponseType = export.ResponseType, + }; + + Imports.AddServiceImport(si); + return si; +} + +public void AddStreamImport(Account source, string from, string to) +{ + // Check authorization + if (!source.Exports.Streams.TryGetValue(from, out var export)) + throw new InvalidOperationException($"No stream export found for '{from}' on account '{source.Name}'"); + + if (!export.Auth.IsAuthorized(this)) + throw new UnauthorizedAccessException($"Account '{Name}' not authorized to import '{from}' from '{source.Name}'"); + + var si = new StreamImport + { + SourceAccount = source, + From = from, + To = to, + }; + + Imports.Streams.Add(si); +} +``` + +**Step 4: Add ServiceImport/StreamImport refs to Subscription** + +Modify `src/NATS.Server/Subscriptions/Subscription.cs`: + +```csharp +using NATS.Server; +using NATS.Server.Imports; + +namespace NATS.Server.Subscriptions; + +public sealed class Subscription +{ + public required string Subject { get; init; } + public string? Queue { get; init; } + public required string Sid { get; init; } + public long MessageCount; + public long MaxMessages; + public INatsClient? Client { get; set; } + public ServiceImport? ServiceImport { get; set; } + public StreamImport? StreamImport { get; set; } +} +``` + +**Step 5: Run tests** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ImportExportTests" -v minimal` +Expected: All import/export tests PASS. + +**Step 6: Run full test suite** + +Run: `dotnet test tests/NATS.Server.Tests -v minimal` +Expected: All tests PASS. + +**Step 7: Commit** + +```bash +git add src/NATS.Server/Auth/Account.cs src/NATS.Server/Subscriptions/Subscription.cs tests/NATS.Server.Tests/ImportExportTests.cs +git commit -m "feat: add import/export support to Account with ACCOUNT client lazy creation" +``` + +--- + +### Task 12: Wire service import into message delivery path + +**Files:** +- Modify: `src/NATS.Server/NatsServer.cs` — modify ProcessMessage and DeliverMessage for imports +- Test: `tests/NATS.Server.Tests/ImportExportTests.cs` (add integration test) + +**Step 1: Write the failing test** + +Add to `tests/NATS.Server.Tests/ImportExportTests.cs`: + +```csharp +[Fact] +public async Task Service_import_forwards_message_to_export_account() +{ + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + // Set up exporter and importer accounts + var exporter = server.GetOrCreateAccount("exporter"); + var importer = server.GetOrCreateAccount("importer"); + + exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null); + importer.AddServiceImport(exporter, "requests.>", "api.>"); + + // Wire the import subscription into the importer's SubList + server.WireServiceImports(importer); + + // Subscribe in exporter account to receive forwarded message + var received = new TaskCompletionSource(); + var exportSub = new Subscription { Subject = "api.test", Sid = "1", Client = null }; + // We'll verify via the internal mechanism + + // TODO: Complete this integration test once ProcessServiceImport is wired + + await server.ShutdownAsync(); +} + +private static NatsServer CreateTestServer() +{ + var port = GetFreePort(); + return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance); +} + +private static int GetFreePort() +{ + using var sock = new System.Net.Sockets.Socket( + System.Net.Sockets.AddressFamily.InterNetwork, + System.Net.Sockets.SocketType.Stream, + System.Net.Sockets.ProtocolType.Tcp); + sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0)); + return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port; +} +``` + +**Step 2: Modify ProcessMessage for service imports** + +In `NatsServer.ProcessMessage`, after the existing delivery logic, add service import processing: + +```csharp +// Check for service imports that match this subject +if (sender.Account != null) +{ + foreach (var kvp in sender.Account.Imports.Services) + { + foreach (var si in kvp.Value) + { + if (si.Invalid) continue; + if (SubjectMatch.Match(subject, si.From)) + { + ProcessServiceImport(si, subject, replyTo, headers, payload, sender); + delivered = true; + } + } + } +} +``` + +Add the `ProcessServiceImport` method: + +```csharp +private void ProcessServiceImport(ServiceImport si, string subject, string? replyTo, + ReadOnlyMemory headers, ReadOnlyMemory payload, INatsClient sender) +{ + // Transform subject + var targetSubject = si.To; + if (si.Transform != null) + { + var transformed = si.Transform.Apply(subject); + if (transformed != null) targetSubject = transformed; + } + else if (si.UsePub) + { + targetSubject = subject; + } + + // Match against destination account's SubList + var destSubList = si.DestinationAccount.SubList; + var result = destSubList.Match(targetSubject); + + // Deliver to destination subscribers + foreach (var sub in result.PlainSubs) + { + if (sub.Client == null) continue; + DeliverMessage(sub, targetSubject, replyTo, headers, payload); + } + + foreach (var queueGroup in result.QueueSubs) + { + if (queueGroup.Length == 0) continue; + var sub = queueGroup[0]; + if (sub.Client != null) + DeliverMessage(sub, targetSubject, replyTo, headers, payload); + } +} +``` + +Similarly, modify `DeliverMessage` for stream imports: + +```csharp +// In DeliverMessage, before client.SendMessage: +var deliverSubject = subject; +if (sub.StreamImport != null) +{ + if (sub.StreamImport.Transform != null) + { + var transformed = sub.StreamImport.Transform.Apply(subject); + if (transformed != null) deliverSubject = transformed; + } + else if (!sub.StreamImport.UsePub) + { + deliverSubject = sub.StreamImport.To; + } +} + +client.SendMessage(deliverSubject, sub.Sid, replyTo, headers, payload); +``` + +Add `WireServiceImports` and `GetOrCreateAccount` helpers to NatsServer. + +**Step 3: Run tests** + +Run: `dotnet test tests/NATS.Server.Tests -v minimal` +Expected: All tests PASS. + +**Step 4: Commit** + +```bash +git add src/NATS.Server/NatsServer.cs tests/NATS.Server.Tests/ImportExportTests.cs +git commit -m "feat: wire service import forwarding and stream import transforms into message delivery" +``` + +--- + +### Task 13: Add response routing for service imports + +**Files:** +- Modify: `src/NATS.Server/NatsServer.cs` — response import creation and delivery +- Create: `tests/NATS.Server.Tests/ResponseRoutingTests.cs` + +**Step 1: Write the failing test** + +Create `tests/NATS.Server.Tests/ResponseRoutingTests.cs`: + +```csharp +using NATS.Server; +using NATS.Server.Auth; +using NATS.Server.Imports; +using Microsoft.Extensions.Logging.Abstractions; + +namespace NATS.Server.Tests; + +public class ResponseRoutingTests +{ + [Fact] + public void GenerateReplyPrefix_creates_unique_prefix() + { + var prefix1 = ResponseRouter.GenerateReplyPrefix(); + var prefix2 = ResponseRouter.GenerateReplyPrefix(); + + prefix1.ShouldStartWith("_R_."); + prefix2.ShouldStartWith("_R_."); + prefix1.ShouldNotBe(prefix2); + prefix1.Length.ShouldBeGreaterThan(4); + } + + [Fact] + public void Singleton_response_import_removed_after_delivery() + { + var exporter = new Account("exporter"); + exporter.AddServiceExport("api.test", ServiceResponseType.Singleton, null); + + var replyPrefix = ResponseRouter.GenerateReplyPrefix(); + var responseSi = new ServiceImport + { + DestinationAccount = exporter, + From = replyPrefix + ">", + To = "_INBOX.original.reply", + IsResponse = true, + ResponseType = ServiceResponseType.Singleton, + }; + exporter.Exports.Responses[replyPrefix] = responseSi; + + exporter.Exports.Responses.ShouldContainKey(replyPrefix); + + // Simulate singleton delivery cleanup + ResponseRouter.CleanupResponse(exporter, replyPrefix, responseSi); + + exporter.Exports.Responses.ShouldNotContainKey(replyPrefix); + } +} +``` + +**Step 2: Create ResponseRouter** + +Create `src/NATS.Server/Imports/ResponseRouter.cs`: + +```csharp +using System.Security.Cryptography; +using NATS.Server.Auth; + +namespace NATS.Server.Imports; + +/// +/// Handles response routing for service imports. +/// Maps to Go's service reply prefix generation and response cleanup. +/// +public static class ResponseRouter +{ + private static readonly char[] Base62 = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789".ToCharArray(); + + public static string GenerateReplyPrefix() + { + Span bytes = stackalloc byte[10]; + RandomNumberGenerator.Fill(bytes); + var chars = new char[10]; + for (int i = 0; i < 10; i++) + chars[i] = Base62[bytes[i] % 62]; + return $"_R_.{new string(chars)}."; + } + + public static ServiceImport CreateResponseImport( + Account exporterAccount, + ServiceImport originalImport, + string originalReply) + { + var replyPrefix = GenerateReplyPrefix(); + + var responseSi = new ServiceImport + { + DestinationAccount = exporterAccount, + From = replyPrefix + ">", + To = originalReply, + IsResponse = true, + ResponseType = originalImport.ResponseType, + Export = originalImport.Export, + TimestampTicks = DateTime.UtcNow.Ticks, + }; + + exporterAccount.Exports.Responses[replyPrefix] = responseSi; + return responseSi; + } + + public static void CleanupResponse(Account account, string replyPrefix, ServiceImport responseSi) + { + account.Exports.Responses.Remove(replyPrefix); + // Could also remove subscription from SubList if one was created + } +} +``` + +**Step 3: Run tests** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ResponseRoutingTests" -v minimal` +Expected: All response routing tests PASS. + +**Step 4: Commit** + +```bash +git add src/NATS.Server/Imports/ResponseRouter.cs tests/NATS.Server.Tests/ResponseRoutingTests.cs +git commit -m "feat: add response routing for service import request-reply patterns" +``` + +--- + +### Task 14: Add latency tracking for service imports + +**Files:** +- Modify: `src/NATS.Server/Imports/ServiceImport.cs` — add latency measurement fields +- Create: `src/NATS.Server/Imports/LatencyTracker.cs` +- Test: `tests/NATS.Server.Tests/ResponseRoutingTests.cs` (add tests) + +**Step 1: Write the failing test** + +Add to `tests/NATS.Server.Tests/ResponseRoutingTests.cs`: + +```csharp +[Fact] +public void LatencyTracker_should_sample_respects_percentage() +{ + var latency = new ServiceLatency { SamplingPercentage = 0, Subject = "latency.test" }; + LatencyTracker.ShouldSample(latency).ShouldBeFalse(); + + var latency100 = new ServiceLatency { SamplingPercentage = 100, Subject = "latency.test" }; + LatencyTracker.ShouldSample(latency100).ShouldBeTrue(); +} + +[Fact] +public void LatencyTracker_builds_latency_message() +{ + var msg = LatencyTracker.BuildLatencyMsg("requester", "responder", + TimeSpan.FromMilliseconds(5), TimeSpan.FromMilliseconds(10)); + + msg.Requestor.ShouldBe("requester"); + msg.Responder.ShouldBe("responder"); + msg.ServiceLatencyNanos.ShouldBeGreaterThan(0); + msg.TotalLatencyNanos.ShouldBeGreaterThan(0); +} +``` + +**Step 2: Create LatencyTracker** + +Create `src/NATS.Server/Imports/LatencyTracker.cs`: + +```csharp +using System.Text.Json.Serialization; + +namespace NATS.Server.Imports; + +public sealed class ServiceLatencyMsg +{ + [JsonPropertyName("type")] + public string Type { get; set; } = "io.nats.server.metric.v1.service_latency"; + + [JsonPropertyName("requestor")] + public string Requestor { get; set; } = string.Empty; + + [JsonPropertyName("responder")] + public string Responder { get; set; } = string.Empty; + + [JsonPropertyName("status")] + public int Status { get; set; } = 200; + + [JsonPropertyName("svc_latency")] + public long ServiceLatencyNanos { get; set; } + + [JsonPropertyName("total_latency")] + public long TotalLatencyNanos { get; set; } +} + +public static class LatencyTracker +{ + public static bool ShouldSample(ServiceLatency latency) + { + if (latency.SamplingPercentage <= 0) return false; + if (latency.SamplingPercentage >= 100) return true; + return Random.Shared.Next(100) < latency.SamplingPercentage; + } + + public static ServiceLatencyMsg BuildLatencyMsg( + string requestor, string responder, + TimeSpan serviceLatency, TimeSpan totalLatency) + { + return new ServiceLatencyMsg + { + Requestor = requestor, + Responder = responder, + ServiceLatencyNanos = serviceLatency.Ticks * 100, + TotalLatencyNanos = totalLatency.Ticks * 100, + }; + } +} +``` + +**Step 3: Run tests** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~LatencyTracker" -v minimal` +Expected: All latency tests PASS. + +**Step 4: Commit** + +```bash +git add src/NATS.Server/Imports/LatencyTracker.cs tests/NATS.Server.Tests/ResponseRoutingTests.cs +git commit -m "feat: add latency tracking for service import request-reply" +``` + +--- + +### Task 15: Update differences.md + +**Files:** +- Modify: `differences.md` + +**Step 1: Run full test suite first** + +Run: `dotnet test tests/NATS.Server.Tests -v minimal` +Expected: All tests PASS. + +**Step 2: Update differences.md** + +In the Connection Types table (lines 60-71), change: + +``` +| SYSTEM (internal) | Y | N | | +``` +to: +``` +| SYSTEM (internal) | Y | Y | InternalClient + InternalEventSystem with Channel-based send/receive loops | +``` + +And: +``` +| ACCOUNT (internal) | Y | N | | +``` +to: +``` +| ACCOUNT (internal) | Y | Y | Lazy per-account InternalClient with import/export subscription support | +``` + +In the Protocol Parsing Gaps table (line 137), optionally note that multi-client-type awareness is now partially implemented. + +In the Account System table (line 221), change: + +``` +| Account exports/imports | Y | N | | +``` +to: +``` +| Account exports/imports | Y | Y | ServiceImport/StreamImport with ExportAuth, subject transforms, response routing | +``` + +Add to the "Resolved Since Initial Audit" section: + +``` +- **SYSTEM client type** — InternalClient with InternalEventSystem, Channel-based send/receive loops, event publishing +- **ACCOUNT client type** — lazy per-account InternalClient with import/export subscription support +- **System event publishing** — connect/disconnect advisories, server stats, shutdown/lame-duck events, auth errors +- **System request-reply services** — $SYS.REQ.SERVER.*.VARZ/CONNZ/SUBSZ/HEALTHZ/IDZ/STATSZ with ping wildcards +- **Account exports/imports** — service and stream imports with ExportAuth, subject transforms, response routing, latency tracking +``` + +Remove SYSTEM/ACCOUNT from any "Remaining" gaps sections. + +**Step 3: Commit** + +```bash +git add differences.md +git commit -m "docs: update differences.md to reflect SYSTEM/ACCOUNT types and imports/exports implemented" +``` + +--- + +### Task 16: Final verification — full test suite and build + +**Step 1: Clean build** + +Run: `dotnet clean && dotnet build` +Expected: BUILD SUCCEEDED with no warnings. + +**Step 2: Full test suite** + +Run: `dotnet test tests/NATS.Server.Tests -v normal` +Expected: All tests PASS. + +**Step 3: Verify no regressions** + +Run: `dotnet test -v minimal` +Expected: All projects pass. + +**Step 4: Final commit (if any cleanup needed)** + +```bash +git status +# If clean, nothing to commit. Otherwise: +git add -A && git commit -m "chore: final cleanup for SYSTEM/ACCOUNT connection types" +``` diff --git a/docs/plans/2026-02-23-system-account-types-plan.md.tasks.json b/docs/plans/2026-02-23-system-account-types-plan.md.tasks.json new file mode 100644 index 0000000..87e7beb --- /dev/null +++ b/docs/plans/2026-02-23-system-account-types-plan.md.tasks.json @@ -0,0 +1,22 @@ +{ + "planPath": "docs/plans/2026-02-23-system-account-types-plan.md", + "tasks": [ + {"id": 6, "subject": "Task 1: Create ClientKind enum and extensions", "status": "pending"}, + {"id": 7, "subject": "Task 2: Create INatsClient interface and implement on NatsClient", "status": "pending", "blockedBy": [6]}, + {"id": 8, "subject": "Task 3: Create InternalClient class", "status": "pending", "blockedBy": [7]}, + {"id": 9, "subject": "Task 4: Create event subject constants and SystemMessageHandler delegate", "status": "pending", "blockedBy": [8]}, + {"id": 10, "subject": "Task 5: Create event DTO types and JSON source generator", "status": "pending", "blockedBy": [9]}, + {"id": 11, "subject": "Task 6: Create InternalEventSystem with send/receive loops", "status": "pending", "blockedBy": [10]}, + {"id": 12, "subject": "Task 7: Wire system event publishing (connect, disconnect, shutdown)", "status": "pending", "blockedBy": [11]}, + {"id": 13, "subject": "Task 8: Add periodic stats and account connection heartbeats", "status": "pending", "blockedBy": [12]}, + {"id": 14, "subject": "Task 9: Add system request-reply monitoring services", "status": "pending", "blockedBy": [13]}, + {"id": 15, "subject": "Task 10: Create import/export model types", "status": "pending", "blockedBy": [8]}, + {"id": 16, "subject": "Task 11: Add import/export support to Account and ACCOUNT client", "status": "pending", "blockedBy": [15]}, + {"id": 17, "subject": "Task 12: Wire service import into message delivery path", "status": "pending", "blockedBy": [16]}, + {"id": 18, "subject": "Task 13: Add response routing for service imports", "status": "pending", "blockedBy": [17]}, + {"id": 19, "subject": "Task 14: Add latency tracking for service imports", "status": "pending", "blockedBy": [18]}, + {"id": 20, "subject": "Task 15: Update differences.md", "status": "pending", "blockedBy": [19]}, + {"id": 21, "subject": "Task 16: Final verification — full test suite and build", "status": "pending", "blockedBy": [20]} + ], + "lastUpdated": "2026-02-23T00:00:00Z" +}