Files
natsdotnet/docs/plans/2026-02-23-system-account-types-plan.md
Joseph Doherty 4b3890f046 docs: add implementation plan for SYSTEM/ACCOUNT connection types
16 tasks across 6 layers: ClientKind + INatsClient + InternalClient,
event infrastructure, event publishing, request-reply services,
import/export model, and response routing with latency tracking.
2026-02-23 05:12:02 -05:00

85 KiB

SYSTEM and ACCOUNT Connection Types Implementation Plan

For Claude: REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans to implement this plan task-by-task.

Goal: Port the SYSTEM and ACCOUNT internal connection types from Go, including full event publishing, request-reply monitoring, and cross-account import/export.

Architecture: 6-layer bottom-up build. Layer 1 introduces ClientKind enum, INatsClient interface, and InternalClient class. Layer 2 adds the InternalEventSystem with Channel-based send/receive loops. Layer 3 wires system event publishing (connect, disconnect, stats, shutdown). Layer 4 adds request-reply monitoring services ($SYS.REQ.SERVER.*). Layer 5 implements the import/export model with ACCOUNT client. Layer 6 adds response routing and latency tracking.

Tech Stack: .NET 10 / C# 14, System.Text.Json source generators, System.Threading.Channels, xUnit 3, Shouldly

Design doc: docs/plans/2026-02-23-system-account-types-design.md


Task 1: Create ClientKind enum and extensions

Files:

  • Create: src/NATS.Server/ClientKind.cs
  • Test: tests/NATS.Server.Tests/InternalClientTests.cs

Step 1: Write the failing test

Create tests/NATS.Server.Tests/InternalClientTests.cs:

namespace NATS.Server.Tests;

public class InternalClientTests
{
    [Theory]
    [InlineData(ClientKind.Client, false)]
    [InlineData(ClientKind.Router, false)]
    [InlineData(ClientKind.Gateway, false)]
    [InlineData(ClientKind.Leaf, false)]
    [InlineData(ClientKind.System, true)]
    [InlineData(ClientKind.JetStream, true)]
    [InlineData(ClientKind.Account, true)]
    public void IsInternal_returns_correct_value(ClientKind kind, bool expected)
    {
        kind.IsInternal().ShouldBe(expected);
    }
}

Step 2: Run test to verify it fails

Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~IsInternal_returns_correct_value" -v minimal Expected: FAIL — ClientKind does not exist yet.

Step 3: Write minimal implementation

Create src/NATS.Server/ClientKind.cs:

namespace NATS.Server;

/// <summary>
/// Identifies the type of a client connection.
/// Maps to Go's client kind constants in client.go:45-65.
/// </summary>
public enum ClientKind
{
    Client,
    Router,
    Gateway,
    Leaf,
    System,
    JetStream,
    Account,
}

public static class ClientKindExtensions
{
    public static bool IsInternal(this ClientKind kind) =>
        kind is ClientKind.System or ClientKind.JetStream or ClientKind.Account;
}

Step 4: Run test to verify it passes

Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~IsInternal_returns_correct_value" -v minimal Expected: PASS (7 test cases).

Step 5: Commit

git add src/NATS.Server/ClientKind.cs tests/NATS.Server.Tests/InternalClientTests.cs
git commit -m "feat: add ClientKind enum with IsInternal extension"

Task 2: Create INatsClient interface and implement on NatsClient

Files:

  • Create: src/NATS.Server/INatsClient.cs
  • Modify: src/NATS.Server/NatsClient.cs:29 — add : INatsClient and Kind property
  • Modify: src/NATS.Server/Subscriptions/Subscription.cs:12 — change Client type

Step 1: Write the failing test

Add to tests/NATS.Server.Tests/InternalClientTests.cs:

[Fact]
public void NatsClient_implements_INatsClient()
{
    typeof(NatsClient).GetInterfaces().ShouldContain(typeof(INatsClient));
}

[Fact]
public void NatsClient_kind_is_Client()
{
    // NatsClient.Kind should return ClientKind.Client
    typeof(NatsClient).GetProperty("Kind")!.PropertyType.ShouldBe(typeof(ClientKind));
}

Step 2: Run test to verify it fails

Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~NatsClient_implements_INatsClient" -v minimal Expected: FAIL — INatsClient does not exist.

Step 3: Create INatsClient interface

Create src/NATS.Server/INatsClient.cs:

using NATS.Server.Auth;
using NATS.Server.Protocol;
using NATS.Server.Subscriptions;

namespace NATS.Server;

/// <summary>
/// Common interface for all client types (external socket-based and internal socketless).
/// Used by Subscription, message delivery, and routing infrastructure.
/// </summary>
public interface INatsClient
{
    ulong Id { get; }
    ClientKind Kind { get; }
    bool IsInternal => Kind.IsInternal();
    Account? Account { get; }
    ClientOptions? ClientOpts { get; }
    ClientPermissions? Permissions { get; }

    void SendMessage(string subject, string sid, string? replyTo,
        ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload);
    bool QueueOutbound(ReadOnlyMemory<byte> data);
    void RemoveSubscription(string sid);
}

Step 4: Add INatsClient to NatsClient

In src/NATS.Server/NatsClient.cs, change line 29 from:

public sealed class NatsClient : IDisposable

to:

public sealed class NatsClient : INatsClient, IDisposable

Add the Kind property after Id:

public ClientKind Kind => ClientKind.Client;

Step 5: Change Subscription.Client type

In src/NATS.Server/Subscriptions/Subscription.cs, change line 12 from:

public NatsClient? Client { get; set; }

to:

public INatsClient? Client { get; set; }

Remove the using NATS.Server; on line 1 if it becomes redundant (it won't — INatsClient is in NATS.Server namespace).

Step 6: Fix compilation errors from Subscription.Client type change

The following files reference sub.Client as NatsClient and need updating:

  • src/NATS.Server/NatsServer.csDeliverMessage (line 616): client.SendMessage(...) still works via interface. client.RemoveSubscription(sub.Sid) still works. client.Permissions still works. The client.Account?.SubList still works via interface.
  • src/NATS.Server/NatsServer.csProcessMessage (line 558): sub.Client == sender — this is identity comparison, which works on interfaces.
  • src/NATS.Server/NatsServer.csRemoveClient (line 721-722): client.RemoveAllSubscriptions(subList) — this method is on NatsClient, not INatsClient. We need to keep RemoveClient(NatsClient client) accepting the concrete type since it accesses NatsClient-specific properties (RemoteIp, RemotePort, etc.).

The IMessageRouter interface (lines 17-22 of NatsClient.cs) also uses NatsClient directly:

public interface IMessageRouter
{
    void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory<byte> headers,
        ReadOnlyMemory<byte> payload, NatsClient sender);
    void RemoveClient(NatsClient client);
}

Keep IMessageRouter using NatsClient for now — the internal publish path will call ProcessMessage differently (see Task 6). The key change is Subscription.Client becoming INatsClient?.

In NatsServer.DeliverMessage, the client.Permissions?.IsDeliveryAllowed and client.SendMessage calls work through INatsClient. But client.RemoveSubscription is on INatsClient too now. The only issue is client.Account?.SubList in the auto-unsub path — Account is on INatsClient. This compiles fine.

Step 7: Run full test suite

Run: dotnet test tests/NATS.Server.Tests -v minimal Expected: All existing tests PASS. The interface extraction should be fully backwards-compatible.

Step 8: Commit

git add src/NATS.Server/INatsClient.cs src/NATS.Server/NatsClient.cs src/NATS.Server/Subscriptions/Subscription.cs
git commit -m "feat: extract INatsClient interface, implement on NatsClient, update Subscription.Client type"

Task 3: Create InternalClient class

Files:

  • Create: src/NATS.Server/InternalClient.cs
  • Test: tests/NATS.Server.Tests/InternalClientTests.cs (add tests)

Step 1: Write the failing test

Add to tests/NATS.Server.Tests/InternalClientTests.cs:

[Fact]
public void InternalClient_system_kind()
{
    var account = new Account("$SYS");
    var client = new InternalClient(1, ClientKind.System, account);
    client.Kind.ShouldBe(ClientKind.System);
    client.IsInternal.ShouldBeTrue();
    client.Id.ShouldBe(1UL);
    client.Account.ShouldBe(account);
}

[Fact]
public void InternalClient_account_kind()
{
    var account = new Account("myaccount");
    var client = new InternalClient(2, ClientKind.Account, account);
    client.Kind.ShouldBe(ClientKind.Account);
    client.IsInternal.ShouldBeTrue();
}

[Fact]
public void InternalClient_rejects_non_internal_kind()
{
    var account = new Account("test");
    Should.Throw<ArgumentException>(() => new InternalClient(1, ClientKind.Client, account));
}

[Fact]
public void InternalClient_SendMessage_invokes_callback()
{
    var account = new Account("$SYS");
    var client = new InternalClient(1, ClientKind.System, account);
    string? capturedSubject = null;
    string? capturedSid = null;
    client.MessageCallback = (subject, sid, replyTo, headers, payload) =>
    {
        capturedSubject = subject;
        capturedSid = sid;
    };

    client.SendMessage("test.subject", "1", null, ReadOnlyMemory<byte>.Empty, ReadOnlyMemory<byte>.Empty);

    capturedSubject.ShouldBe("test.subject");
    capturedSid.ShouldBe("1");
}

[Fact]
public void InternalClient_QueueOutbound_returns_true_noop()
{
    var account = new Account("$SYS");
    var client = new InternalClient(1, ClientKind.System, account);
    client.QueueOutbound(ReadOnlyMemory<byte>.Empty).ShouldBeTrue();
}

Step 2: Run test to verify it fails

Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~InternalClient_system_kind" -v minimal Expected: FAIL — InternalClient does not exist.

Step 3: Write implementation

Create src/NATS.Server/InternalClient.cs:

using NATS.Server.Auth;
using NATS.Server.Protocol;
using NATS.Server.Subscriptions;

namespace NATS.Server;

/// <summary>
/// Lightweight socketless client for internal messaging (SYSTEM, ACCOUNT, JETSTREAM).
/// Maps to Go's internal client created by createInternalClient() in server.go:1910-1936.
/// No network I/O — messages are delivered via callback.
/// </summary>
public sealed class InternalClient : INatsClient
{
    public ulong Id { get; }
    public ClientKind Kind { get; }
    public Account? Account { get; }
    public ClientOptions? ClientOpts => null;
    public ClientPermissions? Permissions => null;

    /// <summary>
    /// Callback invoked when a message is delivered to this internal client.
    /// Set by the event system or account import infrastructure.
    /// </summary>
    public Action<string, string, string?, ReadOnlyMemory<byte>, ReadOnlyMemory<byte>>? MessageCallback { get; set; }

    private readonly Dictionary<string, Subscription> _subs = new(StringComparer.Ordinal);

    public InternalClient(ulong id, ClientKind kind, Account account)
    {
        if (!kind.IsInternal())
            throw new ArgumentException($"InternalClient requires an internal ClientKind, got {kind}", nameof(kind));

        Id = id;
        Kind = kind;
        Account = account;
    }

    public void SendMessage(string subject, string sid, string? replyTo,
        ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
    {
        MessageCallback?.Invoke(subject, sid, replyTo, headers, payload);
    }

    public bool QueueOutbound(ReadOnlyMemory<byte> data) => true; // no-op for internal clients

    public void RemoveSubscription(string sid)
    {
        if (_subs.Remove(sid))
            Account?.DecrementSubscriptions();
    }

    public void AddSubscription(Subscription sub)
    {
        _subs[sub.Sid] = sub;
    }

    public IReadOnlyDictionary<string, Subscription> Subscriptions => _subs;
}

Step 4: Run test to verify it passes

Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~InternalClient" -v minimal Expected: All 7 InternalClient tests PASS.

Step 5: Run full test suite

Run: dotnet test tests/NATS.Server.Tests -v minimal Expected: All tests PASS.

Step 6: Commit

git add src/NATS.Server/InternalClient.cs tests/NATS.Server.Tests/InternalClientTests.cs
git commit -m "feat: add InternalClient class for socketless internal messaging"

Task 4: Create event subject constants and system message handler delegate

Files:

  • Create: src/NATS.Server/Events/EventSubjects.cs

Step 1: Write implementation

Create src/NATS.Server/Events/EventSubjects.cs:

namespace NATS.Server.Events;

/// <summary>
/// System event subject patterns.
/// Maps to Go events.go:41-97 subject constants.
/// </summary>
public static class EventSubjects
{
    // Account-scoped events
    public const string ConnectEvent = "$SYS.ACCOUNT.{0}.CONNECT";
    public const string DisconnectEvent = "$SYS.ACCOUNT.{0}.DISCONNECT";
    public const string AccountConnsNew = "$SYS.ACCOUNT.{0}.SERVER.CONNS";
    public const string AccountConnsOld = "$SYS.SERVER.ACCOUNT.{0}.CONNS";

    // Server-scoped events
    public const string ServerStats = "$SYS.SERVER.{0}.STATSZ";
    public const string ServerShutdown = "$SYS.SERVER.{0}.SHUTDOWN";
    public const string ServerLameDuck = "$SYS.SERVER.{0}.LAMEDUCK";
    public const string AuthError = "$SYS.SERVER.{0}.CLIENT.AUTH.ERR";
    public const string AuthErrorAccount = "$SYS.ACCOUNT.CLIENT.AUTH.ERR";

    // Request-reply subjects (server-specific)
    public const string ServerReq = "$SYS.REQ.SERVER.{0}.{1}";

    // Wildcard ping subjects (all servers respond)
    public const string ServerPing = "$SYS.REQ.SERVER.PING.{0}";

    // Account-scoped request subjects
    public const string AccountReq = "$SYS.REQ.ACCOUNT.{0}.{1}";

    // Inbox for responses
    public const string InboxResponse = "$SYS._INBOX_.{0}";
}

/// <summary>
/// Callback signature for system message handlers.
/// Maps to Go's sysMsgHandler type in events.go:109.
/// </summary>
public delegate void SystemMessageHandler(
    Subscriptions.Subscription? sub,
    INatsClient? client,
    Auth.Account? account,
    string subject,
    string? reply,
    ReadOnlyMemory<byte> headers,
    ReadOnlyMemory<byte> message);

Step 2: Build

Run: dotnet build src/NATS.Server Expected: BUILD SUCCEEDED.

Step 3: Commit

git add src/NATS.Server/Events/EventSubjects.cs
git commit -m "feat: add system event subject constants and SystemMessageHandler delegate"

Task 5: Create event DTO types and JSON source generator

Files:

  • Create: src/NATS.Server/Events/EventTypes.cs
  • Create: src/NATS.Server/Events/EventJsonContext.cs
  • Test: tests/NATS.Server.Tests/EventSystemTests.cs

Step 1: Write the failing test

Create tests/NATS.Server.Tests/EventSystemTests.cs:

using System.Text.Json;
using NATS.Server.Events;

namespace NATS.Server.Tests;

public class EventSystemTests
{
    [Fact]
    public void ConnectEventMsg_serializes_with_correct_type()
    {
        var evt = new ConnectEventMsg
        {
            Type = ConnectEventMsg.EventType,
            Id = "test123",
            Time = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc),
            Server = new EventServerInfo { Name = "test-server", Id = "SRV1" },
            Client = new EventClientInfo { Id = 1, Account = "$G" },
        };

        var json = JsonSerializer.Serialize(evt, EventJsonContext.Default.ConnectEventMsg);
        json.ShouldContain("\"type\":\"io.nats.server.advisory.v1.client_connect\"");
        json.ShouldContain("\"server\":");
        json.ShouldContain("\"client\":");
    }

    [Fact]
    public void DisconnectEventMsg_serializes_with_reason()
    {
        var evt = new DisconnectEventMsg
        {
            Type = DisconnectEventMsg.EventType,
            Id = "test456",
            Time = DateTime.UtcNow,
            Server = new EventServerInfo { Name = "test-server", Id = "SRV1" },
            Client = new EventClientInfo { Id = 2, Account = "myacc" },
            Reason = "Client Closed",
            Sent = new DataStats { Msgs = 10, Bytes = 1024 },
            Received = new DataStats { Msgs = 5, Bytes = 512 },
        };

        var json = JsonSerializer.Serialize(evt, EventJsonContext.Default.DisconnectEventMsg);
        json.ShouldContain("\"reason\":\"Client Closed\"");
    }

    [Fact]
    public void ServerStatsMsg_serializes()
    {
        var evt = new ServerStatsMsg
        {
            Server = new EventServerInfo { Name = "srv1", Id = "ABC" },
            Stats = new ServerStatsData
            {
                Connections = 10,
                TotalConnections = 100,
                InMsgs = 5000,
                OutMsgs = 4500,
                InBytes = 1_000_000,
                OutBytes = 900_000,
                Mem = 50 * 1024 * 1024,
                Subscriptions = 42,
            },
        };

        var json = JsonSerializer.Serialize(evt, EventJsonContext.Default.ServerStatsMsg);
        json.ShouldContain("\"connections\":10");
        json.ShouldContain("\"in_msgs\":5000");
    }
}

Step 2: Run test to verify it fails

Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ConnectEventMsg_serializes" -v minimal Expected: FAIL — types don't exist.

Step 3: Create event DTOs

Create src/NATS.Server/Events/EventTypes.cs:

using System.Text.Json.Serialization;

namespace NATS.Server.Events;

/// <summary>
/// Server identity block embedded in all system events.
/// </summary>
public sealed class EventServerInfo
{
    [JsonPropertyName("name")]
    public string Name { get; set; } = string.Empty;

    [JsonPropertyName("host")]
    [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
    public string? Host { get; set; }

    [JsonPropertyName("id")]
    public string Id { get; set; } = string.Empty;

    [JsonPropertyName("cluster")]
    [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
    public string? Cluster { get; set; }

    [JsonPropertyName("domain")]
    [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
    public string? Domain { get; set; }

    [JsonPropertyName("ver")]
    [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
    public string? Version { get; set; }

    [JsonPropertyName("seq")]
    [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
    public ulong Seq { get; set; }

    [JsonPropertyName("tags")]
    [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
    public Dictionary<string, string>? Tags { get; set; }
}

/// <summary>
/// Client identity block for connect/disconnect events.
/// </summary>
public sealed class EventClientInfo
{
    [JsonPropertyName("start")]
    [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
    public DateTime Start { get; set; }

    [JsonPropertyName("stop")]
    [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
    public DateTime Stop { get; set; }

    [JsonPropertyName("host")]
    [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
    public string? Host { get; set; }

    [JsonPropertyName("id")]
    public ulong Id { get; set; }

    [JsonPropertyName("acc")]
    [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
    public string? Account { get; set; }

    [JsonPropertyName("name")]
    [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
    public string? Name { get; set; }

    [JsonPropertyName("lang")]
    [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
    public string? Lang { get; set; }

    [JsonPropertyName("ver")]
    [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
    public string? Version { get; set; }

    [JsonPropertyName("rtt")]
    [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
    public long RttNanos { get; set; }
}

public sealed class DataStats
{
    [JsonPropertyName("msgs")]
    public long Msgs { get; set; }

    [JsonPropertyName("bytes")]
    public long Bytes { get; set; }
}

/// <summary>Client connect advisory. Go events.go:155-160.</summary>
public sealed class ConnectEventMsg
{
    public const string EventType = "io.nats.server.advisory.v1.client_connect";

    [JsonPropertyName("type")]
    public string Type { get; set; } = EventType;

    [JsonPropertyName("id")]
    public string Id { get; set; } = string.Empty;

    [JsonPropertyName("timestamp")]
    public DateTime Time { get; set; }

    [JsonPropertyName("server")]
    public EventServerInfo Server { get; set; } = new();

    [JsonPropertyName("client")]
    public EventClientInfo Client { get; set; } = new();
}

/// <summary>Client disconnect advisory. Go events.go:167-174.</summary>
public sealed class DisconnectEventMsg
{
    public const string EventType = "io.nats.server.advisory.v1.client_disconnect";

    [JsonPropertyName("type")]
    public string Type { get; set; } = EventType;

    [JsonPropertyName("id")]
    public string Id { get; set; } = string.Empty;

    [JsonPropertyName("timestamp")]
    public DateTime Time { get; set; }

    [JsonPropertyName("server")]
    public EventServerInfo Server { get; set; } = new();

    [JsonPropertyName("client")]
    public EventClientInfo Client { get; set; } = new();

    [JsonPropertyName("sent")]
    public DataStats Sent { get; set; } = new();

    [JsonPropertyName("received")]
    public DataStats Received { get; set; } = new();

    [JsonPropertyName("reason")]
    public string Reason { get; set; } = string.Empty;
}

/// <summary>Account connection count heartbeat. Go events.go:210-214.</summary>
public sealed class AccountNumConns
{
    public const string EventType = "io.nats.server.advisory.v1.account_connections";

    [JsonPropertyName("type")]
    public string Type { get; set; } = EventType;

    [JsonPropertyName("id")]
    public string Id { get; set; } = string.Empty;

    [JsonPropertyName("timestamp")]
    public DateTime Time { get; set; }

    [JsonPropertyName("server")]
    public EventServerInfo Server { get; set; } = new();

    [JsonPropertyName("acc")]
    public string AccountName { get; set; } = string.Empty;

    [JsonPropertyName("conns")]
    public int Connections { get; set; }

    [JsonPropertyName("total_conns")]
    public long TotalConnections { get; set; }

    [JsonPropertyName("subs")]
    public int Subscriptions { get; set; }

    [JsonPropertyName("sent")]
    public DataStats Sent { get; set; } = new();

    [JsonPropertyName("received")]
    public DataStats Received { get; set; } = new();
}

/// <summary>Server stats broadcast. Go events.go:150-153.</summary>
public sealed class ServerStatsMsg
{
    [JsonPropertyName("server")]
    public EventServerInfo Server { get; set; } = new();

    [JsonPropertyName("statsz")]
    public ServerStatsData Stats { get; set; } = new();
}

public sealed class ServerStatsData
{
    [JsonPropertyName("start")]
    [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
    public DateTime Start { get; set; }

    [JsonPropertyName("mem")]
    public long Mem { get; set; }

    [JsonPropertyName("cores")]
    [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
    public int Cores { get; set; }

    [JsonPropertyName("connections")]
    public int Connections { get; set; }

    [JsonPropertyName("total_connections")]
    public long TotalConnections { get; set; }

    [JsonPropertyName("active_accounts")]
    [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
    public int ActiveAccounts { get; set; }

    [JsonPropertyName("subscriptions")]
    public long Subscriptions { get; set; }

    [JsonPropertyName("in_msgs")]
    public long InMsgs { get; set; }

    [JsonPropertyName("out_msgs")]
    public long OutMsgs { get; set; }

    [JsonPropertyName("in_bytes")]
    public long InBytes { get; set; }

    [JsonPropertyName("out_bytes")]
    public long OutBytes { get; set; }

    [JsonPropertyName("slow_consumers")]
    [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
    public long SlowConsumers { get; set; }
}

/// <summary>Server shutdown notification.</summary>
public sealed class ShutdownEventMsg
{
    [JsonPropertyName("server")]
    public EventServerInfo Server { get; set; } = new();

    [JsonPropertyName("reason")]
    public string Reason { get; set; } = string.Empty;
}

/// <summary>Lame duck mode notification.</summary>
public sealed class LameDuckEventMsg
{
    [JsonPropertyName("server")]
    public EventServerInfo Server { get; set; } = new();
}

/// <summary>Auth error advisory.</summary>
public sealed class AuthErrorEventMsg
{
    public const string EventType = "io.nats.server.advisory.v1.client_auth";

    [JsonPropertyName("type")]
    public string Type { get; set; } = EventType;

    [JsonPropertyName("id")]
    public string Id { get; set; } = string.Empty;

    [JsonPropertyName("timestamp")]
    public DateTime Time { get; set; }

    [JsonPropertyName("server")]
    public EventServerInfo Server { get; set; } = new();

    [JsonPropertyName("client")]
    public EventClientInfo Client { get; set; } = new();

    [JsonPropertyName("reason")]
    public string Reason { get; set; } = string.Empty;
}

Step 4: Create JSON source generator context

Create src/NATS.Server/Events/EventJsonContext.cs:

using System.Text.Json.Serialization;

namespace NATS.Server.Events;

[JsonSerializable(typeof(ConnectEventMsg))]
[JsonSerializable(typeof(DisconnectEventMsg))]
[JsonSerializable(typeof(AccountNumConns))]
[JsonSerializable(typeof(ServerStatsMsg))]
[JsonSerializable(typeof(ShutdownEventMsg))]
[JsonSerializable(typeof(LameDuckEventMsg))]
[JsonSerializable(typeof(AuthErrorEventMsg))]
internal partial class EventJsonContext : JsonSerializerContext;

Step 5: Run tests

Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~EventSystemTests" -v minimal Expected: All 3 tests PASS.

Step 6: Commit

git add src/NATS.Server/Events/EventTypes.cs src/NATS.Server/Events/EventJsonContext.cs tests/NATS.Server.Tests/EventSystemTests.cs
git commit -m "feat: add system event DTOs and JSON source generator context"

Task 6: Create InternalEventSystem with send/receive loops

Files:

  • Create: src/NATS.Server/Events/InternalEventSystem.cs
  • Test: tests/NATS.Server.Tests/EventSystemTests.cs (add tests)

Step 1: Write the failing test

Add to tests/NATS.Server.Tests/EventSystemTests.cs:

[Fact]
public async Task InternalEventSystem_start_and_stop_lifecycle()
{
    using var server = CreateTestServer();
    _ = server.StartAsync(CancellationToken.None);
    await server.WaitForReadyAsync();

    var eventSystem = server.EventSystem;
    eventSystem.ShouldNotBeNull();
    eventSystem.SystemClient.ShouldNotBeNull();
    eventSystem.SystemClient.Kind.ShouldBe(ClientKind.System);

    await server.ShutdownAsync();
}

[Fact]
public async Task SendInternalMsg_delivers_to_system_subscriber()
{
    using var server = CreateTestServer();
    _ = server.StartAsync(CancellationToken.None);
    await server.WaitForReadyAsync();

    var received = new TaskCompletionSource<string>();
    server.EventSystem!.SysSubscribe("test.subject", (sub, client, acc, subject, reply, hdr, msg) =>
    {
        received.TrySetResult(subject);
    });

    server.SendInternalMsg("test.subject", null, new { Value = "hello" });

    var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
    result.ShouldBe("test.subject");

    await server.ShutdownAsync();
}

private static NatsServer CreateTestServer()
{
    var port = GetFreePort();
    return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance);
}

private static int GetFreePort()
{
    using var sock = new System.Net.Sockets.Socket(
        System.Net.Sockets.AddressFamily.InterNetwork,
        System.Net.Sockets.SocketType.Stream,
        System.Net.Sockets.ProtocolType.Tcp);
    sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0));
    return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port;
}

Step 2: Run test to verify it fails

Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~InternalEventSystem_start_and_stop" -v minimal Expected: FAIL — EventSystem property doesn't exist on NatsServer.

Step 3: Write InternalEventSystem

Create src/NATS.Server/Events/InternalEventSystem.cs:

using System.Collections.Concurrent;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using NATS.Server.Auth;
using NATS.Server.Subscriptions;

namespace NATS.Server.Events;

/// <summary>
/// Internal publish message queued for the send loop.
/// </summary>
public sealed class PublishMessage
{
    public InternalClient? Client { get; init; }
    public required string Subject { get; init; }
    public string? Reply { get; init; }
    public byte[]? Headers { get; init; }
    public object? Body { get; init; }
    public bool Echo { get; init; }
    public bool IsLast { get; init; }
}

/// <summary>
/// Internal received message queued for the receive loop.
/// </summary>
public sealed class InternalSystemMessage
{
    public required Subscription? Sub { get; init; }
    public required INatsClient? Client { get; init; }
    public required Account? Account { get; init; }
    public required string Subject { get; init; }
    public required string? Reply { get; init; }
    public required ReadOnlyMemory<byte> Headers { get; init; }
    public required ReadOnlyMemory<byte> Message { get; init; }
    public required SystemMessageHandler Callback { get; init; }
}

/// <summary>
/// Manages the server's internal event system with Channel-based send/receive loops.
/// Maps to Go's internal struct in events.go:124-147 and the goroutines
/// internalSendLoop (events.go:495) and internalReceiveLoop (events.go:476).
/// </summary>
public sealed class InternalEventSystem : IAsyncDisposable
{
    private readonly ILogger _logger;
    private readonly Channel<PublishMessage> _sendQueue;
    private readonly Channel<InternalSystemMessage> _receiveQueue;
    private readonly Channel<InternalSystemMessage> _receiveQueuePings;
    private readonly CancellationTokenSource _cts = new();

    private Task? _sendLoop;
    private Task? _receiveLoop;
    private Task? _receiveLoopPings;
    private NatsServer? _server;

    private ulong _sequence;
    private int _subscriptionId;

    public Account SystemAccount { get; }
    public InternalClient SystemClient { get; }
    public string ServerHash { get; }

    public InternalEventSystem(Account systemAccount, InternalClient systemClient, string serverName, ILogger logger)
    {
        _logger = logger;
        SystemAccount = systemAccount;
        SystemClient = systemClient;

        // Hash server name for inbox routing (matches Go's shash)
        ServerHash = Convert.ToHexString(SHA256.HashData(Encoding.UTF8.GetBytes(serverName)))[..8].ToLowerInvariant();

        _sendQueue = Channel.CreateUnbounded<PublishMessage>(new UnboundedChannelOptions { SingleReader = true });
        _receiveQueue = Channel.CreateUnbounded<InternalSystemMessage>(new UnboundedChannelOptions { SingleReader = true });
        _receiveQueuePings = Channel.CreateUnbounded<InternalSystemMessage>(new UnboundedChannelOptions { SingleReader = true });
    }

    public void Start(NatsServer server)
    {
        _server = server;
        var ct = _cts.Token;
        _sendLoop = Task.Run(() => InternalSendLoopAsync(ct), ct);
        _receiveLoop = Task.Run(() => InternalReceiveLoopAsync(_receiveQueue, ct), ct);
        _receiveLoopPings = Task.Run(() => InternalReceiveLoopAsync(_receiveQueuePings, ct), ct);
    }

    /// <summary>
    /// Creates a system subscription in the system account's SubList.
    /// Maps to Go's sysSubscribe in events.go:2796.
    /// </summary>
    public Subscription SysSubscribe(string subject, SystemMessageHandler callback)
    {
        var sid = Interlocked.Increment(ref _subscriptionId).ToString();
        var sub = new Subscription
        {
            Subject = subject,
            Sid = sid,
            Client = SystemClient,
        };

        // Wrap callback in noInlineCallback pattern: enqueue to receive loop
        SystemClient.MessageCallback = (subj, s, reply, hdr, msg) =>
        {
            _receiveQueue.Writer.TryWrite(new InternalSystemMessage
            {
                Sub = sub,
                Client = SystemClient,
                Account = SystemAccount,
                Subject = subj,
                Reply = reply,
                Headers = hdr,
                Message = msg,
                Callback = callback,
            });
        };

        SystemAccount.SubList.Insert(sub);
        return sub;
    }

    /// <summary>
    /// Enqueue an internal message for publishing through the send loop.
    /// </summary>
    public void Enqueue(PublishMessage message)
    {
        _sendQueue.Writer.TryWrite(message);
    }

    /// <summary>
    /// The send loop: serializes messages and delivers them via the server's routing.
    /// Maps to Go's internalSendLoop in events.go:495-668.
    /// </summary>
    private async Task InternalSendLoopAsync(CancellationToken ct)
    {
        try
        {
            await foreach (var pm in _sendQueue.Reader.ReadAllAsync(ct))
            {
                try
                {
                    var seq = Interlocked.Increment(ref _sequence);

                    // Serialize body to JSON
                    byte[] payload;
                    if (pm.Body is byte[] raw)
                    {
                        payload = raw;
                    }
                    else if (pm.Body != null)
                    {
                        payload = JsonSerializer.SerializeToUtf8Bytes(pm.Body, pm.Body.GetType(), EventJsonContext.Default);
                    }
                    else
                    {
                        payload = [];
                    }

                    // Deliver via the system account's SubList matching
                    var result = SystemAccount.SubList.Match(pm.Subject);

                    foreach (var sub in result.PlainSubs)
                    {
                        sub.Client?.SendMessage(pm.Subject, sub.Sid, pm.Reply,
                            pm.Headers ?? ReadOnlyMemory<byte>.Empty,
                            payload);
                    }

                    foreach (var queueGroup in result.QueueSubs)
                    {
                        if (queueGroup.Length == 0) continue;
                        var sub = queueGroup[0]; // Simple pick for internal
                        sub.Client?.SendMessage(pm.Subject, sub.Sid, pm.Reply,
                            pm.Headers ?? ReadOnlyMemory<byte>.Empty,
                            payload);
                    }

                    if (pm.IsLast)
                        break;
                }
                catch (Exception ex)
                {
                    _logger.LogWarning(ex, "Error in internal send loop processing message on {Subject}", pm.Subject);
                }
            }
        }
        catch (OperationCanceledException)
        {
            // Normal shutdown
        }
    }

    /// <summary>
    /// The receive loop: dispatches callbacks for internally-received messages.
    /// Maps to Go's internalReceiveLoop in events.go:476-491.
    /// </summary>
    private async Task InternalReceiveLoopAsync(Channel<InternalSystemMessage> queue, CancellationToken ct)
    {
        try
        {
            await foreach (var msg in queue.Reader.ReadAllAsync(ct))
            {
                try
                {
                    msg.Callback(msg.Sub, msg.Client, msg.Account, msg.Subject, msg.Reply, msg.Headers, msg.Message);
                }
                catch (Exception ex)
                {
                    _logger.LogWarning(ex, "Error in internal receive loop processing {Subject}", msg.Subject);
                }
            }
        }
        catch (OperationCanceledException)
        {
            // Normal shutdown
        }
    }

    public async ValueTask DisposeAsync()
    {
        await _cts.CancelAsync();
        _sendQueue.Writer.TryComplete();
        _receiveQueue.Writer.TryComplete();
        _receiveQueuePings.Writer.TryComplete();

        if (_sendLoop != null) await _sendLoop.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
        if (_receiveLoop != null) await _receiveLoop.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
        if (_receiveLoopPings != null) await _receiveLoopPings.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);

        _cts.Dispose();
    }
}

Step 4: Wire into NatsServer

In src/NATS.Server/NatsServer.cs:

Add field after _systemAccount (around line 32):

private InternalEventSystem? _eventSystem;

Add public property:

public InternalEventSystem? EventSystem => _eventSystem;

In the constructor (after _systemAccount creation around line 259), create and wire up the event system:

// Create system internal client
var sysClientId = Interlocked.Increment(ref _nextClientId);
var sysClient = new InternalClient(sysClientId, ClientKind.System, _systemAccount);
_eventSystem = new InternalEventSystem(
    _systemAccount, sysClient,
    options.ServerName ?? $"nats-dotnet-{Environment.MachineName}",
    _loggerFactory.CreateLogger<InternalEventSystem>());

In StartAsync (after listener is ready), add:

_eventSystem?.Start(this);

In ShutdownAsync (before closing clients, around line 95), add:

if (_eventSystem != null)
    await _eventSystem.DisposeAsync();

Add the SendInternalMsg API method:

public void SendInternalMsg(string subject, string? reply, object? msg)
{
    _eventSystem?.Enqueue(new PublishMessage { Subject = subject, Reply = reply, Body = msg });
}

public void SendInternalAccountMsg(Account account, string subject, object? msg)
{
    _eventSystem?.Enqueue(new PublishMessage { Subject = subject, Body = msg });
}

Step 5: Run tests

Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~EventSystemTests" -v minimal Expected: All event system tests PASS.

Step 6: Run full test suite

Run: dotnet test tests/NATS.Server.Tests -v minimal Expected: All tests PASS.

Step 7: Commit

git add src/NATS.Server/Events/InternalEventSystem.cs src/NATS.Server/NatsServer.cs tests/NATS.Server.Tests/EventSystemTests.cs
git commit -m "feat: add InternalEventSystem with Channel-based send/receive loops"

Task 7: Wire system event publishing (connect, disconnect, shutdown, stats)

Files:

  • Modify: src/NATS.Server/NatsServer.cs — add event publishing at lifecycle points
  • Modify: src/NATS.Server/NatsClient.cs — publish auth error on auth failure
  • Test: tests/NATS.Server.Tests/SystemEventsTests.cs

Step 1: Write the failing test

Create tests/NATS.Server.Tests/SystemEventsTests.cs:

using System.Text.Json;
using NATS.Server;
using NATS.Server.Events;
using Microsoft.Extensions.Logging.Abstractions;

namespace NATS.Server.Tests;

public class SystemEventsTests
{
    [Fact]
    public async Task Server_publishes_connect_event_on_client_auth()
    {
        using var server = CreateTestServer();
        _ = server.StartAsync(CancellationToken.None);
        await server.WaitForReadyAsync();

        var received = new TaskCompletionSource<string>();
        server.EventSystem!.SysSubscribe($"$SYS.ACCOUNT.*.CONNECT", (sub, client, acc, subject, reply, hdr, msg) =>
        {
            received.TrySetResult(subject);
        });

        // Connect a real client
        using var sock = new System.Net.Sockets.Socket(
            System.Net.Sockets.AddressFamily.InterNetwork,
            System.Net.Sockets.SocketType.Stream,
            System.Net.Sockets.ProtocolType.Tcp);
        await sock.ConnectAsync(System.Net.IPAddress.Loopback, server.Port);

        // Read INFO
        var buf = new byte[4096];
        await sock.ReceiveAsync(buf);

        // Send CONNECT
        var connect = System.Text.Encoding.ASCII.GetBytes("CONNECT {}\r\n");
        await sock.SendAsync(connect);

        var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
        result.ShouldStartWith("$SYS.ACCOUNT.");
        result.ShouldEndWith(".CONNECT");

        await server.ShutdownAsync();
    }

    [Fact]
    public async Task Server_publishes_disconnect_event_on_client_close()
    {
        using var server = CreateTestServer();
        _ = server.StartAsync(CancellationToken.None);
        await server.WaitForReadyAsync();

        var received = new TaskCompletionSource<string>();
        server.EventSystem!.SysSubscribe($"$SYS.ACCOUNT.*.DISCONNECT", (sub, client, acc, subject, reply, hdr, msg) =>
        {
            received.TrySetResult(subject);
        });

        // Connect and then disconnect
        using var sock = new System.Net.Sockets.Socket(
            System.Net.Sockets.AddressFamily.InterNetwork,
            System.Net.Sockets.SocketType.Stream,
            System.Net.Sockets.ProtocolType.Tcp);
        await sock.ConnectAsync(System.Net.IPAddress.Loopback, server.Port);
        var buf = new byte[4096];
        await sock.ReceiveAsync(buf);
        await sock.SendAsync(System.Text.Encoding.ASCII.GetBytes("CONNECT {}\r\n"));
        await Task.Delay(100);
        sock.Shutdown(System.Net.Sockets.SocketShutdown.Both);

        var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
        result.ShouldStartWith("$SYS.ACCOUNT.");
        result.ShouldEndWith(".DISCONNECT");

        await server.ShutdownAsync();
    }

    [Fact]
    public async Task Server_publishes_shutdown_event()
    {
        using var server = CreateTestServer();
        _ = server.StartAsync(CancellationToken.None);
        await server.WaitForReadyAsync();

        var received = new TaskCompletionSource<string>();
        server.EventSystem!.SysSubscribe($"$SYS.SERVER.*.SHUTDOWN", (sub, client, acc, subject, reply, hdr, msg) =>
        {
            received.TrySetResult(subject);
        });

        await server.ShutdownAsync();

        var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
        result.ShouldContain(".SHUTDOWN");
    }

    private static NatsServer CreateTestServer()
    {
        var port = GetFreePort();
        return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance);
    }

    private static int GetFreePort()
    {
        using var sock = new System.Net.Sockets.Socket(
            System.Net.Sockets.AddressFamily.InterNetwork,
            System.Net.Sockets.SocketType.Stream,
            System.Net.Sockets.ProtocolType.Tcp);
        sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0));
        return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port;
    }
}

Step 2: Run test to verify it fails

Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~Server_publishes_connect_event" -v minimal Expected: FAIL — no events being published yet.

Step 3: Add event publishing to NatsServer

Add a helper method to NatsServer that builds EventServerInfo:

private EventServerInfo BuildEventServerInfo()
{
    var seq = _eventSystem != null ? Interlocked.Increment(ref _eventSystem._sequence) : 0; // We'll need to expose this
    return new EventServerInfo
    {
        Name = _serverInfo.ServerName,
        Host = _options.Host,
        Id = _serverInfo.ServerId,
        Version = NatsProtocol.Version,
        Seq = seq,
        Tags = _options.Tags?.Count > 0 ? _options.Tags : null,
    };
}

Note: We need to expose sequence from InternalEventSystem. Add a public method public ulong NextSequence() => Interlocked.Increment(ref _sequence); to InternalEventSystem, and change _sequence visibility.

Publish connect event — in NatsClient.ProcessConnect(), after successful auth assignment (after the client is assigned to an account), have the router publish the connect event. Add to the NatsClient after the CONNECT is processed and account assigned — call a new method on the router/server.

Add to IMessageRouter:

void PublishConnectEvent(NatsClient client);
void PublishDisconnectEvent(NatsClient client);

In NatsServer, implement:

public void PublishConnectEvent(NatsClient client)
{
    if (_eventSystem == null) return;
    var accountName = client.Account?.Name ?? Account.GlobalAccountName;
    var subject = string.Format(EventSubjects.ConnectEvent, accountName);
    var evt = new ConnectEventMsg
    {
        Id = Guid.NewGuid().ToString("N"),
        Time = DateTime.UtcNow,
        Server = BuildEventServerInfo(),
        Client = BuildEventClientInfo(client),
    };
    SendInternalMsg(subject, null, evt);
}

public void PublishDisconnectEvent(NatsClient client)
{
    if (_eventSystem == null) return;
    var accountName = client.Account?.Name ?? Account.GlobalAccountName;
    var subject = string.Format(EventSubjects.DisconnectEvent, accountName);
    var evt = new DisconnectEventMsg
    {
        Id = Guid.NewGuid().ToString("N"),
        Time = DateTime.UtcNow,
        Server = BuildEventServerInfo(),
        Client = BuildEventClientInfo(client),
        Sent = new DataStats { Msgs = Interlocked.Read(ref client.OutMsgs), Bytes = Interlocked.Read(ref client.OutBytes) },
        Received = new DataStats { Msgs = Interlocked.Read(ref client.InMsgs), Bytes = Interlocked.Read(ref client.InBytes) },
        Reason = client.CloseReason.ToReasonString(),
    };
    SendInternalMsg(subject, null, evt);
}

private static EventClientInfo BuildEventClientInfo(NatsClient client)
{
    return new EventClientInfo
    {
        Id = client.Id,
        Host = client.RemoteIp,
        Account = client.Account?.Name,
        Name = client.ClientOpts?.Name,
        Lang = client.ClientOpts?.Lang,
        Version = client.ClientOpts?.Version,
        Start = client.StartTime,
        RttNanos = client.Rtt.Ticks * 100, // Ticks are 100ns
    };
}

Call PublishConnectEvent in the auth success path of NatsClient.ProcessConnect(). Call PublishDisconnectEvent in NatsServer.RemoveClient().

Publish shutdown event — in NatsServer.ShutdownAsync(), before disposing the event system:

var shutdownSubject = string.Format(EventSubjects.ServerShutdown, _serverInfo.ServerId);
_eventSystem?.Enqueue(new PublishMessage
{
    Subject = shutdownSubject,
    Body = new ShutdownEventMsg { Server = BuildEventServerInfo(), Reason = "Server Shutdown" },
    IsLast = true,
});
// Give send loop time to process the shutdown event
await Task.Delay(100);

Step 4: Run tests

Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~SystemEventsTests" -v minimal Expected: All system event tests PASS.

Step 5: Run full test suite

Run: dotnet test tests/NATS.Server.Tests -v minimal Expected: All tests PASS.

Step 6: Commit

git add src/NATS.Server/NatsServer.cs src/NATS.Server/NatsClient.cs src/NATS.Server/Events/InternalEventSystem.cs tests/NATS.Server.Tests/SystemEventsTests.cs
git commit -m "feat: wire system event publishing for connect, disconnect, and shutdown"

Task 8: Add periodic stats and account connection heartbeats

Files:

  • Modify: src/NATS.Server/Events/InternalEventSystem.cs — add timer loops
  • Modify: src/NATS.Server/NatsServer.cs — add stats gathering helpers
  • Test: tests/NATS.Server.Tests/SystemEventsTests.cs (add tests)

Step 1: Write the failing test

Add to tests/NATS.Server.Tests/SystemEventsTests.cs:

[Fact]
public async Task Server_publishes_statsz_periodically()
{
    using var server = CreateTestServer();
    _ = server.StartAsync(CancellationToken.None);
    await server.WaitForReadyAsync();

    var received = new TaskCompletionSource<string>();
    server.EventSystem!.SysSubscribe($"$SYS.SERVER.*.STATSZ", (sub, client, acc, subject, reply, hdr, msg) =>
    {
        received.TrySetResult(subject);
    });

    // Trigger a manual stats publish (don't wait 10s)
    server.EventSystem!.PublishServerStats();

    var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
    result.ShouldContain(".STATSZ");

    await server.ShutdownAsync();
}

Step 2: Run test to verify it fails

Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~Server_publishes_statsz" -v minimal Expected: FAIL — PublishServerStats doesn't exist.

Step 3: Add stats publishing to InternalEventSystem

Add to InternalEventSystem:

public void PublishServerStats()
{
    if (_server == null) return;

    var subject = string.Format(EventSubjects.ServerStats, _server.ServerId);
    var process = System.Diagnostics.Process.GetCurrentProcess();

    var statsMsg = new ServerStatsMsg
    {
        Server = _server.BuildEventServerInfo(),
        Stats = new ServerStatsData
        {
            Start = _server.StartTime,
            Mem = process.WorkingSet64,
            Cores = Environment.ProcessorCount,
            Connections = _server.ClientCount,
            TotalConnections = Interlocked.Read(ref _server.Stats.TotalConnections),
            Subscriptions = SystemAccount.SubList.Count,
            InMsgs = Interlocked.Read(ref _server.Stats.InMsgs),
            OutMsgs = Interlocked.Read(ref _server.Stats.OutMsgs),
            InBytes = Interlocked.Read(ref _server.Stats.InBytes),
            OutBytes = Interlocked.Read(ref _server.Stats.OutBytes),
            SlowConsumers = Interlocked.Read(ref _server.Stats.SlowConsumers),
        },
    };

    Enqueue(new PublishMessage { Subject = subject, Body = statsMsg });
}

Make BuildEventServerInfo() public on NatsServer (change from private to public).

Start a timer task in the Start method for periodic statsz:

_ = Task.Run(async () =>
{
    using var timer = new PeriodicTimer(TimeSpan.FromSeconds(10));
    while (await timer.WaitForNextTickAsync(ct))
    {
        PublishServerStats();
    }
}, ct);

Step 4: Run tests

Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~Server_publishes_statsz" -v minimal Expected: PASS.

Step 5: Commit

git add src/NATS.Server/Events/InternalEventSystem.cs src/NATS.Server/NatsServer.cs tests/NATS.Server.Tests/SystemEventsTests.cs
git commit -m "feat: add periodic server stats and account connection heartbeat publishing"

Task 9: Add system request-reply monitoring services

Files:

  • Modify: src/NATS.Server/Events/InternalEventSystem.cs — add InitEventTracking()
  • Modify: src/NATS.Server/NatsServer.cs — add request handlers
  • Test: tests/NATS.Server.Tests/SystemRequestReplyTests.cs

Step 1: Write the failing test

Create tests/NATS.Server.Tests/SystemRequestReplyTests.cs:

using System.Text;
using System.Text.Json;
using NATS.Server;
using NATS.Server.Events;
using NATS.Server.Monitoring;
using Microsoft.Extensions.Logging.Abstractions;

namespace NATS.Server.Tests;

public class SystemRequestReplyTests
{
    [Fact]
    public async Task Varz_request_reply_returns_server_info()
    {
        using var server = CreateTestServer();
        _ = server.StartAsync(CancellationToken.None);
        await server.WaitForReadyAsync();

        var received = new TaskCompletionSource<byte[]>();
        // Subscribe to the reply inbox
        var replySubject = $"_INBOX.test.{Guid.NewGuid():N}";
        server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) =>
        {
            received.TrySetResult(msg.ToArray());
        });

        // Publish request to $SYS.REQ.SERVER.{id}.VARZ
        var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "VARZ");
        server.SendInternalMsg(reqSubject, replySubject, null);

        var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
        var json = Encoding.UTF8.GetString(result);
        json.ShouldContain("\"server_id\"");

        await server.ShutdownAsync();
    }

    [Fact]
    public async Task Healthz_request_reply_returns_ok()
    {
        using var server = CreateTestServer();
        _ = server.StartAsync(CancellationToken.None);
        await server.WaitForReadyAsync();

        var received = new TaskCompletionSource<byte[]>();
        var replySubject = $"_INBOX.test.{Guid.NewGuid():N}";
        server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) =>
        {
            received.TrySetResult(msg.ToArray());
        });

        var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "HEALTHZ");
        server.SendInternalMsg(reqSubject, replySubject, null);

        var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
        var json = Encoding.UTF8.GetString(result);
        json.ShouldContain("ok");

        await server.ShutdownAsync();
    }

    private static NatsServer CreateTestServer()
    {
        var port = GetFreePort();
        return new NatsServer(new NatsOptions { Port = port, MonitorPort = GetFreePort() }, NullLoggerFactory.Instance);
    }

    private static int GetFreePort()
    {
        using var sock = new System.Net.Sockets.Socket(
            System.Net.Sockets.AddressFamily.InterNetwork,
            System.Net.Sockets.SocketType.Stream,
            System.Net.Sockets.ProtocolType.Tcp);
        sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0));
        return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port;
    }
}

Step 2: Run test to verify it fails

Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~Varz_request_reply" -v minimal Expected: FAIL — no request-reply handlers registered.

Step 3: Add InitEventTracking with request-reply subscriptions

Add method to InternalEventSystem:

public void InitEventTracking(NatsServer server)
{
    _server = server;
    var serverId = server.ServerId;

    // Server-specific monitoring services
    RegisterService(serverId, "VARZ", server.HandleVarzRequest);
    RegisterService(serverId, "CONNZ", server.HandleConnzRequest);
    RegisterService(serverId, "SUBSZ", server.HandleSubszRequest);
    RegisterService(serverId, "HEALTHZ", server.HandleHealthzRequest);
    RegisterService(serverId, "STATSZ", server.HandleStatszRequest);
    RegisterService(serverId, "IDZ", server.HandleIdzRequest);

    // Wildcard ping services (all servers respond)
    SysSubscribe(string.Format(EventSubjects.ServerPing, "VARZ"), WrapRequestHandler(server.HandleVarzRequest));
    SysSubscribe(string.Format(EventSubjects.ServerPing, "HEALTHZ"), WrapRequestHandler(server.HandleHealthzRequest));
    SysSubscribe(string.Format(EventSubjects.ServerPing, "IDZ"), WrapRequestHandler(server.HandleIdzRequest));
    SysSubscribe(string.Format(EventSubjects.ServerPing, "STATSZ"), WrapRequestHandler(server.HandleStatszRequest));
}

private void RegisterService(string serverId, string name, Action<string, string?> handler)
{
    var subject = string.Format(EventSubjects.ServerReq, serverId, name);
    SysSubscribe(subject, WrapRequestHandler(handler));
}

private SystemMessageHandler WrapRequestHandler(Action<string, string?> handler)
{
    return (sub, client, acc, subject, reply, hdr, msg) =>
    {
        handler(subject, reply);
    };
}

Add request handler methods to NatsServer:

public void HandleVarzRequest(string subject, string? reply)
{
    if (reply == null) return;
    // Reuse VarzHandler logic (we need to make it accessible)
    var varz = _varzHandler?.HandleVarzAsync(CancellationToken.None).GetAwaiter().GetResult();
    if (varz != null)
    {
        var json = JsonSerializer.SerializeToUtf8Bytes(varz);
        SendInternalMsg(reply, null, json);
    }
}

public void HandleHealthzRequest(string subject, string? reply)
{
    if (reply == null) return;
    var response = new { status = "ok" };
    SendInternalMsg(reply, null, response);
}

public void HandleConnzRequest(string subject, string? reply)
{
    if (reply == null) return;
    // Simplified connz response
    var connz = _connzHandler?.HandleConnz(null!); // Need adapter
    if (connz != null)
        SendInternalMsg(reply, null, connz);
}

public void HandleSubszRequest(string subject, string? reply)
{
    if (reply == null) return;
    SendInternalMsg(reply, null, new { num_subscriptions = SubList.Count });
}

public void HandleStatszRequest(string subject, string? reply)
{
    if (reply == null) return;
    _eventSystem?.PublishServerStats();
}

public void HandleIdzRequest(string subject, string? reply)
{
    if (reply == null) return;
    var idz = new
    {
        server_id = _serverInfo.ServerId,
        server_name = _serverInfo.ServerName,
        version = NatsProtocol.Version,
        host = _options.Host,
        port = _options.Port,
    };
    SendInternalMsg(reply, null, idz);
}

Wire _varzHandler as a field on NatsServer (extracted from MonitorServer or created directly).

Call _eventSystem.InitEventTracking(this) in StartAsync after the event system starts.

Step 4: Run tests

Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~SystemRequestReplyTests" -v minimal Expected: All request-reply tests PASS.

Step 5: Run full test suite

Run: dotnet test tests/NATS.Server.Tests -v minimal Expected: All tests PASS.

Step 6: Commit

git add src/NATS.Server/Events/InternalEventSystem.cs src/NATS.Server/NatsServer.cs tests/NATS.Server.Tests/SystemRequestReplyTests.cs
git commit -m "feat: add system request-reply monitoring services ($SYS.REQ.SERVER.*)"

Task 10: Create import/export model types

Files:

  • Create: src/NATS.Server/Imports/ServiceResponseType.cs
  • Create: src/NATS.Server/Imports/ExportAuth.cs
  • Create: src/NATS.Server/Imports/StreamExport.cs
  • Create: src/NATS.Server/Imports/ServiceExport.cs
  • Create: src/NATS.Server/Imports/StreamImport.cs
  • Create: src/NATS.Server/Imports/ServiceImport.cs
  • Create: src/NATS.Server/Imports/ExportMap.cs
  • Create: src/NATS.Server/Imports/ImportMap.cs
  • Create: src/NATS.Server/Imports/ServiceLatency.cs
  • Test: tests/NATS.Server.Tests/ImportExportTests.cs

Step 1: Write the failing test

Create tests/NATS.Server.Tests/ImportExportTests.cs:

using NATS.Server.Auth;
using NATS.Server.Imports;

namespace NATS.Server.Tests;

public class ImportExportTests
{
    [Fact]
    public void ExportAuth_public_export_authorizes_any_account()
    {
        var auth = new ExportAuth();
        var account = new Account("test");
        auth.IsAuthorized(account).ShouldBeTrue();
    }

    [Fact]
    public void ExportAuth_approved_accounts_restricts_access()
    {
        var auth = new ExportAuth { ApprovedAccounts = ["allowed"] };
        var allowed = new Account("allowed");
        var denied = new Account("denied");
        auth.IsAuthorized(allowed).ShouldBeTrue();
        auth.IsAuthorized(denied).ShouldBeFalse();
    }

    [Fact]
    public void ExportAuth_revoked_account_denied()
    {
        var auth = new ExportAuth
        {
            ApprovedAccounts = ["test"],
            RevokedAccounts = new() { ["test"] = DateTimeOffset.UtcNow.ToUnixTimeSeconds() },
        };
        var account = new Account("test");
        auth.IsAuthorized(account).ShouldBeFalse();
    }

    [Fact]
    public void ServiceResponseType_defaults_to_singleton()
    {
        var import = new ServiceImport
        {
            DestinationAccount = new Account("dest"),
            From = "requests.>",
            To = "api.>",
        };
        import.ResponseType.ShouldBe(ServiceResponseType.Singleton);
    }

    [Fact]
    public void ExportMap_stores_and_retrieves_exports()
    {
        var map = new ExportMap();
        map.Services["api.>"] = new ServiceExport { Account = new Account("svc") };
        map.Streams["events.>"] = new StreamExport();

        map.Services.ShouldContainKey("api.>");
        map.Streams.ShouldContainKey("events.>");
    }

    [Fact]
    public void ImportMap_stores_service_imports()
    {
        var map = new ImportMap();
        var si = new ServiceImport
        {
            DestinationAccount = new Account("dest"),
            From = "requests.>",
            To = "api.>",
        };
        map.AddServiceImport(si);
        map.Services.ShouldContainKey("requests.>");
        map.Services["requests.>"].Count.ShouldBe(1);
    }
}

Step 2: Run test to verify it fails

Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ImportExportTests" -v minimal Expected: FAIL — types don't exist.

Step 3: Create all import/export types

Create src/NATS.Server/Imports/ServiceResponseType.cs:

namespace NATS.Server.Imports;

/// <summary>
/// Response type for service exports. Maps to Go's ServiceRespType.
/// </summary>
public enum ServiceResponseType
{
    Singleton,
    Streamed,
    Chunked,
}

Create src/NATS.Server/Imports/ServiceLatency.cs:

namespace NATS.Server.Imports;

/// <summary>
/// Latency tracking configuration for a service export.
/// Maps to Go's serviceLatency in accounts.go:245-248.
/// </summary>
public sealed class ServiceLatency
{
    public int SamplingPercentage { get; init; } = 100;
    public string Subject { get; init; } = string.Empty;
}

Create src/NATS.Server/Imports/ExportAuth.cs:

using NATS.Server.Auth;

namespace NATS.Server.Imports;

/// <summary>
/// Authorization rules for an export.
/// Maps to Go's exportAuth in accounts.go:217-222.
/// </summary>
public sealed class ExportAuth
{
    public bool TokenRequired { get; init; }
    public uint AccountPosition { get; init; }
    public HashSet<string>? ApprovedAccounts { get; init; }
    public Dictionary<string, long>? RevokedAccounts { get; init; }

    public bool IsAuthorized(Account account)
    {
        // Check revocation first
        if (RevokedAccounts != null && RevokedAccounts.ContainsKey(account.Name))
            return false;

        // Public export (no restrictions)
        if (ApprovedAccounts == null && !TokenRequired && AccountPosition == 0)
            return true;

        // Check approved accounts
        if (ApprovedAccounts != null)
            return ApprovedAccounts.Contains(account.Name);

        return false;
    }
}

Create src/NATS.Server/Imports/StreamExport.cs:

namespace NATS.Server.Imports;

/// <summary>Maps to Go's streamExport in accounts.go:225-227.</summary>
public sealed class StreamExport
{
    public ExportAuth Auth { get; init; } = new();
}

Create src/NATS.Server/Imports/ServiceExport.cs:

using NATS.Server.Auth;

namespace NATS.Server.Imports;

/// <summary>Maps to Go's serviceExport in accounts.go:230-242.</summary>
public sealed class ServiceExport
{
    public ExportAuth Auth { get; init; } = new();
    public Account? Account { get; init; }
    public ServiceResponseType ResponseType { get; init; } = ServiceResponseType.Singleton;
    public TimeSpan ResponseThreshold { get; init; } = TimeSpan.FromMinutes(2);
    public ServiceLatency? Latency { get; init; }
    public bool AllowTrace { get; init; }
}

Create src/NATS.Server/Imports/StreamImport.cs:

using NATS.Server.Auth;
using NATS.Server.Subscriptions;

namespace NATS.Server.Imports;

/// <summary>Maps to Go's streamImport in accounts.go:142-155.</summary>
public sealed class StreamImport
{
    public required Account SourceAccount { get; init; }
    public required string From { get; init; }
    public required string To { get; init; }
    public SubjectTransform? Transform { get; init; }
    public bool UsePub { get; init; }
    public bool Invalid { get; set; }
}

Create src/NATS.Server/Imports/ServiceImport.cs:

using NATS.Server.Auth;
using NATS.Server.Subscriptions;

namespace NATS.Server.Imports;

/// <summary>Maps to Go's serviceImport in accounts.go:160-181.</summary>
public sealed class ServiceImport
{
    public required Account DestinationAccount { get; init; }
    public required string From { get; init; }
    public required string To { get; init; }
    public SubjectTransform? Transform { get; init; }
    public ServiceExport? Export { get; init; }
    public ServiceResponseType ResponseType { get; init; } = ServiceResponseType.Singleton;
    public byte[]? Sid { get; set; }
    public bool IsResponse { get; init; }
    public bool UsePub { get; init; }
    public bool Invalid { get; set; }
    public bool Share { get; init; }
    public bool Tracking { get; init; }
    public long TimestampTicks { get; set; }
}

Create src/NATS.Server/Imports/ExportMap.cs:

namespace NATS.Server.Imports;

/// <summary>Maps to Go's exportMap in accounts.go:251-255.</summary>
public sealed class ExportMap
{
    public Dictionary<string, StreamExport> Streams { get; } = new(StringComparer.Ordinal);
    public Dictionary<string, ServiceExport> Services { get; } = new(StringComparer.Ordinal);
    public Dictionary<string, ServiceImport> Responses { get; } = new(StringComparer.Ordinal);
}

Create src/NATS.Server/Imports/ImportMap.cs:

namespace NATS.Server.Imports;

/// <summary>Maps to Go's importMap in accounts.go:259-263.</summary>
public sealed class ImportMap
{
    public List<StreamImport> Streams { get; } = [];
    public Dictionary<string, List<ServiceImport>> Services { get; } = new(StringComparer.Ordinal);

    public void AddServiceImport(ServiceImport si)
    {
        if (!Services.TryGetValue(si.From, out var list))
        {
            list = [];
            Services[si.From] = list;
        }
        list.Add(si);
    }
}

Step 4: Run tests

Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ImportExportTests" -v minimal Expected: All 6 import/export tests PASS.

Step 5: Commit

git add src/NATS.Server/Imports/ tests/NATS.Server.Tests/ImportExportTests.cs
git commit -m "feat: add import/export model types (ServiceImport, StreamImport, exports, auth)"

Task 11: Add import/export support to Account and ACCOUNT client

Files:

  • Modify: src/NATS.Server/Auth/Account.cs — add exports, imports, internal client
  • Modify: src/NATS.Server/Subscriptions/Subscription.cs — add ServiceImport/StreamImport references
  • Test: tests/NATS.Server.Tests/ImportExportTests.cs (add tests)

Step 1: Write the failing test

Add to tests/NATS.Server.Tests/ImportExportTests.cs:

[Fact]
public void Account_add_service_export_and_import()
{
    var exporter = new Account("exporter");
    var importer = new Account("importer");

    exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null);
    exporter.Exports.Services.ShouldContainKey("api.>");

    var si = importer.AddServiceImport(exporter, "requests.>", "api.>");
    si.ShouldNotBeNull();
    si.From.ShouldBe("requests.>");
    si.To.ShouldBe("api.>");
    si.DestinationAccount.ShouldBe(exporter);
    importer.Imports.Services.ShouldContainKey("requests.>");
}

[Fact]
public void Account_add_stream_export_and_import()
{
    var exporter = new Account("exporter");
    var importer = new Account("importer");

    exporter.AddStreamExport("events.>", null);
    exporter.Exports.Streams.ShouldContainKey("events.>");

    importer.AddStreamImport(exporter, "events.>", "imported.events.>");
    importer.Imports.Streams.Count.ShouldBe(1);
    importer.Imports.Streams[0].From.ShouldBe("events.>");
    importer.Imports.Streams[0].To.ShouldBe("imported.events.>");
}

[Fact]
public void Account_service_import_auth_rejected()
{
    var exporter = new Account("exporter");
    var importer = new Account("importer");

    exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, [new Account("other")]);

    Should.Throw<UnauthorizedAccessException>(() =>
        importer.AddServiceImport(exporter, "requests.>", "api.>"));
}

[Fact]
public void Account_lazy_creates_internal_client()
{
    var account = new Account("test");
    var client = account.GetOrCreateInternalClient(99);
    client.ShouldNotBeNull();
    client.Kind.ShouldBe(ClientKind.Account);
    client.Account.ShouldBe(account);

    // Second call returns same instance
    var client2 = account.GetOrCreateInternalClient(100);
    client2.ShouldBeSameAs(client);
}

Step 2: Run test to verify it fails

Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~Account_add_service_export" -v minimal Expected: FAIL — methods don't exist on Account.

Step 3: Extend Account

Modify src/NATS.Server/Auth/Account.cs — add imports, exports, internal client:

using NATS.Server.Imports;
using NATS.Server.Subscriptions;

// Add these fields/properties to Account class:

public ExportMap Exports { get; } = new();
public ImportMap Imports { get; } = new();

private InternalClient? _internalClient;
private ulong _internalSubId;

public InternalClient GetOrCreateInternalClient(ulong clientId)
{
    if (_internalClient != null) return _internalClient;
    _internalClient = new InternalClient(clientId, ClientKind.Account, this);
    return _internalClient;
}

public void AddServiceExport(string subject, ServiceResponseType responseType, IEnumerable<Account>? approved)
{
    var auth = new ExportAuth
    {
        ApprovedAccounts = approved != null ? new HashSet<string>(approved.Select(a => a.Name)) : null,
    };
    Exports.Services[subject] = new ServiceExport
    {
        Auth = auth,
        Account = this,
        ResponseType = responseType,
    };
}

public void AddStreamExport(string subject, IEnumerable<Account>? approved)
{
    var auth = new ExportAuth
    {
        ApprovedAccounts = approved != null ? new HashSet<string>(approved.Select(a => a.Name)) : null,
    };
    Exports.Streams[subject] = new StreamExport { Auth = auth };
}

public ServiceImport AddServiceImport(Account destination, string from, string to)
{
    // Check authorization
    if (!destination.Exports.Services.TryGetValue(to, out var export))
        throw new InvalidOperationException($"No service export found for '{to}' on account '{destination.Name}'");

    if (!export.Auth.IsAuthorized(this))
        throw new UnauthorizedAccessException($"Account '{Name}' not authorized to import '{to}' from '{destination.Name}'");

    var si = new ServiceImport
    {
        DestinationAccount = destination,
        From = from,
        To = to,
        Export = export,
        ResponseType = export.ResponseType,
    };

    Imports.AddServiceImport(si);
    return si;
}

public void AddStreamImport(Account source, string from, string to)
{
    // Check authorization
    if (!source.Exports.Streams.TryGetValue(from, out var export))
        throw new InvalidOperationException($"No stream export found for '{from}' on account '{source.Name}'");

    if (!export.Auth.IsAuthorized(this))
        throw new UnauthorizedAccessException($"Account '{Name}' not authorized to import '{from}' from '{source.Name}'");

    var si = new StreamImport
    {
        SourceAccount = source,
        From = from,
        To = to,
    };

    Imports.Streams.Add(si);
}

Step 4: Add ServiceImport/StreamImport refs to Subscription

Modify src/NATS.Server/Subscriptions/Subscription.cs:

using NATS.Server;
using NATS.Server.Imports;

namespace NATS.Server.Subscriptions;

public sealed class Subscription
{
    public required string Subject { get; init; }
    public string? Queue { get; init; }
    public required string Sid { get; init; }
    public long MessageCount;
    public long MaxMessages;
    public INatsClient? Client { get; set; }
    public ServiceImport? ServiceImport { get; set; }
    public StreamImport? StreamImport { get; set; }
}

Step 5: Run tests

Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ImportExportTests" -v minimal Expected: All import/export tests PASS.

Step 6: Run full test suite

Run: dotnet test tests/NATS.Server.Tests -v minimal Expected: All tests PASS.

Step 7: Commit

git add src/NATS.Server/Auth/Account.cs src/NATS.Server/Subscriptions/Subscription.cs tests/NATS.Server.Tests/ImportExportTests.cs
git commit -m "feat: add import/export support to Account with ACCOUNT client lazy creation"

Task 12: Wire service import into message delivery path

Files:

  • Modify: src/NATS.Server/NatsServer.cs — modify ProcessMessage and DeliverMessage for imports
  • Test: tests/NATS.Server.Tests/ImportExportTests.cs (add integration test)

Step 1: Write the failing test

Add to tests/NATS.Server.Tests/ImportExportTests.cs:

[Fact]
public async Task Service_import_forwards_message_to_export_account()
{
    using var server = CreateTestServer();
    _ = server.StartAsync(CancellationToken.None);
    await server.WaitForReadyAsync();

    // Set up exporter and importer accounts
    var exporter = server.GetOrCreateAccount("exporter");
    var importer = server.GetOrCreateAccount("importer");

    exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null);
    importer.AddServiceImport(exporter, "requests.>", "api.>");

    // Wire the import subscription into the importer's SubList
    server.WireServiceImports(importer);

    // Subscribe in exporter account to receive forwarded message
    var received = new TaskCompletionSource<string>();
    var exportSub = new Subscription { Subject = "api.test", Sid = "1", Client = null };
    // We'll verify via the internal mechanism

    // TODO: Complete this integration test once ProcessServiceImport is wired

    await server.ShutdownAsync();
}

private static NatsServer CreateTestServer()
{
    var port = GetFreePort();
    return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance);
}

private static int GetFreePort()
{
    using var sock = new System.Net.Sockets.Socket(
        System.Net.Sockets.AddressFamily.InterNetwork,
        System.Net.Sockets.SocketType.Stream,
        System.Net.Sockets.ProtocolType.Tcp);
    sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0));
    return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port;
}

Step 2: Modify ProcessMessage for service imports

In NatsServer.ProcessMessage, after the existing delivery logic, add service import processing:

// Check for service imports that match this subject
if (sender.Account != null)
{
    foreach (var kvp in sender.Account.Imports.Services)
    {
        foreach (var si in kvp.Value)
        {
            if (si.Invalid) continue;
            if (SubjectMatch.Match(subject, si.From))
            {
                ProcessServiceImport(si, subject, replyTo, headers, payload, sender);
                delivered = true;
            }
        }
    }
}

Add the ProcessServiceImport method:

private void ProcessServiceImport(ServiceImport si, string subject, string? replyTo,
    ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload, INatsClient sender)
{
    // Transform subject
    var targetSubject = si.To;
    if (si.Transform != null)
    {
        var transformed = si.Transform.Apply(subject);
        if (transformed != null) targetSubject = transformed;
    }
    else if (si.UsePub)
    {
        targetSubject = subject;
    }

    // Match against destination account's SubList
    var destSubList = si.DestinationAccount.SubList;
    var result = destSubList.Match(targetSubject);

    // Deliver to destination subscribers
    foreach (var sub in result.PlainSubs)
    {
        if (sub.Client == null) continue;
        DeliverMessage(sub, targetSubject, replyTo, headers, payload);
    }

    foreach (var queueGroup in result.QueueSubs)
    {
        if (queueGroup.Length == 0) continue;
        var sub = queueGroup[0];
        if (sub.Client != null)
            DeliverMessage(sub, targetSubject, replyTo, headers, payload);
    }
}

Similarly, modify DeliverMessage for stream imports:

// In DeliverMessage, before client.SendMessage:
var deliverSubject = subject;
if (sub.StreamImport != null)
{
    if (sub.StreamImport.Transform != null)
    {
        var transformed = sub.StreamImport.Transform.Apply(subject);
        if (transformed != null) deliverSubject = transformed;
    }
    else if (!sub.StreamImport.UsePub)
    {
        deliverSubject = sub.StreamImport.To;
    }
}

client.SendMessage(deliverSubject, sub.Sid, replyTo, headers, payload);

Add WireServiceImports and GetOrCreateAccount helpers to NatsServer.

Step 3: Run tests

Run: dotnet test tests/NATS.Server.Tests -v minimal Expected: All tests PASS.

Step 4: Commit

git add src/NATS.Server/NatsServer.cs tests/NATS.Server.Tests/ImportExportTests.cs
git commit -m "feat: wire service import forwarding and stream import transforms into message delivery"

Task 13: Add response routing for service imports

Files:

  • Modify: src/NATS.Server/NatsServer.cs — response import creation and delivery
  • Create: tests/NATS.Server.Tests/ResponseRoutingTests.cs

Step 1: Write the failing test

Create tests/NATS.Server.Tests/ResponseRoutingTests.cs:

using NATS.Server;
using NATS.Server.Auth;
using NATS.Server.Imports;
using Microsoft.Extensions.Logging.Abstractions;

namespace NATS.Server.Tests;

public class ResponseRoutingTests
{
    [Fact]
    public void GenerateReplyPrefix_creates_unique_prefix()
    {
        var prefix1 = ResponseRouter.GenerateReplyPrefix();
        var prefix2 = ResponseRouter.GenerateReplyPrefix();

        prefix1.ShouldStartWith("_R_.");
        prefix2.ShouldStartWith("_R_.");
        prefix1.ShouldNotBe(prefix2);
        prefix1.Length.ShouldBeGreaterThan(4);
    }

    [Fact]
    public void Singleton_response_import_removed_after_delivery()
    {
        var exporter = new Account("exporter");
        exporter.AddServiceExport("api.test", ServiceResponseType.Singleton, null);

        var replyPrefix = ResponseRouter.GenerateReplyPrefix();
        var responseSi = new ServiceImport
        {
            DestinationAccount = exporter,
            From = replyPrefix + ">",
            To = "_INBOX.original.reply",
            IsResponse = true,
            ResponseType = ServiceResponseType.Singleton,
        };
        exporter.Exports.Responses[replyPrefix] = responseSi;

        exporter.Exports.Responses.ShouldContainKey(replyPrefix);

        // Simulate singleton delivery cleanup
        ResponseRouter.CleanupResponse(exporter, replyPrefix, responseSi);

        exporter.Exports.Responses.ShouldNotContainKey(replyPrefix);
    }
}

Step 2: Create ResponseRouter

Create src/NATS.Server/Imports/ResponseRouter.cs:

using System.Security.Cryptography;
using NATS.Server.Auth;

namespace NATS.Server.Imports;

/// <summary>
/// Handles response routing for service imports.
/// Maps to Go's service reply prefix generation and response cleanup.
/// </summary>
public static class ResponseRouter
{
    private static readonly char[] Base62 = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789".ToCharArray();

    public static string GenerateReplyPrefix()
    {
        Span<byte> bytes = stackalloc byte[10];
        RandomNumberGenerator.Fill(bytes);
        var chars = new char[10];
        for (int i = 0; i < 10; i++)
            chars[i] = Base62[bytes[i] % 62];
        return $"_R_.{new string(chars)}.";
    }

    public static ServiceImport CreateResponseImport(
        Account exporterAccount,
        ServiceImport originalImport,
        string originalReply)
    {
        var replyPrefix = GenerateReplyPrefix();

        var responseSi = new ServiceImport
        {
            DestinationAccount = exporterAccount,
            From = replyPrefix + ">",
            To = originalReply,
            IsResponse = true,
            ResponseType = originalImport.ResponseType,
            Export = originalImport.Export,
            TimestampTicks = DateTime.UtcNow.Ticks,
        };

        exporterAccount.Exports.Responses[replyPrefix] = responseSi;
        return responseSi;
    }

    public static void CleanupResponse(Account account, string replyPrefix, ServiceImport responseSi)
    {
        account.Exports.Responses.Remove(replyPrefix);
        // Could also remove subscription from SubList if one was created
    }
}

Step 3: Run tests

Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~ResponseRoutingTests" -v minimal Expected: All response routing tests PASS.

Step 4: Commit

git add src/NATS.Server/Imports/ResponseRouter.cs tests/NATS.Server.Tests/ResponseRoutingTests.cs
git commit -m "feat: add response routing for service import request-reply patterns"

Task 14: Add latency tracking for service imports

Files:

  • Modify: src/NATS.Server/Imports/ServiceImport.cs — add latency measurement fields
  • Create: src/NATS.Server/Imports/LatencyTracker.cs
  • Test: tests/NATS.Server.Tests/ResponseRoutingTests.cs (add tests)

Step 1: Write the failing test

Add to tests/NATS.Server.Tests/ResponseRoutingTests.cs:

[Fact]
public void LatencyTracker_should_sample_respects_percentage()
{
    var latency = new ServiceLatency { SamplingPercentage = 0, Subject = "latency.test" };
    LatencyTracker.ShouldSample(latency).ShouldBeFalse();

    var latency100 = new ServiceLatency { SamplingPercentage = 100, Subject = "latency.test" };
    LatencyTracker.ShouldSample(latency100).ShouldBeTrue();
}

[Fact]
public void LatencyTracker_builds_latency_message()
{
    var msg = LatencyTracker.BuildLatencyMsg("requester", "responder",
        TimeSpan.FromMilliseconds(5), TimeSpan.FromMilliseconds(10));

    msg.Requestor.ShouldBe("requester");
    msg.Responder.ShouldBe("responder");
    msg.ServiceLatencyNanos.ShouldBeGreaterThan(0);
    msg.TotalLatencyNanos.ShouldBeGreaterThan(0);
}

Step 2: Create LatencyTracker

Create src/NATS.Server/Imports/LatencyTracker.cs:

using System.Text.Json.Serialization;

namespace NATS.Server.Imports;

public sealed class ServiceLatencyMsg
{
    [JsonPropertyName("type")]
    public string Type { get; set; } = "io.nats.server.metric.v1.service_latency";

    [JsonPropertyName("requestor")]
    public string Requestor { get; set; } = string.Empty;

    [JsonPropertyName("responder")]
    public string Responder { get; set; } = string.Empty;

    [JsonPropertyName("status")]
    public int Status { get; set; } = 200;

    [JsonPropertyName("svc_latency")]
    public long ServiceLatencyNanos { get; set; }

    [JsonPropertyName("total_latency")]
    public long TotalLatencyNanos { get; set; }
}

public static class LatencyTracker
{
    public static bool ShouldSample(ServiceLatency latency)
    {
        if (latency.SamplingPercentage <= 0) return false;
        if (latency.SamplingPercentage >= 100) return true;
        return Random.Shared.Next(100) < latency.SamplingPercentage;
    }

    public static ServiceLatencyMsg BuildLatencyMsg(
        string requestor, string responder,
        TimeSpan serviceLatency, TimeSpan totalLatency)
    {
        return new ServiceLatencyMsg
        {
            Requestor = requestor,
            Responder = responder,
            ServiceLatencyNanos = serviceLatency.Ticks * 100,
            TotalLatencyNanos = totalLatency.Ticks * 100,
        };
    }
}

Step 3: Run tests

Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~LatencyTracker" -v minimal Expected: All latency tests PASS.

Step 4: Commit

git add src/NATS.Server/Imports/LatencyTracker.cs tests/NATS.Server.Tests/ResponseRoutingTests.cs
git commit -m "feat: add latency tracking for service import request-reply"

Task 15: Update differences.md

Files:

  • Modify: differences.md

Step 1: Run full test suite first

Run: dotnet test tests/NATS.Server.Tests -v minimal Expected: All tests PASS.

Step 2: Update differences.md

In the Connection Types table (lines 60-71), change:

| SYSTEM (internal) | Y | N | |

to:

| SYSTEM (internal) | Y | Y | InternalClient + InternalEventSystem with Channel-based send/receive loops |

And:

| ACCOUNT (internal) | Y | N | |

to:

| ACCOUNT (internal) | Y | Y | Lazy per-account InternalClient with import/export subscription support |

In the Protocol Parsing Gaps table (line 137), optionally note that multi-client-type awareness is now partially implemented.

In the Account System table (line 221), change:

| Account exports/imports | Y | N | |

to:

| Account exports/imports | Y | Y | ServiceImport/StreamImport with ExportAuth, subject transforms, response routing |

Add to the "Resolved Since Initial Audit" section:

- **SYSTEM client type** — InternalClient with InternalEventSystem, Channel-based send/receive loops, event publishing
- **ACCOUNT client type** — lazy per-account InternalClient with import/export subscription support
- **System event publishing** — connect/disconnect advisories, server stats, shutdown/lame-duck events, auth errors
- **System request-reply services** — $SYS.REQ.SERVER.*.VARZ/CONNZ/SUBSZ/HEALTHZ/IDZ/STATSZ with ping wildcards
- **Account exports/imports** — service and stream imports with ExportAuth, subject transforms, response routing, latency tracking

Remove SYSTEM/ACCOUNT from any "Remaining" gaps sections.

Step 3: Commit

git add differences.md
git commit -m "docs: update differences.md to reflect SYSTEM/ACCOUNT types and imports/exports implemented"

Task 16: Final verification — full test suite and build

Step 1: Clean build

Run: dotnet clean && dotnet build Expected: BUILD SUCCEEDED with no warnings.

Step 2: Full test suite

Run: dotnet test tests/NATS.Server.Tests -v normal Expected: All tests PASS.

Step 3: Verify no regressions

Run: dotnet test -v minimal Expected: All projects pass.

Step 4: Final commit (if any cleanup needed)

git status
# If clean, nothing to commit. Otherwise:
git add -A && git commit -m "chore: final cleanup for SYSTEM/ACCOUNT connection types"