From 16b8f9e2e2b1ca2bd262ec1c947dd43d77ace9dc Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 22 Feb 2026 21:29:10 -0500 Subject: [PATCH] docs: add implementation plan for hardening base server Four tasks: -ERR infrastructure, MaxConnections enforcement, pedantic subject validation + max payload on PUB, PING keepalive. Full TDD steps with exact code and commands. --- .../2026-02-22-harden-base-server-plan.md | 817 ++++++++++++++++++ ...2-22-harden-base-server-plan.md.tasks.json | 10 + 2 files changed, 827 insertions(+) create mode 100644 docs/plans/2026-02-22-harden-base-server-plan.md create mode 100644 docs/plans/2026-02-22-harden-base-server-plan.md.tasks.json diff --git a/docs/plans/2026-02-22-harden-base-server-plan.md b/docs/plans/2026-02-22-harden-base-server-plan.md new file mode 100644 index 0000000..0e7d40d --- /dev/null +++ b/docs/plans/2026-02-22-harden-base-server-plan.md @@ -0,0 +1,817 @@ +# Harden Base Server Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans to implement this plan task-by-task. + +**Goal:** Add -ERR responses, MaxConnections enforcement, pedantic subject validation on PUB, max payload validation, and server-side PING keepalive to the base NATS server. + +**Architecture:** Four incremental features building on the existing server. Task 1 (-ERR infrastructure) is a prerequisite for all others. Tasks 2-4 are independent of each other once Task 1 is done. + +**Tech Stack:** .NET 10 / C# 14, xUnit 3, Shouldly, System.IO.Pipelines, System.Threading.PeriodicTimer + +--- + +### Task 1: -ERR Response Infrastructure + +**Files:** +- Modify: `src/NATS.Server/Protocol/NatsProtocol.cs:5-22` +- Modify: `src/NATS.Server/NatsClient.cs:24-298` +- Modify: `tests/NATS.Server.Tests/ClientTests.cs` + +**Step 1: Write the failing tests** + +Add to `tests/NATS.Server.Tests/ClientTests.cs`: + +```csharp +[Fact] +public async Task Client_SendErrAsync_writes_correct_wire_format() +{ + var runTask = _natsClient.RunAsync(_cts.Token); + + // Read INFO first + var buf = new byte[4096]; + await _clientSocket.ReceiveAsync(buf, SocketFlags.None); + + // Trigger SendErrAsync + await _natsClient.SendErrAsync("Invalid Subject"); + + var n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None); + var response = Encoding.ASCII.GetString(buf, 0, n); + + response.ShouldBe("-ERR 'Invalid Subject'\r\n"); + + await _cts.CancelAsync(); +} + +[Fact] +public async Task Client_SendErrAndCloseAsync_sends_error_then_disconnects() +{ + var runTask = _natsClient.RunAsync(_cts.Token); + + // Read INFO first + var buf = new byte[4096]; + await _clientSocket.ReceiveAsync(buf, SocketFlags.None); + + // Trigger SendErrAndCloseAsync + await _natsClient.SendErrAndCloseAsync("maximum connections exceeded"); + + var n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None); + var response = Encoding.ASCII.GetString(buf, 0, n); + + response.ShouldBe("-ERR 'maximum connections exceeded'\r\n"); + + // Connection should be closed — next read returns 0 + n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None); + n.ShouldBe(0); +} +``` + +**Step 2: Run tests to verify they fail** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~Client_SendErr" -v normal` +Expected: Build fails — `SendErrAsync` and `SendErrAndCloseAsync` don't exist. + +**Step 3: Add error message constants to NatsProtocol.cs** + +In `src/NATS.Server/Protocol/NatsProtocol.cs`, add after line 21 (after `ErrPrefix`): + +```csharp +// Standard error messages (matching Go server) +public const string ErrMaxConnectionsExceeded = "maximum connections exceeded"; +public const string ErrStaleConnection = "Stale Connection"; +public const string ErrMaxPayloadViolation = "Maximum Payload Violation"; +public const string ErrInvalidPublishSubject = "Invalid Publish Subject"; +public const string ErrInvalidSubject = "Invalid Subject"; +``` + +**Step 4: Add a linked CancellationTokenSource to NatsClient for self-close** + +In `src/NATS.Server/NatsClient.cs`, add a field after the `_writeLock` field (line 31): + +```csharp +private CancellationTokenSource? _clientCts; +``` + +Modify `RunAsync` (line 59-85) so it creates a linked CTS and uses it internally. The linked CTS lets the client cancel itself while still respecting server shutdown: + +```csharp +public async Task RunAsync(CancellationToken ct) +{ + _clientCts = CancellationTokenSource.CreateLinkedTokenSource(ct); + var pipe = new Pipe(); + try + { + // Send INFO + await SendInfoAsync(_clientCts.Token); + + // Start read pump and command processing in parallel + var fillTask = FillPipeAsync(pipe.Writer, _clientCts.Token); + var processTask = ProcessCommandsAsync(pipe.Reader, _clientCts.Token); + + await Task.WhenAny(fillTask, processTask); + } + catch (OperationCanceledException) + { + _logger.LogDebug("Client {ClientId} operation cancelled", Id); + } + catch (Exception ex) + { + _logger.LogDebug(ex, "Client {ClientId} connection error", Id); + } + finally + { + Router?.RemoveClient(this); + } +} +``` + +**Step 5: Add SendErrAsync and SendErrAndCloseAsync to NatsClient** + +Add after `WriteAsync` (after line 283): + +```csharp +public async Task SendErrAsync(string message) +{ + var errLine = Encoding.ASCII.GetBytes($"-ERR '{message}'\r\n"); + try + { + await WriteAsync(errLine, _clientCts?.Token ?? CancellationToken.None); + } + catch (Exception ex) + { + _logger.LogDebug(ex, "Client {ClientId} failed to send -ERR", Id); + } +} + +public async Task SendErrAndCloseAsync(string message) +{ + await SendErrAsync(message); + _clientCts?.Cancel(); +} +``` + +**Step 6: Dispose the linked CTS** + +Update `Dispose()` (line 292-297): + +```csharp +public void Dispose() +{ + _clientCts?.Dispose(); + _stream.Dispose(); + _socket.Dispose(); + _writeLock.Dispose(); +} +``` + +**Step 7: Run tests to verify they pass** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~Client_SendErr" -v normal` +Expected: Both tests PASS. + +**Step 8: Run all existing tests to verify no regressions** + +Run: `dotnet test tests/NATS.Server.Tests -v normal` +Expected: All tests PASS. + +**Step 9: Commit** + +```bash +git add src/NATS.Server/Protocol/NatsProtocol.cs src/NATS.Server/NatsClient.cs tests/NATS.Server.Tests/ClientTests.cs +git commit -m "feat: add -ERR response infrastructure with SendErrAsync and SendErrAndCloseAsync" +``` + +--- + +### Task 2: MaxConnections Enforcement + +**Files:** +- Modify: `src/NATS.Server/NatsServer.cs:42-75` +- Modify: `tests/NATS.Server.Tests/ServerTests.cs` + +**Step 1: Write the failing test** + +Add to `tests/NATS.Server.Tests/ServerTests.cs`. This test needs its own server with a low MaxConnections. Add a new test class at the end of the file: + +```csharp +public class MaxConnectionsTests : IAsyncLifetime +{ + private readonly NatsServer _server; + private readonly int _port; + private readonly CancellationTokenSource _cts = new(); + + public MaxConnectionsTests() + { + _port = GetFreePort(); + _server = new NatsServer(new NatsOptions { Port = _port, MaxConnections = 2 }, NullLoggerFactory.Instance); + } + + public async Task InitializeAsync() + { + _ = _server.StartAsync(_cts.Token); + await _server.WaitForReadyAsync(); + } + + public async Task DisposeAsync() + { + await _cts.CancelAsync(); + _server.Dispose(); + } + + private static int GetFreePort() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + sock.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + return ((IPEndPoint)sock.LocalEndPoint!).Port; + } + + [Fact] + public async Task Server_rejects_connection_when_max_reached() + { + // Connect two clients (at limit) + var client1 = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await client1.ConnectAsync(IPAddress.Loopback, _port); + var buf = new byte[4096]; + var n = await client1.ReceiveAsync(buf, SocketFlags.None); + Encoding.ASCII.GetString(buf, 0, n).ShouldStartWith("INFO "); + + var client2 = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await client2.ConnectAsync(IPAddress.Loopback, _port); + n = await client2.ReceiveAsync(buf, SocketFlags.None); + Encoding.ASCII.GetString(buf, 0, n).ShouldStartWith("INFO "); + + // Third client should be rejected + var client3 = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await client3.ConnectAsync(IPAddress.Loopback, _port); + + n = await client3.ReceiveAsync(buf, SocketFlags.None); + var response = Encoding.ASCII.GetString(buf, 0, n); + response.ShouldContain("-ERR 'maximum connections exceeded'"); + + // Connection should be closed + n = await client3.ReceiveAsync(buf, SocketFlags.None); + n.ShouldBe(0); + + client1.Dispose(); + client2.Dispose(); + client3.Dispose(); + } +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~Server_rejects_connection_when_max_reached" -v normal` +Expected: FAIL — third client gets INFO instead of -ERR. + +**Step 3: Add MaxConnections check in the accept loop** + +In `src/NATS.Server/NatsServer.cs`, modify the accept loop (lines 56-69). Insert the check after `AcceptAsync` and before creating the `NatsClient`: + +```csharp +while (!ct.IsCancellationRequested) +{ + var socket = await _listener.AcceptAsync(ct); + + // Check MaxConnections before creating the client + if (_options.MaxConnections > 0 && _clients.Count >= _options.MaxConnections) + { + _logger.LogWarning("Client connection rejected: maximum connections ({MaxConnections}) exceeded", + _options.MaxConnections); + try + { + var stream = new NetworkStream(socket, ownsSocket: false); + var errBytes = Encoding.ASCII.GetBytes( + $"-ERR '{NatsProtocol.ErrMaxConnectionsExceeded}'\r\n"); + await stream.WriteAsync(errBytes, ct); + await stream.FlushAsync(ct); + stream.Dispose(); + } + catch (Exception ex) + { + _logger.LogDebug(ex, "Failed to send -ERR to rejected client"); + } + finally + { + socket.Dispose(); + } + continue; + } + + var clientId = Interlocked.Increment(ref _nextClientId); + + _logger.LogDebug("Client {ClientId} connected from {RemoteEndpoint}", clientId, socket.RemoteEndPoint); + + var clientLogger = _loggerFactory.CreateLogger($"NATS.Server.NatsClient[{clientId}]"); + var client = new NatsClient(clientId, socket, _options, _serverInfo, clientLogger); + client.Router = this; + _clients[clientId] = client; + + _ = RunClientAsync(client, ct); +} +``` + +Add `using System.Text;` to the top of `NatsServer.cs` if not already present. + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~Server_rejects_connection_when_max_reached" -v normal` +Expected: PASS. + +**Step 5: Run all tests** + +Run: `dotnet test tests/NATS.Server.Tests -v normal` +Expected: All tests PASS. + +**Step 6: Commit** + +```bash +git add src/NATS.Server/NatsServer.cs tests/NATS.Server.Tests/ServerTests.cs +git commit -m "feat: enforce MaxConnections limit in accept loop" +``` + +--- + +### Task 3: Subject Validation and Max Payload on PUB + +**Files:** +- Modify: `src/NATS.Server/NatsClient.cs:213-228` +- Modify: `tests/NATS.Server.Tests/ServerTests.cs` + +**Step 1: Write the failing tests** + +Add to `tests/NATS.Server.Tests/ServerTests.cs` in the existing `ServerTests` class: + +```csharp +[Fact] +public async Task Server_pedantic_rejects_invalid_publish_subject() +{ + using var pub = await ConnectClientAsync(); + using var sub = await ConnectClientAsync(); + + // Read INFO from both + await ReadLineAsync(pub); + await ReadLineAsync(sub); + + // Connect with pedantic mode ON + await pub.SendAsync(Encoding.ASCII.GetBytes( + "CONNECT {\"pedantic\":true}\r\nPING\r\n")); + var pong = await ReadUntilAsync(pub, "PONG"); + + // Subscribe on sub + await sub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB foo.* 1\r\nPING\r\n")); + await ReadUntilAsync(sub, "PONG"); + + // PUB with wildcard subject (invalid for publish) + await pub.SendAsync(Encoding.ASCII.GetBytes("PUB foo.* 5\r\nHello\r\n")); + + // Publisher should get -ERR + var errResponse = await ReadUntilAsync(pub, "-ERR", timeoutMs: 3000); + errResponse.ShouldContain("-ERR 'Invalid Publish Subject'"); +} + +[Fact] +public async Task Server_nonpedantic_allows_wildcard_publish_subject() +{ + using var pub = await ConnectClientAsync(); + using var sub = await ConnectClientAsync(); + + await ReadLineAsync(pub); + await ReadLineAsync(sub); + + // Connect without pedantic mode (default) + await sub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB foo.* 1\r\nPING\r\n")); + await ReadUntilAsync(sub, "PONG"); + + await pub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPUB foo.* 5\r\nHello\r\n")); + + // Sub should still receive the message (no validation in non-pedantic mode) + var msg = await ReadUntilAsync(sub, "Hello\r\n"); + msg.ShouldContain("MSG foo.* 1 5\r\nHello\r\n"); +} + +[Fact] +public async Task Server_rejects_max_payload_violation() +{ + // Create server with tiny max payload + var port = GetFreePort(); + using var cts = new CancellationTokenSource(); + var server = new NatsServer(new NatsOptions { Port = port, MaxPayload = 10 }, NullLoggerFactory.Instance); + _ = server.StartAsync(cts.Token); + await server.WaitForReadyAsync(); + + try + { + var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await client.ConnectAsync(IPAddress.Loopback, port); + + var buf = new byte[4096]; + await client.ReceiveAsync(buf, SocketFlags.None); // INFO + + await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\n")); + + // Send PUB with payload larger than MaxPayload (10 bytes) + await client.SendAsync(Encoding.ASCII.GetBytes("PUB foo 20\r\n12345678901234567890\r\n")); + + var n = await client.ReceiveAsync(buf, SocketFlags.None); + var response = Encoding.ASCII.GetString(buf, 0, n); + response.ShouldContain("-ERR 'Maximum Payload Violation'"); + + // Connection should be closed + n = await client.ReceiveAsync(buf, SocketFlags.None); + n.ShouldBe(0); + + client.Dispose(); + } + finally + { + await cts.CancelAsync(); + server.Dispose(); + } +} +``` + +**Step 2: Run tests to verify they fail** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~pedantic|FullyQualifiedName~max_payload" -v normal` +Expected: FAIL — no validation exists. + +**Step 3: Add validation to ProcessPub** + +In `src/NATS.Server/NatsClient.cs`, replace the `ProcessPub` method (lines 213-228): + +```csharp +private async ValueTask ProcessPubAsync(ParsedCommand cmd) +{ + Interlocked.Increment(ref InMsgs); + Interlocked.Add(ref InBytes, cmd.Payload.Length); + + // Max payload validation (always, hard close) + if (cmd.Payload.Length > _options.MaxPayload) + { + _logger.LogWarning("Client {ClientId} exceeded max payload: {Size} > {MaxPayload}", + Id, cmd.Payload.Length, _options.MaxPayload); + await SendErrAndCloseAsync(NatsProtocol.ErrMaxPayloadViolation); + return; + } + + // Pedantic mode: validate publish subject + if (ClientOpts?.Pedantic == true && !SubjectMatch.IsValidPublishSubject(cmd.Subject!)) + { + _logger.LogDebug("Client {ClientId} invalid publish subject: {Subject}", Id, cmd.Subject); + await SendErrAsync(NatsProtocol.ErrInvalidPublishSubject); + return; + } + + ReadOnlyMemory headers = default; + ReadOnlyMemory payload = cmd.Payload; + + if (cmd.Type == CommandType.HPub && cmd.HeaderSize > 0) + { + headers = cmd.Payload[..cmd.HeaderSize]; + payload = cmd.Payload[cmd.HeaderSize..]; + } + + Router?.ProcessMessage(cmd.Subject!, cmd.ReplyTo, headers, payload, this); +} +``` + +Since `ProcessPub` is now async, update `DispatchCommandAsync` (line 160-163) to await it: + +```csharp +case CommandType.Pub: +case CommandType.HPub: + await ProcessPubAsync(cmd); + break; +``` + +**Step 4: Run tests to verify they pass** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~pedantic|FullyQualifiedName~max_payload" -v normal` +Expected: All three PASS. + +**Step 5: Run all tests** + +Run: `dotnet test tests/NATS.Server.Tests -v normal` +Expected: All tests PASS. + +**Step 6: Commit** + +```bash +git add src/NATS.Server/NatsClient.cs tests/NATS.Server.Tests/ServerTests.cs +git commit -m "feat: add pedantic subject validation and max payload enforcement on PUB" +``` + +--- + +### Task 4: Server-Side PING Keepalive + +**Files:** +- Modify: `src/NATS.Server/NatsClient.cs` +- Modify: `tests/NATS.Server.Tests/ServerTests.cs` + +**Step 1: Write the failing tests** + +Add a new test class at the end of `tests/NATS.Server.Tests/ServerTests.cs`: + +```csharp +public class PingKeepaliveTests : IAsyncLifetime +{ + private readonly NatsServer _server; + private readonly int _port; + private readonly CancellationTokenSource _cts = new(); + + public PingKeepaliveTests() + { + _port = GetFreePort(); + // Short intervals for testing: 500ms ping interval, 2 max pings out + _server = new NatsServer( + new NatsOptions + { + Port = _port, + PingInterval = TimeSpan.FromMilliseconds(500), + MaxPingsOut = 2, + }, + NullLoggerFactory.Instance); + } + + public async Task InitializeAsync() + { + _ = _server.StartAsync(_cts.Token); + await _server.WaitForReadyAsync(); + } + + public async Task DisposeAsync() + { + await _cts.CancelAsync(); + _server.Dispose(); + } + + private static int GetFreePort() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + sock.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + return ((IPEndPoint)sock.LocalEndPoint!).Port; + } + + private static async Task ReadUntilAsync(Socket sock, string expected, int timeoutMs = 5000) + { + using var cts = new CancellationTokenSource(timeoutMs); + var sb = new StringBuilder(); + var buf = new byte[4096]; + while (!sb.ToString().Contains(expected)) + { + var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token); + if (n == 0) break; + sb.Append(Encoding.ASCII.GetString(buf, 0, n)); + } + return sb.ToString(); + } + + [Fact] + public async Task Server_sends_PING_after_inactivity() + { + var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await client.ConnectAsync(IPAddress.Loopback, _port); + + // Read INFO + var buf = new byte[4096]; + await client.ReceiveAsync(buf, SocketFlags.None); + + // Send CONNECT to start keepalive + await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\n")); + + // Wait for server to send PING (should come within ~500ms) + var response = await ReadUntilAsync(client, "PING", timeoutMs: 3000); + response.ShouldContain("PING"); + + client.Dispose(); + } + + [Fact] + public async Task Server_pong_resets_ping_counter() + { + var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await client.ConnectAsync(IPAddress.Loopback, _port); + + var buf = new byte[4096]; + await client.ReceiveAsync(buf, SocketFlags.None); // INFO + + await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\n")); + + // Wait for first PING + var response = await ReadUntilAsync(client, "PING", timeoutMs: 3000); + response.ShouldContain("PING"); + + // Respond with PONG — this resets the counter + await client.SendAsync(Encoding.ASCII.GetBytes("PONG\r\n")); + + // Wait for next PING (counter reset, so we should get another one) + response = await ReadUntilAsync(client, "PING", timeoutMs: 3000); + response.ShouldContain("PING"); + + // Respond again to keep alive + await client.SendAsync(Encoding.ASCII.GetBytes("PONG\r\n")); + + // Client should still be alive — send a PING and expect PONG back + await client.SendAsync(Encoding.ASCII.GetBytes("PING\r\n")); + response = await ReadUntilAsync(client, "PONG", timeoutMs: 3000); + response.ShouldContain("PONG"); + + client.Dispose(); + } + + [Fact] + public async Task Server_disconnects_stale_client() + { + var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await client.ConnectAsync(IPAddress.Loopback, _port); + + var buf = new byte[4096]; + await client.ReceiveAsync(buf, SocketFlags.None); // INFO + + await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\n")); + + // Don't respond to PINGs — wait for stale disconnect + // With 500ms interval and MaxPingsOut=2: + // t=500ms: PING #1, pingsOut=1 + // t=1000ms: PING #2, pingsOut=2 + // t=1500ms: pingsOut+1 > MaxPingsOut → -ERR 'Stale Connection' + close + var sb = new StringBuilder(); + try + { + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + while (true) + { + var n = await client.ReceiveAsync(buf, SocketFlags.None, timeout.Token); + if (n == 0) break; + sb.Append(Encoding.ASCII.GetString(buf, 0, n)); + } + } + catch (OperationCanceledException) + { + // Timeout is acceptable — check what we got + } + + var allData = sb.ToString(); + allData.ShouldContain("-ERR 'Stale Connection'"); + + client.Dispose(); + } +} +``` + +**Step 2: Run tests to verify they fail** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~PingKeepalive" -v normal` +Expected: FAIL — server never sends PING. + +**Step 3: Add ping state fields to NatsClient** + +In `src/NATS.Server/NatsClient.cs`, add after the stats fields (after line 44): + +```csharp +// PING keepalive state +private int _pingsOut; +private long _lastIn; +``` + +**Step 4: Add RunPingTimerAsync method** + +Add after `SendErrAndCloseAsync`: + +```csharp +private async Task RunPingTimerAsync(CancellationToken ct) +{ + using var timer = new PeriodicTimer(_options.PingInterval); + try + { + while (await timer.WaitForNextTickAsync(ct)) + { + var elapsed = Environment.TickCount64 - Interlocked.Read(ref _lastIn); + if (elapsed < (long)_options.PingInterval.TotalMilliseconds) + { + // Client was recently active, skip ping + Interlocked.Exchange(ref _pingsOut, 0); + continue; + } + + var currentPingsOut = Interlocked.Increment(ref _pingsOut); + if (currentPingsOut > _options.MaxPingsOut) + { + _logger.LogDebug("Client {ClientId} stale connection — closing", Id); + await SendErrAndCloseAsync(NatsProtocol.ErrStaleConnection); + return; + } + + _logger.LogDebug("Client {ClientId} sending PING ({PingsOut}/{MaxPingsOut})", + Id, currentPingsOut, _options.MaxPingsOut); + try + { + await WriteAsync(NatsProtocol.PingBytes, ct); + } + catch (Exception ex) + { + _logger.LogDebug(ex, "Client {ClientId} failed to send PING", Id); + return; + } + } + } + catch (OperationCanceledException) + { + // Normal shutdown + } +} +``` + +**Step 5: Update PONG handling in DispatchCommandAsync** + +Replace the `Pong` case (lines 148-149): + +```csharp +case CommandType.Pong: + Interlocked.Exchange(ref _pingsOut, 0); + break; +``` + +**Step 6: Update _lastIn on every parsed command** + +In `ProcessCommandsAsync` (line 110-134), add `_lastIn` update inside the inner loop, right after `TryParse` succeeds (inside `while (_parser.TryParse(...))`): + +```csharp +while (_parser.TryParse(ref buffer, out var cmd)) +{ + Interlocked.Exchange(ref _lastIn, Environment.TickCount64); + await DispatchCommandAsync(cmd, ct); +} +``` + +**Step 7: Launch ping timer in RunAsync after CONNECT** + +The ping timer must start after CONNECT is received. The cleanest approach: start it unconditionally in `RunAsync` alongside the other tasks. Even if CONNECT hasn't been received yet, the timer won't fire until `PingInterval` has elapsed, and `_lastIn` will be updated by command processing. + +Update `RunAsync` to launch the ping timer as a third concurrent task: + +```csharp +public async Task RunAsync(CancellationToken ct) +{ + _clientCts = CancellationTokenSource.CreateLinkedTokenSource(ct); + Interlocked.Exchange(ref _lastIn, Environment.TickCount64); + var pipe = new Pipe(); + try + { + // Send INFO + await SendInfoAsync(_clientCts.Token); + + // Start read pump, command processing, and ping timer in parallel + var fillTask = FillPipeAsync(pipe.Writer, _clientCts.Token); + var processTask = ProcessCommandsAsync(pipe.Reader, _clientCts.Token); + var pingTask = RunPingTimerAsync(_clientCts.Token); + + await Task.WhenAny(fillTask, processTask, pingTask); + } + catch (OperationCanceledException) + { + _logger.LogDebug("Client {ClientId} operation cancelled", Id); + } + catch (Exception ex) + { + _logger.LogDebug(ex, "Client {ClientId} connection error", Id); + } + finally + { + Router?.RemoveClient(this); + } +} +``` + +**Step 8: Run tests to verify they pass** + +Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~PingKeepalive" -v normal` +Expected: All three PASS. + +**Step 9: Run all tests to verify no regressions** + +Run: `dotnet test tests/NATS.Server.Tests -v normal` +Expected: All tests PASS. + +Note: The existing integration tests (`IntegrationTests.cs`) use `NATS.Client.Core` which automatically responds to server PINGs, so they won't be affected by the new keepalive feature. However, if existing `ClientTests` or `ServerTests` that use raw sockets hang because of unexpected PINGs, the fix is that those tests use the default `NatsOptions` which has `PingInterval = 2 minutes` — far longer than any test runs. This should not be an issue. + +**Step 10: Commit** + +```bash +git add src/NATS.Server/NatsClient.cs tests/NATS.Server.Tests/ServerTests.cs +git commit -m "feat: add server-side PING keepalive with stale connection detection" +``` + +--- + +## Summary + +| Task | Description | Dependencies | Estimated Steps | +|------|-------------|--------------|----------------| +| Task 1 | -ERR infrastructure | None | 9 steps | +| Task 2 | MaxConnections enforcement | Task 1 | 6 steps | +| Task 3 | Subject validation + max payload | Task 1 | 6 steps | +| Task 4 | PING keepalive | Task 1 | 10 steps | + +Tasks 2, 3, and 4 are independent of each other and can be done in any order (or in parallel) after Task 1 is complete. diff --git a/docs/plans/2026-02-22-harden-base-server-plan.md.tasks.json b/docs/plans/2026-02-22-harden-base-server-plan.md.tasks.json new file mode 100644 index 0000000..2d26a89 --- /dev/null +++ b/docs/plans/2026-02-22-harden-base-server-plan.md.tasks.json @@ -0,0 +1,10 @@ +{ + "planPath": "docs/plans/2026-02-22-harden-base-server-plan.md", + "tasks": [ + {"id": 5, "subject": "Task 1: -ERR Response Infrastructure", "status": "pending"}, + {"id": 6, "subject": "Task 2: MaxConnections Enforcement", "status": "pending", "blockedBy": [5]}, + {"id": 7, "subject": "Task 3: Subject Validation and Max Payload on PUB", "status": "pending", "blockedBy": [5]}, + {"id": 8, "subject": "Task 4: Server-Side PING Keepalive", "status": "pending", "blockedBy": [5]} + ], + "lastUpdated": "2026-02-22T00:00:00Z" +}