Compare commits
9 Commits
5dee4f5fa6
...
1813250a9e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1813250a9e | ||
|
|
b744913296 | ||
|
|
d14d73a7d0 | ||
|
|
9d0d5064ac | ||
|
|
0c12b0f6e3 | ||
|
|
19e8c65f6d | ||
|
|
8ee5a7f97b | ||
|
|
16b8f9e2e2 | ||
|
|
b8acca19dd |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -161,6 +161,9 @@ FodyWeavers.xsd
|
||||
## Go reference implementation
|
||||
golang/
|
||||
|
||||
## Git worktrees
|
||||
.worktrees/
|
||||
|
||||
## OS
|
||||
.DS_Store
|
||||
Thumbs.db
|
||||
|
||||
141
docs/plans/2026-02-22-harden-base-server-design.md
Normal file
141
docs/plans/2026-02-22-harden-base-server-design.md
Normal file
@@ -0,0 +1,141 @@
|
||||
# Harden Base Server — Design Document
|
||||
|
||||
**Date:** 2026-02-22
|
||||
**Status:** Approved
|
||||
**Scope:** Small — fills gaps in the base single-node pub/sub server
|
||||
|
||||
## Overview
|
||||
|
||||
The base NATS server port handles PUB/SUB, wildcards, queue groups, and integration with real NATS clients. However, it lacks several hardening features present in the Go reference implementation. This design covers four areas:
|
||||
|
||||
1. `-ERR` response infrastructure
|
||||
2. MaxConnections enforcement
|
||||
3. Subject validation on PUB (pedantic mode) + max payload validation
|
||||
4. Server-side PING keepalive with stale connection detection
|
||||
|
||||
## 1. -ERR Response Infrastructure
|
||||
|
||||
**Go reference:** `client.go:93,2608-2617`
|
||||
|
||||
### Wire Format
|
||||
|
||||
All errors use: `-ERR '{message}'\r\n` (single quotes around the message).
|
||||
|
||||
### Constants (NatsProtocol.cs)
|
||||
|
||||
Add standard error message constants matching the Go server:
|
||||
|
||||
| Constant | Wire message | Connection closed? |
|
||||
|---|---|---|
|
||||
| `MaxConnectionsExceeded` | `maximum connections exceeded` | Yes |
|
||||
| `StaleConnection` | `Stale Connection` | Yes |
|
||||
| `MaxPayloadViolation` | `Maximum Payload Violation` | Yes |
|
||||
| `InvalidPublishSubject` | `Invalid Publish Subject` | No |
|
||||
| `InvalidSubject` | `Invalid Subject` | No |
|
||||
|
||||
### Client Methods (NatsClient.cs)
|
||||
|
||||
- `SendErrAsync(string message)` — writes `-ERR '{message}'\r\n` using the existing write-lock pattern. Connection stays open.
|
||||
- `SendErrAndCloseAsync(string message)` — sends the `-ERR` then triggers client shutdown via cancellation token.
|
||||
|
||||
## 2. MaxConnections Enforcement
|
||||
|
||||
**Go reference:** `server.go:3378-3384`, `client.go:2428-2431`
|
||||
|
||||
### Design
|
||||
|
||||
In `NatsServer.StartAsync`, after `AcceptAsync` returns a new TCP connection:
|
||||
|
||||
1. Check `_clients.Count >= _options.MaxConnections`
|
||||
2. If exceeded: write `-ERR 'maximum connections exceeded'\r\n` directly on the raw `NetworkStream`, close the `TcpClient`. No `NatsClient` is created.
|
||||
3. Log at Warning level: `"Client connection rejected: maximum connections ({MaxConnections}) exceeded"`
|
||||
|
||||
The check uses `ConcurrentDictionary.Count` which is safe for this purpose — a slight race is acceptable since Go has the same pattern (check under lock, but the lock is released before the reject write).
|
||||
|
||||
Default `MaxConnections` remains `65536`.
|
||||
|
||||
## 3. Subject Validation on PUB
|
||||
|
||||
**Go reference:** `client.go:2869-2871`
|
||||
|
||||
### Pedantic Mode Subject Validation
|
||||
|
||||
In `ProcessPub` and `ProcessHPub`, after parsing the command:
|
||||
|
||||
1. If `ClientOptions.Pedantic == true`, call `SubjectMatch.IsValidPublishSubject(subject)`
|
||||
2. If invalid: `SendErrAsync("Invalid Publish Subject")` — connection stays open, message is dropped (not routed)
|
||||
3. Log at Debug level
|
||||
|
||||
This matches Go which only validates publish subjects in pedantic mode.
|
||||
|
||||
### Max Payload Validation (Always)
|
||||
|
||||
In `ProcessPub` and `ProcessHPub`:
|
||||
|
||||
1. If payload size > `_options.MaxPayload`, call `SendErrAndCloseAsync("Maximum Payload Violation")`
|
||||
2. This is a hard close matching Go behavior
|
||||
|
||||
## 4. Server-Side PING Keepalive
|
||||
|
||||
**Go reference:** `client.go:5537-5654,2577-2584,2680-2682`
|
||||
|
||||
### New Client State
|
||||
|
||||
- `_pingsOut: int` — count of unanswered server-initiated PINGs (via `Interlocked`)
|
||||
- `_lastIn: long` — `Environment.TickCount64` timestamp of last inbound data
|
||||
|
||||
### Timer Implementation
|
||||
|
||||
Use `PeriodicTimer` in a dedicated `RunPingTimerAsync` method, launched alongside `FillPipeAsync` and `ProcessCommandsAsync` in `RunAsync`. This avoids fire-and-forget async from sync timer callbacks.
|
||||
|
||||
### Lifecycle
|
||||
|
||||
1. **Start:** After CONNECT handshake completes, launch `RunPingTimerAsync` with period = `PingInterval`
|
||||
2. **Each tick:**
|
||||
- If `TickCount64 - _lastIn < PingInterval.TotalMilliseconds`: client was recently active, reset `_pingsOut = 0`, skip
|
||||
- Else if `_pingsOut + 1 > MaxPingsOut`: send `-ERR 'Stale Connection'` and close
|
||||
- Else: increment `_pingsOut`, send `PING\r\n`
|
||||
3. **PONG received:** In `DispatchCommandAsync` for `CommandType.Pong`, set `_pingsOut = 0`
|
||||
4. **Cleanup:** Cancel and dispose the timer in client shutdown
|
||||
|
||||
### Updating `_lastIn`
|
||||
|
||||
Set `_lastIn = Environment.TickCount64` at the top of `ProcessCommandsAsync` whenever a complete command is parsed from the pipe.
|
||||
|
||||
### Defaults
|
||||
|
||||
- `PingInterval = 2 minutes`
|
||||
- `MaxPingsOut = 2`
|
||||
- Stale detection: non-responding client disconnected after ~6 minutes (3 intervals)
|
||||
|
||||
## Testing Strategy
|
||||
|
||||
### -ERR Infrastructure
|
||||
- Unit test: `SendErrAsync` writes correct wire format
|
||||
- Unit test: `SendErrAndCloseAsync` sends error then disconnects
|
||||
|
||||
### MaxConnections
|
||||
- Integration test: connect N clients up to max, verify N+1 receives `-ERR` and is disconnected
|
||||
- Verify existing clients are unaffected
|
||||
|
||||
### Subject Validation
|
||||
- Test with pedantic client: PUB to `foo.*` gets `-ERR`, connection stays open
|
||||
- Test with non-pedantic client: PUB to `foo.*` is accepted (no validation)
|
||||
- Test max payload: PUB exceeding limit gets `-ERR` and connection closes
|
||||
|
||||
### PING Keepalive
|
||||
- Test with short intervals: verify server sends PING after inactivity
|
||||
- Test PONG response resets the counter
|
||||
- Test stale detection: non-responding client is disconnected
|
||||
- Test active client: recent data suppresses PING
|
||||
|
||||
## Files Modified
|
||||
|
||||
| File | Changes |
|
||||
|---|---|
|
||||
| `NatsProtocol.cs` | Error message constants |
|
||||
| `NatsClient.cs` | `SendErrAsync`, `SendErrAndCloseAsync`, ping timer, `_lastIn` tracking, pedantic validation, max payload check |
|
||||
| `NatsServer.cs` | MaxConnections check in accept loop |
|
||||
| `ClientTests.cs` | -ERR format tests, max payload tests |
|
||||
| `ServerTests.cs` | MaxConnections test, PING keepalive tests |
|
||||
| `IntegrationTests.cs` | End-to-end stale connection test |
|
||||
817
docs/plans/2026-02-22-harden-base-server-plan.md
Normal file
817
docs/plans/2026-02-22-harden-base-server-plan.md
Normal file
@@ -0,0 +1,817 @@
|
||||
# Harden Base Server Implementation Plan
|
||||
|
||||
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans to implement this plan task-by-task.
|
||||
|
||||
**Goal:** Add -ERR responses, MaxConnections enforcement, pedantic subject validation on PUB, max payload validation, and server-side PING keepalive to the base NATS server.
|
||||
|
||||
**Architecture:** Four incremental features building on the existing server. Task 1 (-ERR infrastructure) is a prerequisite for all others. Tasks 2-4 are independent of each other once Task 1 is done.
|
||||
|
||||
**Tech Stack:** .NET 10 / C# 14, xUnit 3, Shouldly, System.IO.Pipelines, System.Threading.PeriodicTimer
|
||||
|
||||
---
|
||||
|
||||
### Task 1: -ERR Response Infrastructure
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/NATS.Server/Protocol/NatsProtocol.cs:5-22`
|
||||
- Modify: `src/NATS.Server/NatsClient.cs:24-298`
|
||||
- Modify: `tests/NATS.Server.Tests/ClientTests.cs`
|
||||
|
||||
**Step 1: Write the failing tests**
|
||||
|
||||
Add to `tests/NATS.Server.Tests/ClientTests.cs`:
|
||||
|
||||
```csharp
|
||||
[Fact]
|
||||
public async Task Client_SendErrAsync_writes_correct_wire_format()
|
||||
{
|
||||
var runTask = _natsClient.RunAsync(_cts.Token);
|
||||
|
||||
// Read INFO first
|
||||
var buf = new byte[4096];
|
||||
await _clientSocket.ReceiveAsync(buf, SocketFlags.None);
|
||||
|
||||
// Trigger SendErrAsync
|
||||
await _natsClient.SendErrAsync("Invalid Subject");
|
||||
|
||||
var n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None);
|
||||
var response = Encoding.ASCII.GetString(buf, 0, n);
|
||||
|
||||
response.ShouldBe("-ERR 'Invalid Subject'\r\n");
|
||||
|
||||
await _cts.CancelAsync();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Client_SendErrAndCloseAsync_sends_error_then_disconnects()
|
||||
{
|
||||
var runTask = _natsClient.RunAsync(_cts.Token);
|
||||
|
||||
// Read INFO first
|
||||
var buf = new byte[4096];
|
||||
await _clientSocket.ReceiveAsync(buf, SocketFlags.None);
|
||||
|
||||
// Trigger SendErrAndCloseAsync
|
||||
await _natsClient.SendErrAndCloseAsync("maximum connections exceeded");
|
||||
|
||||
var n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None);
|
||||
var response = Encoding.ASCII.GetString(buf, 0, n);
|
||||
|
||||
response.ShouldBe("-ERR 'maximum connections exceeded'\r\n");
|
||||
|
||||
// Connection should be closed — next read returns 0
|
||||
n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None);
|
||||
n.ShouldBe(0);
|
||||
}
|
||||
```
|
||||
|
||||
**Step 2: Run tests to verify they fail**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~Client_SendErr" -v normal`
|
||||
Expected: Build fails — `SendErrAsync` and `SendErrAndCloseAsync` don't exist.
|
||||
|
||||
**Step 3: Add error message constants to NatsProtocol.cs**
|
||||
|
||||
In `src/NATS.Server/Protocol/NatsProtocol.cs`, add after line 21 (after `ErrPrefix`):
|
||||
|
||||
```csharp
|
||||
// Standard error messages (matching Go server)
|
||||
public const string ErrMaxConnectionsExceeded = "maximum connections exceeded";
|
||||
public const string ErrStaleConnection = "Stale Connection";
|
||||
public const string ErrMaxPayloadViolation = "Maximum Payload Violation";
|
||||
public const string ErrInvalidPublishSubject = "Invalid Publish Subject";
|
||||
public const string ErrInvalidSubject = "Invalid Subject";
|
||||
```
|
||||
|
||||
**Step 4: Add a linked CancellationTokenSource to NatsClient for self-close**
|
||||
|
||||
In `src/NATS.Server/NatsClient.cs`, add a field after the `_writeLock` field (line 31):
|
||||
|
||||
```csharp
|
||||
private CancellationTokenSource? _clientCts;
|
||||
```
|
||||
|
||||
Modify `RunAsync` (line 59-85) so it creates a linked CTS and uses it internally. The linked CTS lets the client cancel itself while still respecting server shutdown:
|
||||
|
||||
```csharp
|
||||
public async Task RunAsync(CancellationToken ct)
|
||||
{
|
||||
_clientCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
||||
var pipe = new Pipe();
|
||||
try
|
||||
{
|
||||
// Send INFO
|
||||
await SendInfoAsync(_clientCts.Token);
|
||||
|
||||
// Start read pump and command processing in parallel
|
||||
var fillTask = FillPipeAsync(pipe.Writer, _clientCts.Token);
|
||||
var processTask = ProcessCommandsAsync(pipe.Reader, _clientCts.Token);
|
||||
|
||||
await Task.WhenAny(fillTask, processTask);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
_logger.LogDebug("Client {ClientId} operation cancelled", Id);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogDebug(ex, "Client {ClientId} connection error", Id);
|
||||
}
|
||||
finally
|
||||
{
|
||||
Router?.RemoveClient(this);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Step 5: Add SendErrAsync and SendErrAndCloseAsync to NatsClient**
|
||||
|
||||
Add after `WriteAsync` (after line 283):
|
||||
|
||||
```csharp
|
||||
public async Task SendErrAsync(string message)
|
||||
{
|
||||
var errLine = Encoding.ASCII.GetBytes($"-ERR '{message}'\r\n");
|
||||
try
|
||||
{
|
||||
await WriteAsync(errLine, _clientCts?.Token ?? CancellationToken.None);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogDebug(ex, "Client {ClientId} failed to send -ERR", Id);
|
||||
}
|
||||
}
|
||||
|
||||
public async Task SendErrAndCloseAsync(string message)
|
||||
{
|
||||
await SendErrAsync(message);
|
||||
_clientCts?.Cancel();
|
||||
}
|
||||
```
|
||||
|
||||
**Step 6: Dispose the linked CTS**
|
||||
|
||||
Update `Dispose()` (line 292-297):
|
||||
|
||||
```csharp
|
||||
public void Dispose()
|
||||
{
|
||||
_clientCts?.Dispose();
|
||||
_stream.Dispose();
|
||||
_socket.Dispose();
|
||||
_writeLock.Dispose();
|
||||
}
|
||||
```
|
||||
|
||||
**Step 7: Run tests to verify they pass**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~Client_SendErr" -v normal`
|
||||
Expected: Both tests PASS.
|
||||
|
||||
**Step 8: Run all existing tests to verify no regressions**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests -v normal`
|
||||
Expected: All tests PASS.
|
||||
|
||||
**Step 9: Commit**
|
||||
|
||||
```bash
|
||||
git add src/NATS.Server/Protocol/NatsProtocol.cs src/NATS.Server/NatsClient.cs tests/NATS.Server.Tests/ClientTests.cs
|
||||
git commit -m "feat: add -ERR response infrastructure with SendErrAsync and SendErrAndCloseAsync"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 2: MaxConnections Enforcement
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/NATS.Server/NatsServer.cs:42-75`
|
||||
- Modify: `tests/NATS.Server.Tests/ServerTests.cs`
|
||||
|
||||
**Step 1: Write the failing test**
|
||||
|
||||
Add to `tests/NATS.Server.Tests/ServerTests.cs`. This test needs its own server with a low MaxConnections. Add a new test class at the end of the file:
|
||||
|
||||
```csharp
|
||||
public class MaxConnectionsTests : IAsyncLifetime
|
||||
{
|
||||
private readonly NatsServer _server;
|
||||
private readonly int _port;
|
||||
private readonly CancellationTokenSource _cts = new();
|
||||
|
||||
public MaxConnectionsTests()
|
||||
{
|
||||
_port = GetFreePort();
|
||||
_server = new NatsServer(new NatsOptions { Port = _port, MaxConnections = 2 }, NullLoggerFactory.Instance);
|
||||
}
|
||||
|
||||
public async Task InitializeAsync()
|
||||
{
|
||||
_ = _server.StartAsync(_cts.Token);
|
||||
await _server.WaitForReadyAsync();
|
||||
}
|
||||
|
||||
public async Task DisposeAsync()
|
||||
{
|
||||
await _cts.CancelAsync();
|
||||
_server.Dispose();
|
||||
}
|
||||
|
||||
private static int GetFreePort()
|
||||
{
|
||||
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
sock.Bind(new IPEndPoint(IPAddress.Loopback, 0));
|
||||
return ((IPEndPoint)sock.LocalEndPoint!).Port;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Server_rejects_connection_when_max_reached()
|
||||
{
|
||||
// Connect two clients (at limit)
|
||||
var client1 = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
await client1.ConnectAsync(IPAddress.Loopback, _port);
|
||||
var buf = new byte[4096];
|
||||
var n = await client1.ReceiveAsync(buf, SocketFlags.None);
|
||||
Encoding.ASCII.GetString(buf, 0, n).ShouldStartWith("INFO ");
|
||||
|
||||
var client2 = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
await client2.ConnectAsync(IPAddress.Loopback, _port);
|
||||
n = await client2.ReceiveAsync(buf, SocketFlags.None);
|
||||
Encoding.ASCII.GetString(buf, 0, n).ShouldStartWith("INFO ");
|
||||
|
||||
// Third client should be rejected
|
||||
var client3 = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
await client3.ConnectAsync(IPAddress.Loopback, _port);
|
||||
|
||||
n = await client3.ReceiveAsync(buf, SocketFlags.None);
|
||||
var response = Encoding.ASCII.GetString(buf, 0, n);
|
||||
response.ShouldContain("-ERR 'maximum connections exceeded'");
|
||||
|
||||
// Connection should be closed
|
||||
n = await client3.ReceiveAsync(buf, SocketFlags.None);
|
||||
n.ShouldBe(0);
|
||||
|
||||
client1.Dispose();
|
||||
client2.Dispose();
|
||||
client3.Dispose();
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Step 2: Run test to verify it fails**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~Server_rejects_connection_when_max_reached" -v normal`
|
||||
Expected: FAIL — third client gets INFO instead of -ERR.
|
||||
|
||||
**Step 3: Add MaxConnections check in the accept loop**
|
||||
|
||||
In `src/NATS.Server/NatsServer.cs`, modify the accept loop (lines 56-69). Insert the check after `AcceptAsync` and before creating the `NatsClient`:
|
||||
|
||||
```csharp
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
var socket = await _listener.AcceptAsync(ct);
|
||||
|
||||
// Check MaxConnections before creating the client
|
||||
if (_options.MaxConnections > 0 && _clients.Count >= _options.MaxConnections)
|
||||
{
|
||||
_logger.LogWarning("Client connection rejected: maximum connections ({MaxConnections}) exceeded",
|
||||
_options.MaxConnections);
|
||||
try
|
||||
{
|
||||
var stream = new NetworkStream(socket, ownsSocket: false);
|
||||
var errBytes = Encoding.ASCII.GetBytes(
|
||||
$"-ERR '{NatsProtocol.ErrMaxConnectionsExceeded}'\r\n");
|
||||
await stream.WriteAsync(errBytes, ct);
|
||||
await stream.FlushAsync(ct);
|
||||
stream.Dispose();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogDebug(ex, "Failed to send -ERR to rejected client");
|
||||
}
|
||||
finally
|
||||
{
|
||||
socket.Dispose();
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
var clientId = Interlocked.Increment(ref _nextClientId);
|
||||
|
||||
_logger.LogDebug("Client {ClientId} connected from {RemoteEndpoint}", clientId, socket.RemoteEndPoint);
|
||||
|
||||
var clientLogger = _loggerFactory.CreateLogger($"NATS.Server.NatsClient[{clientId}]");
|
||||
var client = new NatsClient(clientId, socket, _options, _serverInfo, clientLogger);
|
||||
client.Router = this;
|
||||
_clients[clientId] = client;
|
||||
|
||||
_ = RunClientAsync(client, ct);
|
||||
}
|
||||
```
|
||||
|
||||
Add `using System.Text;` to the top of `NatsServer.cs` if not already present.
|
||||
|
||||
**Step 4: Run test to verify it passes**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~Server_rejects_connection_when_max_reached" -v normal`
|
||||
Expected: PASS.
|
||||
|
||||
**Step 5: Run all tests**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests -v normal`
|
||||
Expected: All tests PASS.
|
||||
|
||||
**Step 6: Commit**
|
||||
|
||||
```bash
|
||||
git add src/NATS.Server/NatsServer.cs tests/NATS.Server.Tests/ServerTests.cs
|
||||
git commit -m "feat: enforce MaxConnections limit in accept loop"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 3: Subject Validation and Max Payload on PUB
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/NATS.Server/NatsClient.cs:213-228`
|
||||
- Modify: `tests/NATS.Server.Tests/ServerTests.cs`
|
||||
|
||||
**Step 1: Write the failing tests**
|
||||
|
||||
Add to `tests/NATS.Server.Tests/ServerTests.cs` in the existing `ServerTests` class:
|
||||
|
||||
```csharp
|
||||
[Fact]
|
||||
public async Task Server_pedantic_rejects_invalid_publish_subject()
|
||||
{
|
||||
using var pub = await ConnectClientAsync();
|
||||
using var sub = await ConnectClientAsync();
|
||||
|
||||
// Read INFO from both
|
||||
await ReadLineAsync(pub);
|
||||
await ReadLineAsync(sub);
|
||||
|
||||
// Connect with pedantic mode ON
|
||||
await pub.SendAsync(Encoding.ASCII.GetBytes(
|
||||
"CONNECT {\"pedantic\":true}\r\nPING\r\n"));
|
||||
var pong = await ReadUntilAsync(pub, "PONG");
|
||||
|
||||
// Subscribe on sub
|
||||
await sub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB foo.* 1\r\nPING\r\n"));
|
||||
await ReadUntilAsync(sub, "PONG");
|
||||
|
||||
// PUB with wildcard subject (invalid for publish)
|
||||
await pub.SendAsync(Encoding.ASCII.GetBytes("PUB foo.* 5\r\nHello\r\n"));
|
||||
|
||||
// Publisher should get -ERR
|
||||
var errResponse = await ReadUntilAsync(pub, "-ERR", timeoutMs: 3000);
|
||||
errResponse.ShouldContain("-ERR 'Invalid Publish Subject'");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Server_nonpedantic_allows_wildcard_publish_subject()
|
||||
{
|
||||
using var pub = await ConnectClientAsync();
|
||||
using var sub = await ConnectClientAsync();
|
||||
|
||||
await ReadLineAsync(pub);
|
||||
await ReadLineAsync(sub);
|
||||
|
||||
// Connect without pedantic mode (default)
|
||||
await sub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB foo.* 1\r\nPING\r\n"));
|
||||
await ReadUntilAsync(sub, "PONG");
|
||||
|
||||
await pub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPUB foo.* 5\r\nHello\r\n"));
|
||||
|
||||
// Sub should still receive the message (no validation in non-pedantic mode)
|
||||
var msg = await ReadUntilAsync(sub, "Hello\r\n");
|
||||
msg.ShouldContain("MSG foo.* 1 5\r\nHello\r\n");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Server_rejects_max_payload_violation()
|
||||
{
|
||||
// Create server with tiny max payload
|
||||
var port = GetFreePort();
|
||||
using var cts = new CancellationTokenSource();
|
||||
var server = new NatsServer(new NatsOptions { Port = port, MaxPayload = 10 }, NullLoggerFactory.Instance);
|
||||
_ = server.StartAsync(cts.Token);
|
||||
await server.WaitForReadyAsync();
|
||||
|
||||
try
|
||||
{
|
||||
var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
await client.ConnectAsync(IPAddress.Loopback, port);
|
||||
|
||||
var buf = new byte[4096];
|
||||
await client.ReceiveAsync(buf, SocketFlags.None); // INFO
|
||||
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\n"));
|
||||
|
||||
// Send PUB with payload larger than MaxPayload (10 bytes)
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes("PUB foo 20\r\n12345678901234567890\r\n"));
|
||||
|
||||
var n = await client.ReceiveAsync(buf, SocketFlags.None);
|
||||
var response = Encoding.ASCII.GetString(buf, 0, n);
|
||||
response.ShouldContain("-ERR 'Maximum Payload Violation'");
|
||||
|
||||
// Connection should be closed
|
||||
n = await client.ReceiveAsync(buf, SocketFlags.None);
|
||||
n.ShouldBe(0);
|
||||
|
||||
client.Dispose();
|
||||
}
|
||||
finally
|
||||
{
|
||||
await cts.CancelAsync();
|
||||
server.Dispose();
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Step 2: Run tests to verify they fail**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~pedantic|FullyQualifiedName~max_payload" -v normal`
|
||||
Expected: FAIL — no validation exists.
|
||||
|
||||
**Step 3: Add validation to ProcessPub**
|
||||
|
||||
In `src/NATS.Server/NatsClient.cs`, replace the `ProcessPub` method (lines 213-228):
|
||||
|
||||
```csharp
|
||||
private async ValueTask ProcessPubAsync(ParsedCommand cmd)
|
||||
{
|
||||
Interlocked.Increment(ref InMsgs);
|
||||
Interlocked.Add(ref InBytes, cmd.Payload.Length);
|
||||
|
||||
// Max payload validation (always, hard close)
|
||||
if (cmd.Payload.Length > _options.MaxPayload)
|
||||
{
|
||||
_logger.LogWarning("Client {ClientId} exceeded max payload: {Size} > {MaxPayload}",
|
||||
Id, cmd.Payload.Length, _options.MaxPayload);
|
||||
await SendErrAndCloseAsync(NatsProtocol.ErrMaxPayloadViolation);
|
||||
return;
|
||||
}
|
||||
|
||||
// Pedantic mode: validate publish subject
|
||||
if (ClientOpts?.Pedantic == true && !SubjectMatch.IsValidPublishSubject(cmd.Subject!))
|
||||
{
|
||||
_logger.LogDebug("Client {ClientId} invalid publish subject: {Subject}", Id, cmd.Subject);
|
||||
await SendErrAsync(NatsProtocol.ErrInvalidPublishSubject);
|
||||
return;
|
||||
}
|
||||
|
||||
ReadOnlyMemory<byte> headers = default;
|
||||
ReadOnlyMemory<byte> payload = cmd.Payload;
|
||||
|
||||
if (cmd.Type == CommandType.HPub && cmd.HeaderSize > 0)
|
||||
{
|
||||
headers = cmd.Payload[..cmd.HeaderSize];
|
||||
payload = cmd.Payload[cmd.HeaderSize..];
|
||||
}
|
||||
|
||||
Router?.ProcessMessage(cmd.Subject!, cmd.ReplyTo, headers, payload, this);
|
||||
}
|
||||
```
|
||||
|
||||
Since `ProcessPub` is now async, update `DispatchCommandAsync` (line 160-163) to await it:
|
||||
|
||||
```csharp
|
||||
case CommandType.Pub:
|
||||
case CommandType.HPub:
|
||||
await ProcessPubAsync(cmd);
|
||||
break;
|
||||
```
|
||||
|
||||
**Step 4: Run tests to verify they pass**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~pedantic|FullyQualifiedName~max_payload" -v normal`
|
||||
Expected: All three PASS.
|
||||
|
||||
**Step 5: Run all tests**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests -v normal`
|
||||
Expected: All tests PASS.
|
||||
|
||||
**Step 6: Commit**
|
||||
|
||||
```bash
|
||||
git add src/NATS.Server/NatsClient.cs tests/NATS.Server.Tests/ServerTests.cs
|
||||
git commit -m "feat: add pedantic subject validation and max payload enforcement on PUB"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 4: Server-Side PING Keepalive
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/NATS.Server/NatsClient.cs`
|
||||
- Modify: `tests/NATS.Server.Tests/ServerTests.cs`
|
||||
|
||||
**Step 1: Write the failing tests**
|
||||
|
||||
Add a new test class at the end of `tests/NATS.Server.Tests/ServerTests.cs`:
|
||||
|
||||
```csharp
|
||||
public class PingKeepaliveTests : IAsyncLifetime
|
||||
{
|
||||
private readonly NatsServer _server;
|
||||
private readonly int _port;
|
||||
private readonly CancellationTokenSource _cts = new();
|
||||
|
||||
public PingKeepaliveTests()
|
||||
{
|
||||
_port = GetFreePort();
|
||||
// Short intervals for testing: 500ms ping interval, 2 max pings out
|
||||
_server = new NatsServer(
|
||||
new NatsOptions
|
||||
{
|
||||
Port = _port,
|
||||
PingInterval = TimeSpan.FromMilliseconds(500),
|
||||
MaxPingsOut = 2,
|
||||
},
|
||||
NullLoggerFactory.Instance);
|
||||
}
|
||||
|
||||
public async Task InitializeAsync()
|
||||
{
|
||||
_ = _server.StartAsync(_cts.Token);
|
||||
await _server.WaitForReadyAsync();
|
||||
}
|
||||
|
||||
public async Task DisposeAsync()
|
||||
{
|
||||
await _cts.CancelAsync();
|
||||
_server.Dispose();
|
||||
}
|
||||
|
||||
private static int GetFreePort()
|
||||
{
|
||||
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
sock.Bind(new IPEndPoint(IPAddress.Loopback, 0));
|
||||
return ((IPEndPoint)sock.LocalEndPoint!).Port;
|
||||
}
|
||||
|
||||
private static async Task<string> ReadUntilAsync(Socket sock, string expected, int timeoutMs = 5000)
|
||||
{
|
||||
using var cts = new CancellationTokenSource(timeoutMs);
|
||||
var sb = new StringBuilder();
|
||||
var buf = new byte[4096];
|
||||
while (!sb.ToString().Contains(expected))
|
||||
{
|
||||
var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token);
|
||||
if (n == 0) break;
|
||||
sb.Append(Encoding.ASCII.GetString(buf, 0, n));
|
||||
}
|
||||
return sb.ToString();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Server_sends_PING_after_inactivity()
|
||||
{
|
||||
var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
await client.ConnectAsync(IPAddress.Loopback, _port);
|
||||
|
||||
// Read INFO
|
||||
var buf = new byte[4096];
|
||||
await client.ReceiveAsync(buf, SocketFlags.None);
|
||||
|
||||
// Send CONNECT to start keepalive
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\n"));
|
||||
|
||||
// Wait for server to send PING (should come within ~500ms)
|
||||
var response = await ReadUntilAsync(client, "PING", timeoutMs: 3000);
|
||||
response.ShouldContain("PING");
|
||||
|
||||
client.Dispose();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Server_pong_resets_ping_counter()
|
||||
{
|
||||
var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
await client.ConnectAsync(IPAddress.Loopback, _port);
|
||||
|
||||
var buf = new byte[4096];
|
||||
await client.ReceiveAsync(buf, SocketFlags.None); // INFO
|
||||
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\n"));
|
||||
|
||||
// Wait for first PING
|
||||
var response = await ReadUntilAsync(client, "PING", timeoutMs: 3000);
|
||||
response.ShouldContain("PING");
|
||||
|
||||
// Respond with PONG — this resets the counter
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes("PONG\r\n"));
|
||||
|
||||
// Wait for next PING (counter reset, so we should get another one)
|
||||
response = await ReadUntilAsync(client, "PING", timeoutMs: 3000);
|
||||
response.ShouldContain("PING");
|
||||
|
||||
// Respond again to keep alive
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes("PONG\r\n"));
|
||||
|
||||
// Client should still be alive — send a PING and expect PONG back
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
|
||||
response = await ReadUntilAsync(client, "PONG", timeoutMs: 3000);
|
||||
response.ShouldContain("PONG");
|
||||
|
||||
client.Dispose();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Server_disconnects_stale_client()
|
||||
{
|
||||
var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
await client.ConnectAsync(IPAddress.Loopback, _port);
|
||||
|
||||
var buf = new byte[4096];
|
||||
await client.ReceiveAsync(buf, SocketFlags.None); // INFO
|
||||
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\n"));
|
||||
|
||||
// Don't respond to PINGs — wait for stale disconnect
|
||||
// With 500ms interval and MaxPingsOut=2:
|
||||
// t=500ms: PING #1, pingsOut=1
|
||||
// t=1000ms: PING #2, pingsOut=2
|
||||
// t=1500ms: pingsOut+1 > MaxPingsOut → -ERR 'Stale Connection' + close
|
||||
var sb = new StringBuilder();
|
||||
try
|
||||
{
|
||||
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||
while (true)
|
||||
{
|
||||
var n = await client.ReceiveAsync(buf, SocketFlags.None, timeout.Token);
|
||||
if (n == 0) break;
|
||||
sb.Append(Encoding.ASCII.GetString(buf, 0, n));
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// Timeout is acceptable — check what we got
|
||||
}
|
||||
|
||||
var allData = sb.ToString();
|
||||
allData.ShouldContain("-ERR 'Stale Connection'");
|
||||
|
||||
client.Dispose();
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Step 2: Run tests to verify they fail**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~PingKeepalive" -v normal`
|
||||
Expected: FAIL — server never sends PING.
|
||||
|
||||
**Step 3: Add ping state fields to NatsClient**
|
||||
|
||||
In `src/NATS.Server/NatsClient.cs`, add after the stats fields (after line 44):
|
||||
|
||||
```csharp
|
||||
// PING keepalive state
|
||||
private int _pingsOut;
|
||||
private long _lastIn;
|
||||
```
|
||||
|
||||
**Step 4: Add RunPingTimerAsync method**
|
||||
|
||||
Add after `SendErrAndCloseAsync`:
|
||||
|
||||
```csharp
|
||||
private async Task RunPingTimerAsync(CancellationToken ct)
|
||||
{
|
||||
using var timer = new PeriodicTimer(_options.PingInterval);
|
||||
try
|
||||
{
|
||||
while (await timer.WaitForNextTickAsync(ct))
|
||||
{
|
||||
var elapsed = Environment.TickCount64 - Interlocked.Read(ref _lastIn);
|
||||
if (elapsed < (long)_options.PingInterval.TotalMilliseconds)
|
||||
{
|
||||
// Client was recently active, skip ping
|
||||
Interlocked.Exchange(ref _pingsOut, 0);
|
||||
continue;
|
||||
}
|
||||
|
||||
var currentPingsOut = Interlocked.Increment(ref _pingsOut);
|
||||
if (currentPingsOut > _options.MaxPingsOut)
|
||||
{
|
||||
_logger.LogDebug("Client {ClientId} stale connection — closing", Id);
|
||||
await SendErrAndCloseAsync(NatsProtocol.ErrStaleConnection);
|
||||
return;
|
||||
}
|
||||
|
||||
_logger.LogDebug("Client {ClientId} sending PING ({PingsOut}/{MaxPingsOut})",
|
||||
Id, currentPingsOut, _options.MaxPingsOut);
|
||||
try
|
||||
{
|
||||
await WriteAsync(NatsProtocol.PingBytes, ct);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogDebug(ex, "Client {ClientId} failed to send PING", Id);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// Normal shutdown
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Step 5: Update PONG handling in DispatchCommandAsync**
|
||||
|
||||
Replace the `Pong` case (lines 148-149):
|
||||
|
||||
```csharp
|
||||
case CommandType.Pong:
|
||||
Interlocked.Exchange(ref _pingsOut, 0);
|
||||
break;
|
||||
```
|
||||
|
||||
**Step 6: Update _lastIn on every parsed command**
|
||||
|
||||
In `ProcessCommandsAsync` (line 110-134), add `_lastIn` update inside the inner loop, right after `TryParse` succeeds (inside `while (_parser.TryParse(...))`):
|
||||
|
||||
```csharp
|
||||
while (_parser.TryParse(ref buffer, out var cmd))
|
||||
{
|
||||
Interlocked.Exchange(ref _lastIn, Environment.TickCount64);
|
||||
await DispatchCommandAsync(cmd, ct);
|
||||
}
|
||||
```
|
||||
|
||||
**Step 7: Launch ping timer in RunAsync after CONNECT**
|
||||
|
||||
The ping timer must start after CONNECT is received. The cleanest approach: start it unconditionally in `RunAsync` alongside the other tasks. Even if CONNECT hasn't been received yet, the timer won't fire until `PingInterval` has elapsed, and `_lastIn` will be updated by command processing.
|
||||
|
||||
Update `RunAsync` to launch the ping timer as a third concurrent task:
|
||||
|
||||
```csharp
|
||||
public async Task RunAsync(CancellationToken ct)
|
||||
{
|
||||
_clientCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
||||
Interlocked.Exchange(ref _lastIn, Environment.TickCount64);
|
||||
var pipe = new Pipe();
|
||||
try
|
||||
{
|
||||
// Send INFO
|
||||
await SendInfoAsync(_clientCts.Token);
|
||||
|
||||
// Start read pump, command processing, and ping timer in parallel
|
||||
var fillTask = FillPipeAsync(pipe.Writer, _clientCts.Token);
|
||||
var processTask = ProcessCommandsAsync(pipe.Reader, _clientCts.Token);
|
||||
var pingTask = RunPingTimerAsync(_clientCts.Token);
|
||||
|
||||
await Task.WhenAny(fillTask, processTask, pingTask);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
_logger.LogDebug("Client {ClientId} operation cancelled", Id);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogDebug(ex, "Client {ClientId} connection error", Id);
|
||||
}
|
||||
finally
|
||||
{
|
||||
Router?.RemoveClient(this);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Step 8: Run tests to verify they pass**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~PingKeepalive" -v normal`
|
||||
Expected: All three PASS.
|
||||
|
||||
**Step 9: Run all tests to verify no regressions**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests -v normal`
|
||||
Expected: All tests PASS.
|
||||
|
||||
Note: The existing integration tests (`IntegrationTests.cs`) use `NATS.Client.Core` which automatically responds to server PINGs, so they won't be affected by the new keepalive feature. However, if existing `ClientTests` or `ServerTests` that use raw sockets hang because of unexpected PINGs, the fix is that those tests use the default `NatsOptions` which has `PingInterval = 2 minutes` — far longer than any test runs. This should not be an issue.
|
||||
|
||||
**Step 10: Commit**
|
||||
|
||||
```bash
|
||||
git add src/NATS.Server/NatsClient.cs tests/NATS.Server.Tests/ServerTests.cs
|
||||
git commit -m "feat: add server-side PING keepalive with stale connection detection"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Summary
|
||||
|
||||
| Task | Description | Dependencies | Estimated Steps |
|
||||
|------|-------------|--------------|----------------|
|
||||
| Task 1 | -ERR infrastructure | None | 9 steps |
|
||||
| Task 2 | MaxConnections enforcement | Task 1 | 6 steps |
|
||||
| Task 3 | Subject validation + max payload | Task 1 | 6 steps |
|
||||
| Task 4 | PING keepalive | Task 1 | 10 steps |
|
||||
|
||||
Tasks 2, 3, and 4 are independent of each other and can be done in any order (or in parallel) after Task 1 is complete.
|
||||
10
docs/plans/2026-02-22-harden-base-server-plan.md.tasks.json
Normal file
10
docs/plans/2026-02-22-harden-base-server-plan.md.tasks.json
Normal file
@@ -0,0 +1,10 @@
|
||||
{
|
||||
"planPath": "docs/plans/2026-02-22-harden-base-server-plan.md",
|
||||
"tasks": [
|
||||
{"id": 5, "subject": "Task 1: -ERR Response Infrastructure", "status": "pending"},
|
||||
{"id": 6, "subject": "Task 2: MaxConnections Enforcement", "status": "pending", "blockedBy": [5]},
|
||||
{"id": 7, "subject": "Task 3: Subject Validation and Max Payload on PUB", "status": "pending", "blockedBy": [5]},
|
||||
{"id": 8, "subject": "Task 4: Server-Side PING Keepalive", "status": "pending", "blockedBy": [5]}
|
||||
],
|
||||
"lastUpdated": "2026-02-22T00:00:00Z"
|
||||
}
|
||||
243
docs/plans/2026-02-22-monitoring-tls-design.md
Normal file
243
docs/plans/2026-02-22-monitoring-tls-design.md
Normal file
@@ -0,0 +1,243 @@
|
||||
# Monitoring HTTP & TLS Support Design
|
||||
|
||||
**Date:** 2026-02-22
|
||||
**Scope:** Port monitoring endpoints (`/varz`, `/connz`) and full TLS support from Go NATS server
|
||||
**Go Reference:** `golang/nats-server/server/monitor.go`, `server.go` (TLS), `client.go` (TLS), `opts.go`
|
||||
|
||||
## Overview
|
||||
|
||||
Two features ported from Go NATS:
|
||||
|
||||
1. **Monitoring HTTP** — Kestrel Minimal API embedded in `NatsServer`, serving `/varz`, `/connz`, `/healthz` and stub endpoints. Exact Go JSON schema for tooling compatibility.
|
||||
2. **TLS Support** — `SslStream` wrapping with four modes: no TLS, TLS required, TLS-first, and mixed TLS/plaintext. Certificate pinning, client cert verification, rate limiting.
|
||||
|
||||
## 1. Server-Level Stats Aggregation
|
||||
|
||||
New `ServerStats` class with atomic counters, replacing the need to sum across all clients on each `/varz` request.
|
||||
|
||||
### ServerStats Fields
|
||||
|
||||
```csharp
|
||||
// src/NATS.Server/ServerStats.cs
|
||||
public sealed class ServerStats
|
||||
{
|
||||
public long InMsgs;
|
||||
public long OutMsgs;
|
||||
public long InBytes;
|
||||
public long OutBytes;
|
||||
public long TotalConnections;
|
||||
public long SlowConsumers;
|
||||
public long StaleConnections;
|
||||
public long Stalls;
|
||||
public long SlowConsumerClients;
|
||||
public long SlowConsumerRoutes;
|
||||
public long SlowConsumerLeafs;
|
||||
public long SlowConsumerGateways;
|
||||
public readonly ConcurrentDictionary<string, long> HttpReqStats = new();
|
||||
}
|
||||
```
|
||||
|
||||
### Integration Points
|
||||
|
||||
- `NatsServer` owns a `ServerStats` instance, passes it to each `NatsClient`
|
||||
- `NatsClient.ProcessPub` increments server-level `InMsgs`/`InBytes` alongside client-level counters
|
||||
- `NatsClient.SendMessageAsync` increments server-level `OutMsgs`/`OutBytes`
|
||||
- Accept loop increments `TotalConnections`
|
||||
- `NatsServer.StartTime` field added (set once at startup)
|
||||
|
||||
## 2. Monitoring HTTP Endpoints
|
||||
|
||||
### HTTP Stack
|
||||
|
||||
Kestrel Minimal APIs via `FrameworkReference` to `Microsoft.AspNetCore.App`. No NuGet packages needed.
|
||||
|
||||
### Endpoints
|
||||
|
||||
| Path | Handler | Description |
|
||||
|------|---------|-------------|
|
||||
| `/` | `HandleRoot` | Links to all endpoints |
|
||||
| `/varz` | `HandleVarz` | Server stats and config |
|
||||
| `/connz` | `HandleConnz` | Connection info (paginated) |
|
||||
| `/healthz` | `HandleHealthz` | Health check (200 OK) |
|
||||
| `/routez` | stub | Returns `{}` |
|
||||
| `/gatewayz` | stub | Returns `{}` |
|
||||
| `/leafz` | stub | Returns `{}` |
|
||||
| `/subz` | stub | Returns `{}` |
|
||||
| `/accountz` | stub | Returns `{}` |
|
||||
| `/jsz` | stub | Returns `{}` |
|
||||
|
||||
All paths support optional base path prefix via `MonitorBasePath` config.
|
||||
|
||||
### Configuration
|
||||
|
||||
```csharp
|
||||
// Added to NatsOptions
|
||||
public int MonitorPort { get; set; } // 0 = disabled, CLI: -m
|
||||
public string MonitorHost { get; set; } = "0.0.0.0";
|
||||
public string? MonitorBasePath { get; set; }
|
||||
public int MonitorHttpsPort { get; set; } // 0 = disabled
|
||||
```
|
||||
|
||||
### Varz Model
|
||||
|
||||
Exact Go JSON field names. All fields from Go's `Varz` struct including nested config structs (`ClusterOptsVarz`, `GatewayOptsVarz`, `LeafNodeOptsVarz`, `MqttOptsVarz`, `WebsocketOptsVarz`, `JetStreamVarz`). Nested structs return defaults/zeros until those subsystems are ported.
|
||||
|
||||
Key field categories: identification, network config, security/limits, timing/lifecycle, runtime metrics (mem, CPU, cores), connection stats, message stats, health counters, subsystem configs, HTTP request stats.
|
||||
|
||||
### Connz Model
|
||||
|
||||
Paginated connection list with query parameter support:
|
||||
|
||||
- `sort` — sort field (cid, bytes_to, msgs_to, etc.)
|
||||
- `subs` / `subs=detail` — include subscription lists
|
||||
- `offset` / `limit` — pagination (default limit 1024)
|
||||
- `state` — filter open/closed/all
|
||||
- `auth` — include usernames
|
||||
|
||||
`ConnInfo` includes all Go fields: cid, kind, ip, port, start, last_activity, rtt, uptime, idle, pending, msg/byte stats, subscription count, client name/lang/version, TLS version/cipher, account.
|
||||
|
||||
### Concurrency
|
||||
|
||||
- `HandleVarz` acquires a `SemaphoreSlim(1,1)` to serialize JSON building (matches Go's `varzMu`)
|
||||
- `HandleConnz` snapshots `_clients.Values.ToArray()` to avoid holding the dictionary during serialization
|
||||
- CPU percentage sampled via `Process.TotalProcessorTime` delta, cached for 1 second
|
||||
|
||||
### NatsClient Additions for ConnInfo
|
||||
|
||||
```csharp
|
||||
public DateTime StartTime { get; } // set in constructor
|
||||
public DateTime LastActivity; // updated on every command dispatch
|
||||
public string? RemoteIp { get; } // from socket.RemoteEndPoint
|
||||
public int RemotePort { get; } // from socket.RemoteEndPoint
|
||||
```
|
||||
|
||||
## 3. TLS Support
|
||||
|
||||
### Configuration
|
||||
|
||||
```csharp
|
||||
// Added to NatsOptions
|
||||
public string? TlsCert { get; set; }
|
||||
public string? TlsKey { get; set; }
|
||||
public string? TlsCaCert { get; set; }
|
||||
public bool TlsVerify { get; set; }
|
||||
public bool TlsMap { get; set; }
|
||||
public double TlsTimeout { get; set; } = 2.0;
|
||||
public bool TlsHandshakeFirst { get; set; }
|
||||
public TimeSpan TlsHandshakeFirstFallback { get; set; } = TimeSpan.FromMilliseconds(50);
|
||||
public bool AllowNonTls { get; set; }
|
||||
public long TlsRateLimit { get; set; }
|
||||
public HashSet<string>? TlsPinnedCerts { get; set; }
|
||||
public SslProtocols TlsMinVersion { get; set; } = SslProtocols.Tls12;
|
||||
```
|
||||
|
||||
CLI args: `--tls`, `--tlscert`, `--tlskey`, `--tlscacert`, `--tlsverify`
|
||||
|
||||
### INFO Message Changes
|
||||
|
||||
Three new fields on `ServerInfo`: `tls_required`, `tls_verify`, `tls_available`.
|
||||
|
||||
- `tls_required = (TlsConfig != null && !AllowNonTls)`
|
||||
- `tls_verify = (TlsConfig != null && TlsVerify)`
|
||||
- `tls_available = (TlsConfig != null && AllowNonTls)`
|
||||
|
||||
### Four TLS Modes
|
||||
|
||||
**Mode 1: No TLS** — current behavior, unchanged.
|
||||
|
||||
**Mode 2: TLS Required** — send INFO with `tls_required=true`, client initiates TLS, server detects 0x16 byte, performs `SslStream` handshake, validates pinned certs, continues protocol over encrypted stream.
|
||||
|
||||
**Mode 3: TLS First** — do NOT send INFO, wait up to 50ms for data. If 0x16 byte arrives: TLS handshake then send INFO over encrypted stream. If timeout or non-TLS byte: fallback to Mode 2 flow.
|
||||
|
||||
**Mode 4: Mixed** — send INFO with `tls_available=true`, peek first byte. 0x16 → TLS handshake. Other → continue plaintext.
|
||||
|
||||
### Key Components
|
||||
|
||||
**`TlsHelper`** — static class for cert loading (`X509Certificate2` from PEM/PFX), CA cert loading, building `SslServerAuthenticationOptions`, pinned cert validation (SHA256 of SubjectPublicKeyInfo).
|
||||
|
||||
**`TlsConnectionWrapper`** — per-connection negotiation state machine. Takes socket + options, returns `(Stream stream, bool infoAlreadySent)`. Handles peek logic, timeout, handshake, cert validation.
|
||||
|
||||
**`PeekableStream`** — wraps `NetworkStream`, buffers peeked bytes, replays them on first `ReadAsync`. Required so `SslStream.AuthenticateAsServerAsync` sees the full TLS ClientHello including the peeked byte.
|
||||
|
||||
**`TlsRateLimiter`** — token-bucket rate limiter. Refills `TlsRateLimit` tokens per second. `WaitAsync` blocks if no tokens. Only applies to TLS handshakes, not plain connections.
|
||||
|
||||
**`TlsConnectionState`** — post-handshake record: `TlsVersion`, `CipherSuite`, `PeerCert`. Stored on `NatsClient` for `/connz` reporting.
|
||||
|
||||
### NatsClient Changes
|
||||
|
||||
Constructor takes `Stream` instead of building `NetworkStream` internally. TLS negotiation happens before `NatsClient` is constructed. `NatsClient` receives the already-negotiated stream and `TlsConnectionState`.
|
||||
|
||||
### Accept Loop Changes
|
||||
|
||||
```
|
||||
Accept socket
|
||||
→ Increment TotalConnections
|
||||
→ Rate limit check (if TLS configured)
|
||||
→ TlsConnectionWrapper.NegotiateAsync (returns stream + infoAlreadySent)
|
||||
→ Extract TlsConnectionState from SslStream if applicable
|
||||
→ Construct NatsClient with stream + tlsState
|
||||
→ client.InfoAlreadySent flag set if TLS-first sent INFO during negotiation
|
||||
→ RunClientAsync
|
||||
```
|
||||
|
||||
## 4. File Layout
|
||||
|
||||
```
|
||||
src/NATS.Server/
|
||||
ServerStats.cs
|
||||
Monitoring/
|
||||
MonitorServer.cs # Kestrel host, route registration
|
||||
Varz.cs # Varz + nested config structs
|
||||
Connz.cs # Connz, ConnInfo, ConnzOptions, SubDetail
|
||||
VarzHandler.cs # Snapshot logic, CPU/mem sampling
|
||||
ConnzHandler.cs # Query param parsing, sort, pagination
|
||||
Tls/
|
||||
TlsHelper.cs # Cert loading, auth options builder
|
||||
TlsConnectionWrapper.cs # Per-connection TLS negotiation
|
||||
TlsConnectionState.cs # Post-handshake state record
|
||||
TlsRateLimiter.cs # Token-bucket rate limiter
|
||||
PeekableStream.cs # Buffered-peek stream wrapper
|
||||
```
|
||||
|
||||
### Package Dependencies
|
||||
|
||||
- `FrameworkReference` to `Microsoft.AspNetCore.App` in `NATS.Server.csproj` (for Kestrel)
|
||||
- No new NuGet packages — `SslStream`, `X509Certificate2`, `SslServerAuthenticationOptions` all in `System.Net.Security`
|
||||
- Tests use `HttpClient` (built-in) and `CertificateRequest` (built-in) for self-signed test certs
|
||||
|
||||
## 5. Testing Strategy
|
||||
|
||||
### Monitoring Tests (`MonitorTests.cs`)
|
||||
|
||||
- `/varz` returns correct server identity, config limits, zero stats on fresh server
|
||||
- After pub/sub traffic: message/byte counters are accurate
|
||||
- `/connz` pagination: `?limit=2&offset=0` with 5 clients returns 2, total=5
|
||||
- `/connz?sort=bytes_to` ordering
|
||||
- `/connz?subs=true` includes subscription subjects
|
||||
- `/healthz` returns 200
|
||||
- HTTP request stats tracked in `/varz` response
|
||||
|
||||
### TLS Tests (`TlsTests.cs`)
|
||||
|
||||
Self-signed certs generated in-memory via `CertificateRequest` + `RSA.Create()`.
|
||||
|
||||
- Basic TLS: server cert, client connects with SslStream, pub/sub works
|
||||
- TLS Required: plaintext client rejected
|
||||
- TLS Verify: valid client cert succeeds, wrong cert fails
|
||||
- Mixed mode: TLS and plaintext clients coexist
|
||||
- TLS First: immediate TLS handshake without reading INFO first
|
||||
- TLS First fallback: slow client gets INFO sent, normal negotiation
|
||||
- Certificate pinning: matching cert accepted, non-matching rejected
|
||||
- Rate limiting: rapid connections throttled
|
||||
- TLS timeout: incomplete handshake closed after configured timeout
|
||||
- Integration: NATS.Client.Core NuGet client works over TLS
|
||||
- Monitoring: `/connz` shows `tls_version` and `tls_cipher_suite`
|
||||
|
||||
## 6. Error Handling
|
||||
|
||||
- **TLS handshake failures** are non-fatal: log warning, close socket, increment counter
|
||||
- **Mixed mode byte detection**: 0x16 → TLS, printable ASCII → plain, connection close → clean disconnect
|
||||
- **Rate limiter**: holds TCP connection open until token available (not rejected)
|
||||
- **Monitoring concurrency**: `varzMu` semaphore serializes `/varz`, client snapshot for `/connz`
|
||||
- **CPU sampling**: cached 1 second to avoid overhead on rapid polls
|
||||
- **Graceful shutdown**: `MonitorServer.DisposeAsync()` stops Kestrel, rate limiter disposes timer, in-flight handshakes cancelled via CancellationToken
|
||||
2661
docs/plans/2026-02-22-monitoring-tls-plan.md
Normal file
2661
docs/plans/2026-02-22-monitoring-tls-plan.md
Normal file
File diff suppressed because it is too large
Load Diff
18
docs/plans/2026-02-22-monitoring-tls-plan.md.tasks.json
Normal file
18
docs/plans/2026-02-22-monitoring-tls-plan.md.tasks.json
Normal file
@@ -0,0 +1,18 @@
|
||||
{
|
||||
"planPath": "docs/plans/2026-02-22-monitoring-tls-plan.md",
|
||||
"tasks": [
|
||||
{"id": 0, "nativeId": 6, "subject": "Task 0: Project setup — csproj and configuration options", "status": "pending"},
|
||||
{"id": 1, "nativeId": 7, "subject": "Task 1: ServerStats and NatsClient metadata", "status": "pending", "blockedBy": [0]},
|
||||
{"id": 2, "nativeId": 8, "subject": "Task 2: Refactor NatsClient to accept Stream", "status": "pending", "blockedBy": [1]},
|
||||
{"id": 3, "nativeId": 9, "subject": "Task 3: Monitoring JSON models (Varz, Connz, nested stubs)", "status": "pending", "blockedBy": [2]},
|
||||
{"id": 4, "nativeId": 10, "subject": "Task 4: MonitorServer with /healthz and /varz endpoints", "status": "pending", "blockedBy": [3]},
|
||||
{"id": 5, "nativeId": 11, "subject": "Task 5: ConnzHandler and /connz endpoint", "status": "pending", "blockedBy": [4]},
|
||||
{"id": 6, "nativeId": 12, "subject": "Task 6: Wire monitoring CLI args into Host", "status": "pending", "blockedBy": [0]},
|
||||
{"id": 7, "nativeId": 13, "subject": "Task 7: TLS helpers — TlsHelper, PeekableStream, TlsRateLimiter", "status": "pending", "blockedBy": [2]},
|
||||
{"id": 8, "nativeId": 14, "subject": "Task 8: TlsConnectionWrapper — 4-mode negotiation", "status": "pending", "blockedBy": [7]},
|
||||
{"id": 9, "nativeId": 15, "subject": "Task 9: Wire TLS into NatsServer accept loop", "status": "pending", "blockedBy": [8]},
|
||||
{"id": 10, "nativeId": 16, "subject": "Task 10: TLS CLI args in Host", "status": "pending", "blockedBy": [9]},
|
||||
{"id": 11, "nativeId": 17, "subject": "Task 11: Full integration tests — TLS modes, mixed mode, monitoring + TLS", "status": "pending", "blockedBy": [9, 5]}
|
||||
],
|
||||
"lastUpdated": "2026-02-22T00:00:00Z"
|
||||
}
|
||||
@@ -29,6 +29,7 @@ public sealed class NatsClient : IDisposable
|
||||
private readonly ServerInfo _serverInfo;
|
||||
private readonly NatsParser _parser;
|
||||
private readonly SemaphoreSlim _writeLock = new(1, 1);
|
||||
private CancellationTokenSource? _clientCts;
|
||||
private readonly Dictionary<string, Subscription> _subs = new();
|
||||
private readonly ILogger _logger;
|
||||
|
||||
@@ -43,6 +44,10 @@ public sealed class NatsClient : IDisposable
|
||||
public long InBytes;
|
||||
public long OutBytes;
|
||||
|
||||
// PING keepalive state
|
||||
private int _pingsOut;
|
||||
private long _lastIn;
|
||||
|
||||
public IReadOnlyDictionary<string, Subscription> Subscriptions => _subs;
|
||||
|
||||
public NatsClient(ulong id, Socket socket, NatsOptions options, ServerInfo serverInfo, ILogger logger)
|
||||
@@ -58,17 +63,20 @@ public sealed class NatsClient : IDisposable
|
||||
|
||||
public async Task RunAsync(CancellationToken ct)
|
||||
{
|
||||
_clientCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
||||
Interlocked.Exchange(ref _lastIn, Environment.TickCount64);
|
||||
var pipe = new Pipe();
|
||||
try
|
||||
{
|
||||
// Send INFO
|
||||
await SendInfoAsync(ct);
|
||||
await SendInfoAsync(_clientCts.Token);
|
||||
|
||||
// Start read pump and command processing in parallel
|
||||
var fillTask = FillPipeAsync(pipe.Writer, ct);
|
||||
var processTask = ProcessCommandsAsync(pipe.Reader, ct);
|
||||
// Start read pump, command processing, and ping timer in parallel
|
||||
var fillTask = FillPipeAsync(pipe.Writer, _clientCts.Token);
|
||||
var processTask = ProcessCommandsAsync(pipe.Reader, _clientCts.Token);
|
||||
var pingTask = RunPingTimerAsync(_clientCts.Token);
|
||||
|
||||
await Task.WhenAny(fillTask, processTask);
|
||||
await Task.WhenAny(fillTask, processTask, pingTask);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
@@ -80,6 +88,9 @@ public sealed class NatsClient : IDisposable
|
||||
}
|
||||
finally
|
||||
{
|
||||
try { _socket.Shutdown(SocketShutdown.Both); }
|
||||
catch (SocketException) { }
|
||||
catch (ObjectDisposedException) { }
|
||||
Router?.RemoveClient(this);
|
||||
}
|
||||
}
|
||||
@@ -118,6 +129,7 @@ public sealed class NatsClient : IDisposable
|
||||
|
||||
while (_parser.TryParse(ref buffer, out var cmd))
|
||||
{
|
||||
Interlocked.Exchange(ref _lastIn, Environment.TickCount64);
|
||||
await DispatchCommandAsync(cmd, ct);
|
||||
}
|
||||
|
||||
@@ -146,7 +158,7 @@ public sealed class NatsClient : IDisposable
|
||||
break;
|
||||
|
||||
case CommandType.Pong:
|
||||
// Update RTT tracking (placeholder)
|
||||
Interlocked.Exchange(ref _pingsOut, 0);
|
||||
break;
|
||||
|
||||
case CommandType.Sub:
|
||||
@@ -159,7 +171,7 @@ public sealed class NatsClient : IDisposable
|
||||
|
||||
case CommandType.Pub:
|
||||
case CommandType.HPub:
|
||||
ProcessPub(cmd);
|
||||
await ProcessPubAsync(cmd);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -210,11 +222,28 @@ public sealed class NatsClient : IDisposable
|
||||
sl.SubList.Remove(sub);
|
||||
}
|
||||
|
||||
private void ProcessPub(ParsedCommand cmd)
|
||||
private async ValueTask ProcessPubAsync(ParsedCommand cmd)
|
||||
{
|
||||
Interlocked.Increment(ref InMsgs);
|
||||
Interlocked.Add(ref InBytes, cmd.Payload.Length);
|
||||
|
||||
// Max payload validation (always, hard close)
|
||||
if (cmd.Payload.Length > _options.MaxPayload)
|
||||
{
|
||||
_logger.LogWarning("Client {ClientId} exceeded max payload: {Size} > {MaxPayload}",
|
||||
Id, cmd.Payload.Length, _options.MaxPayload);
|
||||
await SendErrAndCloseAsync(NatsProtocol.ErrMaxPayloadViolation);
|
||||
return;
|
||||
}
|
||||
|
||||
// Pedantic mode: validate publish subject
|
||||
if (ClientOpts?.Pedantic == true && !SubjectMatch.IsValidPublishSubject(cmd.Subject!))
|
||||
{
|
||||
_logger.LogDebug("Client {ClientId} invalid publish subject: {Subject}", Id, cmd.Subject);
|
||||
await SendErrAsync(NatsProtocol.ErrInvalidPublishSubject);
|
||||
return;
|
||||
}
|
||||
|
||||
ReadOnlyMemory<byte> headers = default;
|
||||
ReadOnlyMemory<byte> payload = cmd.Payload;
|
||||
|
||||
@@ -282,6 +311,78 @@ public sealed class NatsClient : IDisposable
|
||||
}
|
||||
}
|
||||
|
||||
public async Task SendErrAsync(string message)
|
||||
{
|
||||
var errLine = Encoding.ASCII.GetBytes($"-ERR '{message}'\r\n");
|
||||
try
|
||||
{
|
||||
await WriteAsync(errLine, _clientCts?.Token ?? CancellationToken.None);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// Expected during shutdown
|
||||
}
|
||||
catch (IOException ex)
|
||||
{
|
||||
_logger.LogDebug(ex, "Client {ClientId} failed to send -ERR", Id);
|
||||
}
|
||||
catch (ObjectDisposedException ex)
|
||||
{
|
||||
_logger.LogDebug(ex, "Client {ClientId} failed to send -ERR (disposed)", Id);
|
||||
}
|
||||
}
|
||||
|
||||
public async Task SendErrAndCloseAsync(string message)
|
||||
{
|
||||
await SendErrAsync(message);
|
||||
if (_clientCts is { } cts)
|
||||
await cts.CancelAsync();
|
||||
else
|
||||
_socket.Close();
|
||||
}
|
||||
|
||||
private async Task RunPingTimerAsync(CancellationToken ct)
|
||||
{
|
||||
using var timer = new PeriodicTimer(_options.PingInterval);
|
||||
try
|
||||
{
|
||||
while (await timer.WaitForNextTickAsync(ct))
|
||||
{
|
||||
var elapsed = Environment.TickCount64 - Interlocked.Read(ref _lastIn);
|
||||
if (elapsed < (long)_options.PingInterval.TotalMilliseconds)
|
||||
{
|
||||
// Client was recently active, skip ping
|
||||
Interlocked.Exchange(ref _pingsOut, 0);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (Volatile.Read(ref _pingsOut) + 1 > _options.MaxPingsOut)
|
||||
{
|
||||
_logger.LogDebug("Client {ClientId} stale connection — closing", Id);
|
||||
await SendErrAndCloseAsync(NatsProtocol.ErrStaleConnection);
|
||||
return;
|
||||
}
|
||||
|
||||
var currentPingsOut = Interlocked.Increment(ref _pingsOut);
|
||||
_logger.LogDebug("Client {ClientId} sending PING ({PingsOut}/{MaxPingsOut})",
|
||||
Id, currentPingsOut, _options.MaxPingsOut);
|
||||
try
|
||||
{
|
||||
await WriteAsync(NatsProtocol.PingBytes, ct);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogDebug(ex, "Client {ClientId} failed to send PING", Id);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// Normal shutdown
|
||||
}
|
||||
}
|
||||
|
||||
public void RemoveAllSubscriptions(SubList subList)
|
||||
{
|
||||
foreach (var sub in _subs.Values)
|
||||
@@ -291,6 +392,7 @@ public sealed class NatsClient : IDisposable
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
_clientCts?.Dispose();
|
||||
_stream.Dispose();
|
||||
_socket.Dispose();
|
||||
_writeLock.Dispose();
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
using System.Collections.Concurrent;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using System.Text;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using NATS.Server.Protocol;
|
||||
using NATS.Server.Subscriptions;
|
||||
@@ -56,6 +57,32 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
var socket = await _listener.AcceptAsync(ct);
|
||||
|
||||
// Check MaxConnections before creating the client
|
||||
if (_options.MaxConnections > 0 && _clients.Count >= _options.MaxConnections)
|
||||
{
|
||||
_logger.LogWarning("Client connection rejected: maximum connections ({MaxConnections}) exceeded",
|
||||
_options.MaxConnections);
|
||||
try
|
||||
{
|
||||
var stream = new NetworkStream(socket, ownsSocket: false);
|
||||
var errBytes = Encoding.ASCII.GetBytes(
|
||||
$"-ERR '{NatsProtocol.ErrMaxConnectionsExceeded}'\r\n");
|
||||
await stream.WriteAsync(errBytes, ct);
|
||||
await stream.FlushAsync(ct);
|
||||
stream.Dispose();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogDebug(ex, "Failed to send -ERR to rejected client");
|
||||
}
|
||||
finally
|
||||
{
|
||||
socket.Dispose();
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
var clientId = Interlocked.Increment(ref _nextClientId);
|
||||
|
||||
_logger.LogDebug("Client {ClientId} connected from {RemoteEndpoint}", clientId, socket.RemoteEndPoint);
|
||||
|
||||
@@ -203,10 +203,10 @@ public sealed class NatsParser
|
||||
throw new ProtocolViolationException("Invalid PUB arguments");
|
||||
}
|
||||
|
||||
if (size < 0 || size > _maxPayload)
|
||||
if (size < 0)
|
||||
throw new ProtocolViolationException("Invalid payload size");
|
||||
|
||||
// Now read payload + \r\n
|
||||
// Now read payload + \r\n (max payload enforcement is done at the client level)
|
||||
buffer = buffer.Slice(afterLine);
|
||||
_awaitingPayload = true;
|
||||
_expectedPayloadSize = size;
|
||||
@@ -253,7 +253,7 @@ public sealed class NatsParser
|
||||
throw new ProtocolViolationException("Invalid HPUB arguments");
|
||||
}
|
||||
|
||||
if (hdrSize < 0 || totalSize < 0 || hdrSize > totalSize || totalSize > _maxPayload)
|
||||
if (hdrSize < 0 || totalSize < 0 || hdrSize > totalSize)
|
||||
throw new ProtocolViolationException("Invalid HPUB sizes");
|
||||
|
||||
buffer = buffer.Slice(afterLine);
|
||||
|
||||
@@ -19,6 +19,13 @@ public static class NatsProtocol
|
||||
public static readonly byte[] MsgPrefix = "MSG "u8.ToArray();
|
||||
public static readonly byte[] HmsgPrefix = "HMSG "u8.ToArray();
|
||||
public static readonly byte[] ErrPrefix = "-ERR "u8.ToArray();
|
||||
|
||||
// Standard error messages (matching Go server)
|
||||
public const string ErrMaxConnectionsExceeded = "maximum connections exceeded";
|
||||
public const string ErrStaleConnection = "Stale Connection";
|
||||
public const string ErrMaxPayloadViolation = "Maximum Payload Violation";
|
||||
public const string ErrInvalidPublishSubject = "Invalid Publish Subject";
|
||||
public const string ErrInvalidSubject = "Invalid Subject";
|
||||
}
|
||||
|
||||
public sealed class ServerInfo
|
||||
|
||||
@@ -15,8 +15,13 @@ public sealed class SubList : IDisposable
|
||||
private readonly TrieLevel _root = new();
|
||||
private Dictionary<string, SubListResult>? _cache = new(StringComparer.Ordinal);
|
||||
private uint _count;
|
||||
private volatile bool _disposed;
|
||||
|
||||
public void Dispose() => _lock.Dispose();
|
||||
public void Dispose()
|
||||
{
|
||||
_disposed = true;
|
||||
_lock.Dispose();
|
||||
}
|
||||
|
||||
public uint Count
|
||||
{
|
||||
@@ -95,6 +100,7 @@ public sealed class SubList : IDisposable
|
||||
|
||||
public void Remove(Subscription sub)
|
||||
{
|
||||
if (_disposed) return;
|
||||
_lock.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
|
||||
@@ -86,4 +86,48 @@ public class ClientTests : IAsyncDisposable
|
||||
|
||||
await _cts.CancelAsync();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Client_SendErrAsync_writes_correct_wire_format()
|
||||
{
|
||||
var runTask = _natsClient.RunAsync(_cts.Token);
|
||||
|
||||
// Read INFO first
|
||||
var buf = new byte[4096];
|
||||
using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||
await _clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
|
||||
|
||||
// Trigger SendErrAsync
|
||||
await _natsClient.SendErrAsync("Invalid Subject");
|
||||
|
||||
var n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
|
||||
var response = Encoding.ASCII.GetString(buf, 0, n);
|
||||
|
||||
response.ShouldBe("-ERR 'Invalid Subject'\r\n");
|
||||
|
||||
await _cts.CancelAsync();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Client_SendErrAndCloseAsync_sends_error_then_disconnects()
|
||||
{
|
||||
var runTask = _natsClient.RunAsync(_cts.Token);
|
||||
|
||||
// Read INFO first
|
||||
var buf = new byte[4096];
|
||||
using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||
await _clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
|
||||
|
||||
// Trigger SendErrAndCloseAsync
|
||||
await _natsClient.SendErrAndCloseAsync("maximum connections exceeded");
|
||||
|
||||
var n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
|
||||
var response = Encoding.ASCII.GetString(buf, 0, n);
|
||||
|
||||
response.ShouldBe("-ERR 'maximum connections exceeded'\r\n");
|
||||
|
||||
// Connection should be closed — next read returns 0
|
||||
n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
|
||||
n.ShouldBe(0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -123,4 +123,303 @@ public class ServerTests : IAsyncLifetime
|
||||
|
||||
msg.ShouldContain("MSG foo.bar 1 5\r\n");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Server_pedantic_rejects_invalid_publish_subject()
|
||||
{
|
||||
using var pub = await ConnectClientAsync();
|
||||
using var sub = await ConnectClientAsync();
|
||||
|
||||
// Read INFO from both
|
||||
await ReadLineAsync(pub);
|
||||
await ReadLineAsync(sub);
|
||||
|
||||
// Connect with pedantic mode ON
|
||||
await pub.SendAsync(Encoding.ASCII.GetBytes(
|
||||
"CONNECT {\"pedantic\":true}\r\nPING\r\n"));
|
||||
var pong = await ReadUntilAsync(pub, "PONG");
|
||||
|
||||
// Subscribe on sub
|
||||
await sub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB foo.* 1\r\nPING\r\n"));
|
||||
await ReadUntilAsync(sub, "PONG");
|
||||
|
||||
// PUB with wildcard subject (invalid for publish)
|
||||
await pub.SendAsync(Encoding.ASCII.GetBytes("PUB foo.* 5\r\nHello\r\n"));
|
||||
|
||||
// Publisher should get -ERR
|
||||
var errResponse = await ReadUntilAsync(pub, "-ERR", timeoutMs: 3000);
|
||||
errResponse.ShouldContain("-ERR 'Invalid Publish Subject'");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Server_nonpedantic_allows_wildcard_publish_subject()
|
||||
{
|
||||
using var pub = await ConnectClientAsync();
|
||||
using var sub = await ConnectClientAsync();
|
||||
|
||||
await ReadLineAsync(pub);
|
||||
await ReadLineAsync(sub);
|
||||
|
||||
// Connect without pedantic mode (default)
|
||||
await sub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB foo.* 1\r\nPING\r\n"));
|
||||
await ReadUntilAsync(sub, "PONG");
|
||||
|
||||
await pub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPUB foo.* 5\r\nHello\r\n"));
|
||||
|
||||
// Sub should still receive the message (no validation in non-pedantic mode)
|
||||
var msg = await ReadUntilAsync(sub, "Hello\r\n");
|
||||
msg.ShouldContain("MSG foo.* 1 5\r\nHello\r\n");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Server_rejects_max_payload_violation()
|
||||
{
|
||||
// Create server with tiny max payload
|
||||
var port = GetFreePort();
|
||||
using var cts = new CancellationTokenSource();
|
||||
var server = new NatsServer(new NatsOptions { Port = port, MaxPayload = 10 }, NullLoggerFactory.Instance);
|
||||
_ = server.StartAsync(cts.Token);
|
||||
await server.WaitForReadyAsync();
|
||||
|
||||
try
|
||||
{
|
||||
var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
await client.ConnectAsync(IPAddress.Loopback, port);
|
||||
|
||||
var buf = new byte[4096];
|
||||
await client.ReceiveAsync(buf, SocketFlags.None); // INFO
|
||||
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\n"));
|
||||
|
||||
// Send PUB with payload larger than MaxPayload (10 bytes)
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes("PUB foo 20\r\n12345678901234567890\r\n"));
|
||||
|
||||
using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||
var n = await client.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
|
||||
var response = Encoding.ASCII.GetString(buf, 0, n);
|
||||
response.ShouldContain("-ERR 'Maximum Payload Violation'");
|
||||
|
||||
// Connection should be closed
|
||||
n = await client.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
|
||||
n.ShouldBe(0);
|
||||
|
||||
client.Dispose();
|
||||
}
|
||||
finally
|
||||
{
|
||||
await cts.CancelAsync();
|
||||
server.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public class MaxConnectionsTests : IAsyncLifetime
|
||||
{
|
||||
private readonly NatsServer _server;
|
||||
private readonly int _port;
|
||||
private readonly CancellationTokenSource _cts = new();
|
||||
|
||||
public MaxConnectionsTests()
|
||||
{
|
||||
_port = GetFreePort();
|
||||
_server = new NatsServer(new NatsOptions { Port = _port, MaxConnections = 2 }, NullLoggerFactory.Instance);
|
||||
}
|
||||
|
||||
public async Task InitializeAsync()
|
||||
{
|
||||
_ = _server.StartAsync(_cts.Token);
|
||||
await _server.WaitForReadyAsync();
|
||||
}
|
||||
|
||||
public async Task DisposeAsync()
|
||||
{
|
||||
await _cts.CancelAsync();
|
||||
_server.Dispose();
|
||||
}
|
||||
|
||||
private static int GetFreePort()
|
||||
{
|
||||
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
sock.Bind(new IPEndPoint(IPAddress.Loopback, 0));
|
||||
return ((IPEndPoint)sock.LocalEndPoint!).Port;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Server_rejects_connection_when_max_reached()
|
||||
{
|
||||
using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||
|
||||
// Connect two clients (at limit)
|
||||
var client1 = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
await client1.ConnectAsync(IPAddress.Loopback, _port);
|
||||
var buf = new byte[4096];
|
||||
var n = await client1.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
|
||||
Encoding.ASCII.GetString(buf, 0, n).ShouldStartWith("INFO ");
|
||||
|
||||
var client2 = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
await client2.ConnectAsync(IPAddress.Loopback, _port);
|
||||
n = await client2.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
|
||||
Encoding.ASCII.GetString(buf, 0, n).ShouldStartWith("INFO ");
|
||||
|
||||
// Third client should be rejected
|
||||
var client3 = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
await client3.ConnectAsync(IPAddress.Loopback, _port);
|
||||
|
||||
n = await client3.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
|
||||
var response = Encoding.ASCII.GetString(buf, 0, n);
|
||||
response.ShouldContain("-ERR 'maximum connections exceeded'");
|
||||
|
||||
// Connection should be closed
|
||||
n = await client3.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
|
||||
n.ShouldBe(0);
|
||||
|
||||
client1.Dispose();
|
||||
client2.Dispose();
|
||||
client3.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
public class PingKeepaliveTests : IAsyncLifetime
|
||||
{
|
||||
private readonly NatsServer _server;
|
||||
private readonly int _port;
|
||||
private readonly CancellationTokenSource _cts = new();
|
||||
|
||||
public PingKeepaliveTests()
|
||||
{
|
||||
_port = GetFreePort();
|
||||
// Short intervals for testing: 500ms ping interval, 2 max pings out
|
||||
_server = new NatsServer(
|
||||
new NatsOptions
|
||||
{
|
||||
Port = _port,
|
||||
PingInterval = TimeSpan.FromMilliseconds(500),
|
||||
MaxPingsOut = 2,
|
||||
},
|
||||
NullLoggerFactory.Instance);
|
||||
}
|
||||
|
||||
public async Task InitializeAsync()
|
||||
{
|
||||
_ = _server.StartAsync(_cts.Token);
|
||||
await _server.WaitForReadyAsync();
|
||||
}
|
||||
|
||||
public async Task DisposeAsync()
|
||||
{
|
||||
await _cts.CancelAsync();
|
||||
_server.Dispose();
|
||||
}
|
||||
|
||||
private static int GetFreePort()
|
||||
{
|
||||
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
sock.Bind(new IPEndPoint(IPAddress.Loopback, 0));
|
||||
return ((IPEndPoint)sock.LocalEndPoint!).Port;
|
||||
}
|
||||
|
||||
private static async Task<string> ReadUntilAsync(Socket sock, string expected, int timeoutMs = 5000)
|
||||
{
|
||||
using var cts = new CancellationTokenSource(timeoutMs);
|
||||
var sb = new StringBuilder();
|
||||
var buf = new byte[4096];
|
||||
while (!sb.ToString().Contains(expected))
|
||||
{
|
||||
var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token);
|
||||
if (n == 0) break;
|
||||
sb.Append(Encoding.ASCII.GetString(buf, 0, n));
|
||||
}
|
||||
return sb.ToString();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Server_sends_PING_after_inactivity()
|
||||
{
|
||||
var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
await client.ConnectAsync(IPAddress.Loopback, _port);
|
||||
|
||||
// Read INFO
|
||||
var buf = new byte[4096];
|
||||
await client.ReceiveAsync(buf, SocketFlags.None);
|
||||
|
||||
// Send CONNECT to start keepalive
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\n"));
|
||||
|
||||
// Wait for server to send PING (should come within ~500ms)
|
||||
var response = await ReadUntilAsync(client, "PING", timeoutMs: 3000);
|
||||
response.ShouldContain("PING");
|
||||
|
||||
client.Dispose();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Server_pong_resets_ping_counter()
|
||||
{
|
||||
var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
await client.ConnectAsync(IPAddress.Loopback, _port);
|
||||
|
||||
var buf = new byte[4096];
|
||||
await client.ReceiveAsync(buf, SocketFlags.None); // INFO
|
||||
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\n"));
|
||||
|
||||
// Wait for first PING
|
||||
var response = await ReadUntilAsync(client, "PING", timeoutMs: 3000);
|
||||
response.ShouldContain("PING");
|
||||
|
||||
// Respond with PONG — this resets the counter
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes("PONG\r\n"));
|
||||
|
||||
// Wait for next PING (counter reset, so we should get another one)
|
||||
response = await ReadUntilAsync(client, "PING", timeoutMs: 3000);
|
||||
response.ShouldContain("PING");
|
||||
|
||||
// Respond again to keep alive
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes("PONG\r\n"));
|
||||
|
||||
// Client should still be alive — send a PING and expect PONG back
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
|
||||
response = await ReadUntilAsync(client, "PONG", timeoutMs: 3000);
|
||||
response.ShouldContain("PONG");
|
||||
|
||||
client.Dispose();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Server_disconnects_stale_client()
|
||||
{
|
||||
var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
await client.ConnectAsync(IPAddress.Loopback, _port);
|
||||
|
||||
var buf = new byte[4096];
|
||||
await client.ReceiveAsync(buf, SocketFlags.None); // INFO
|
||||
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\n"));
|
||||
|
||||
// Don't respond to PINGs — wait for stale disconnect
|
||||
// With 500ms interval and MaxPingsOut=2:
|
||||
// t=500ms: PING #1, pingsOut=1
|
||||
// t=1000ms: PING #2, pingsOut=2
|
||||
// t=1500ms: pingsOut+1 > MaxPingsOut → -ERR 'Stale Connection' + close
|
||||
var sb = new StringBuilder();
|
||||
try
|
||||
{
|
||||
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||
while (true)
|
||||
{
|
||||
var n = await client.ReceiveAsync(buf, SocketFlags.None, timeout.Token);
|
||||
if (n == 0) break;
|
||||
sb.Append(Encoding.ASCII.GetString(buf, 0, n));
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// Timeout is acceptable — check what we got
|
||||
}
|
||||
|
||||
var allData = sb.ToString();
|
||||
allData.ShouldContain("-ERR 'Stale Connection'");
|
||||
|
||||
client.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user