Files
natsdotnet/docs/plans/2026-02-22-harden-base-server-plan.md
Joseph Doherty 16b8f9e2e2 docs: add implementation plan for hardening base server
Four tasks: -ERR infrastructure, MaxConnections enforcement,
pedantic subject validation + max payload on PUB, PING keepalive.
Full TDD steps with exact code and commands.
2026-02-22 21:29:10 -05:00

26 KiB

Harden Base Server Implementation Plan

For Claude: REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans to implement this plan task-by-task.

Goal: Add -ERR responses, MaxConnections enforcement, pedantic subject validation on PUB, max payload validation, and server-side PING keepalive to the base NATS server.

Architecture: Four incremental features building on the existing server. Task 1 (-ERR infrastructure) is a prerequisite for all others. Tasks 2-4 are independent of each other once Task 1 is done.

Tech Stack: .NET 10 / C# 14, xUnit 3, Shouldly, System.IO.Pipelines, System.Threading.PeriodicTimer


Task 1: -ERR Response Infrastructure

Files:

  • Modify: src/NATS.Server/Protocol/NatsProtocol.cs:5-22
  • Modify: src/NATS.Server/NatsClient.cs:24-298
  • Modify: tests/NATS.Server.Tests/ClientTests.cs

Step 1: Write the failing tests

Add to tests/NATS.Server.Tests/ClientTests.cs:

[Fact]
public async Task Client_SendErrAsync_writes_correct_wire_format()
{
    var runTask = _natsClient.RunAsync(_cts.Token);

    // Read INFO first
    var buf = new byte[4096];
    await _clientSocket.ReceiveAsync(buf, SocketFlags.None);

    // Trigger SendErrAsync
    await _natsClient.SendErrAsync("Invalid Subject");

    var n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None);
    var response = Encoding.ASCII.GetString(buf, 0, n);

    response.ShouldBe("-ERR 'Invalid Subject'\r\n");

    await _cts.CancelAsync();
}

[Fact]
public async Task Client_SendErrAndCloseAsync_sends_error_then_disconnects()
{
    var runTask = _natsClient.RunAsync(_cts.Token);

    // Read INFO first
    var buf = new byte[4096];
    await _clientSocket.ReceiveAsync(buf, SocketFlags.None);

    // Trigger SendErrAndCloseAsync
    await _natsClient.SendErrAndCloseAsync("maximum connections exceeded");

    var n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None);
    var response = Encoding.ASCII.GetString(buf, 0, n);

    response.ShouldBe("-ERR 'maximum connections exceeded'\r\n");

    // Connection should be closed — next read returns 0
    n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None);
    n.ShouldBe(0);
}

Step 2: Run tests to verify they fail

Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~Client_SendErr" -v normal Expected: Build fails — SendErrAsync and SendErrAndCloseAsync don't exist.

Step 3: Add error message constants to NatsProtocol.cs

In src/NATS.Server/Protocol/NatsProtocol.cs, add after line 21 (after ErrPrefix):

// Standard error messages (matching Go server)
public const string ErrMaxConnectionsExceeded = "maximum connections exceeded";
public const string ErrStaleConnection = "Stale Connection";
public const string ErrMaxPayloadViolation = "Maximum Payload Violation";
public const string ErrInvalidPublishSubject = "Invalid Publish Subject";
public const string ErrInvalidSubject = "Invalid Subject";

Step 4: Add a linked CancellationTokenSource to NatsClient for self-close

In src/NATS.Server/NatsClient.cs, add a field after the _writeLock field (line 31):

private CancellationTokenSource? _clientCts;

Modify RunAsync (line 59-85) so it creates a linked CTS and uses it internally. The linked CTS lets the client cancel itself while still respecting server shutdown:

public async Task RunAsync(CancellationToken ct)
{
    _clientCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
    var pipe = new Pipe();
    try
    {
        // Send INFO
        await SendInfoAsync(_clientCts.Token);

        // Start read pump and command processing in parallel
        var fillTask = FillPipeAsync(pipe.Writer, _clientCts.Token);
        var processTask = ProcessCommandsAsync(pipe.Reader, _clientCts.Token);

        await Task.WhenAny(fillTask, processTask);
    }
    catch (OperationCanceledException)
    {
        _logger.LogDebug("Client {ClientId} operation cancelled", Id);
    }
    catch (Exception ex)
    {
        _logger.LogDebug(ex, "Client {ClientId} connection error", Id);
    }
    finally
    {
        Router?.RemoveClient(this);
    }
}

Step 5: Add SendErrAsync and SendErrAndCloseAsync to NatsClient

Add after WriteAsync (after line 283):

public async Task SendErrAsync(string message)
{
    var errLine = Encoding.ASCII.GetBytes($"-ERR '{message}'\r\n");
    try
    {
        await WriteAsync(errLine, _clientCts?.Token ?? CancellationToken.None);
    }
    catch (Exception ex)
    {
        _logger.LogDebug(ex, "Client {ClientId} failed to send -ERR", Id);
    }
}

public async Task SendErrAndCloseAsync(string message)
{
    await SendErrAsync(message);
    _clientCts?.Cancel();
}

Step 6: Dispose the linked CTS

Update Dispose() (line 292-297):

public void Dispose()
{
    _clientCts?.Dispose();
    _stream.Dispose();
    _socket.Dispose();
    _writeLock.Dispose();
}

Step 7: Run tests to verify they pass

Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~Client_SendErr" -v normal Expected: Both tests PASS.

Step 8: Run all existing tests to verify no regressions

Run: dotnet test tests/NATS.Server.Tests -v normal Expected: All tests PASS.

Step 9: Commit

git add src/NATS.Server/Protocol/NatsProtocol.cs src/NATS.Server/NatsClient.cs tests/NATS.Server.Tests/ClientTests.cs
git commit -m "feat: add -ERR response infrastructure with SendErrAsync and SendErrAndCloseAsync"

Task 2: MaxConnections Enforcement

Files:

  • Modify: src/NATS.Server/NatsServer.cs:42-75
  • Modify: tests/NATS.Server.Tests/ServerTests.cs

Step 1: Write the failing test

Add to tests/NATS.Server.Tests/ServerTests.cs. This test needs its own server with a low MaxConnections. Add a new test class at the end of the file:

public class MaxConnectionsTests : IAsyncLifetime
{
    private readonly NatsServer _server;
    private readonly int _port;
    private readonly CancellationTokenSource _cts = new();

    public MaxConnectionsTests()
    {
        _port = GetFreePort();
        _server = new NatsServer(new NatsOptions { Port = _port, MaxConnections = 2 }, NullLoggerFactory.Instance);
    }

    public async Task InitializeAsync()
    {
        _ = _server.StartAsync(_cts.Token);
        await _server.WaitForReadyAsync();
    }

    public async Task DisposeAsync()
    {
        await _cts.CancelAsync();
        _server.Dispose();
    }

    private static int GetFreePort()
    {
        using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        sock.Bind(new IPEndPoint(IPAddress.Loopback, 0));
        return ((IPEndPoint)sock.LocalEndPoint!).Port;
    }

    [Fact]
    public async Task Server_rejects_connection_when_max_reached()
    {
        // Connect two clients (at limit)
        var client1 = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        await client1.ConnectAsync(IPAddress.Loopback, _port);
        var buf = new byte[4096];
        var n = await client1.ReceiveAsync(buf, SocketFlags.None);
        Encoding.ASCII.GetString(buf, 0, n).ShouldStartWith("INFO ");

        var client2 = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        await client2.ConnectAsync(IPAddress.Loopback, _port);
        n = await client2.ReceiveAsync(buf, SocketFlags.None);
        Encoding.ASCII.GetString(buf, 0, n).ShouldStartWith("INFO ");

        // Third client should be rejected
        var client3 = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        await client3.ConnectAsync(IPAddress.Loopback, _port);

        n = await client3.ReceiveAsync(buf, SocketFlags.None);
        var response = Encoding.ASCII.GetString(buf, 0, n);
        response.ShouldContain("-ERR 'maximum connections exceeded'");

        // Connection should be closed
        n = await client3.ReceiveAsync(buf, SocketFlags.None);
        n.ShouldBe(0);

        client1.Dispose();
        client2.Dispose();
        client3.Dispose();
    }
}

Step 2: Run test to verify it fails

Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~Server_rejects_connection_when_max_reached" -v normal Expected: FAIL — third client gets INFO instead of -ERR.

Step 3: Add MaxConnections check in the accept loop

In src/NATS.Server/NatsServer.cs, modify the accept loop (lines 56-69). Insert the check after AcceptAsync and before creating the NatsClient:

while (!ct.IsCancellationRequested)
{
    var socket = await _listener.AcceptAsync(ct);

    // Check MaxConnections before creating the client
    if (_options.MaxConnections > 0 && _clients.Count >= _options.MaxConnections)
    {
        _logger.LogWarning("Client connection rejected: maximum connections ({MaxConnections}) exceeded",
            _options.MaxConnections);
        try
        {
            var stream = new NetworkStream(socket, ownsSocket: false);
            var errBytes = Encoding.ASCII.GetBytes(
                $"-ERR '{NatsProtocol.ErrMaxConnectionsExceeded}'\r\n");
            await stream.WriteAsync(errBytes, ct);
            await stream.FlushAsync(ct);
            stream.Dispose();
        }
        catch (Exception ex)
        {
            _logger.LogDebug(ex, "Failed to send -ERR to rejected client");
        }
        finally
        {
            socket.Dispose();
        }
        continue;
    }

    var clientId = Interlocked.Increment(ref _nextClientId);

    _logger.LogDebug("Client {ClientId} connected from {RemoteEndpoint}", clientId, socket.RemoteEndPoint);

    var clientLogger = _loggerFactory.CreateLogger($"NATS.Server.NatsClient[{clientId}]");
    var client = new NatsClient(clientId, socket, _options, _serverInfo, clientLogger);
    client.Router = this;
    _clients[clientId] = client;

    _ = RunClientAsync(client, ct);
}

Add using System.Text; to the top of NatsServer.cs if not already present.

Step 4: Run test to verify it passes

Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~Server_rejects_connection_when_max_reached" -v normal Expected: PASS.

Step 5: Run all tests

Run: dotnet test tests/NATS.Server.Tests -v normal Expected: All tests PASS.

Step 6: Commit

git add src/NATS.Server/NatsServer.cs tests/NATS.Server.Tests/ServerTests.cs
git commit -m "feat: enforce MaxConnections limit in accept loop"

Task 3: Subject Validation and Max Payload on PUB

Files:

  • Modify: src/NATS.Server/NatsClient.cs:213-228
  • Modify: tests/NATS.Server.Tests/ServerTests.cs

Step 1: Write the failing tests

Add to tests/NATS.Server.Tests/ServerTests.cs in the existing ServerTests class:

[Fact]
public async Task Server_pedantic_rejects_invalid_publish_subject()
{
    using var pub = await ConnectClientAsync();
    using var sub = await ConnectClientAsync();

    // Read INFO from both
    await ReadLineAsync(pub);
    await ReadLineAsync(sub);

    // Connect with pedantic mode ON
    await pub.SendAsync(Encoding.ASCII.GetBytes(
        "CONNECT {\"pedantic\":true}\r\nPING\r\n"));
    var pong = await ReadUntilAsync(pub, "PONG");

    // Subscribe on sub
    await sub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB foo.* 1\r\nPING\r\n"));
    await ReadUntilAsync(sub, "PONG");

    // PUB with wildcard subject (invalid for publish)
    await pub.SendAsync(Encoding.ASCII.GetBytes("PUB foo.* 5\r\nHello\r\n"));

    // Publisher should get -ERR
    var errResponse = await ReadUntilAsync(pub, "-ERR", timeoutMs: 3000);
    errResponse.ShouldContain("-ERR 'Invalid Publish Subject'");
}

[Fact]
public async Task Server_nonpedantic_allows_wildcard_publish_subject()
{
    using var pub = await ConnectClientAsync();
    using var sub = await ConnectClientAsync();

    await ReadLineAsync(pub);
    await ReadLineAsync(sub);

    // Connect without pedantic mode (default)
    await sub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB foo.* 1\r\nPING\r\n"));
    await ReadUntilAsync(sub, "PONG");

    await pub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPUB foo.* 5\r\nHello\r\n"));

    // Sub should still receive the message (no validation in non-pedantic mode)
    var msg = await ReadUntilAsync(sub, "Hello\r\n");
    msg.ShouldContain("MSG foo.* 1 5\r\nHello\r\n");
}

[Fact]
public async Task Server_rejects_max_payload_violation()
{
    // Create server with tiny max payload
    var port = GetFreePort();
    using var cts = new CancellationTokenSource();
    var server = new NatsServer(new NatsOptions { Port = port, MaxPayload = 10 }, NullLoggerFactory.Instance);
    _ = server.StartAsync(cts.Token);
    await server.WaitForReadyAsync();

    try
    {
        var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        await client.ConnectAsync(IPAddress.Loopback, port);

        var buf = new byte[4096];
        await client.ReceiveAsync(buf, SocketFlags.None); // INFO

        await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\n"));

        // Send PUB with payload larger than MaxPayload (10 bytes)
        await client.SendAsync(Encoding.ASCII.GetBytes("PUB foo 20\r\n12345678901234567890\r\n"));

        var n = await client.ReceiveAsync(buf, SocketFlags.None);
        var response = Encoding.ASCII.GetString(buf, 0, n);
        response.ShouldContain("-ERR 'Maximum Payload Violation'");

        // Connection should be closed
        n = await client.ReceiveAsync(buf, SocketFlags.None);
        n.ShouldBe(0);

        client.Dispose();
    }
    finally
    {
        await cts.CancelAsync();
        server.Dispose();
    }
}

Step 2: Run tests to verify they fail

Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~pedantic|FullyQualifiedName~max_payload" -v normal Expected: FAIL — no validation exists.

Step 3: Add validation to ProcessPub

In src/NATS.Server/NatsClient.cs, replace the ProcessPub method (lines 213-228):

private async ValueTask ProcessPubAsync(ParsedCommand cmd)
{
    Interlocked.Increment(ref InMsgs);
    Interlocked.Add(ref InBytes, cmd.Payload.Length);

    // Max payload validation (always, hard close)
    if (cmd.Payload.Length > _options.MaxPayload)
    {
        _logger.LogWarning("Client {ClientId} exceeded max payload: {Size} > {MaxPayload}",
            Id, cmd.Payload.Length, _options.MaxPayload);
        await SendErrAndCloseAsync(NatsProtocol.ErrMaxPayloadViolation);
        return;
    }

    // Pedantic mode: validate publish subject
    if (ClientOpts?.Pedantic == true && !SubjectMatch.IsValidPublishSubject(cmd.Subject!))
    {
        _logger.LogDebug("Client {ClientId} invalid publish subject: {Subject}", Id, cmd.Subject);
        await SendErrAsync(NatsProtocol.ErrInvalidPublishSubject);
        return;
    }

    ReadOnlyMemory<byte> headers = default;
    ReadOnlyMemory<byte> payload = cmd.Payload;

    if (cmd.Type == CommandType.HPub && cmd.HeaderSize > 0)
    {
        headers = cmd.Payload[..cmd.HeaderSize];
        payload = cmd.Payload[cmd.HeaderSize..];
    }

    Router?.ProcessMessage(cmd.Subject!, cmd.ReplyTo, headers, payload, this);
}

Since ProcessPub is now async, update DispatchCommandAsync (line 160-163) to await it:

case CommandType.Pub:
case CommandType.HPub:
    await ProcessPubAsync(cmd);
    break;

Step 4: Run tests to verify they pass

Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~pedantic|FullyQualifiedName~max_payload" -v normal Expected: All three PASS.

Step 5: Run all tests

Run: dotnet test tests/NATS.Server.Tests -v normal Expected: All tests PASS.

Step 6: Commit

git add src/NATS.Server/NatsClient.cs tests/NATS.Server.Tests/ServerTests.cs
git commit -m "feat: add pedantic subject validation and max payload enforcement on PUB"

Task 4: Server-Side PING Keepalive

Files:

  • Modify: src/NATS.Server/NatsClient.cs
  • Modify: tests/NATS.Server.Tests/ServerTests.cs

Step 1: Write the failing tests

Add a new test class at the end of tests/NATS.Server.Tests/ServerTests.cs:

public class PingKeepaliveTests : IAsyncLifetime
{
    private readonly NatsServer _server;
    private readonly int _port;
    private readonly CancellationTokenSource _cts = new();

    public PingKeepaliveTests()
    {
        _port = GetFreePort();
        // Short intervals for testing: 500ms ping interval, 2 max pings out
        _server = new NatsServer(
            new NatsOptions
            {
                Port = _port,
                PingInterval = TimeSpan.FromMilliseconds(500),
                MaxPingsOut = 2,
            },
            NullLoggerFactory.Instance);
    }

    public async Task InitializeAsync()
    {
        _ = _server.StartAsync(_cts.Token);
        await _server.WaitForReadyAsync();
    }

    public async Task DisposeAsync()
    {
        await _cts.CancelAsync();
        _server.Dispose();
    }

    private static int GetFreePort()
    {
        using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        sock.Bind(new IPEndPoint(IPAddress.Loopback, 0));
        return ((IPEndPoint)sock.LocalEndPoint!).Port;
    }

    private static async Task<string> ReadUntilAsync(Socket sock, string expected, int timeoutMs = 5000)
    {
        using var cts = new CancellationTokenSource(timeoutMs);
        var sb = new StringBuilder();
        var buf = new byte[4096];
        while (!sb.ToString().Contains(expected))
        {
            var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token);
            if (n == 0) break;
            sb.Append(Encoding.ASCII.GetString(buf, 0, n));
        }
        return sb.ToString();
    }

    [Fact]
    public async Task Server_sends_PING_after_inactivity()
    {
        var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        await client.ConnectAsync(IPAddress.Loopback, _port);

        // Read INFO
        var buf = new byte[4096];
        await client.ReceiveAsync(buf, SocketFlags.None);

        // Send CONNECT to start keepalive
        await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\n"));

        // Wait for server to send PING (should come within ~500ms)
        var response = await ReadUntilAsync(client, "PING", timeoutMs: 3000);
        response.ShouldContain("PING");

        client.Dispose();
    }

    [Fact]
    public async Task Server_pong_resets_ping_counter()
    {
        var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        await client.ConnectAsync(IPAddress.Loopback, _port);

        var buf = new byte[4096];
        await client.ReceiveAsync(buf, SocketFlags.None); // INFO

        await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\n"));

        // Wait for first PING
        var response = await ReadUntilAsync(client, "PING", timeoutMs: 3000);
        response.ShouldContain("PING");

        // Respond with PONG — this resets the counter
        await client.SendAsync(Encoding.ASCII.GetBytes("PONG\r\n"));

        // Wait for next PING (counter reset, so we should get another one)
        response = await ReadUntilAsync(client, "PING", timeoutMs: 3000);
        response.ShouldContain("PING");

        // Respond again to keep alive
        await client.SendAsync(Encoding.ASCII.GetBytes("PONG\r\n"));

        // Client should still be alive — send a PING and expect PONG back
        await client.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
        response = await ReadUntilAsync(client, "PONG", timeoutMs: 3000);
        response.ShouldContain("PONG");

        client.Dispose();
    }

    [Fact]
    public async Task Server_disconnects_stale_client()
    {
        var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        await client.ConnectAsync(IPAddress.Loopback, _port);

        var buf = new byte[4096];
        await client.ReceiveAsync(buf, SocketFlags.None); // INFO

        await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\n"));

        // Don't respond to PINGs — wait for stale disconnect
        // With 500ms interval and MaxPingsOut=2:
        // t=500ms: PING #1, pingsOut=1
        // t=1000ms: PING #2, pingsOut=2
        // t=1500ms: pingsOut+1 > MaxPingsOut → -ERR 'Stale Connection' + close
        var sb = new StringBuilder();
        try
        {
            using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
            while (true)
            {
                var n = await client.ReceiveAsync(buf, SocketFlags.None, timeout.Token);
                if (n == 0) break;
                sb.Append(Encoding.ASCII.GetString(buf, 0, n));
            }
        }
        catch (OperationCanceledException)
        {
            // Timeout is acceptable — check what we got
        }

        var allData = sb.ToString();
        allData.ShouldContain("-ERR 'Stale Connection'");

        client.Dispose();
    }
}

Step 2: Run tests to verify they fail

Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~PingKeepalive" -v normal Expected: FAIL — server never sends PING.

Step 3: Add ping state fields to NatsClient

In src/NATS.Server/NatsClient.cs, add after the stats fields (after line 44):

// PING keepalive state
private int _pingsOut;
private long _lastIn;

Step 4: Add RunPingTimerAsync method

Add after SendErrAndCloseAsync:

private async Task RunPingTimerAsync(CancellationToken ct)
{
    using var timer = new PeriodicTimer(_options.PingInterval);
    try
    {
        while (await timer.WaitForNextTickAsync(ct))
        {
            var elapsed = Environment.TickCount64 - Interlocked.Read(ref _lastIn);
            if (elapsed < (long)_options.PingInterval.TotalMilliseconds)
            {
                // Client was recently active, skip ping
                Interlocked.Exchange(ref _pingsOut, 0);
                continue;
            }

            var currentPingsOut = Interlocked.Increment(ref _pingsOut);
            if (currentPingsOut > _options.MaxPingsOut)
            {
                _logger.LogDebug("Client {ClientId} stale connection — closing", Id);
                await SendErrAndCloseAsync(NatsProtocol.ErrStaleConnection);
                return;
            }

            _logger.LogDebug("Client {ClientId} sending PING ({PingsOut}/{MaxPingsOut})",
                Id, currentPingsOut, _options.MaxPingsOut);
            try
            {
                await WriteAsync(NatsProtocol.PingBytes, ct);
            }
            catch (Exception ex)
            {
                _logger.LogDebug(ex, "Client {ClientId} failed to send PING", Id);
                return;
            }
        }
    }
    catch (OperationCanceledException)
    {
        // Normal shutdown
    }
}

Step 5: Update PONG handling in DispatchCommandAsync

Replace the Pong case (lines 148-149):

case CommandType.Pong:
    Interlocked.Exchange(ref _pingsOut, 0);
    break;

Step 6: Update _lastIn on every parsed command

In ProcessCommandsAsync (line 110-134), add _lastIn update inside the inner loop, right after TryParse succeeds (inside while (_parser.TryParse(...))):

while (_parser.TryParse(ref buffer, out var cmd))
{
    Interlocked.Exchange(ref _lastIn, Environment.TickCount64);
    await DispatchCommandAsync(cmd, ct);
}

Step 7: Launch ping timer in RunAsync after CONNECT

The ping timer must start after CONNECT is received. The cleanest approach: start it unconditionally in RunAsync alongside the other tasks. Even if CONNECT hasn't been received yet, the timer won't fire until PingInterval has elapsed, and _lastIn will be updated by command processing.

Update RunAsync to launch the ping timer as a third concurrent task:

public async Task RunAsync(CancellationToken ct)
{
    _clientCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
    Interlocked.Exchange(ref _lastIn, Environment.TickCount64);
    var pipe = new Pipe();
    try
    {
        // Send INFO
        await SendInfoAsync(_clientCts.Token);

        // Start read pump, command processing, and ping timer in parallel
        var fillTask = FillPipeAsync(pipe.Writer, _clientCts.Token);
        var processTask = ProcessCommandsAsync(pipe.Reader, _clientCts.Token);
        var pingTask = RunPingTimerAsync(_clientCts.Token);

        await Task.WhenAny(fillTask, processTask, pingTask);
    }
    catch (OperationCanceledException)
    {
        _logger.LogDebug("Client {ClientId} operation cancelled", Id);
    }
    catch (Exception ex)
    {
        _logger.LogDebug(ex, "Client {ClientId} connection error", Id);
    }
    finally
    {
        Router?.RemoveClient(this);
    }
}

Step 8: Run tests to verify they pass

Run: dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~PingKeepalive" -v normal Expected: All three PASS.

Step 9: Run all tests to verify no regressions

Run: dotnet test tests/NATS.Server.Tests -v normal Expected: All tests PASS.

Note: The existing integration tests (IntegrationTests.cs) use NATS.Client.Core which automatically responds to server PINGs, so they won't be affected by the new keepalive feature. However, if existing ClientTests or ServerTests that use raw sockets hang because of unexpected PINGs, the fix is that those tests use the default NatsOptions which has PingInterval = 2 minutes — far longer than any test runs. This should not be an issue.

Step 10: Commit

git add src/NATS.Server/NatsClient.cs tests/NATS.Server.Tests/ServerTests.cs
git commit -m "feat: add server-side PING keepalive with stale connection detection"

Summary

Task Description Dependencies Estimated Steps
Task 1 -ERR infrastructure None 9 steps
Task 2 MaxConnections enforcement Task 1 6 steps
Task 3 Subject validation + max payload Task 1 6 steps
Task 4 PING keepalive Task 1 10 steps

Tasks 2, 3, and 4 are independent of each other and can be done in any order (or in parallel) after Task 1 is complete.