Compare commits

...

14 Commits

Author SHA1 Message Date
Joseph Doherty
2baf8a85bf docs: update differences.md section 2 to reflect implemented features 2026-02-22 23:59:19 -05:00
Joseph Doherty
f5c0c4f906 feat: wire pending bytes and close reason into connz monitoring 2026-02-22 23:57:39 -05:00
Joseph Doherty
2fb14821e0 feat: add no-responders CONNECT validation and tests
Reject connections that send no_responders:true without headers:true,
since the 503 HMSG response requires header support. Add three tests:
connection rejection, acceptance with headers, and 503 delivery flow.
2026-02-22 23:56:49 -05:00
Joseph Doherty
04305447f9 feat: implement verbose mode (+OK after commands)
When a client sends CONNECT {"verbose":true}, the server now responds
with +OK\r\n after successfully processing CONNECT, PING, SUB, UNSUB,
and PUB/HPUB commands, matching the Go NATS server behavior.
2026-02-22 23:54:41 -05:00
Joseph Doherty
bce793fd42 perf: batch stat increments per read cycle in ProcessCommandsAsync
Accumulate InMsgs/InBytes locally per ReadAsync cycle and flush once,
reducing from 4 Interlocked operations per published message to 2 per
read cycle. This matches the Go server's approach of batching stats.
2026-02-22 23:52:09 -05:00
Joseph Doherty
c522ce99f5 feat: add delivery tracking and no-responders 503 support to ProcessMessage
When a PUB with a reply-to subject has no matching subscribers and the
sender opted into no_responders, send a 503 HMSG back on the reply
subject so request-reply callers can fail fast instead of timing out.
2026-02-22 23:49:39 -05:00
Joseph Doherty
b289041761 test: add write loop and slow consumer detection tests
Verify channel-based write loop behavior: QueueOutbound writes data
to client socket, PendingBytes tracking, slow consumer detection
when MaxPending is exceeded, close reason propagation, and server
stats incrementation on slow consumer events.
2026-02-22 23:47:31 -05:00
Joseph Doherty
31660a4187 feat: replace inline writes with channel-based write loop and batch flush 2026-02-22 23:41:44 -05:00
Joseph Doherty
ad6a02b9a2 refactor: replace _connectReceived with ClientFlagHolder and add CloseReason tracking 2026-02-22 23:35:35 -05:00
Joseph Doherty
61c6b832e5 feat: add MaxPending, WriteDeadline options and error constants 2026-02-22 23:33:49 -05:00
Joseph Doherty
1a916a3f36 feat: add ClientFlags bitfield with thread-safe holder 2026-02-22 23:33:21 -05:00
Joseph Doherty
8bbfa54058 feat: add ClientClosedReason enum with 16 close reason values 2026-02-22 23:33:13 -05:00
Joseph Doherty
149c852510 docs: add core lifecycle implementation plan with 12 tasks
Detailed step-by-step plan covering ClosedState enum, close reason
tracking, ephemeral port, graceful shutdown, flush-before-close,
lame duck mode, PID/ports files, NKey stubs, signal handling, and
differences.md update.
2026-02-22 23:31:01 -05:00
Joseph Doherty
c2dc503e2e docs: add core server lifecycle design for section 1 gaps
Covers ClosedState enum, accept loop backoff, ephemeral port,
graceful shutdown, lame duck mode, PID/ports files, signal
handling, and stub components.
2026-02-22 23:25:53 -05:00
18 changed files with 3208 additions and 83 deletions

402
differences.md Normal file
View File

@@ -0,0 +1,402 @@
# Go vs .NET NATS Server: Functionality Differences
> Excludes clustering/routes, gateways, leaf nodes, and JetStream.
> Generated 2026-02-22 by comparing `golang/nats-server/server/` against `src/NATS.Server/`.
---
## 1. Core Server Lifecycle
### Server Initialization
| Feature | Go | .NET | Notes |
|---------|:--:|:----:|-------|
| NKey generation (server identity) | Y | Y | Ed25519 key pair via NATS.NKeys at startup |
| System account setup | Y | Y | `$SYS` account created; no event publishing yet (stub) |
| Config file validation on startup | Y | Stub | `-c` flag parsed, `ConfigFile` stored, but no config parser |
| PID file writing | Y | Y | Written on startup, deleted on shutdown |
| Profiling HTTP endpoint (`/debug/pprof`) | Y | Stub | `ProfPort` option exists but endpoint not implemented |
| Ports file output | Y | Y | JSON ports file written to `PortsFileDir` on startup |
### Accept Loop
| Feature | Go | .NET | Notes |
|---------|:--:|:----:|-------|
| Exponential backoff on accept errors | Y | Y | .NET backs off from 10ms to 1s on repeated failures |
| Config reload lock during client creation | Y | N | Go holds `reloadMu` around `createClient` |
| Goroutine/task tracking (WaitGroup) | Y | Y | `Interlocked` counter + drain with 10s timeout on shutdown |
| Callback-based error handling | Y | N | Go uses `errFunc` callback pattern |
| Random/ephemeral port (port=0) | Y | Y | Port resolved after `Bind`+`Listen`, stored in `_options.Port` |
### Shutdown
| Feature | Go | .NET | Notes |
|---------|:--:|:----:|-------|
| Graceful shutdown with `WaitForShutdown()` | Y | Y | Idempotent CAS-guarded `ShutdownAsync()` + blocking `WaitForShutdown()` |
| Close reason tracking per connection | Y | Y | 37-value `ClosedState` enum, CAS-based first-writer-wins `MarkClosed()` |
| Lame duck mode (stop new, drain existing) | Y | Y | `LameDuckShutdownAsync()` with grace period + stagger-close with jitter |
| Wait for accept loop completion | Y | Y | `TaskCompletionSource` signaled in accept loop `finally` |
| Flush pending data before close | Y | Y | `FlushAndCloseAsync()` with best-effort flush, skip-flush for error conditions |
### Signal Handling
| Signal | Go | .NET | Notes |
|--------|:--:|:----:|-------|
| SIGINT (Ctrl+C) | Y | Y | Both handle graceful shutdown |
| SIGTERM | Y | Y | `PosixSignalRegistration` triggers `ShutdownAsync()` |
| SIGUSR1 (reopen logs) | Y | Stub | Signal registered, handler logs "not yet implemented" |
| SIGUSR2 (lame duck mode) | Y | Y | Triggers `LameDuckShutdownAsync()` |
| SIGHUP (config reload) | Y | Stub | Signal registered, handler logs "not yet implemented" |
| Windows Service integration | Y | N | |
---
## 2. Client / Connection Handling
### Concurrency Model
| Feature | Go | .NET | Notes |
|---------|:--:|:----:|-------|
| Separate read + write loops | Y | Y | Channel-based `RunWriteLoopAsync` with `QueueOutbound()` |
| Write coalescing / batch flush | Y | Y | Write loop drains all channel items before single `FlushAsync` |
| Dynamic buffer sizing (512B-64KB) | Y | N | .NET delegates to `System.IO.Pipelines` |
| Output buffer pooling (3-tier) | Y | N | Go pools at 512B, 4KB, 64KB |
### Connection Types
| Type | Go | .NET | Notes |
|------|:--:|:----:|-------|
| CLIENT | Y | Y | |
| ROUTER | Y | N | Excluded per scope |
| GATEWAY | Y | N | Excluded per scope |
| LEAF | Y | N | Excluded per scope |
| SYSTEM (internal) | Y | N | |
| JETSTREAM (internal) | Y | N | |
| ACCOUNT (internal) | Y | N | |
| WebSocket clients | Y | N | |
| MQTT clients | Y | N | |
### Client Features
| Feature | Go | .NET | Notes |
|---------|:--:|:----:|-------|
| Echo suppression (`echo: false`) | Y | Y | .NET checks echo in delivery path (NatsServer.cs:234,253) |
| Verbose mode (`+OK` responses) | Y | Y | Sends `+OK` after CONNECT, SUB, UNSUB, PUB when `verbose:true` |
| No-responders validation | Y | Y | CONNECT rejects `no_responders` without `headers`; 503 HMSG on no match |
| Slow consumer detection | Y | Y | Pending bytes threshold (64MB) + write deadline timeout (10s) |
| Write deadline / timeout policies | Y | Y | `WriteDeadline` option with `CancellationTokenSource.CancelAfter` on flush |
| RTT measurement | Y | N | Go tracks round-trip time per client |
| Per-client trace mode | Y | N | |
| Detailed close reason tracking | Y | Y | 17-value `ClientClosedReason` enum (single-server subset of Go's 37) |
| Connection state flags (16 flags) | Y | Y | 7-flag `ClientFlagHolder` with `Interlocked.Or`/`And` |
### Slow Consumer Handling
Go implements a sophisticated slow consumer detection system:
- Tracks `pendingBytes` per client output buffer
- If pending exceeds `maxPending`, enters stall mode (2-5ms waits)
- Total stall capped at 10ms per read cycle
- Closes with `SlowConsumerPendingBytes` or `SlowConsumerWriteDeadline`
- Sets `isSlowConsumer` flag for monitoring
.NET now implements pending bytes tracking and write deadline enforcement via `Channel<ReadOnlyMemory<byte>>`. Key differences from Go: no stall/retry mode (immediate close on threshold exceeded), write deadline via `CancellationTokenSource.CancelAfter` instead of `SetWriteDeadline`. `IsSlowConsumer` flag and server-level `SlowConsumerCount` stats are tracked for monitoring.
### Stats Tracking
| Feature | Go | .NET | Notes |
|---------|:--:|:----:|-------|
| Per-connection atomic stats | Y | Y | .NET uses `Interlocked` for stats access |
| Per-read-cycle stat batching | Y | Y | Local accumulators flushed via `Interlocked.Add` per read cycle |
| Per-account stats | Y | N | |
| Slow consumer counters | Y | Y | `SlowConsumers` and `SlowConsumerClients` incremented on detection |
---
## 3. Protocol Parsing
### Parser Architecture
| Aspect | Go | .NET |
|--------|-----|------|
| Approach | Byte-by-byte state machine (74 states) | Two-phase: line extraction + command dispatch |
| Case handling | Per-state character checks | Bit-mask lowercase normalization (`\| 0x20`) |
| Buffer strategy | Jump-ahead optimization for payloads | Direct size-based reads via Pipe |
| Split-buffer handling | argBuf accumulation with scratch buffer | State variables (`_awaitingPayload`, etc.) |
| Error model | Inline error sending + error return | Exception-based (`ProtocolViolationException`) |
| CRLF in payload | Included in message buffer | Excluded by design |
### Protocol Operations
| Operation | Go | .NET | Notes |
|-----------|:--:|:----:|-------|
| PUB | Y | Y | |
| HPUB (headers) | Y | Y | |
| SUB | Y | Y | |
| UNSUB | Y | Y | |
| CONNECT | Y | Y | |
| INFO | Y | Y | |
| PING / PONG | Y | Y | |
| MSG / HMSG | Y | Y | |
| +OK / -ERR | Y | Y | |
| RS+/RS-/RMSG (routes) | Y | N | Excluded per scope |
| A+/A- (accounts) | Y | N | Excluded per scope |
| LS+/LS-/LMSG (leaf) | Y | N | Excluded per scope |
### Protocol Parsing Gaps
| Feature | Go | .NET | Notes |
|---------|:--:|:----:|-------|
| Multi-client-type command routing | Y | N | Go checks `c.kind` to allow/reject commands |
| Protocol tracing in parser | Y | N | Go calls `traceInOp()` per operation |
| Subject mapping (input→output) | Y | N | Go transforms subjects via mapping rules |
| MIME header parsing | Y | N | .NET delegates header handling to client layer |
| Message trace event initialization | Y | N | |
### Protocol Writing
| Aspect | Go | .NET | Notes |
|--------|:--:|:----:|-------|
| INFO serialization | Once at startup | Every send | .NET re-serializes JSON each time |
| MSG/HMSG construction | Direct buffer write | String interpolation → byte encode | More allocations in .NET |
| Pre-encoded constants | Y | Y | Both pre-encode PING/PONG/OK |
---
## 4. Subscriptions & Subject Matching
### Trie Implementation
| Feature | Go | .NET | Notes |
|---------|:--:|:----:|-------|
| Basic trie with `*`/`>` wildcards | Y | Y | Core matching identical |
| Queue group support | Y | Y | |
| Result caching (1024 max) | Y | Y | Same limits |
| `plist` optimization (>256 subs) | Y | N | Go converts high-fanout nodes to array |
| Async cache sweep (background) | Y | N | .NET sweeps inline under write lock |
| Atomic generation ID for invalidation | Y | N | .NET clears cache explicitly |
| Cache eviction strategy | Random | First-N | Semantic difference minimal |
### Missing SubList Features
| Feature | Go | .NET | Notes |
|---------|:--:|:----:|-------|
| `Stats()` — comprehensive statistics | Y | N | Matches, cache hits, inserts, removes, fanout |
| `HasInterest()` — fast bool check | Y | N | |
| `NumInterest()` — fast count | Y | N | |
| `ReverseMatch()` — pattern→literal query | Y | N | |
| `RemoveBatch()` — efficient bulk removal | Y | N | |
| `All()` — enumerate all subscriptions | Y | N | |
| Notification system (interest changes) | Y | N | |
| Local/remote subscription filtering | Y | N | |
| Queue weight expansion (remote subs) | Y | N | |
| `MatchBytes()` — zero-copy byte API | Y | N | |
### Subject Validation
| Feature | Go | .NET | Notes |
|---------|:--:|:----:|-------|
| Basic validation (empty tokens, wildcards) | Y | Y | |
| Literal subject check | Y | Y | |
| UTF-8/null rune validation | Y | N | Go has `checkRunes` parameter |
| Collision detection (`SubjectsCollide`) | Y | N | |
| Token utilities (`tokenAt`, `numTokens`) | Y | N | |
| Stack-allocated token buffer | Y | N | Go uses `[32]string{}` on stack |
### Subscription Lifecycle
| Feature | Go | .NET | Notes |
|---------|:--:|:----:|-------|
| Per-account subscription limit | Y | N | |
| Auto-unsubscribe on max messages | Y | Y | .NET enforces at delivery time (NatsServer.cs:269-270) |
| Subscription routing propagation | Y | N | For clusters |
| Queue weight (`qw`) field | Y | N | For remote queue load balancing |
---
## 5. Authentication & Authorization
### Auth Mechanisms
| Mechanism | Go | .NET | Notes |
|-----------|:--:|:----:|-------|
| Username/password | Y | Y | |
| Token | Y | Y | |
| NKeys (Ed25519) | Y | Y | .NET has framework but integration is basic |
| JWT validation | Y | N | |
| Bcrypt password hashing | Y | Y | .NET supports bcrypt (`$2*` prefix) with constant-time fallback |
| TLS certificate mapping | Y | N | Property exists but no implementation |
| Custom auth interface | Y | N | |
| External auth callout | Y | N | |
| Proxy authentication | Y | N | |
| Bearer tokens | Y | N | |
| User revocation tracking | Y | N | |
### Account System
| Feature | Go | .NET | Notes |
|---------|:--:|:----:|-------|
| Per-account SubList isolation | Y | Y | |
| Multi-account user resolution | Y | N | .NET has basic account, no resolution |
| Account exports/imports | Y | N | |
| Per-account connection limits | Y | N | |
| Per-account subscription limits | Y | N | |
| Account JetStream limits | Y | N | Excluded per scope |
### Permissions
| Feature | Go | .NET | Notes |
|---------|:--:|:----:|-------|
| Publish allow list | Y | Y | |
| Subscribe allow list | Y | Y | |
| Publish deny list | Y | Partial | .NET has deny in struct but limited enforcement |
| Subscribe deny list | Y | Partial | Same |
| Message-level deny filtering | Y | N | Go filters at delivery time |
| Permission caching (128 entries) | Y | N | |
| Response permissions (reply tracking) | Y | N | Dynamic reply subject authorization |
| Permission templates (JWT) | Y | N | e.g., `{{name()}}`, `{{account-tag(...)}}` |
---
## 6. Configuration
### CLI Flags
| Flag | Go | .NET | Notes |
|------|:--:|:----:|-------|
| `-p/--port` | Y | Y | |
| `-a/--addr` | Y | Y | |
| `-n/--name` (ServerName) | Y | Y | |
| `-m/--http_port` (monitoring) | Y | Y | |
| `-c` (config file) | Y | Stub | Flag parsed, stored in `ConfigFile`, no config parser |
| `-D/-V/-DV` (debug/trace) | Y | N | |
| `--tlscert/--tlskey/--tlscacert` | Y | Y | |
| `--tlsverify` | Y | Y | |
| `--http_base_path` | Y | Y | |
| `--https_port` | Y | Y | |
### Configuration System
| Feature | Go | .NET | Notes |
|---------|:--:|:----:|-------|
| Config file parsing | Y | N | Go has custom `conf` parser with includes |
| Hot reload (SIGHUP) | Y | N | |
| Config change detection | Y | N | Go tracks `inConfig`/`inCmdLine` origins |
| ~450 option fields | Y | ~54 | .NET covers core options only |
### Missing Options Categories
- Logging options (file, rotation, syslog, trace levels)
- Advanced limits (MaxSubs, MaxSubTokens, MaxPending, WriteDeadline)
- Tags/metadata
- OCSP configuration
- WebSocket/MQTT options
- Operator mode / account resolver
---
## 7. Monitoring
### HTTP Endpoints
| Endpoint | Go | .NET | Notes |
|----------|:--:|:----:|-------|
| `/healthz` | Y | Y | |
| `/varz` | Y | Y | |
| `/connz` | Y | Y | |
| `/` (root listing) | Y | Y | |
| `/routez` | Y | Stub | Returns empty response |
| `/gatewayz` | Y | Stub | Returns empty response |
| `/leafz` | Y | Stub | Returns empty response |
| `/subz` / `/subscriptionsz` | Y | Stub | Returns empty response |
| `/accountz` | Y | Stub | Returns empty response |
| `/accstatz` | Y | Stub | Returns empty response |
| `/jsz` | Y | Stub | Returns empty response |
### Varz Response
| Field Category | Go | .NET | Notes |
|----------------|:--:|:----:|-------|
| Identity (ID, Name, Version) | Y | Y | |
| Network (Host, Port, URLs) | Y | Y | |
| Security (AuthRequired, TLS) | Y | Y | |
| Limits (MaxConn, MaxPayload) | Y | Y | |
| Timing (Start, Now, Uptime) | Y | Y | |
| Runtime (Mem, CPU, Cores) | Y | Y | |
| Connections (current, total) | Y | Y | |
| Messages (in/out msgs/bytes) | Y | Y | |
| SlowConsumer breakdown | Y | N | Go tracks per connection type |
| Cluster/Gateway/Leaf blocks | Y | N | Excluded per scope |
| JetStream block | Y | N | Excluded per scope |
| TLS cert expiry info | Y | N | |
### Connz Response
| Feature | Go | .NET | Notes |
|---------|:--:|:----:|-------|
| Filtering by CID, user, account | Y | Partial | |
| Sorting (11 options) | Y | Y | .NET missing ByStop, ByReason |
| Pagination (offset, limit) | Y | Y | |
| Subscription detail mode | Y | N | |
| TLS peer certificate info | Y | N | |
| JWT/IssuerKey/Tags fields | Y | N | |
| MQTT client ID filtering | Y | N | |
| Proxy info | Y | N | |
---
## 8. TLS
### TLS Modes
| Mode | Go | .NET | Notes |
|------|:--:|:----:|-------|
| No TLS | Y | Y | |
| INFO-first (default NATS) | Y | Y | |
| TLS-first (before INFO) | Y | Y | |
| Mixed/Fallback | Y | Y | |
| TLS-required | Y | Y | |
### TLS Features
| Feature | Go | .NET | Notes |
|---------|:--:|:----:|-------|
| PEM cert/key loading | Y | Y | |
| CA chain validation | Y | Y | |
| Mutual TLS (client certs) | Y | Y | |
| Certificate pinning (SHA256 SPKI) | Y | Y | |
| TLS handshake timeout | Y | Y | |
| TLS rate limiting | Y | Property only | .NET has the option but enforcement is partial |
| First-byte peeking (0x16 detection) | Y | Y | |
| Cert subject→user mapping | Y | N | `TlsMap` property exists, no implementation |
| OCSP stapling | Y | N | |
| Min TLS version control | Y | Y | |
---
## 9. Logging
| Feature | Go | .NET | Notes |
|---------|:--:|:----:|-------|
| Structured logging | Partial | Y | .NET uses Serilog with ILogger<T> |
| File logging with rotation | Y | N | |
| Syslog (local and remote) | Y | N | |
| Log reopening (SIGUSR1) | Y | N | |
| Trace mode (protocol-level) | Y | N | |
| Debug mode | Y | N | |
| Per-subsystem log control | Y | N | |
| Color output on TTY | Y | N | |
| Timestamp format control | Y | N | |
---
## 10. Ping/Pong & Keepalive
| Feature | Go | .NET | Notes |
|---------|:--:|:----:|-------|
| Server-initiated PING | Y | Y | |
| Configurable interval | Y | Y | PingInterval option |
| Max pings out | Y | Y | MaxPingsOut option |
| Stale connection close | Y | Y | |
| RTT-based first PING delay | Y | N | Go delays first PING based on RTT |
| RTT tracking | Y | N | |
| Stale connection watcher | Y | N | Go has dedicated watcher goroutine |
---
## Summary: Critical Gaps for Production Use
### High Priority
1. **Slow consumer detection** — unbounded writes can exhaust memory (stat fields exist but no detection logic)
2. **Write coalescing / batch flush** — performance gap for high-throughput scenarios
### Medium Priority
3. **Verbose mode** — clients expect `+OK` when `verbose: true`
4. **Permission deny enforcement at delivery** — deny lists checked at SUB/PUB time but not during message delivery
5. **Config file parsing** — needed for production deployment (CLI stub exists)
6. **Hot reload** — needed for zero-downtime config changes (SIGHUP stub exists)
7. **File logging with rotation** — needed for production logging
8. **No-responders validation** — flag parsed but not enforced
### Lower Priority
9. **Dynamic buffer sizing** — delegated to Pipe, less optimized for long-lived connections
10. **JWT authentication** — needed for operator mode
11. **TLS certificate mapping** — property exists, not implemented
12. **OCSP support** — certificate revocation checking
13. **Subject mapping** — input→output subject transformation
14. **Protocol tracing** — no trace-level logging
15. **Subscription statistics** — SubList has no stats collection
16. **Per-account limits** — connections, subscriptions per account
17. **Reply subject tracking** — dynamic response permissions
18. **Windows Service integration** — needed for Windows deployment

View File

@@ -0,0 +1,139 @@
# Core Server Lifecycle — Design
Implements all gaps from section 1 of `differences.md` (Core Server Lifecycle).
Reference: `golang/nats-server/server/server.go`, `client.go`, `signal.go`
## Components
### 1. ClosedState Enum & Close Reason Tracking
New file `src/NATS.Server/ClosedState.cs` — full Go enum (37 values from `client.go:188-228`).
- `NatsClient` gets `CloseReason` property, `MarkClosed(ClosedState)` method
- Close reason set in `RunAsync` finally blocks based on exception type
- Error-related reasons (ReadError, WriteError, TLSHandshakeError) skip flush on close
- `NatsServer.RemoveClient` logs close reason via structured logging
### 2. Accept Loop Exponential Backoff
Port Go's `acceptError` pattern from `server.go:4607-4627`.
- Constants: `AcceptMinSleep = 10ms`, `AcceptMaxSleep = 1s`
- On `SocketException`: sleep `tmpDelay`, double it, cap at 1s
- On success: reset to 10ms
- During sleep: check `_quitCts` to abort if shutting down
- Non-temporary errors break the loop
### 3. Ephemeral Port (port=0)
After `_listener.Bind()` + `Listen()`, resolve actual port:
```csharp
if (_options.Port == 0)
{
var actualPort = ((IPEndPoint)_listener.LocalEndPoint!).Port;
_options.Port = actualPort;
_serverInfo.Port = actualPort;
}
```
Add public `Port` property on `NatsServer` exposing the resolved port.
### 4. Graceful Shutdown with WaitForShutdown
New fields on `NatsServer`:
- `_shutdown` (volatile bool)
- `_shutdownComplete` (TaskCompletionSource)
- `_quitCts` (CancellationTokenSource) — internal shutdown signal
`ShutdownAsync()` sequence:
1. Guard: if already shutting down, return
2. Set `_shutdown = true`, cancel `_quitCts`
3. Close `_listener` (stops accept loop)
4. Close all client connections with `ServerShutdown` reason
5. Wait for active client tasks to drain
6. Stop monitor server
7. Signal `_shutdownComplete`
`WaitForShutdown()`: blocks on `_shutdownComplete.Task`.
`Dispose()`: calls `ShutdownAsync` synchronously if not already shut down.
### 5. Task Tracking
Track active client tasks for clean shutdown:
- `_activeClientCount` (int, Interlocked)
- `_allClientsExited` (TaskCompletionSource, signaled when count hits 0 during shutdown)
- Increment in `AcceptClientAsync`, decrement in `RunClientAsync` finally block
- `ShutdownAsync` waits on `_allClientsExited` with timeout
### 6. Flush Pending Data Before Close
`NatsClient.FlushAndCloseAsync(bool minimalFlush)`:
- If not skip-flush reason: flush stream with 100ms write deadline
- Close socket
`MarkClosed(ClosedState)` sets skip-flush flag for: ReadError, WriteError, SlowConsumerPendingBytes, SlowConsumerWriteDeadline, TLSHandshakeError.
### 7. Lame Duck Mode
New options: `LameDuckDuration` (default 2min), `LameDuckGracePeriod` (default 10s).
`LameDuckShutdownAsync()`:
1. Set `_lameDuckMode = true`
2. Close listener (stop new connections)
3. Wait `LameDuckGracePeriod` (10s default) for clients to drain naturally
4. Stagger-close remaining clients over `LameDuckDuration - GracePeriod`
- Sleep interval = remaining duration / client count (min 1ms, max 1s)
- Randomize slightly to avoid reconnect storms
5. Call `ShutdownAsync()` for final cleanup
Accept loop: on error, if `_lameDuckMode`, exit cleanly.
### 8. PID File & Ports File
New options: `PidFile` (string?), `PortsFileDir` (string?).
PID file: `File.WriteAllText(pidFile, Process.GetCurrentProcess().Id.ToString())`
Ports file: JSON with `{ "client": port, "monitor": monitorPort }` written to `{dir}/{exe}_{pid}.ports`
Written at startup, deleted at shutdown.
### 9. Signal Handling
In `Program.cs`, use `PosixSignalRegistration` (.NET 6+):
- `SIGTERM``server.ShutdownAsync()` then exit
- `SIGUSR2``server.LameDuckShutdownAsync()`
- `SIGUSR1` → log "log reopen not yet supported"
- `SIGHUP` → log "config reload not yet supported"
Keep existing Ctrl+C handler (SIGINT).
### 10. Server Identity NKey (Stub)
Generate Ed25519 key pair at construction. Store as `ServerNKey` (public) and `_serverSeed` (private). Not used in protocol yet — placeholder for future cluster identity.
### 11. System Account (Stub)
Create `$SYS` account in `_accounts` at construction. Expose as `SystemAccount` property. No internal subscriptions yet.
### 12. Config File & Profiling (Stubs)
- `NatsOptions.ConfigFile` — if set, log warning "config file parsing not yet supported"
- `NatsOptions.ProfPort` — if set, log warning "profiling endpoint not yet supported"
- `Program.cs`: add `-c` CLI flag
## Testing
- Accept loop backoff: mock socket that throws N times, verify delays
- Ephemeral port: start server with port=0, verify resolved port > 0
- Graceful shutdown: start server, connect clients, call ShutdownAsync, verify all disconnected
- WaitForShutdown: verify it blocks until shutdown completes
- Close reason tracking: verify correct ClosedState for auth timeout, max connections, stale connection
- Lame duck mode: start server, connect clients, trigger lame duck, verify staggered closure
- PID file: start server with PidFile option, verify file contents, verify deleted on shutdown
- Ports file: start server with PortsFileDir, verify JSON contents
- Flush before close: verify data is flushed before socket close during shutdown
- System account: verify $SYS account exists after construction

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,18 @@
{
"planPath": "docs/plans/2026-02-22-core-lifecycle-plan.md",
"tasks": [
{"id": 5, "subject": "Task 0: Create ClosedState enum", "status": "pending"},
{"id": 6, "subject": "Task 1: Add close reason tracking to NatsClient", "status": "pending", "blockedBy": [5]},
{"id": 7, "subject": "Task 2: Add NatsOptions for lifecycle features", "status": "pending"},
{"id": 8, "subject": "Task 3: Ephemeral port support", "status": "pending"},
{"id": 9, "subject": "Task 4: Graceful shutdown infrastructure", "status": "pending", "blockedBy": [5, 6, 7, 8]},
{"id": 10, "subject": "Task 5: Flush pending data before close", "status": "pending", "blockedBy": [9]},
{"id": 11, "subject": "Task 6: Lame duck mode", "status": "pending", "blockedBy": [9]},
{"id": 12, "subject": "Task 7: PID file and ports file", "status": "pending", "blockedBy": [9]},
{"id": 13, "subject": "Task 8: System account and NKey identity stubs", "status": "pending"},
{"id": 14, "subject": "Task 9: Signal handling and CLI stubs", "status": "pending", "blockedBy": [11]},
{"id": 15, "subject": "Task 10: Update differences.md", "status": "pending", "blockedBy": [14]},
{"id": 16, "subject": "Task 11: Final verification", "status": "pending", "blockedBy": [15]}
],
"lastUpdated": "2026-02-22T00:00:00Z"
}

View File

@@ -0,0 +1,51 @@
namespace NATS.Server;
/// <summary>
/// Reason a client connection was closed.
/// Corresponds to Go server/client.go ClosedState (subset for single-server scope).
/// </summary>
public enum ClientClosedReason
{
None = 0,
ClientClosed,
AuthenticationTimeout,
AuthenticationViolation,
TlsHandshakeError,
SlowConsumerPendingBytes,
SlowConsumerWriteDeadline,
WriteError,
ReadError,
ParseError,
StaleConnection,
ProtocolViolation,
MaxPayloadExceeded,
MaxSubscriptionsExceeded,
ServerShutdown,
MsgHeaderViolation,
NoRespondersRequiresHeaders,
}
public static class ClientClosedReasonExtensions
{
public static string ToReasonString(this ClientClosedReason reason) => reason switch
{
ClientClosedReason.None => "",
ClientClosedReason.ClientClosed => "Client Closed",
ClientClosedReason.AuthenticationTimeout => "Authentication Timeout",
ClientClosedReason.AuthenticationViolation => "Authorization Violation",
ClientClosedReason.TlsHandshakeError => "TLS Handshake Error",
ClientClosedReason.SlowConsumerPendingBytes => "Slow Consumer (Pending Bytes)",
ClientClosedReason.SlowConsumerWriteDeadline => "Slow Consumer (Write Deadline)",
ClientClosedReason.WriteError => "Write Error",
ClientClosedReason.ReadError => "Read Error",
ClientClosedReason.ParseError => "Parse Error",
ClientClosedReason.StaleConnection => "Stale Connection",
ClientClosedReason.ProtocolViolation => "Protocol Violation",
ClientClosedReason.MaxPayloadExceeded => "Maximum Payload Exceeded",
ClientClosedReason.MaxSubscriptionsExceeded => "Maximum Subscriptions Exceeded",
ClientClosedReason.ServerShutdown => "Server Shutdown",
ClientClosedReason.MsgHeaderViolation => "Message Header Violation",
ClientClosedReason.NoRespondersRequiresHeaders => "No Responders Requires Headers",
_ => reason.ToString(),
};
}

View File

@@ -0,0 +1,41 @@
namespace NATS.Server;
/// <summary>
/// Connection state flags tracked per client.
/// Corresponds to Go server/client.go clientFlag bitfield.
/// Thread-safe via Interlocked operations on the backing int.
/// </summary>
[Flags]
public enum ClientFlags
{
ConnectReceived = 1 << 0,
FirstPongSent = 1 << 1,
HandshakeComplete = 1 << 2,
CloseConnection = 1 << 3,
WriteLoopStarted = 1 << 4,
IsSlowConsumer = 1 << 5,
ConnectProcessFinished = 1 << 6,
}
/// <summary>
/// Thread-safe holder for client flags using Interlocked operations.
/// </summary>
public sealed class ClientFlagHolder
{
private int _flags;
public void SetFlag(ClientFlags flag)
{
Interlocked.Or(ref _flags, (int)flag);
}
public void ClearFlag(ClientFlags flag)
{
Interlocked.And(ref _flags, ~(int)flag);
}
public bool HasFlag(ClientFlags flag)
{
return (Volatile.Read(ref _flags) & (int)flag) != 0;
}
}

View File

@@ -69,6 +69,8 @@ public sealed class ConnzHandler(NatsServer server)
Name = client.ClientOpts?.Name ?? "", Name = client.ClientOpts?.Name ?? "",
Lang = client.ClientOpts?.Lang ?? "", Lang = client.ClientOpts?.Lang ?? "",
Version = client.ClientOpts?.Version ?? "", Version = client.ClientOpts?.Version ?? "",
Pending = (int)client.PendingBytes,
Reason = client.CloseReason.ToReasonString(),
TlsVersion = client.TlsState?.TlsVersion ?? "", TlsVersion = client.TlsState?.TlsVersion ?? "",
TlsCipherSuite = client.TlsState?.CipherSuite ?? "", TlsCipherSuite = client.TlsState?.CipherSuite ?? "",
}; };

View File

@@ -66,6 +66,8 @@ public sealed class VarzHandler : IDisposable
MaxConnections = _options.MaxConnections, MaxConnections = _options.MaxConnections,
MaxPayload = _options.MaxPayload, MaxPayload = _options.MaxPayload,
MaxControlLine = _options.MaxControlLine, MaxControlLine = _options.MaxControlLine,
MaxPending = _options.MaxPending,
WriteDeadline = (long)_options.WriteDeadline.TotalNanoseconds,
MaxPingsOut = _options.MaxPingsOut, MaxPingsOut = _options.MaxPingsOut,
PingInterval = (long)_options.PingInterval.TotalNanoseconds, PingInterval = (long)_options.PingInterval.TotalNanoseconds,
Start = _server.StartTime, Start = _server.StartTime,

View File

@@ -5,6 +5,7 @@ using System.Net.Sockets;
using System.Security.Cryptography; using System.Security.Cryptography;
using System.Text; using System.Text;
using System.Text.Json; using System.Text.Json;
using System.Threading.Channels;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using NATS.Server.Auth; using NATS.Server.Auth;
using NATS.Server.Protocol; using NATS.Server.Protocol;
@@ -34,7 +35,9 @@ public sealed class NatsClient : IDisposable
private readonly AuthService _authService; private readonly AuthService _authService;
private readonly byte[]? _nonce; private readonly byte[]? _nonce;
private readonly NatsParser _parser; private readonly NatsParser _parser;
private readonly SemaphoreSlim _writeLock = new(1, 1); private readonly Channel<ReadOnlyMemory<byte>> _outbound = Channel.CreateBounded<ReadOnlyMemory<byte>>(
new BoundedChannelOptions(8192) { SingleReader = true, FullMode = BoundedChannelFullMode.Wait });
private long _pendingBytes;
private CancellationTokenSource? _clientCts; private CancellationTokenSource? _clientCts;
private readonly Dictionary<string, Subscription> _subs = new(); private readonly Dictionary<string, Subscription> _subs = new();
private readonly ILogger _logger; private readonly ILogger _logger;
@@ -46,9 +49,9 @@ public sealed class NatsClient : IDisposable
public IMessageRouter? Router { get; set; } public IMessageRouter? Router { get; set; }
public Account? Account { get; private set; } public Account? Account { get; private set; }
// Thread-safe: read from auth timeout task on threadpool, written from command pipeline private readonly ClientFlagHolder _flags = new();
private int _connectReceived; public bool ConnectReceived => _flags.HasFlag(ClientFlags.ConnectReceived);
public bool ConnectReceived => Volatile.Read(ref _connectReceived) != 0; public ClientClosedReason CloseReason { get; private set; }
public DateTime StartTime { get; } public DateTime StartTime { get; }
private long _lastActivityTicks; private long _lastActivityTicks;
@@ -93,6 +96,37 @@ public sealed class NatsClient : IDisposable
} }
} }
public bool QueueOutbound(ReadOnlyMemory<byte> data)
{
if (_flags.HasFlag(ClientFlags.CloseConnection))
return false;
var pending = Interlocked.Add(ref _pendingBytes, data.Length);
if (pending > _options.MaxPending)
{
Interlocked.Add(ref _pendingBytes, -data.Length);
_flags.SetFlag(ClientFlags.IsSlowConsumer);
Interlocked.Increment(ref _serverStats.SlowConsumers);
Interlocked.Increment(ref _serverStats.SlowConsumerClients);
_ = CloseWithReasonAsync(ClientClosedReason.SlowConsumerPendingBytes, NatsProtocol.ErrSlowConsumer);
return false;
}
if (!_outbound.Writer.TryWrite(data))
{
Interlocked.Add(ref _pendingBytes, -data.Length);
_flags.SetFlag(ClientFlags.IsSlowConsumer);
Interlocked.Increment(ref _serverStats.SlowConsumers);
Interlocked.Increment(ref _serverStats.SlowConsumerClients);
_ = CloseWithReasonAsync(ClientClosedReason.SlowConsumerPendingBytes, NatsProtocol.ErrSlowConsumer);
return false;
}
return true;
}
public long PendingBytes => Interlocked.Read(ref _pendingBytes);
public async Task RunAsync(CancellationToken ct) public async Task RunAsync(CancellationToken ct)
{ {
_clientCts = CancellationTokenSource.CreateLinkedTokenSource(ct); _clientCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
@@ -102,7 +136,7 @@ public sealed class NatsClient : IDisposable
{ {
// Send INFO (skip if already sent during TLS negotiation) // Send INFO (skip if already sent during TLS negotiation)
if (!InfoAlreadySent) if (!InfoAlreadySent)
await SendInfoAsync(_clientCts.Token); SendInfo();
// Start auth timeout if auth is required // Start auth timeout if auth is required
Task? authTimeoutTask = null; Task? authTimeoutTask = null;
@@ -116,7 +150,7 @@ public sealed class NatsClient : IDisposable
if (!ConnectReceived) if (!ConnectReceived)
{ {
_logger.LogDebug("Client {ClientId} auth timeout", Id); _logger.LogDebug("Client {ClientId} auth timeout", Id);
await SendErrAndCloseAsync(NatsProtocol.ErrAuthTimeout); await SendErrAndCloseAsync(NatsProtocol.ErrAuthTimeout, ClientClosedReason.AuthenticationTimeout);
} }
} }
catch (OperationCanceledException) catch (OperationCanceledException)
@@ -126,15 +160,16 @@ public sealed class NatsClient : IDisposable
}, _clientCts.Token); }, _clientCts.Token);
} }
// Start read pump, command processing, and ping timer in parallel // Start read pump, command processing, write loop, and ping timer in parallel
var fillTask = FillPipeAsync(pipe.Writer, _clientCts.Token); var fillTask = FillPipeAsync(pipe.Writer, _clientCts.Token);
var processTask = ProcessCommandsAsync(pipe.Reader, _clientCts.Token); var processTask = ProcessCommandsAsync(pipe.Reader, _clientCts.Token);
var pingTask = RunPingTimerAsync(_clientCts.Token); var pingTask = RunPingTimerAsync(_clientCts.Token);
var writeTask = RunWriteLoopAsync(_clientCts.Token);
if (authTimeoutTask != null) if (authTimeoutTask != null)
await Task.WhenAny(fillTask, processTask, pingTask, authTimeoutTask); await Task.WhenAny(fillTask, processTask, pingTask, writeTask, authTimeoutTask);
else else
await Task.WhenAny(fillTask, processTask, pingTask); await Task.WhenAny(fillTask, processTask, pingTask, writeTask);
} }
catch (OperationCanceledException) catch (OperationCanceledException)
{ {
@@ -146,6 +181,7 @@ public sealed class NatsClient : IDisposable
} }
finally finally
{ {
_outbound.Writer.TryComplete();
try { _socket.Shutdown(SocketShutdown.Both); } try { _socket.Shutdown(SocketShutdown.Both); }
catch (SocketException) { } catch (SocketException) { }
catch (ObjectDisposedException) { } catch (ObjectDisposedException) { }
@@ -185,11 +221,40 @@ public sealed class NatsClient : IDisposable
var result = await reader.ReadAsync(ct); var result = await reader.ReadAsync(ct);
var buffer = result.Buffer; var buffer = result.Buffer;
long localInMsgs = 0;
long localInBytes = 0;
while (_parser.TryParse(ref buffer, out var cmd)) while (_parser.TryParse(ref buffer, out var cmd))
{ {
Interlocked.Exchange(ref _lastIn, Environment.TickCount64); Interlocked.Exchange(ref _lastIn, Environment.TickCount64);
// Handle Pub/HPub inline to allow ref parameter passing for stat batching.
// DispatchCommandAsync is async and cannot accept ref parameters.
if (cmd.Type is CommandType.Pub or CommandType.HPub
&& (!_authService.IsAuthRequired || ConnectReceived))
{
Interlocked.Exchange(ref _lastActivityTicks, DateTime.UtcNow.Ticks);
ProcessPub(cmd, ref localInMsgs, ref localInBytes);
if (ClientOpts?.Verbose == true)
WriteProtocol(NatsProtocol.OkBytes);
}
else
{
await DispatchCommandAsync(cmd, ct); await DispatchCommandAsync(cmd, ct);
} }
}
if (localInMsgs > 0)
{
Interlocked.Add(ref InMsgs, localInMsgs);
Interlocked.Add(ref _serverStats.InMsgs, localInMsgs);
}
if (localInBytes > 0)
{
Interlocked.Add(ref InBytes, localInBytes);
Interlocked.Add(ref _serverStats.InBytes, localInBytes);
}
reader.AdvanceTo(buffer.Start, buffer.End); reader.AdvanceTo(buffer.Start, buffer.End);
@@ -217,7 +282,7 @@ public sealed class NatsClient : IDisposable
await ProcessConnectAsync(cmd); await ProcessConnectAsync(cmd);
return; return;
case CommandType.Ping: case CommandType.Ping:
await WriteAsync(NatsProtocol.PongBytes, ct); WriteProtocol(NatsProtocol.PongBytes);
return; return;
default: default:
// Ignore all other commands until authenticated // Ignore all other commands until authenticated
@@ -229,10 +294,14 @@ public sealed class NatsClient : IDisposable
{ {
case CommandType.Connect: case CommandType.Connect:
await ProcessConnectAsync(cmd); await ProcessConnectAsync(cmd);
if (ClientOpts?.Verbose == true)
WriteProtocol(NatsProtocol.OkBytes);
break; break;
case CommandType.Ping: case CommandType.Ping:
await WriteAsync(NatsProtocol.PongBytes, ct); WriteProtocol(NatsProtocol.PongBytes);
if (ClientOpts?.Verbose == true)
WriteProtocol(NatsProtocol.OkBytes);
break; break;
case CommandType.Pong: case CommandType.Pong:
@@ -240,16 +309,20 @@ public sealed class NatsClient : IDisposable
break; break;
case CommandType.Sub: case CommandType.Sub:
await ProcessSubAsync(cmd); ProcessSub(cmd);
if (ClientOpts?.Verbose == true)
WriteProtocol(NatsProtocol.OkBytes);
break; break;
case CommandType.Unsub: case CommandType.Unsub:
ProcessUnsub(cmd); ProcessUnsub(cmd);
if (ClientOpts?.Verbose == true)
WriteProtocol(NatsProtocol.OkBytes);
break; break;
case CommandType.Pub: case CommandType.Pub:
case CommandType.HPub: case CommandType.HPub:
await ProcessPubAsync(cmd); // Pub/HPub is handled inline in ProcessCommandsAsync for stat batching
break; break;
} }
} }
@@ -272,7 +345,7 @@ public sealed class NatsClient : IDisposable
if (result == null) if (result == null)
{ {
_logger.LogWarning("Client {ClientId} authentication failed", Id); _logger.LogWarning("Client {ClientId} authentication failed", Id);
await SendErrAndCloseAsync(NatsProtocol.ErrAuthorizationViolation); await SendErrAndCloseAsync(NatsProtocol.ErrAuthorizationViolation, ClientClosedReason.AuthenticationViolation);
return; return;
} }
@@ -301,17 +374,27 @@ public sealed class NatsClient : IDisposable
Account.AddClient(Id); Account.AddClient(Id);
} }
Volatile.Write(ref _connectReceived, 1); // Validate no_responders requires headers
if (ClientOpts.NoResponders && !ClientOpts.Headers)
{
_logger.LogDebug("Client {ClientId} no_responders requires headers", Id);
await CloseWithReasonAsync(ClientClosedReason.NoRespondersRequiresHeaders,
NatsProtocol.ErrNoRespondersRequiresHeaders);
return;
}
_flags.SetFlag(ClientFlags.ConnectReceived);
_flags.SetFlag(ClientFlags.ConnectProcessFinished);
_logger.LogDebug("CONNECT received from client {ClientId}, name={ClientName}", Id, ClientOpts?.Name); _logger.LogDebug("CONNECT received from client {ClientId}, name={ClientName}", Id, ClientOpts?.Name);
} }
private async ValueTask ProcessSubAsync(ParsedCommand cmd) private void ProcessSub(ParsedCommand cmd)
{ {
// Permission check for subscribe // Permission check for subscribe
if (_permissions != null && !_permissions.IsSubscribeAllowed(cmd.Subject!, cmd.Queue)) if (_permissions != null && !_permissions.IsSubscribeAllowed(cmd.Subject!, cmd.Queue))
{ {
_logger.LogDebug("Client {ClientId} subscribe permission denied for {Subject}", Id, cmd.Subject); _logger.LogDebug("Client {ClientId} subscribe permission denied for {Subject}", Id, cmd.Subject);
await SendErrAsync(NatsProtocol.ErrPermissionsSubscribe); SendErr(NatsProtocol.ErrPermissionsSubscribe);
return; return;
} }
@@ -349,19 +432,17 @@ public sealed class NatsClient : IDisposable
Account?.SubList.Remove(sub); Account?.SubList.Remove(sub);
} }
private async ValueTask ProcessPubAsync(ParsedCommand cmd) private void ProcessPub(ParsedCommand cmd, ref long localInMsgs, ref long localInBytes)
{ {
Interlocked.Increment(ref InMsgs); localInMsgs++;
Interlocked.Add(ref InBytes, cmd.Payload.Length); localInBytes += cmd.Payload.Length;
Interlocked.Increment(ref _serverStats.InMsgs);
Interlocked.Add(ref _serverStats.InBytes, cmd.Payload.Length);
// Max payload validation (always, hard close) // Max payload validation (always, hard close)
if (cmd.Payload.Length > _options.MaxPayload) if (cmd.Payload.Length > _options.MaxPayload)
{ {
_logger.LogWarning("Client {ClientId} exceeded max payload: {Size} > {MaxPayload}", _logger.LogWarning("Client {ClientId} exceeded max payload: {Size} > {MaxPayload}",
Id, cmd.Payload.Length, _options.MaxPayload); Id, cmd.Payload.Length, _options.MaxPayload);
await SendErrAndCloseAsync(NatsProtocol.ErrMaxPayloadViolation); _ = SendErrAndCloseAsync(NatsProtocol.ErrMaxPayloadViolation, ClientClosedReason.MaxPayloadExceeded);
return; return;
} }
@@ -369,7 +450,7 @@ public sealed class NatsClient : IDisposable
if (ClientOpts?.Pedantic == true && !SubjectMatch.IsValidPublishSubject(cmd.Subject!)) if (ClientOpts?.Pedantic == true && !SubjectMatch.IsValidPublishSubject(cmd.Subject!))
{ {
_logger.LogDebug("Client {ClientId} invalid publish subject: {Subject}", Id, cmd.Subject); _logger.LogDebug("Client {ClientId} invalid publish subject: {Subject}", Id, cmd.Subject);
await SendErrAsync(NatsProtocol.ErrInvalidPublishSubject); SendErr(NatsProtocol.ErrInvalidPublishSubject);
return; return;
} }
@@ -377,7 +458,7 @@ public sealed class NatsClient : IDisposable
if (_permissions != null && !_permissions.IsPublishAllowed(cmd.Subject!)) if (_permissions != null && !_permissions.IsPublishAllowed(cmd.Subject!))
{ {
_logger.LogDebug("Client {ClientId} publish permission denied for {Subject}", Id, cmd.Subject); _logger.LogDebug("Client {ClientId} publish permission denied for {Subject}", Id, cmd.Subject);
await SendErrAsync(NatsProtocol.ErrPermissionsPublish); SendErr(NatsProtocol.ErrPermissionsPublish);
return; return;
} }
@@ -393,15 +474,15 @@ public sealed class NatsClient : IDisposable
Router?.ProcessMessage(cmd.Subject!, cmd.ReplyTo, headers, payload, this); Router?.ProcessMessage(cmd.Subject!, cmd.ReplyTo, headers, payload, this);
} }
private async Task SendInfoAsync(CancellationToken ct) private void SendInfo()
{ {
var infoJson = JsonSerializer.Serialize(_serverInfo); var infoJson = JsonSerializer.Serialize(_serverInfo);
var infoLine = Encoding.ASCII.GetBytes($"INFO {infoJson}\r\n"); var infoLine = Encoding.ASCII.GetBytes($"INFO {infoJson}\r\n");
await WriteAsync(infoLine, ct); QueueOutbound(infoLine);
} }
public async Task SendMessageAsync(string subject, string sid, string? replyTo, public void SendMessage(string subject, string sid, string? replyTo,
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload, CancellationToken ct) ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
{ {
Interlocked.Increment(ref OutMsgs); Interlocked.Increment(ref OutMsgs);
Interlocked.Add(ref OutBytes, payload.Length + headers.Length); Interlocked.Add(ref OutBytes, payload.Length + headers.Length);
@@ -419,61 +500,89 @@ public sealed class NatsClient : IDisposable
line = Encoding.ASCII.GetBytes($"MSG {subject} {sid} {(replyTo != null ? replyTo + " " : "")}{payload.Length}\r\n"); line = Encoding.ASCII.GetBytes($"MSG {subject} {sid} {(replyTo != null ? replyTo + " " : "")}{payload.Length}\r\n");
} }
await _writeLock.WaitAsync(ct); var totalLen = line.Length + headers.Length + payload.Length + NatsProtocol.CrLf.Length;
try var msg = new byte[totalLen];
{ var offset = 0;
await _stream.WriteAsync(line, ct); line.CopyTo(msg.AsSpan(offset)); offset += line.Length;
if (headers.Length > 0) if (headers.Length > 0) { headers.Span.CopyTo(msg.AsSpan(offset)); offset += headers.Length; }
await _stream.WriteAsync(headers, ct); if (payload.Length > 0) { payload.Span.CopyTo(msg.AsSpan(offset)); offset += payload.Length; }
if (payload.Length > 0) NatsProtocol.CrLf.CopyTo(msg.AsSpan(offset));
await _stream.WriteAsync(payload, ct);
await _stream.WriteAsync(NatsProtocol.CrLf, ct); QueueOutbound(msg);
await _stream.FlushAsync(ct);
}
finally
{
_writeLock.Release();
}
} }
private async Task WriteAsync(byte[] data, CancellationToken ct) private void WriteProtocol(byte[] data)
{ {
await _writeLock.WaitAsync(ct); QueueOutbound(data);
try
{
await _stream.WriteAsync(data, ct);
await _stream.FlushAsync(ct);
}
finally
{
_writeLock.Release();
}
} }
public async Task SendErrAsync(string message) public void SendErr(string message)
{ {
var errLine = Encoding.ASCII.GetBytes($"-ERR '{message}'\r\n"); var errLine = Encoding.ASCII.GetBytes($"-ERR '{message}'\r\n");
QueueOutbound(errLine);
}
private async Task RunWriteLoopAsync(CancellationToken ct)
{
_flags.SetFlag(ClientFlags.WriteLoopStarted);
var reader = _outbound.Reader;
try try
{ {
await WriteAsync(errLine, _clientCts?.Token ?? CancellationToken.None); while (await reader.WaitToReadAsync(ct))
{
long batchBytes = 0;
while (reader.TryRead(out var data))
{
await _stream.WriteAsync(data, ct);
batchBytes += data.Length;
}
using var flushCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
flushCts.CancelAfter(_options.WriteDeadline);
try
{
await _stream.FlushAsync(flushCts.Token);
}
catch (OperationCanceledException) when (!ct.IsCancellationRequested)
{
_flags.SetFlag(ClientFlags.IsSlowConsumer);
Interlocked.Increment(ref _serverStats.SlowConsumers);
Interlocked.Increment(ref _serverStats.SlowConsumerClients);
await CloseWithReasonAsync(ClientClosedReason.SlowConsumerWriteDeadline, NatsProtocol.ErrSlowConsumer);
return;
}
Interlocked.Add(ref _pendingBytes, -batchBytes);
}
} }
catch (OperationCanceledException) catch (OperationCanceledException)
{ {
// Expected during shutdown // Normal shutdown
} }
catch (IOException ex) catch (IOException)
{ {
_logger.LogDebug(ex, "Client {ClientId} failed to send -ERR", Id); await CloseWithReasonAsync(ClientClosedReason.WriteError);
}
catch (ObjectDisposedException ex)
{
_logger.LogDebug(ex, "Client {ClientId} failed to send -ERR (disposed)", Id);
} }
} }
public async Task SendErrAndCloseAsync(string message) public async Task SendErrAndCloseAsync(string message, ClientClosedReason reason = ClientClosedReason.ProtocolViolation)
{ {
await SendErrAsync(message); await CloseWithReasonAsync(reason, message);
}
private async Task CloseWithReasonAsync(ClientClosedReason reason, string? errMessage = null)
{
CloseReason = reason;
if (errMessage != null)
SendErr(errMessage);
_flags.SetFlag(ClientFlags.CloseConnection);
// Complete the outbound channel so the write loop drains remaining data
_outbound.Writer.TryComplete();
// Give the write loop a short window to flush the final batch before canceling
await Task.Delay(50);
if (_clientCts is { } cts) if (_clientCts is { } cts)
await cts.CancelAsync(); await cts.CancelAsync();
else else
@@ -498,22 +607,14 @@ public sealed class NatsClient : IDisposable
if (Volatile.Read(ref _pingsOut) + 1 > _options.MaxPingsOut) if (Volatile.Read(ref _pingsOut) + 1 > _options.MaxPingsOut)
{ {
_logger.LogDebug("Client {ClientId} stale connection -- closing", Id); _logger.LogDebug("Client {ClientId} stale connection -- closing", Id);
await SendErrAndCloseAsync(NatsProtocol.ErrStaleConnection); await SendErrAndCloseAsync(NatsProtocol.ErrStaleConnection, ClientClosedReason.StaleConnection);
return; return;
} }
var currentPingsOut = Interlocked.Increment(ref _pingsOut); var currentPingsOut = Interlocked.Increment(ref _pingsOut);
_logger.LogDebug("Client {ClientId} sending PING ({PingsOut}/{MaxPingsOut})", _logger.LogDebug("Client {ClientId} sending PING ({PingsOut}/{MaxPingsOut})",
Id, currentPingsOut, _options.MaxPingsOut); Id, currentPingsOut, _options.MaxPingsOut);
try WriteProtocol(NatsProtocol.PingBytes);
{
await WriteAsync(NatsProtocol.PingBytes, ct);
}
catch (Exception ex)
{
_logger.LogDebug(ex, "Client {ClientId} failed to send PING", Id);
return;
}
} }
} }
catch (OperationCanceledException) catch (OperationCanceledException)
@@ -532,9 +633,9 @@ public sealed class NatsClient : IDisposable
public void Dispose() public void Dispose()
{ {
_permissions?.Dispose(); _permissions?.Dispose();
_outbound.Writer.TryComplete();
_clientCts?.Dispose(); _clientCts?.Dispose();
_stream.Dispose(); _stream.Dispose();
_socket.Dispose(); _socket.Dispose();
_writeLock.Dispose();
} }
} }

View File

@@ -11,6 +11,8 @@ public sealed class NatsOptions
public int MaxPayload { get; set; } = 1024 * 1024; public int MaxPayload { get; set; } = 1024 * 1024;
public int MaxControlLine { get; set; } = 4096; public int MaxControlLine { get; set; } = 4096;
public int MaxConnections { get; set; } = 65536; public int MaxConnections { get; set; } = 65536;
public long MaxPending { get; set; } = 64 * 1024 * 1024; // 64MB, matching Go MAX_PENDING_SIZE
public TimeSpan WriteDeadline { get; set; } = TimeSpan.FromSeconds(10);
public TimeSpan PingInterval { get; set; } = TimeSpan.FromMinutes(2); public TimeSpan PingInterval { get; set; } = TimeSpan.FromMinutes(2);
public int MaxPingsOut { get; set; } = 2; public int MaxPingsOut { get; set; } = 2;

View File

@@ -227,6 +227,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
{ {
var subList = sender.Account?.SubList ?? _globalAccount.SubList; var subList = sender.Account?.SubList ?? _globalAccount.SubList;
var result = subList.Match(subject); var result = subList.Match(subject);
var delivered = false;
// Deliver to plain subscribers // Deliver to plain subscribers
foreach (var sub in result.PlainSubs) foreach (var sub in result.PlainSubs)
@@ -235,6 +236,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
continue; continue;
DeliverMessage(sub, subject, replyTo, headers, payload); DeliverMessage(sub, subject, replyTo, headers, payload);
delivered = true;
} }
// Deliver to one member of each queue group (round-robin) // Deliver to one member of each queue group (round-robin)
@@ -244,7 +246,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
// Simple round-robin -- pick based on total delivered across group // Simple round-robin -- pick based on total delivered across group
var idx = Math.Abs((int)Interlocked.Increment(ref sender.OutMsgs)) % queueGroup.Length; var idx = Math.Abs((int)Interlocked.Increment(ref sender.OutMsgs)) % queueGroup.Length;
// Undo the OutMsgs increment -- it will be incremented properly in SendMessageAsync // Undo the OutMsgs increment -- it will be incremented properly in SendMessage
Interlocked.Decrement(ref sender.OutMsgs); Interlocked.Decrement(ref sender.OutMsgs);
for (int attempt = 0; attempt < queueGroup.Length; attempt++) for (int attempt = 0; attempt < queueGroup.Length; attempt++)
@@ -253,10 +255,18 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
if (sub.Client != null && (sub.Client != sender || (sender.ClientOpts?.Echo ?? true))) if (sub.Client != null && (sub.Client != sender || (sender.ClientOpts?.Echo ?? true)))
{ {
DeliverMessage(sub, subject, replyTo, headers, payload); DeliverMessage(sub, subject, replyTo, headers, payload);
delivered = true;
break; break;
} }
} }
} }
// No-responders: if nobody received the message and the publisher
// opted in, send back a 503 status HMSG on the reply subject.
if (!delivered && replyTo != null && sender.ClientOpts?.NoResponders == true)
{
SendNoResponders(sender, replyTo);
}
} }
private static void DeliverMessage(Subscription sub, string subject, string? replyTo, private static void DeliverMessage(Subscription sub, string subject, string? replyTo,
@@ -270,8 +280,37 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
if (sub.MaxMessages > 0 && count > sub.MaxMessages) if (sub.MaxMessages > 0 && count > sub.MaxMessages)
return; return;
// Fire and forget -- deliver asynchronously client.SendMessage(subject, sub.Sid, replyTo, headers, payload);
_ = client.SendMessageAsync(subject, sub.Sid, replyTo, headers, payload, CancellationToken.None); }
private static void SendNoResponders(NatsClient sender, string replyTo)
{
// Find the sid for a subscription matching the reply subject
var sid = string.Empty;
foreach (var sub in sender.Subscriptions.Values)
{
if (SubjectMatch.MatchLiteral(replyTo, sub.Subject))
{
sid = sub.Sid;
break;
}
}
// Build: HMSG {replyTo} {sid} {hdrLen} {hdrLen}\r\n{headers}\r\n
var headerBlock = "NATS/1.0 503\r\n\r\n"u8;
var hdrLen = headerBlock.Length;
var controlLine = Encoding.ASCII.GetBytes($"HMSG {replyTo} {sid} {hdrLen} {hdrLen}\r\n");
var totalLen = controlLine.Length + hdrLen + NatsProtocol.CrLf.Length;
var msg = new byte[totalLen];
var offset = 0;
controlLine.CopyTo(msg.AsSpan(offset));
offset += controlLine.Length;
headerBlock.CopyTo(msg.AsSpan(offset));
offset += hdrLen;
NatsProtocol.CrLf.CopyTo(msg.AsSpan(offset));
sender.QueueOutbound(msg);
} }
public Account GetOrCreateAccount(string name) public Account GetOrCreateAccount(string name)

View File

@@ -6,6 +6,7 @@ public static class NatsProtocol
{ {
public const int MaxControlLineSize = 4096; public const int MaxControlLineSize = 4096;
public const int MaxPayloadSize = 1024 * 1024; // 1MB public const int MaxPayloadSize = 1024 * 1024; // 1MB
public const long MaxPendingSize = 64 * 1024 * 1024; // 64MB default max pending
public const int DefaultPort = 4222; public const int DefaultPort = 4222;
public const string Version = "0.1.0"; public const string Version = "0.1.0";
public const int ProtoVersion = 1; public const int ProtoVersion = 1;
@@ -30,6 +31,8 @@ public static class NatsProtocol
public const string ErrAuthTimeout = "Authentication Timeout"; public const string ErrAuthTimeout = "Authentication Timeout";
public const string ErrPermissionsPublish = "Permissions Violation for Publish"; public const string ErrPermissionsPublish = "Permissions Violation for Publish";
public const string ErrPermissionsSubscribe = "Permissions Violation for Subscription"; public const string ErrPermissionsSubscribe = "Permissions Violation for Subscription";
public const string ErrSlowConsumer = "Slow Consumer";
public const string ErrNoRespondersRequiresHeaders = "No Responders Requires Headers Support";
} }
public sealed class ServerInfo public sealed class ServerInfo

View File

@@ -0,0 +1,24 @@
namespace NATS.Server.Tests;
public class ClientClosedReasonTests
{
[Fact]
public void All_expected_close_reasons_exist()
{
// Verify all 17 enum values exist and are distinct (None + 16 named reasons)
var values = Enum.GetValues<ClientClosedReason>();
values.Length.ShouldBe(17);
values.Distinct().Count().ShouldBe(17);
}
[Theory]
[InlineData(ClientClosedReason.ClientClosed, "Client Closed")]
[InlineData(ClientClosedReason.SlowConsumerPendingBytes, "Slow Consumer (Pending Bytes)")]
[InlineData(ClientClosedReason.SlowConsumerWriteDeadline, "Slow Consumer (Write Deadline)")]
[InlineData(ClientClosedReason.StaleConnection, "Stale Connection")]
[InlineData(ClientClosedReason.ServerShutdown, "Server Shutdown")]
public void ToReasonString_returns_human_readable_description(ClientClosedReason reason, string expected)
{
reason.ToReasonString().ShouldBe(expected);
}
}

View File

@@ -0,0 +1,53 @@
namespace NATS.Server.Tests;
public class ClientFlagsTests
{
[Fact]
public void SetFlag_and_HasFlag_work()
{
var holder = new ClientFlagHolder();
holder.HasFlag(ClientFlags.ConnectReceived).ShouldBeFalse();
holder.SetFlag(ClientFlags.ConnectReceived);
holder.HasFlag(ClientFlags.ConnectReceived).ShouldBeTrue();
}
[Fact]
public void ClearFlag_removes_flag()
{
var holder = new ClientFlagHolder();
holder.SetFlag(ClientFlags.ConnectReceived);
holder.SetFlag(ClientFlags.IsSlowConsumer);
holder.ClearFlag(ClientFlags.ConnectReceived);
holder.HasFlag(ClientFlags.ConnectReceived).ShouldBeFalse();
holder.HasFlag(ClientFlags.IsSlowConsumer).ShouldBeTrue();
}
[Fact]
public void Multiple_flags_can_be_set_independently()
{
var holder = new ClientFlagHolder();
holder.SetFlag(ClientFlags.ConnectReceived);
holder.SetFlag(ClientFlags.WriteLoopStarted);
holder.SetFlag(ClientFlags.FirstPongSent);
holder.HasFlag(ClientFlags.ConnectReceived).ShouldBeTrue();
holder.HasFlag(ClientFlags.WriteLoopStarted).ShouldBeTrue();
holder.HasFlag(ClientFlags.FirstPongSent).ShouldBeTrue();
holder.HasFlag(ClientFlags.IsSlowConsumer).ShouldBeFalse();
}
[Fact]
public void SetFlag_is_thread_safe()
{
var holder = new ClientFlagHolder();
var flags = Enum.GetValues<ClientFlags>();
Parallel.ForEach(flags, flag => holder.SetFlag(flag));
foreach (var flag in flags)
holder.HasFlag(flag).ShouldBeTrue();
}
}

View File

@@ -99,8 +99,8 @@ public class ClientTests : IAsyncDisposable
using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await _clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token); await _clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
// Trigger SendErrAsync // Trigger SendErr
await _natsClient.SendErrAsync("Invalid Subject"); _natsClient.SendErr("Invalid Subject");
var n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token); var n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
var response = Encoding.ASCII.GetString(buf, 0, n); var response = Encoding.ASCII.GetString(buf, 0, n);

View File

@@ -0,0 +1,117 @@
using System.Net;
using System.Net.Sockets;
using System.Text;
using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server;
namespace NATS.Server.Tests;
public class NoRespondersTests : IAsyncLifetime
{
private readonly NatsServer _server;
private readonly int _port;
private readonly CancellationTokenSource _cts = new();
public NoRespondersTests()
{
_port = GetFreePort();
_server = new NatsServer(new NatsOptions { Port = _port }, NullLoggerFactory.Instance);
}
public async Task InitializeAsync()
{
_ = _server.StartAsync(_cts.Token);
await _server.WaitForReadyAsync();
}
public async Task DisposeAsync()
{
await _cts.CancelAsync();
_server.Dispose();
}
private static int GetFreePort()
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
sock.Bind(new IPEndPoint(IPAddress.Loopback, 0));
return ((IPEndPoint)sock.LocalEndPoint!).Port;
}
private async Task<Socket> ConnectClientAsync()
{
var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, _port);
return sock;
}
private static async Task<string> ReadUntilAsync(Socket sock, string expected, int timeoutMs = 5000)
{
using var cts = new CancellationTokenSource(timeoutMs);
var sb = new StringBuilder();
var buf = new byte[4096];
while (!sb.ToString().Contains(expected))
{
var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token);
if (n == 0) break;
sb.Append(Encoding.ASCII.GetString(buf, 0, n));
}
return sb.ToString();
}
[Fact]
public async Task NoResponders_without_headers_closes_connection()
{
using var client = await ConnectClientAsync();
// Read INFO
await ReadUntilAsync(client, "\r\n");
// Send CONNECT with no_responders:true but headers:false
await client.SendAsync(Encoding.ASCII.GetBytes(
"CONNECT {\"no_responders\":true,\"headers\":false}\r\n"));
// Should receive -ERR and connection should close
var response = await ReadUntilAsync(client, "-ERR");
response.ShouldContain("-ERR");
response.ShouldContain("No Responders Requires Headers Support");
}
[Fact]
public async Task NoResponders_with_headers_accepted()
{
using var client = await ConnectClientAsync();
// Read INFO
await ReadUntilAsync(client, "\r\n");
// Send CONNECT with both no_responders and headers true, then PING
await client.SendAsync(Encoding.ASCII.GetBytes(
"CONNECT {\"no_responders\":true,\"headers\":true}\r\nPING\r\n"));
// Should receive PONG (connection stays alive)
var response = await ReadUntilAsync(client, "PONG\r\n");
response.ShouldContain("PONG\r\n");
}
[Fact]
public async Task NoResponders_sends_503_when_no_subscribers()
{
using var client = await ConnectClientAsync();
// Read INFO
await ReadUntilAsync(client, "\r\n");
// CONNECT with no_responders and headers enabled
// SUB to the reply inbox so we can receive the 503
// PUB to a subject with no subscribers, using a reply-to subject
await client.SendAsync(Encoding.ASCII.GetBytes(
"CONNECT {\"no_responders\":true,\"headers\":true}\r\n" +
"SUB _INBOX.reply 1\r\n" +
"PUB no.subscribers _INBOX.reply 5\r\nHello\r\n"));
// Should receive HMSG with 503 status on the reply subject
var response = await ReadUntilAsync(client, "HMSG");
response.ShouldContain("HMSG _INBOX.reply 1");
response.ShouldContain("503");
}
}

View File

@@ -0,0 +1,137 @@
using System.Net;
using System.Net.Sockets;
using System.Text;
using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server;
namespace NATS.Server.Tests;
public class VerboseModeTests : IAsyncLifetime
{
private readonly NatsServer _server;
private readonly int _port;
private readonly CancellationTokenSource _cts = new();
public VerboseModeTests()
{
_port = GetFreePort();
_server = new NatsServer(new NatsOptions { Port = _port }, NullLoggerFactory.Instance);
}
public async Task InitializeAsync()
{
_ = _server.StartAsync(_cts.Token);
await _server.WaitForReadyAsync();
}
public async Task DisposeAsync()
{
await _cts.CancelAsync();
_server.Dispose();
}
private static int GetFreePort()
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
sock.Bind(new IPEndPoint(IPAddress.Loopback, 0));
return ((IPEndPoint)sock.LocalEndPoint!).Port;
}
private async Task<Socket> ConnectClientAsync()
{
var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, _port);
return sock;
}
private static async Task<string> ReadUntilAsync(Socket sock, string expected, int timeoutMs = 5000)
{
using var cts = new CancellationTokenSource(timeoutMs);
var sb = new StringBuilder();
var buf = new byte[4096];
while (!sb.ToString().Contains(expected))
{
var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token);
if (n == 0) break;
sb.Append(Encoding.ASCII.GetString(buf, 0, n));
}
return sb.ToString();
}
[Fact]
public async Task Verbose_mode_sends_OK_after_CONNECT()
{
using var client = await ConnectClientAsync();
// Read INFO
await ReadUntilAsync(client, "\r\n");
// Send CONNECT with verbose:true
await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {\"verbose\":true}\r\n"));
// Should receive +OK after CONNECT
var response = await ReadUntilAsync(client, "+OK\r\n");
response.ShouldContain("+OK\r\n");
}
[Fact]
public async Task Verbose_mode_sends_OK_after_SUB()
{
using var client = await ConnectClientAsync();
// Read INFO
await ReadUntilAsync(client, "\r\n");
// Send CONNECT with verbose:true, then SUB
await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {\"verbose\":true}\r\n"));
// Read +OK from CONNECT
await ReadUntilAsync(client, "+OK\r\n");
// Send SUB
await client.SendAsync(Encoding.ASCII.GetBytes("SUB foo 1\r\n"));
// Should receive +OK after SUB
var response = await ReadUntilAsync(client, "+OK\r\n");
response.ShouldContain("+OK\r\n");
}
[Fact]
public async Task Verbose_mode_sends_OK_after_PUB()
{
using var client = await ConnectClientAsync();
// Read INFO
await ReadUntilAsync(client, "\r\n");
// Send CONNECT with verbose:true
await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {\"verbose\":true}\r\n"));
// Read +OK from CONNECT
await ReadUntilAsync(client, "+OK\r\n");
// Send PUB
await client.SendAsync(Encoding.ASCII.GetBytes("PUB foo 5\r\nHello\r\n"));
// Should receive +OK after PUB
var response = await ReadUntilAsync(client, "+OK\r\n");
response.ShouldContain("+OK\r\n");
}
[Fact]
public async Task Non_verbose_mode_does_not_send_OK()
{
using var client = await ConnectClientAsync();
// Read INFO
await ReadUntilAsync(client, "\r\n");
// Send CONNECT without verbose (default false), then SUB, then PING
await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB foo 1\r\nPING\r\n"));
// Should receive PONG but NOT +OK
var response = await ReadUntilAsync(client, "PONG\r\n");
response.ShouldContain("PONG\r\n");
response.ShouldNotContain("+OK");
}
}

View File

@@ -0,0 +1,363 @@
using System.Net;
using System.Net.Sockets;
using System.Text;
using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server;
using NATS.Server.Auth;
using NATS.Server.Protocol;
namespace NATS.Server.Tests;
public class WriteLoopTests : IAsyncDisposable
{
private readonly Socket _serverSocket;
private readonly Socket _clientSocket;
private readonly CancellationTokenSource _cts = new();
private readonly ServerInfo _serverInfo = new()
{
ServerId = "test",
ServerName = "test",
Version = "0.1.0",
Host = "127.0.0.1",
Port = 4222,
};
/// <summary>
/// Creates a connected socket pair via loopback and returns both sockets.
/// </summary>
private static (Socket serverSocket, Socket clientSocket) CreateSocketPair()
{
var listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
listener.Bind(new IPEndPoint(IPAddress.Loopback, 0));
listener.Listen(1);
var port = ((IPEndPoint)listener.LocalEndPoint!).Port;
var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
clientSocket.Connect(IPAddress.Loopback, port);
var serverSocket = listener.Accept();
listener.Dispose();
return (serverSocket, clientSocket);
}
public WriteLoopTests()
{
(_serverSocket, _clientSocket) = CreateSocketPair();
}
public async ValueTask DisposeAsync()
{
await _cts.CancelAsync();
_serverSocket.Dispose();
_clientSocket.Dispose();
}
private NatsClient CreateClient(NatsOptions? options = null)
{
options ??= new NatsOptions();
var authService = AuthService.Build(options);
return new NatsClient(
1,
new NetworkStream(_serverSocket, ownsSocket: false),
_serverSocket,
options,
_serverInfo,
authService,
null,
NullLogger.Instance,
new ServerStats());
}
/// <summary>
/// Reads all available data from the client socket until timeout, accumulating it into a string.
/// </summary>
private async Task<string> ReadFromClientSocketAsync(int bufferSize = 8192, int timeoutMs = 5000)
{
var buf = new byte[bufferSize];
using var readCts = new CancellationTokenSource(timeoutMs);
var n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
return Encoding.ASCII.GetString(buf, 0, n);
}
/// <summary>
/// Reads and discards the INFO line that is sent on client startup.
/// </summary>
private async Task ConsumeInfoAsync()
{
var response = await ReadFromClientSocketAsync();
response.ShouldStartWith("INFO ");
}
[Fact]
public async Task QueueOutbound_writes_data_to_client()
{
using var natsClient = CreateClient();
var runTask = natsClient.RunAsync(_cts.Token);
// Read and discard the initial INFO message
await ConsumeInfoAsync();
// Queue some data via QueueOutbound
var testData = "PING\r\n"u8.ToArray();
var result = natsClient.QueueOutbound(testData);
result.ShouldBeTrue();
// Read the data from the client socket
var received = await ReadFromClientSocketAsync();
received.ShouldBe("PING\r\n");
await _cts.CancelAsync();
}
[Fact]
public async Task QueueOutbound_writes_multiple_messages_to_client()
{
using var natsClient = CreateClient();
var runTask = natsClient.RunAsync(_cts.Token);
// Read and discard the initial INFO message
await ConsumeInfoAsync();
// Queue multiple messages
natsClient.QueueOutbound("MSG foo 1 5\r\nhello\r\n"u8.ToArray());
natsClient.QueueOutbound("MSG bar 2 5\r\nworld\r\n"u8.ToArray());
// Read all data from the socket -- may arrive in one or two reads
var sb = new StringBuilder();
var buf = new byte[8192];
using var readCts = new CancellationTokenSource(5000);
while (sb.Length < "MSG foo 1 5\r\nhello\r\nMSG bar 2 5\r\nworld\r\n".Length)
{
var n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
sb.Append(Encoding.ASCII.GetString(buf, 0, n));
}
var received = sb.ToString();
received.ShouldContain("MSG foo 1 5\r\nhello\r\n");
received.ShouldContain("MSG bar 2 5\r\nworld\r\n");
await _cts.CancelAsync();
}
[Fact]
public async Task SlowConsumer_closes_when_pending_exceeds_max()
{
// Use a very small MaxPending so we can easily exceed it
var options = new NatsOptions { MaxPending = 1024 };
using var natsClient = CreateClient(options);
var runTask = natsClient.RunAsync(_cts.Token);
// Read and discard the initial INFO message
await ConsumeInfoAsync();
// Queue data that exceeds MaxPending. The first call adds to pending, then
// subsequent calls should eventually trigger slow consumer detection.
var largeData = new byte[2048];
Array.Fill(largeData, (byte)'X');
var queued = natsClient.QueueOutbound(largeData);
// The QueueOutbound should return false since data exceeds MaxPending
queued.ShouldBeFalse();
// CloseReason should be set to SlowConsumerPendingBytes
// Give a small delay for the async close to propagate
await Task.Delay(200);
natsClient.CloseReason.ShouldBe(ClientClosedReason.SlowConsumerPendingBytes);
await _cts.CancelAsync();
}
[Fact]
public async Task SlowConsumer_sets_reason_before_closing_connection()
{
// Use very small MaxPending
var options = new NatsOptions { MaxPending = 512 };
using var natsClient = CreateClient(options);
var runTask = natsClient.RunAsync(_cts.Token);
// Read and discard the initial INFO message
await ConsumeInfoAsync();
// Try to queue data that exceeds MaxPending
var largeData = new byte[1024];
Array.Fill(largeData, (byte)'Y');
natsClient.QueueOutbound(largeData);
// Wait for the close to propagate
await Task.Delay(200);
natsClient.CloseReason.ShouldBe(ClientClosedReason.SlowConsumerPendingBytes);
// The connection should eventually be closed -- ReceiveAsync returns 0
var buf = new byte[4096];
using var readCts = new CancellationTokenSource(5000);
try
{
// Read any remaining data (the -ERR 'Slow Consumer' message) then expect 0
var total = 0;
while (true)
{
var n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
if (n == 0)
break;
total += n;
}
// We should get here -- the socket was closed by the server
true.ShouldBeTrue();
}
catch (SocketException)
{
// Also acceptable -- connection reset
true.ShouldBeTrue();
}
await _cts.CancelAsync();
}
[Fact]
public async Task PendingBytes_tracks_queued_data()
{
using var natsClient = CreateClient();
var runTask = natsClient.RunAsync(_cts.Token);
// Read and discard the initial INFO message -- note that the write loop
// processes the INFO so by the time we've read it, PendingBytes for INFO
// should have been decremented.
await ConsumeInfoAsync();
// After INFO has been written and flushed, pending bytes should be back to 0
// (give a tiny delay for the write loop to decrement)
await Task.Delay(100);
natsClient.PendingBytes.ShouldBe(0);
// Now queue known data
var data1 = new byte[100];
var data2 = new byte[200];
natsClient.QueueOutbound(data1);
// PendingBytes should reflect the queued but not-yet-written data.
// However, the write loop is running concurrently so it may drain quickly.
// At a minimum, after queueing data1 PendingBytes should be >= 0
// (it could have been written already). Let's queue both in quick succession
// and check the combined pending.
natsClient.QueueOutbound(data2);
// PendingBytes should be at least 0 (write loop may have already drained)
natsClient.PendingBytes.ShouldBeGreaterThanOrEqualTo(0);
// Now let the write loop drain everything and verify it returns to 0
await Task.Delay(500);
natsClient.PendingBytes.ShouldBe(0);
await _cts.CancelAsync();
}
[Fact]
public async Task PendingBytes_increases_with_queued_data_before_drain()
{
// To observe PendingBytes before the write loop drains it, we create
// a scenario where we can check the value atomically after queuing.
// We use a fresh client and check PendingBytes right after QueueOutbound.
using var natsClient = CreateClient();
var runTask = natsClient.RunAsync(_cts.Token);
// Read the INFO to avoid backpressure
await ConsumeInfoAsync();
await Task.Delay(100);
// Queue a message and check that PendingBytes is non-negative
// (the write loop may drain it very quickly, so we can't guarantee a specific value,
// but we can verify the property works and eventually returns to 0)
var data = new byte[500];
natsClient.QueueOutbound(data);
// PendingBytes is either 500 (not yet drained) or 0 (already drained) -- both valid
var pending = natsClient.PendingBytes;
pending.ShouldBeOneOf(0L, 500L);
// After draining, it should be 0
await Task.Delay(500);
natsClient.PendingBytes.ShouldBe(0);
// Read and discard the written data from the socket so it doesn't block
var buf = new byte[8192];
using var readCts = new CancellationTokenSource(2000);
try
{
await _clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
}
catch (OperationCanceledException)
{
// OK -- may have already been consumed
}
await _cts.CancelAsync();
}
[Fact]
public async Task QueueOutbound_returns_false_when_client_is_closed()
{
var options = new NatsOptions { MaxPending = 512 };
using var natsClient = CreateClient(options);
var runTask = natsClient.RunAsync(_cts.Token);
// Read INFO
await ConsumeInfoAsync();
// Trigger slow consumer close by exceeding MaxPending
var largeData = new byte[1024];
natsClient.QueueOutbound(largeData);
// Wait for close to propagate
await Task.Delay(200);
// Subsequent QueueOutbound calls should return false
var result = natsClient.QueueOutbound("PING\r\n"u8.ToArray());
result.ShouldBeFalse();
await _cts.CancelAsync();
}
[Fact]
public async Task SlowConsumer_increments_server_stats()
{
var options = new NatsOptions { MaxPending = 512 };
var stats = new ServerStats();
var authService = AuthService.Build(options);
using var natsClient = new NatsClient(
1,
new NetworkStream(_serverSocket, ownsSocket: false),
_serverSocket,
options,
_serverInfo,
authService,
null,
NullLogger.Instance,
stats);
var runTask = natsClient.RunAsync(_cts.Token);
// Read INFO
await ConsumeInfoAsync();
// Initial stats should be 0
Interlocked.Read(ref stats.SlowConsumers).ShouldBe(0);
Interlocked.Read(ref stats.SlowConsumerClients).ShouldBe(0);
// Trigger slow consumer
var largeData = new byte[1024];
natsClient.QueueOutbound(largeData);
// Wait for propagation
await Task.Delay(200);
// Stats should be incremented
Interlocked.Read(ref stats.SlowConsumers).ShouldBeGreaterThan(0);
Interlocked.Read(ref stats.SlowConsumerClients).ShouldBeGreaterThan(0);
await _cts.CancelAsync();
}
}