Compare commits
14 Commits
4d89661e79
...
2baf8a85bf
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2baf8a85bf | ||
|
|
f5c0c4f906 | ||
|
|
2fb14821e0 | ||
|
|
04305447f9 | ||
|
|
bce793fd42 | ||
|
|
c522ce99f5 | ||
|
|
b289041761 | ||
|
|
31660a4187 | ||
|
|
ad6a02b9a2 | ||
|
|
61c6b832e5 | ||
|
|
1a916a3f36 | ||
|
|
8bbfa54058 | ||
|
|
149c852510 | ||
|
|
c2dc503e2e |
402
differences.md
Normal file
402
differences.md
Normal file
@@ -0,0 +1,402 @@
|
||||
# Go vs .NET NATS Server: Functionality Differences
|
||||
|
||||
> Excludes clustering/routes, gateways, leaf nodes, and JetStream.
|
||||
> Generated 2026-02-22 by comparing `golang/nats-server/server/` against `src/NATS.Server/`.
|
||||
|
||||
---
|
||||
|
||||
## 1. Core Server Lifecycle
|
||||
|
||||
### Server Initialization
|
||||
| Feature | Go | .NET | Notes |
|
||||
|---------|:--:|:----:|-------|
|
||||
| NKey generation (server identity) | Y | Y | Ed25519 key pair via NATS.NKeys at startup |
|
||||
| System account setup | Y | Y | `$SYS` account created; no event publishing yet (stub) |
|
||||
| Config file validation on startup | Y | Stub | `-c` flag parsed, `ConfigFile` stored, but no config parser |
|
||||
| PID file writing | Y | Y | Written on startup, deleted on shutdown |
|
||||
| Profiling HTTP endpoint (`/debug/pprof`) | Y | Stub | `ProfPort` option exists but endpoint not implemented |
|
||||
| Ports file output | Y | Y | JSON ports file written to `PortsFileDir` on startup |
|
||||
|
||||
### Accept Loop
|
||||
| Feature | Go | .NET | Notes |
|
||||
|---------|:--:|:----:|-------|
|
||||
| Exponential backoff on accept errors | Y | Y | .NET backs off from 10ms to 1s on repeated failures |
|
||||
| Config reload lock during client creation | Y | N | Go holds `reloadMu` around `createClient` |
|
||||
| Goroutine/task tracking (WaitGroup) | Y | Y | `Interlocked` counter + drain with 10s timeout on shutdown |
|
||||
| Callback-based error handling | Y | N | Go uses `errFunc` callback pattern |
|
||||
| Random/ephemeral port (port=0) | Y | Y | Port resolved after `Bind`+`Listen`, stored in `_options.Port` |
|
||||
|
||||
### Shutdown
|
||||
| Feature | Go | .NET | Notes |
|
||||
|---------|:--:|:----:|-------|
|
||||
| Graceful shutdown with `WaitForShutdown()` | Y | Y | Idempotent CAS-guarded `ShutdownAsync()` + blocking `WaitForShutdown()` |
|
||||
| Close reason tracking per connection | Y | Y | 37-value `ClosedState` enum, CAS-based first-writer-wins `MarkClosed()` |
|
||||
| Lame duck mode (stop new, drain existing) | Y | Y | `LameDuckShutdownAsync()` with grace period + stagger-close with jitter |
|
||||
| Wait for accept loop completion | Y | Y | `TaskCompletionSource` signaled in accept loop `finally` |
|
||||
| Flush pending data before close | Y | Y | `FlushAndCloseAsync()` with best-effort flush, skip-flush for error conditions |
|
||||
|
||||
### Signal Handling
|
||||
| Signal | Go | .NET | Notes |
|
||||
|--------|:--:|:----:|-------|
|
||||
| SIGINT (Ctrl+C) | Y | Y | Both handle graceful shutdown |
|
||||
| SIGTERM | Y | Y | `PosixSignalRegistration` triggers `ShutdownAsync()` |
|
||||
| SIGUSR1 (reopen logs) | Y | Stub | Signal registered, handler logs "not yet implemented" |
|
||||
| SIGUSR2 (lame duck mode) | Y | Y | Triggers `LameDuckShutdownAsync()` |
|
||||
| SIGHUP (config reload) | Y | Stub | Signal registered, handler logs "not yet implemented" |
|
||||
| Windows Service integration | Y | N | |
|
||||
|
||||
---
|
||||
|
||||
## 2. Client / Connection Handling
|
||||
|
||||
### Concurrency Model
|
||||
| Feature | Go | .NET | Notes |
|
||||
|---------|:--:|:----:|-------|
|
||||
| Separate read + write loops | Y | Y | Channel-based `RunWriteLoopAsync` with `QueueOutbound()` |
|
||||
| Write coalescing / batch flush | Y | Y | Write loop drains all channel items before single `FlushAsync` |
|
||||
| Dynamic buffer sizing (512B-64KB) | Y | N | .NET delegates to `System.IO.Pipelines` |
|
||||
| Output buffer pooling (3-tier) | Y | N | Go pools at 512B, 4KB, 64KB |
|
||||
|
||||
### Connection Types
|
||||
| Type | Go | .NET | Notes |
|
||||
|------|:--:|:----:|-------|
|
||||
| CLIENT | Y | Y | |
|
||||
| ROUTER | Y | N | Excluded per scope |
|
||||
| GATEWAY | Y | N | Excluded per scope |
|
||||
| LEAF | Y | N | Excluded per scope |
|
||||
| SYSTEM (internal) | Y | N | |
|
||||
| JETSTREAM (internal) | Y | N | |
|
||||
| ACCOUNT (internal) | Y | N | |
|
||||
| WebSocket clients | Y | N | |
|
||||
| MQTT clients | Y | N | |
|
||||
|
||||
### Client Features
|
||||
| Feature | Go | .NET | Notes |
|
||||
|---------|:--:|:----:|-------|
|
||||
| Echo suppression (`echo: false`) | Y | Y | .NET checks echo in delivery path (NatsServer.cs:234,253) |
|
||||
| Verbose mode (`+OK` responses) | Y | Y | Sends `+OK` after CONNECT, SUB, UNSUB, PUB when `verbose:true` |
|
||||
| No-responders validation | Y | Y | CONNECT rejects `no_responders` without `headers`; 503 HMSG on no match |
|
||||
| Slow consumer detection | Y | Y | Pending bytes threshold (64MB) + write deadline timeout (10s) |
|
||||
| Write deadline / timeout policies | Y | Y | `WriteDeadline` option with `CancellationTokenSource.CancelAfter` on flush |
|
||||
| RTT measurement | Y | N | Go tracks round-trip time per client |
|
||||
| Per-client trace mode | Y | N | |
|
||||
| Detailed close reason tracking | Y | Y | 17-value `ClientClosedReason` enum (single-server subset of Go's 37) |
|
||||
| Connection state flags (16 flags) | Y | Y | 7-flag `ClientFlagHolder` with `Interlocked.Or`/`And` |
|
||||
|
||||
### Slow Consumer Handling
|
||||
Go implements a sophisticated slow consumer detection system:
|
||||
- Tracks `pendingBytes` per client output buffer
|
||||
- If pending exceeds `maxPending`, enters stall mode (2-5ms waits)
|
||||
- Total stall capped at 10ms per read cycle
|
||||
- Closes with `SlowConsumerPendingBytes` or `SlowConsumerWriteDeadline`
|
||||
- Sets `isSlowConsumer` flag for monitoring
|
||||
|
||||
.NET now implements pending bytes tracking and write deadline enforcement via `Channel<ReadOnlyMemory<byte>>`. Key differences from Go: no stall/retry mode (immediate close on threshold exceeded), write deadline via `CancellationTokenSource.CancelAfter` instead of `SetWriteDeadline`. `IsSlowConsumer` flag and server-level `SlowConsumerCount` stats are tracked for monitoring.
|
||||
|
||||
### Stats Tracking
|
||||
| Feature | Go | .NET | Notes |
|
||||
|---------|:--:|:----:|-------|
|
||||
| Per-connection atomic stats | Y | Y | .NET uses `Interlocked` for stats access |
|
||||
| Per-read-cycle stat batching | Y | Y | Local accumulators flushed via `Interlocked.Add` per read cycle |
|
||||
| Per-account stats | Y | N | |
|
||||
| Slow consumer counters | Y | Y | `SlowConsumers` and `SlowConsumerClients` incremented on detection |
|
||||
|
||||
---
|
||||
|
||||
## 3. Protocol Parsing
|
||||
|
||||
### Parser Architecture
|
||||
| Aspect | Go | .NET |
|
||||
|--------|-----|------|
|
||||
| Approach | Byte-by-byte state machine (74 states) | Two-phase: line extraction + command dispatch |
|
||||
| Case handling | Per-state character checks | Bit-mask lowercase normalization (`\| 0x20`) |
|
||||
| Buffer strategy | Jump-ahead optimization for payloads | Direct size-based reads via Pipe |
|
||||
| Split-buffer handling | argBuf accumulation with scratch buffer | State variables (`_awaitingPayload`, etc.) |
|
||||
| Error model | Inline error sending + error return | Exception-based (`ProtocolViolationException`) |
|
||||
| CRLF in payload | Included in message buffer | Excluded by design |
|
||||
|
||||
### Protocol Operations
|
||||
| Operation | Go | .NET | Notes |
|
||||
|-----------|:--:|:----:|-------|
|
||||
| PUB | Y | Y | |
|
||||
| HPUB (headers) | Y | Y | |
|
||||
| SUB | Y | Y | |
|
||||
| UNSUB | Y | Y | |
|
||||
| CONNECT | Y | Y | |
|
||||
| INFO | Y | Y | |
|
||||
| PING / PONG | Y | Y | |
|
||||
| MSG / HMSG | Y | Y | |
|
||||
| +OK / -ERR | Y | Y | |
|
||||
| RS+/RS-/RMSG (routes) | Y | N | Excluded per scope |
|
||||
| A+/A- (accounts) | Y | N | Excluded per scope |
|
||||
| LS+/LS-/LMSG (leaf) | Y | N | Excluded per scope |
|
||||
|
||||
### Protocol Parsing Gaps
|
||||
| Feature | Go | .NET | Notes |
|
||||
|---------|:--:|:----:|-------|
|
||||
| Multi-client-type command routing | Y | N | Go checks `c.kind` to allow/reject commands |
|
||||
| Protocol tracing in parser | Y | N | Go calls `traceInOp()` per operation |
|
||||
| Subject mapping (input→output) | Y | N | Go transforms subjects via mapping rules |
|
||||
| MIME header parsing | Y | N | .NET delegates header handling to client layer |
|
||||
| Message trace event initialization | Y | N | |
|
||||
|
||||
### Protocol Writing
|
||||
| Aspect | Go | .NET | Notes |
|
||||
|--------|:--:|:----:|-------|
|
||||
| INFO serialization | Once at startup | Every send | .NET re-serializes JSON each time |
|
||||
| MSG/HMSG construction | Direct buffer write | String interpolation → byte encode | More allocations in .NET |
|
||||
| Pre-encoded constants | Y | Y | Both pre-encode PING/PONG/OK |
|
||||
|
||||
---
|
||||
|
||||
## 4. Subscriptions & Subject Matching
|
||||
|
||||
### Trie Implementation
|
||||
| Feature | Go | .NET | Notes |
|
||||
|---------|:--:|:----:|-------|
|
||||
| Basic trie with `*`/`>` wildcards | Y | Y | Core matching identical |
|
||||
| Queue group support | Y | Y | |
|
||||
| Result caching (1024 max) | Y | Y | Same limits |
|
||||
| `plist` optimization (>256 subs) | Y | N | Go converts high-fanout nodes to array |
|
||||
| Async cache sweep (background) | Y | N | .NET sweeps inline under write lock |
|
||||
| Atomic generation ID for invalidation | Y | N | .NET clears cache explicitly |
|
||||
| Cache eviction strategy | Random | First-N | Semantic difference minimal |
|
||||
|
||||
### Missing SubList Features
|
||||
| Feature | Go | .NET | Notes |
|
||||
|---------|:--:|:----:|-------|
|
||||
| `Stats()` — comprehensive statistics | Y | N | Matches, cache hits, inserts, removes, fanout |
|
||||
| `HasInterest()` — fast bool check | Y | N | |
|
||||
| `NumInterest()` — fast count | Y | N | |
|
||||
| `ReverseMatch()` — pattern→literal query | Y | N | |
|
||||
| `RemoveBatch()` — efficient bulk removal | Y | N | |
|
||||
| `All()` — enumerate all subscriptions | Y | N | |
|
||||
| Notification system (interest changes) | Y | N | |
|
||||
| Local/remote subscription filtering | Y | N | |
|
||||
| Queue weight expansion (remote subs) | Y | N | |
|
||||
| `MatchBytes()` — zero-copy byte API | Y | N | |
|
||||
|
||||
### Subject Validation
|
||||
| Feature | Go | .NET | Notes |
|
||||
|---------|:--:|:----:|-------|
|
||||
| Basic validation (empty tokens, wildcards) | Y | Y | |
|
||||
| Literal subject check | Y | Y | |
|
||||
| UTF-8/null rune validation | Y | N | Go has `checkRunes` parameter |
|
||||
| Collision detection (`SubjectsCollide`) | Y | N | |
|
||||
| Token utilities (`tokenAt`, `numTokens`) | Y | N | |
|
||||
| Stack-allocated token buffer | Y | N | Go uses `[32]string{}` on stack |
|
||||
|
||||
### Subscription Lifecycle
|
||||
| Feature | Go | .NET | Notes |
|
||||
|---------|:--:|:----:|-------|
|
||||
| Per-account subscription limit | Y | N | |
|
||||
| Auto-unsubscribe on max messages | Y | Y | .NET enforces at delivery time (NatsServer.cs:269-270) |
|
||||
| Subscription routing propagation | Y | N | For clusters |
|
||||
| Queue weight (`qw`) field | Y | N | For remote queue load balancing |
|
||||
|
||||
---
|
||||
|
||||
## 5. Authentication & Authorization
|
||||
|
||||
### Auth Mechanisms
|
||||
| Mechanism | Go | .NET | Notes |
|
||||
|-----------|:--:|:----:|-------|
|
||||
| Username/password | Y | Y | |
|
||||
| Token | Y | Y | |
|
||||
| NKeys (Ed25519) | Y | Y | .NET has framework but integration is basic |
|
||||
| JWT validation | Y | N | |
|
||||
| Bcrypt password hashing | Y | Y | .NET supports bcrypt (`$2*` prefix) with constant-time fallback |
|
||||
| TLS certificate mapping | Y | N | Property exists but no implementation |
|
||||
| Custom auth interface | Y | N | |
|
||||
| External auth callout | Y | N | |
|
||||
| Proxy authentication | Y | N | |
|
||||
| Bearer tokens | Y | N | |
|
||||
| User revocation tracking | Y | N | |
|
||||
|
||||
### Account System
|
||||
| Feature | Go | .NET | Notes |
|
||||
|---------|:--:|:----:|-------|
|
||||
| Per-account SubList isolation | Y | Y | |
|
||||
| Multi-account user resolution | Y | N | .NET has basic account, no resolution |
|
||||
| Account exports/imports | Y | N | |
|
||||
| Per-account connection limits | Y | N | |
|
||||
| Per-account subscription limits | Y | N | |
|
||||
| Account JetStream limits | Y | N | Excluded per scope |
|
||||
|
||||
### Permissions
|
||||
| Feature | Go | .NET | Notes |
|
||||
|---------|:--:|:----:|-------|
|
||||
| Publish allow list | Y | Y | |
|
||||
| Subscribe allow list | Y | Y | |
|
||||
| Publish deny list | Y | Partial | .NET has deny in struct but limited enforcement |
|
||||
| Subscribe deny list | Y | Partial | Same |
|
||||
| Message-level deny filtering | Y | N | Go filters at delivery time |
|
||||
| Permission caching (128 entries) | Y | N | |
|
||||
| Response permissions (reply tracking) | Y | N | Dynamic reply subject authorization |
|
||||
| Permission templates (JWT) | Y | N | e.g., `{{name()}}`, `{{account-tag(...)}}` |
|
||||
|
||||
---
|
||||
|
||||
## 6. Configuration
|
||||
|
||||
### CLI Flags
|
||||
| Flag | Go | .NET | Notes |
|
||||
|------|:--:|:----:|-------|
|
||||
| `-p/--port` | Y | Y | |
|
||||
| `-a/--addr` | Y | Y | |
|
||||
| `-n/--name` (ServerName) | Y | Y | |
|
||||
| `-m/--http_port` (monitoring) | Y | Y | |
|
||||
| `-c` (config file) | Y | Stub | Flag parsed, stored in `ConfigFile`, no config parser |
|
||||
| `-D/-V/-DV` (debug/trace) | Y | N | |
|
||||
| `--tlscert/--tlskey/--tlscacert` | Y | Y | |
|
||||
| `--tlsverify` | Y | Y | |
|
||||
| `--http_base_path` | Y | Y | |
|
||||
| `--https_port` | Y | Y | |
|
||||
|
||||
### Configuration System
|
||||
| Feature | Go | .NET | Notes |
|
||||
|---------|:--:|:----:|-------|
|
||||
| Config file parsing | Y | N | Go has custom `conf` parser with includes |
|
||||
| Hot reload (SIGHUP) | Y | N | |
|
||||
| Config change detection | Y | N | Go tracks `inConfig`/`inCmdLine` origins |
|
||||
| ~450 option fields | Y | ~54 | .NET covers core options only |
|
||||
|
||||
### Missing Options Categories
|
||||
- Logging options (file, rotation, syslog, trace levels)
|
||||
- Advanced limits (MaxSubs, MaxSubTokens, MaxPending, WriteDeadline)
|
||||
- Tags/metadata
|
||||
- OCSP configuration
|
||||
- WebSocket/MQTT options
|
||||
- Operator mode / account resolver
|
||||
|
||||
---
|
||||
|
||||
## 7. Monitoring
|
||||
|
||||
### HTTP Endpoints
|
||||
| Endpoint | Go | .NET | Notes |
|
||||
|----------|:--:|:----:|-------|
|
||||
| `/healthz` | Y | Y | |
|
||||
| `/varz` | Y | Y | |
|
||||
| `/connz` | Y | Y | |
|
||||
| `/` (root listing) | Y | Y | |
|
||||
| `/routez` | Y | Stub | Returns empty response |
|
||||
| `/gatewayz` | Y | Stub | Returns empty response |
|
||||
| `/leafz` | Y | Stub | Returns empty response |
|
||||
| `/subz` / `/subscriptionsz` | Y | Stub | Returns empty response |
|
||||
| `/accountz` | Y | Stub | Returns empty response |
|
||||
| `/accstatz` | Y | Stub | Returns empty response |
|
||||
| `/jsz` | Y | Stub | Returns empty response |
|
||||
|
||||
### Varz Response
|
||||
| Field Category | Go | .NET | Notes |
|
||||
|----------------|:--:|:----:|-------|
|
||||
| Identity (ID, Name, Version) | Y | Y | |
|
||||
| Network (Host, Port, URLs) | Y | Y | |
|
||||
| Security (AuthRequired, TLS) | Y | Y | |
|
||||
| Limits (MaxConn, MaxPayload) | Y | Y | |
|
||||
| Timing (Start, Now, Uptime) | Y | Y | |
|
||||
| Runtime (Mem, CPU, Cores) | Y | Y | |
|
||||
| Connections (current, total) | Y | Y | |
|
||||
| Messages (in/out msgs/bytes) | Y | Y | |
|
||||
| SlowConsumer breakdown | Y | N | Go tracks per connection type |
|
||||
| Cluster/Gateway/Leaf blocks | Y | N | Excluded per scope |
|
||||
| JetStream block | Y | N | Excluded per scope |
|
||||
| TLS cert expiry info | Y | N | |
|
||||
|
||||
### Connz Response
|
||||
| Feature | Go | .NET | Notes |
|
||||
|---------|:--:|:----:|-------|
|
||||
| Filtering by CID, user, account | Y | Partial | |
|
||||
| Sorting (11 options) | Y | Y | .NET missing ByStop, ByReason |
|
||||
| Pagination (offset, limit) | Y | Y | |
|
||||
| Subscription detail mode | Y | N | |
|
||||
| TLS peer certificate info | Y | N | |
|
||||
| JWT/IssuerKey/Tags fields | Y | N | |
|
||||
| MQTT client ID filtering | Y | N | |
|
||||
| Proxy info | Y | N | |
|
||||
|
||||
---
|
||||
|
||||
## 8. TLS
|
||||
|
||||
### TLS Modes
|
||||
| Mode | Go | .NET | Notes |
|
||||
|------|:--:|:----:|-------|
|
||||
| No TLS | Y | Y | |
|
||||
| INFO-first (default NATS) | Y | Y | |
|
||||
| TLS-first (before INFO) | Y | Y | |
|
||||
| Mixed/Fallback | Y | Y | |
|
||||
| TLS-required | Y | Y | |
|
||||
|
||||
### TLS Features
|
||||
| Feature | Go | .NET | Notes |
|
||||
|---------|:--:|:----:|-------|
|
||||
| PEM cert/key loading | Y | Y | |
|
||||
| CA chain validation | Y | Y | |
|
||||
| Mutual TLS (client certs) | Y | Y | |
|
||||
| Certificate pinning (SHA256 SPKI) | Y | Y | |
|
||||
| TLS handshake timeout | Y | Y | |
|
||||
| TLS rate limiting | Y | Property only | .NET has the option but enforcement is partial |
|
||||
| First-byte peeking (0x16 detection) | Y | Y | |
|
||||
| Cert subject→user mapping | Y | N | `TlsMap` property exists, no implementation |
|
||||
| OCSP stapling | Y | N | |
|
||||
| Min TLS version control | Y | Y | |
|
||||
|
||||
---
|
||||
|
||||
## 9. Logging
|
||||
|
||||
| Feature | Go | .NET | Notes |
|
||||
|---------|:--:|:----:|-------|
|
||||
| Structured logging | Partial | Y | .NET uses Serilog with ILogger<T> |
|
||||
| File logging with rotation | Y | N | |
|
||||
| Syslog (local and remote) | Y | N | |
|
||||
| Log reopening (SIGUSR1) | Y | N | |
|
||||
| Trace mode (protocol-level) | Y | N | |
|
||||
| Debug mode | Y | N | |
|
||||
| Per-subsystem log control | Y | N | |
|
||||
| Color output on TTY | Y | N | |
|
||||
| Timestamp format control | Y | N | |
|
||||
|
||||
---
|
||||
|
||||
## 10. Ping/Pong & Keepalive
|
||||
|
||||
| Feature | Go | .NET | Notes |
|
||||
|---------|:--:|:----:|-------|
|
||||
| Server-initiated PING | Y | Y | |
|
||||
| Configurable interval | Y | Y | PingInterval option |
|
||||
| Max pings out | Y | Y | MaxPingsOut option |
|
||||
| Stale connection close | Y | Y | |
|
||||
| RTT-based first PING delay | Y | N | Go delays first PING based on RTT |
|
||||
| RTT tracking | Y | N | |
|
||||
| Stale connection watcher | Y | N | Go has dedicated watcher goroutine |
|
||||
|
||||
---
|
||||
|
||||
## Summary: Critical Gaps for Production Use
|
||||
|
||||
### High Priority
|
||||
1. **Slow consumer detection** — unbounded writes can exhaust memory (stat fields exist but no detection logic)
|
||||
2. **Write coalescing / batch flush** — performance gap for high-throughput scenarios
|
||||
|
||||
### Medium Priority
|
||||
3. **Verbose mode** — clients expect `+OK` when `verbose: true`
|
||||
4. **Permission deny enforcement at delivery** — deny lists checked at SUB/PUB time but not during message delivery
|
||||
5. **Config file parsing** — needed for production deployment (CLI stub exists)
|
||||
6. **Hot reload** — needed for zero-downtime config changes (SIGHUP stub exists)
|
||||
7. **File logging with rotation** — needed for production logging
|
||||
8. **No-responders validation** — flag parsed but not enforced
|
||||
|
||||
### Lower Priority
|
||||
9. **Dynamic buffer sizing** — delegated to Pipe, less optimized for long-lived connections
|
||||
10. **JWT authentication** — needed for operator mode
|
||||
11. **TLS certificate mapping** — property exists, not implemented
|
||||
12. **OCSP support** — certificate revocation checking
|
||||
13. **Subject mapping** — input→output subject transformation
|
||||
14. **Protocol tracing** — no trace-level logging
|
||||
15. **Subscription statistics** — SubList has no stats collection
|
||||
16. **Per-account limits** — connections, subscriptions per account
|
||||
17. **Reply subject tracking** — dynamic response permissions
|
||||
18. **Windows Service integration** — needed for Windows deployment
|
||||
139
docs/plans/2026-02-22-core-lifecycle-design.md
Normal file
139
docs/plans/2026-02-22-core-lifecycle-design.md
Normal file
@@ -0,0 +1,139 @@
|
||||
# Core Server Lifecycle — Design
|
||||
|
||||
Implements all gaps from section 1 of `differences.md` (Core Server Lifecycle).
|
||||
|
||||
Reference: `golang/nats-server/server/server.go`, `client.go`, `signal.go`
|
||||
|
||||
## Components
|
||||
|
||||
### 1. ClosedState Enum & Close Reason Tracking
|
||||
|
||||
New file `src/NATS.Server/ClosedState.cs` — full Go enum (37 values from `client.go:188-228`).
|
||||
|
||||
- `NatsClient` gets `CloseReason` property, `MarkClosed(ClosedState)` method
|
||||
- Close reason set in `RunAsync` finally blocks based on exception type
|
||||
- Error-related reasons (ReadError, WriteError, TLSHandshakeError) skip flush on close
|
||||
- `NatsServer.RemoveClient` logs close reason via structured logging
|
||||
|
||||
### 2. Accept Loop Exponential Backoff
|
||||
|
||||
Port Go's `acceptError` pattern from `server.go:4607-4627`.
|
||||
|
||||
- Constants: `AcceptMinSleep = 10ms`, `AcceptMaxSleep = 1s`
|
||||
- On `SocketException`: sleep `tmpDelay`, double it, cap at 1s
|
||||
- On success: reset to 10ms
|
||||
- During sleep: check `_quitCts` to abort if shutting down
|
||||
- Non-temporary errors break the loop
|
||||
|
||||
### 3. Ephemeral Port (port=0)
|
||||
|
||||
After `_listener.Bind()` + `Listen()`, resolve actual port:
|
||||
|
||||
```csharp
|
||||
if (_options.Port == 0)
|
||||
{
|
||||
var actualPort = ((IPEndPoint)_listener.LocalEndPoint!).Port;
|
||||
_options.Port = actualPort;
|
||||
_serverInfo.Port = actualPort;
|
||||
}
|
||||
```
|
||||
|
||||
Add public `Port` property on `NatsServer` exposing the resolved port.
|
||||
|
||||
### 4. Graceful Shutdown with WaitForShutdown
|
||||
|
||||
New fields on `NatsServer`:
|
||||
- `_shutdown` (volatile bool)
|
||||
- `_shutdownComplete` (TaskCompletionSource)
|
||||
- `_quitCts` (CancellationTokenSource) — internal shutdown signal
|
||||
|
||||
`ShutdownAsync()` sequence:
|
||||
1. Guard: if already shutting down, return
|
||||
2. Set `_shutdown = true`, cancel `_quitCts`
|
||||
3. Close `_listener` (stops accept loop)
|
||||
4. Close all client connections with `ServerShutdown` reason
|
||||
5. Wait for active client tasks to drain
|
||||
6. Stop monitor server
|
||||
7. Signal `_shutdownComplete`
|
||||
|
||||
`WaitForShutdown()`: blocks on `_shutdownComplete.Task`.
|
||||
|
||||
`Dispose()`: calls `ShutdownAsync` synchronously if not already shut down.
|
||||
|
||||
### 5. Task Tracking
|
||||
|
||||
Track active client tasks for clean shutdown:
|
||||
- `_activeClientCount` (int, Interlocked)
|
||||
- `_allClientsExited` (TaskCompletionSource, signaled when count hits 0 during shutdown)
|
||||
- Increment in `AcceptClientAsync`, decrement in `RunClientAsync` finally block
|
||||
- `ShutdownAsync` waits on `_allClientsExited` with timeout
|
||||
|
||||
### 6. Flush Pending Data Before Close
|
||||
|
||||
`NatsClient.FlushAndCloseAsync(bool minimalFlush)`:
|
||||
- If not skip-flush reason: flush stream with 100ms write deadline
|
||||
- Close socket
|
||||
|
||||
`MarkClosed(ClosedState)` sets skip-flush flag for: ReadError, WriteError, SlowConsumerPendingBytes, SlowConsumerWriteDeadline, TLSHandshakeError.
|
||||
|
||||
### 7. Lame Duck Mode
|
||||
|
||||
New options: `LameDuckDuration` (default 2min), `LameDuckGracePeriod` (default 10s).
|
||||
|
||||
`LameDuckShutdownAsync()`:
|
||||
1. Set `_lameDuckMode = true`
|
||||
2. Close listener (stop new connections)
|
||||
3. Wait `LameDuckGracePeriod` (10s default) for clients to drain naturally
|
||||
4. Stagger-close remaining clients over `LameDuckDuration - GracePeriod`
|
||||
- Sleep interval = remaining duration / client count (min 1ms, max 1s)
|
||||
- Randomize slightly to avoid reconnect storms
|
||||
5. Call `ShutdownAsync()` for final cleanup
|
||||
|
||||
Accept loop: on error, if `_lameDuckMode`, exit cleanly.
|
||||
|
||||
### 8. PID File & Ports File
|
||||
|
||||
New options: `PidFile` (string?), `PortsFileDir` (string?).
|
||||
|
||||
PID file: `File.WriteAllText(pidFile, Process.GetCurrentProcess().Id.ToString())`
|
||||
Ports file: JSON with `{ "client": port, "monitor": monitorPort }` written to `{dir}/{exe}_{pid}.ports`
|
||||
|
||||
Written at startup, deleted at shutdown.
|
||||
|
||||
### 9. Signal Handling
|
||||
|
||||
In `Program.cs`, use `PosixSignalRegistration` (.NET 6+):
|
||||
|
||||
- `SIGTERM` → `server.ShutdownAsync()` then exit
|
||||
- `SIGUSR2` → `server.LameDuckShutdownAsync()`
|
||||
- `SIGUSR1` → log "log reopen not yet supported"
|
||||
- `SIGHUP` → log "config reload not yet supported"
|
||||
|
||||
Keep existing Ctrl+C handler (SIGINT).
|
||||
|
||||
### 10. Server Identity NKey (Stub)
|
||||
|
||||
Generate Ed25519 key pair at construction. Store as `ServerNKey` (public) and `_serverSeed` (private). Not used in protocol yet — placeholder for future cluster identity.
|
||||
|
||||
### 11. System Account (Stub)
|
||||
|
||||
Create `$SYS` account in `_accounts` at construction. Expose as `SystemAccount` property. No internal subscriptions yet.
|
||||
|
||||
### 12. Config File & Profiling (Stubs)
|
||||
|
||||
- `NatsOptions.ConfigFile` — if set, log warning "config file parsing not yet supported"
|
||||
- `NatsOptions.ProfPort` — if set, log warning "profiling endpoint not yet supported"
|
||||
- `Program.cs`: add `-c` CLI flag
|
||||
|
||||
## Testing
|
||||
|
||||
- Accept loop backoff: mock socket that throws N times, verify delays
|
||||
- Ephemeral port: start server with port=0, verify resolved port > 0
|
||||
- Graceful shutdown: start server, connect clients, call ShutdownAsync, verify all disconnected
|
||||
- WaitForShutdown: verify it blocks until shutdown completes
|
||||
- Close reason tracking: verify correct ClosedState for auth timeout, max connections, stale connection
|
||||
- Lame duck mode: start server, connect clients, trigger lame duck, verify staggered closure
|
||||
- PID file: start server with PidFile option, verify file contents, verify deleted on shutdown
|
||||
- Ports file: start server with PortsFileDir, verify JSON contents
|
||||
- Flush before close: verify data is flushed before socket close during shutdown
|
||||
- System account: verify $SYS account exists after construction
|
||||
1631
docs/plans/2026-02-22-core-lifecycle-plan.md
Normal file
1631
docs/plans/2026-02-22-core-lifecycle-plan.md
Normal file
File diff suppressed because it is too large
Load Diff
18
docs/plans/2026-02-22-core-lifecycle-plan.md.tasks.json
Normal file
18
docs/plans/2026-02-22-core-lifecycle-plan.md.tasks.json
Normal file
@@ -0,0 +1,18 @@
|
||||
{
|
||||
"planPath": "docs/plans/2026-02-22-core-lifecycle-plan.md",
|
||||
"tasks": [
|
||||
{"id": 5, "subject": "Task 0: Create ClosedState enum", "status": "pending"},
|
||||
{"id": 6, "subject": "Task 1: Add close reason tracking to NatsClient", "status": "pending", "blockedBy": [5]},
|
||||
{"id": 7, "subject": "Task 2: Add NatsOptions for lifecycle features", "status": "pending"},
|
||||
{"id": 8, "subject": "Task 3: Ephemeral port support", "status": "pending"},
|
||||
{"id": 9, "subject": "Task 4: Graceful shutdown infrastructure", "status": "pending", "blockedBy": [5, 6, 7, 8]},
|
||||
{"id": 10, "subject": "Task 5: Flush pending data before close", "status": "pending", "blockedBy": [9]},
|
||||
{"id": 11, "subject": "Task 6: Lame duck mode", "status": "pending", "blockedBy": [9]},
|
||||
{"id": 12, "subject": "Task 7: PID file and ports file", "status": "pending", "blockedBy": [9]},
|
||||
{"id": 13, "subject": "Task 8: System account and NKey identity stubs", "status": "pending"},
|
||||
{"id": 14, "subject": "Task 9: Signal handling and CLI stubs", "status": "pending", "blockedBy": [11]},
|
||||
{"id": 15, "subject": "Task 10: Update differences.md", "status": "pending", "blockedBy": [14]},
|
||||
{"id": 16, "subject": "Task 11: Final verification", "status": "pending", "blockedBy": [15]}
|
||||
],
|
||||
"lastUpdated": "2026-02-22T00:00:00Z"
|
||||
}
|
||||
51
src/NATS.Server/ClientClosedReason.cs
Normal file
51
src/NATS.Server/ClientClosedReason.cs
Normal file
@@ -0,0 +1,51 @@
|
||||
namespace NATS.Server;
|
||||
|
||||
/// <summary>
|
||||
/// Reason a client connection was closed.
|
||||
/// Corresponds to Go server/client.go ClosedState (subset for single-server scope).
|
||||
/// </summary>
|
||||
public enum ClientClosedReason
|
||||
{
|
||||
None = 0,
|
||||
ClientClosed,
|
||||
AuthenticationTimeout,
|
||||
AuthenticationViolation,
|
||||
TlsHandshakeError,
|
||||
SlowConsumerPendingBytes,
|
||||
SlowConsumerWriteDeadline,
|
||||
WriteError,
|
||||
ReadError,
|
||||
ParseError,
|
||||
StaleConnection,
|
||||
ProtocolViolation,
|
||||
MaxPayloadExceeded,
|
||||
MaxSubscriptionsExceeded,
|
||||
ServerShutdown,
|
||||
MsgHeaderViolation,
|
||||
NoRespondersRequiresHeaders,
|
||||
}
|
||||
|
||||
public static class ClientClosedReasonExtensions
|
||||
{
|
||||
public static string ToReasonString(this ClientClosedReason reason) => reason switch
|
||||
{
|
||||
ClientClosedReason.None => "",
|
||||
ClientClosedReason.ClientClosed => "Client Closed",
|
||||
ClientClosedReason.AuthenticationTimeout => "Authentication Timeout",
|
||||
ClientClosedReason.AuthenticationViolation => "Authorization Violation",
|
||||
ClientClosedReason.TlsHandshakeError => "TLS Handshake Error",
|
||||
ClientClosedReason.SlowConsumerPendingBytes => "Slow Consumer (Pending Bytes)",
|
||||
ClientClosedReason.SlowConsumerWriteDeadline => "Slow Consumer (Write Deadline)",
|
||||
ClientClosedReason.WriteError => "Write Error",
|
||||
ClientClosedReason.ReadError => "Read Error",
|
||||
ClientClosedReason.ParseError => "Parse Error",
|
||||
ClientClosedReason.StaleConnection => "Stale Connection",
|
||||
ClientClosedReason.ProtocolViolation => "Protocol Violation",
|
||||
ClientClosedReason.MaxPayloadExceeded => "Maximum Payload Exceeded",
|
||||
ClientClosedReason.MaxSubscriptionsExceeded => "Maximum Subscriptions Exceeded",
|
||||
ClientClosedReason.ServerShutdown => "Server Shutdown",
|
||||
ClientClosedReason.MsgHeaderViolation => "Message Header Violation",
|
||||
ClientClosedReason.NoRespondersRequiresHeaders => "No Responders Requires Headers",
|
||||
_ => reason.ToString(),
|
||||
};
|
||||
}
|
||||
41
src/NATS.Server/ClientFlags.cs
Normal file
41
src/NATS.Server/ClientFlags.cs
Normal file
@@ -0,0 +1,41 @@
|
||||
namespace NATS.Server;
|
||||
|
||||
/// <summary>
|
||||
/// Connection state flags tracked per client.
|
||||
/// Corresponds to Go server/client.go clientFlag bitfield.
|
||||
/// Thread-safe via Interlocked operations on the backing int.
|
||||
/// </summary>
|
||||
[Flags]
|
||||
public enum ClientFlags
|
||||
{
|
||||
ConnectReceived = 1 << 0,
|
||||
FirstPongSent = 1 << 1,
|
||||
HandshakeComplete = 1 << 2,
|
||||
CloseConnection = 1 << 3,
|
||||
WriteLoopStarted = 1 << 4,
|
||||
IsSlowConsumer = 1 << 5,
|
||||
ConnectProcessFinished = 1 << 6,
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Thread-safe holder for client flags using Interlocked operations.
|
||||
/// </summary>
|
||||
public sealed class ClientFlagHolder
|
||||
{
|
||||
private int _flags;
|
||||
|
||||
public void SetFlag(ClientFlags flag)
|
||||
{
|
||||
Interlocked.Or(ref _flags, (int)flag);
|
||||
}
|
||||
|
||||
public void ClearFlag(ClientFlags flag)
|
||||
{
|
||||
Interlocked.And(ref _flags, ~(int)flag);
|
||||
}
|
||||
|
||||
public bool HasFlag(ClientFlags flag)
|
||||
{
|
||||
return (Volatile.Read(ref _flags) & (int)flag) != 0;
|
||||
}
|
||||
}
|
||||
@@ -69,6 +69,8 @@ public sealed class ConnzHandler(NatsServer server)
|
||||
Name = client.ClientOpts?.Name ?? "",
|
||||
Lang = client.ClientOpts?.Lang ?? "",
|
||||
Version = client.ClientOpts?.Version ?? "",
|
||||
Pending = (int)client.PendingBytes,
|
||||
Reason = client.CloseReason.ToReasonString(),
|
||||
TlsVersion = client.TlsState?.TlsVersion ?? "",
|
||||
TlsCipherSuite = client.TlsState?.CipherSuite ?? "",
|
||||
};
|
||||
|
||||
@@ -66,6 +66,8 @@ public sealed class VarzHandler : IDisposable
|
||||
MaxConnections = _options.MaxConnections,
|
||||
MaxPayload = _options.MaxPayload,
|
||||
MaxControlLine = _options.MaxControlLine,
|
||||
MaxPending = _options.MaxPending,
|
||||
WriteDeadline = (long)_options.WriteDeadline.TotalNanoseconds,
|
||||
MaxPingsOut = _options.MaxPingsOut,
|
||||
PingInterval = (long)_options.PingInterval.TotalNanoseconds,
|
||||
Start = _server.StartTime,
|
||||
|
||||
@@ -5,6 +5,7 @@ using System.Net.Sockets;
|
||||
using System.Security.Cryptography;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using System.Threading.Channels;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using NATS.Server.Auth;
|
||||
using NATS.Server.Protocol;
|
||||
@@ -34,7 +35,9 @@ public sealed class NatsClient : IDisposable
|
||||
private readonly AuthService _authService;
|
||||
private readonly byte[]? _nonce;
|
||||
private readonly NatsParser _parser;
|
||||
private readonly SemaphoreSlim _writeLock = new(1, 1);
|
||||
private readonly Channel<ReadOnlyMemory<byte>> _outbound = Channel.CreateBounded<ReadOnlyMemory<byte>>(
|
||||
new BoundedChannelOptions(8192) { SingleReader = true, FullMode = BoundedChannelFullMode.Wait });
|
||||
private long _pendingBytes;
|
||||
private CancellationTokenSource? _clientCts;
|
||||
private readonly Dictionary<string, Subscription> _subs = new();
|
||||
private readonly ILogger _logger;
|
||||
@@ -46,9 +49,9 @@ public sealed class NatsClient : IDisposable
|
||||
public IMessageRouter? Router { get; set; }
|
||||
public Account? Account { get; private set; }
|
||||
|
||||
// Thread-safe: read from auth timeout task on threadpool, written from command pipeline
|
||||
private int _connectReceived;
|
||||
public bool ConnectReceived => Volatile.Read(ref _connectReceived) != 0;
|
||||
private readonly ClientFlagHolder _flags = new();
|
||||
public bool ConnectReceived => _flags.HasFlag(ClientFlags.ConnectReceived);
|
||||
public ClientClosedReason CloseReason { get; private set; }
|
||||
|
||||
public DateTime StartTime { get; }
|
||||
private long _lastActivityTicks;
|
||||
@@ -93,6 +96,37 @@ public sealed class NatsClient : IDisposable
|
||||
}
|
||||
}
|
||||
|
||||
public bool QueueOutbound(ReadOnlyMemory<byte> data)
|
||||
{
|
||||
if (_flags.HasFlag(ClientFlags.CloseConnection))
|
||||
return false;
|
||||
|
||||
var pending = Interlocked.Add(ref _pendingBytes, data.Length);
|
||||
if (pending > _options.MaxPending)
|
||||
{
|
||||
Interlocked.Add(ref _pendingBytes, -data.Length);
|
||||
_flags.SetFlag(ClientFlags.IsSlowConsumer);
|
||||
Interlocked.Increment(ref _serverStats.SlowConsumers);
|
||||
Interlocked.Increment(ref _serverStats.SlowConsumerClients);
|
||||
_ = CloseWithReasonAsync(ClientClosedReason.SlowConsumerPendingBytes, NatsProtocol.ErrSlowConsumer);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!_outbound.Writer.TryWrite(data))
|
||||
{
|
||||
Interlocked.Add(ref _pendingBytes, -data.Length);
|
||||
_flags.SetFlag(ClientFlags.IsSlowConsumer);
|
||||
Interlocked.Increment(ref _serverStats.SlowConsumers);
|
||||
Interlocked.Increment(ref _serverStats.SlowConsumerClients);
|
||||
_ = CloseWithReasonAsync(ClientClosedReason.SlowConsumerPendingBytes, NatsProtocol.ErrSlowConsumer);
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
public long PendingBytes => Interlocked.Read(ref _pendingBytes);
|
||||
|
||||
public async Task RunAsync(CancellationToken ct)
|
||||
{
|
||||
_clientCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
||||
@@ -102,7 +136,7 @@ public sealed class NatsClient : IDisposable
|
||||
{
|
||||
// Send INFO (skip if already sent during TLS negotiation)
|
||||
if (!InfoAlreadySent)
|
||||
await SendInfoAsync(_clientCts.Token);
|
||||
SendInfo();
|
||||
|
||||
// Start auth timeout if auth is required
|
||||
Task? authTimeoutTask = null;
|
||||
@@ -116,7 +150,7 @@ public sealed class NatsClient : IDisposable
|
||||
if (!ConnectReceived)
|
||||
{
|
||||
_logger.LogDebug("Client {ClientId} auth timeout", Id);
|
||||
await SendErrAndCloseAsync(NatsProtocol.ErrAuthTimeout);
|
||||
await SendErrAndCloseAsync(NatsProtocol.ErrAuthTimeout, ClientClosedReason.AuthenticationTimeout);
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
@@ -126,15 +160,16 @@ public sealed class NatsClient : IDisposable
|
||||
}, _clientCts.Token);
|
||||
}
|
||||
|
||||
// Start read pump, command processing, and ping timer in parallel
|
||||
// Start read pump, command processing, write loop, and ping timer in parallel
|
||||
var fillTask = FillPipeAsync(pipe.Writer, _clientCts.Token);
|
||||
var processTask = ProcessCommandsAsync(pipe.Reader, _clientCts.Token);
|
||||
var pingTask = RunPingTimerAsync(_clientCts.Token);
|
||||
var writeTask = RunWriteLoopAsync(_clientCts.Token);
|
||||
|
||||
if (authTimeoutTask != null)
|
||||
await Task.WhenAny(fillTask, processTask, pingTask, authTimeoutTask);
|
||||
await Task.WhenAny(fillTask, processTask, pingTask, writeTask, authTimeoutTask);
|
||||
else
|
||||
await Task.WhenAny(fillTask, processTask, pingTask);
|
||||
await Task.WhenAny(fillTask, processTask, pingTask, writeTask);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
@@ -146,6 +181,7 @@ public sealed class NatsClient : IDisposable
|
||||
}
|
||||
finally
|
||||
{
|
||||
_outbound.Writer.TryComplete();
|
||||
try { _socket.Shutdown(SocketShutdown.Both); }
|
||||
catch (SocketException) { }
|
||||
catch (ObjectDisposedException) { }
|
||||
@@ -185,11 +221,40 @@ public sealed class NatsClient : IDisposable
|
||||
var result = await reader.ReadAsync(ct);
|
||||
var buffer = result.Buffer;
|
||||
|
||||
long localInMsgs = 0;
|
||||
long localInBytes = 0;
|
||||
|
||||
while (_parser.TryParse(ref buffer, out var cmd))
|
||||
{
|
||||
Interlocked.Exchange(ref _lastIn, Environment.TickCount64);
|
||||
|
||||
// Handle Pub/HPub inline to allow ref parameter passing for stat batching.
|
||||
// DispatchCommandAsync is async and cannot accept ref parameters.
|
||||
if (cmd.Type is CommandType.Pub or CommandType.HPub
|
||||
&& (!_authService.IsAuthRequired || ConnectReceived))
|
||||
{
|
||||
Interlocked.Exchange(ref _lastActivityTicks, DateTime.UtcNow.Ticks);
|
||||
ProcessPub(cmd, ref localInMsgs, ref localInBytes);
|
||||
if (ClientOpts?.Verbose == true)
|
||||
WriteProtocol(NatsProtocol.OkBytes);
|
||||
}
|
||||
else
|
||||
{
|
||||
await DispatchCommandAsync(cmd, ct);
|
||||
}
|
||||
}
|
||||
|
||||
if (localInMsgs > 0)
|
||||
{
|
||||
Interlocked.Add(ref InMsgs, localInMsgs);
|
||||
Interlocked.Add(ref _serverStats.InMsgs, localInMsgs);
|
||||
}
|
||||
|
||||
if (localInBytes > 0)
|
||||
{
|
||||
Interlocked.Add(ref InBytes, localInBytes);
|
||||
Interlocked.Add(ref _serverStats.InBytes, localInBytes);
|
||||
}
|
||||
|
||||
reader.AdvanceTo(buffer.Start, buffer.End);
|
||||
|
||||
@@ -217,7 +282,7 @@ public sealed class NatsClient : IDisposable
|
||||
await ProcessConnectAsync(cmd);
|
||||
return;
|
||||
case CommandType.Ping:
|
||||
await WriteAsync(NatsProtocol.PongBytes, ct);
|
||||
WriteProtocol(NatsProtocol.PongBytes);
|
||||
return;
|
||||
default:
|
||||
// Ignore all other commands until authenticated
|
||||
@@ -229,10 +294,14 @@ public sealed class NatsClient : IDisposable
|
||||
{
|
||||
case CommandType.Connect:
|
||||
await ProcessConnectAsync(cmd);
|
||||
if (ClientOpts?.Verbose == true)
|
||||
WriteProtocol(NatsProtocol.OkBytes);
|
||||
break;
|
||||
|
||||
case CommandType.Ping:
|
||||
await WriteAsync(NatsProtocol.PongBytes, ct);
|
||||
WriteProtocol(NatsProtocol.PongBytes);
|
||||
if (ClientOpts?.Verbose == true)
|
||||
WriteProtocol(NatsProtocol.OkBytes);
|
||||
break;
|
||||
|
||||
case CommandType.Pong:
|
||||
@@ -240,16 +309,20 @@ public sealed class NatsClient : IDisposable
|
||||
break;
|
||||
|
||||
case CommandType.Sub:
|
||||
await ProcessSubAsync(cmd);
|
||||
ProcessSub(cmd);
|
||||
if (ClientOpts?.Verbose == true)
|
||||
WriteProtocol(NatsProtocol.OkBytes);
|
||||
break;
|
||||
|
||||
case CommandType.Unsub:
|
||||
ProcessUnsub(cmd);
|
||||
if (ClientOpts?.Verbose == true)
|
||||
WriteProtocol(NatsProtocol.OkBytes);
|
||||
break;
|
||||
|
||||
case CommandType.Pub:
|
||||
case CommandType.HPub:
|
||||
await ProcessPubAsync(cmd);
|
||||
// Pub/HPub is handled inline in ProcessCommandsAsync for stat batching
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -272,7 +345,7 @@ public sealed class NatsClient : IDisposable
|
||||
if (result == null)
|
||||
{
|
||||
_logger.LogWarning("Client {ClientId} authentication failed", Id);
|
||||
await SendErrAndCloseAsync(NatsProtocol.ErrAuthorizationViolation);
|
||||
await SendErrAndCloseAsync(NatsProtocol.ErrAuthorizationViolation, ClientClosedReason.AuthenticationViolation);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -301,17 +374,27 @@ public sealed class NatsClient : IDisposable
|
||||
Account.AddClient(Id);
|
||||
}
|
||||
|
||||
Volatile.Write(ref _connectReceived, 1);
|
||||
// Validate no_responders requires headers
|
||||
if (ClientOpts.NoResponders && !ClientOpts.Headers)
|
||||
{
|
||||
_logger.LogDebug("Client {ClientId} no_responders requires headers", Id);
|
||||
await CloseWithReasonAsync(ClientClosedReason.NoRespondersRequiresHeaders,
|
||||
NatsProtocol.ErrNoRespondersRequiresHeaders);
|
||||
return;
|
||||
}
|
||||
|
||||
_flags.SetFlag(ClientFlags.ConnectReceived);
|
||||
_flags.SetFlag(ClientFlags.ConnectProcessFinished);
|
||||
_logger.LogDebug("CONNECT received from client {ClientId}, name={ClientName}", Id, ClientOpts?.Name);
|
||||
}
|
||||
|
||||
private async ValueTask ProcessSubAsync(ParsedCommand cmd)
|
||||
private void ProcessSub(ParsedCommand cmd)
|
||||
{
|
||||
// Permission check for subscribe
|
||||
if (_permissions != null && !_permissions.IsSubscribeAllowed(cmd.Subject!, cmd.Queue))
|
||||
{
|
||||
_logger.LogDebug("Client {ClientId} subscribe permission denied for {Subject}", Id, cmd.Subject);
|
||||
await SendErrAsync(NatsProtocol.ErrPermissionsSubscribe);
|
||||
SendErr(NatsProtocol.ErrPermissionsSubscribe);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -349,19 +432,17 @@ public sealed class NatsClient : IDisposable
|
||||
Account?.SubList.Remove(sub);
|
||||
}
|
||||
|
||||
private async ValueTask ProcessPubAsync(ParsedCommand cmd)
|
||||
private void ProcessPub(ParsedCommand cmd, ref long localInMsgs, ref long localInBytes)
|
||||
{
|
||||
Interlocked.Increment(ref InMsgs);
|
||||
Interlocked.Add(ref InBytes, cmd.Payload.Length);
|
||||
Interlocked.Increment(ref _serverStats.InMsgs);
|
||||
Interlocked.Add(ref _serverStats.InBytes, cmd.Payload.Length);
|
||||
localInMsgs++;
|
||||
localInBytes += cmd.Payload.Length;
|
||||
|
||||
// Max payload validation (always, hard close)
|
||||
if (cmd.Payload.Length > _options.MaxPayload)
|
||||
{
|
||||
_logger.LogWarning("Client {ClientId} exceeded max payload: {Size} > {MaxPayload}",
|
||||
Id, cmd.Payload.Length, _options.MaxPayload);
|
||||
await SendErrAndCloseAsync(NatsProtocol.ErrMaxPayloadViolation);
|
||||
_ = SendErrAndCloseAsync(NatsProtocol.ErrMaxPayloadViolation, ClientClosedReason.MaxPayloadExceeded);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -369,7 +450,7 @@ public sealed class NatsClient : IDisposable
|
||||
if (ClientOpts?.Pedantic == true && !SubjectMatch.IsValidPublishSubject(cmd.Subject!))
|
||||
{
|
||||
_logger.LogDebug("Client {ClientId} invalid publish subject: {Subject}", Id, cmd.Subject);
|
||||
await SendErrAsync(NatsProtocol.ErrInvalidPublishSubject);
|
||||
SendErr(NatsProtocol.ErrInvalidPublishSubject);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -377,7 +458,7 @@ public sealed class NatsClient : IDisposable
|
||||
if (_permissions != null && !_permissions.IsPublishAllowed(cmd.Subject!))
|
||||
{
|
||||
_logger.LogDebug("Client {ClientId} publish permission denied for {Subject}", Id, cmd.Subject);
|
||||
await SendErrAsync(NatsProtocol.ErrPermissionsPublish);
|
||||
SendErr(NatsProtocol.ErrPermissionsPublish);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -393,15 +474,15 @@ public sealed class NatsClient : IDisposable
|
||||
Router?.ProcessMessage(cmd.Subject!, cmd.ReplyTo, headers, payload, this);
|
||||
}
|
||||
|
||||
private async Task SendInfoAsync(CancellationToken ct)
|
||||
private void SendInfo()
|
||||
{
|
||||
var infoJson = JsonSerializer.Serialize(_serverInfo);
|
||||
var infoLine = Encoding.ASCII.GetBytes($"INFO {infoJson}\r\n");
|
||||
await WriteAsync(infoLine, ct);
|
||||
QueueOutbound(infoLine);
|
||||
}
|
||||
|
||||
public async Task SendMessageAsync(string subject, string sid, string? replyTo,
|
||||
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload, CancellationToken ct)
|
||||
public void SendMessage(string subject, string sid, string? replyTo,
|
||||
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
|
||||
{
|
||||
Interlocked.Increment(ref OutMsgs);
|
||||
Interlocked.Add(ref OutBytes, payload.Length + headers.Length);
|
||||
@@ -419,61 +500,89 @@ public sealed class NatsClient : IDisposable
|
||||
line = Encoding.ASCII.GetBytes($"MSG {subject} {sid} {(replyTo != null ? replyTo + " " : "")}{payload.Length}\r\n");
|
||||
}
|
||||
|
||||
await _writeLock.WaitAsync(ct);
|
||||
try
|
||||
{
|
||||
await _stream.WriteAsync(line, ct);
|
||||
if (headers.Length > 0)
|
||||
await _stream.WriteAsync(headers, ct);
|
||||
if (payload.Length > 0)
|
||||
await _stream.WriteAsync(payload, ct);
|
||||
await _stream.WriteAsync(NatsProtocol.CrLf, ct);
|
||||
await _stream.FlushAsync(ct);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_writeLock.Release();
|
||||
}
|
||||
var totalLen = line.Length + headers.Length + payload.Length + NatsProtocol.CrLf.Length;
|
||||
var msg = new byte[totalLen];
|
||||
var offset = 0;
|
||||
line.CopyTo(msg.AsSpan(offset)); offset += line.Length;
|
||||
if (headers.Length > 0) { headers.Span.CopyTo(msg.AsSpan(offset)); offset += headers.Length; }
|
||||
if (payload.Length > 0) { payload.Span.CopyTo(msg.AsSpan(offset)); offset += payload.Length; }
|
||||
NatsProtocol.CrLf.CopyTo(msg.AsSpan(offset));
|
||||
|
||||
QueueOutbound(msg);
|
||||
}
|
||||
|
||||
private async Task WriteAsync(byte[] data, CancellationToken ct)
|
||||
private void WriteProtocol(byte[] data)
|
||||
{
|
||||
await _writeLock.WaitAsync(ct);
|
||||
try
|
||||
{
|
||||
await _stream.WriteAsync(data, ct);
|
||||
await _stream.FlushAsync(ct);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_writeLock.Release();
|
||||
}
|
||||
QueueOutbound(data);
|
||||
}
|
||||
|
||||
public async Task SendErrAsync(string message)
|
||||
public void SendErr(string message)
|
||||
{
|
||||
var errLine = Encoding.ASCII.GetBytes($"-ERR '{message}'\r\n");
|
||||
QueueOutbound(errLine);
|
||||
}
|
||||
|
||||
private async Task RunWriteLoopAsync(CancellationToken ct)
|
||||
{
|
||||
_flags.SetFlag(ClientFlags.WriteLoopStarted);
|
||||
var reader = _outbound.Reader;
|
||||
try
|
||||
{
|
||||
await WriteAsync(errLine, _clientCts?.Token ?? CancellationToken.None);
|
||||
while (await reader.WaitToReadAsync(ct))
|
||||
{
|
||||
long batchBytes = 0;
|
||||
while (reader.TryRead(out var data))
|
||||
{
|
||||
await _stream.WriteAsync(data, ct);
|
||||
batchBytes += data.Length;
|
||||
}
|
||||
|
||||
using var flushCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
||||
flushCts.CancelAfter(_options.WriteDeadline);
|
||||
try
|
||||
{
|
||||
await _stream.FlushAsync(flushCts.Token);
|
||||
}
|
||||
catch (OperationCanceledException) when (!ct.IsCancellationRequested)
|
||||
{
|
||||
_flags.SetFlag(ClientFlags.IsSlowConsumer);
|
||||
Interlocked.Increment(ref _serverStats.SlowConsumers);
|
||||
Interlocked.Increment(ref _serverStats.SlowConsumerClients);
|
||||
await CloseWithReasonAsync(ClientClosedReason.SlowConsumerWriteDeadline, NatsProtocol.ErrSlowConsumer);
|
||||
return;
|
||||
}
|
||||
|
||||
Interlocked.Add(ref _pendingBytes, -batchBytes);
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// Expected during shutdown
|
||||
// Normal shutdown
|
||||
}
|
||||
catch (IOException ex)
|
||||
catch (IOException)
|
||||
{
|
||||
_logger.LogDebug(ex, "Client {ClientId} failed to send -ERR", Id);
|
||||
}
|
||||
catch (ObjectDisposedException ex)
|
||||
{
|
||||
_logger.LogDebug(ex, "Client {ClientId} failed to send -ERR (disposed)", Id);
|
||||
await CloseWithReasonAsync(ClientClosedReason.WriteError);
|
||||
}
|
||||
}
|
||||
|
||||
public async Task SendErrAndCloseAsync(string message)
|
||||
public async Task SendErrAndCloseAsync(string message, ClientClosedReason reason = ClientClosedReason.ProtocolViolation)
|
||||
{
|
||||
await SendErrAsync(message);
|
||||
await CloseWithReasonAsync(reason, message);
|
||||
}
|
||||
|
||||
private async Task CloseWithReasonAsync(ClientClosedReason reason, string? errMessage = null)
|
||||
{
|
||||
CloseReason = reason;
|
||||
if (errMessage != null)
|
||||
SendErr(errMessage);
|
||||
_flags.SetFlag(ClientFlags.CloseConnection);
|
||||
|
||||
// Complete the outbound channel so the write loop drains remaining data
|
||||
_outbound.Writer.TryComplete();
|
||||
|
||||
// Give the write loop a short window to flush the final batch before canceling
|
||||
await Task.Delay(50);
|
||||
|
||||
if (_clientCts is { } cts)
|
||||
await cts.CancelAsync();
|
||||
else
|
||||
@@ -498,22 +607,14 @@ public sealed class NatsClient : IDisposable
|
||||
if (Volatile.Read(ref _pingsOut) + 1 > _options.MaxPingsOut)
|
||||
{
|
||||
_logger.LogDebug("Client {ClientId} stale connection -- closing", Id);
|
||||
await SendErrAndCloseAsync(NatsProtocol.ErrStaleConnection);
|
||||
await SendErrAndCloseAsync(NatsProtocol.ErrStaleConnection, ClientClosedReason.StaleConnection);
|
||||
return;
|
||||
}
|
||||
|
||||
var currentPingsOut = Interlocked.Increment(ref _pingsOut);
|
||||
_logger.LogDebug("Client {ClientId} sending PING ({PingsOut}/{MaxPingsOut})",
|
||||
Id, currentPingsOut, _options.MaxPingsOut);
|
||||
try
|
||||
{
|
||||
await WriteAsync(NatsProtocol.PingBytes, ct);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogDebug(ex, "Client {ClientId} failed to send PING", Id);
|
||||
return;
|
||||
}
|
||||
WriteProtocol(NatsProtocol.PingBytes);
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
@@ -532,9 +633,9 @@ public sealed class NatsClient : IDisposable
|
||||
public void Dispose()
|
||||
{
|
||||
_permissions?.Dispose();
|
||||
_outbound.Writer.TryComplete();
|
||||
_clientCts?.Dispose();
|
||||
_stream.Dispose();
|
||||
_socket.Dispose();
|
||||
_writeLock.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,8 @@ public sealed class NatsOptions
|
||||
public int MaxPayload { get; set; } = 1024 * 1024;
|
||||
public int MaxControlLine { get; set; } = 4096;
|
||||
public int MaxConnections { get; set; } = 65536;
|
||||
public long MaxPending { get; set; } = 64 * 1024 * 1024; // 64MB, matching Go MAX_PENDING_SIZE
|
||||
public TimeSpan WriteDeadline { get; set; } = TimeSpan.FromSeconds(10);
|
||||
public TimeSpan PingInterval { get; set; } = TimeSpan.FromMinutes(2);
|
||||
public int MaxPingsOut { get; set; } = 2;
|
||||
|
||||
|
||||
@@ -227,6 +227,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
{
|
||||
var subList = sender.Account?.SubList ?? _globalAccount.SubList;
|
||||
var result = subList.Match(subject);
|
||||
var delivered = false;
|
||||
|
||||
// Deliver to plain subscribers
|
||||
foreach (var sub in result.PlainSubs)
|
||||
@@ -235,6 +236,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
continue;
|
||||
|
||||
DeliverMessage(sub, subject, replyTo, headers, payload);
|
||||
delivered = true;
|
||||
}
|
||||
|
||||
// Deliver to one member of each queue group (round-robin)
|
||||
@@ -244,7 +246,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
|
||||
// Simple round-robin -- pick based on total delivered across group
|
||||
var idx = Math.Abs((int)Interlocked.Increment(ref sender.OutMsgs)) % queueGroup.Length;
|
||||
// Undo the OutMsgs increment -- it will be incremented properly in SendMessageAsync
|
||||
// Undo the OutMsgs increment -- it will be incremented properly in SendMessage
|
||||
Interlocked.Decrement(ref sender.OutMsgs);
|
||||
|
||||
for (int attempt = 0; attempt < queueGroup.Length; attempt++)
|
||||
@@ -253,10 +255,18 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
if (sub.Client != null && (sub.Client != sender || (sender.ClientOpts?.Echo ?? true)))
|
||||
{
|
||||
DeliverMessage(sub, subject, replyTo, headers, payload);
|
||||
delivered = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// No-responders: if nobody received the message and the publisher
|
||||
// opted in, send back a 503 status HMSG on the reply subject.
|
||||
if (!delivered && replyTo != null && sender.ClientOpts?.NoResponders == true)
|
||||
{
|
||||
SendNoResponders(sender, replyTo);
|
||||
}
|
||||
}
|
||||
|
||||
private static void DeliverMessage(Subscription sub, string subject, string? replyTo,
|
||||
@@ -270,8 +280,37 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
if (sub.MaxMessages > 0 && count > sub.MaxMessages)
|
||||
return;
|
||||
|
||||
// Fire and forget -- deliver asynchronously
|
||||
_ = client.SendMessageAsync(subject, sub.Sid, replyTo, headers, payload, CancellationToken.None);
|
||||
client.SendMessage(subject, sub.Sid, replyTo, headers, payload);
|
||||
}
|
||||
|
||||
private static void SendNoResponders(NatsClient sender, string replyTo)
|
||||
{
|
||||
// Find the sid for a subscription matching the reply subject
|
||||
var sid = string.Empty;
|
||||
foreach (var sub in sender.Subscriptions.Values)
|
||||
{
|
||||
if (SubjectMatch.MatchLiteral(replyTo, sub.Subject))
|
||||
{
|
||||
sid = sub.Sid;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Build: HMSG {replyTo} {sid} {hdrLen} {hdrLen}\r\n{headers}\r\n
|
||||
var headerBlock = "NATS/1.0 503\r\n\r\n"u8;
|
||||
var hdrLen = headerBlock.Length;
|
||||
var controlLine = Encoding.ASCII.GetBytes($"HMSG {replyTo} {sid} {hdrLen} {hdrLen}\r\n");
|
||||
|
||||
var totalLen = controlLine.Length + hdrLen + NatsProtocol.CrLf.Length;
|
||||
var msg = new byte[totalLen];
|
||||
var offset = 0;
|
||||
controlLine.CopyTo(msg.AsSpan(offset));
|
||||
offset += controlLine.Length;
|
||||
headerBlock.CopyTo(msg.AsSpan(offset));
|
||||
offset += hdrLen;
|
||||
NatsProtocol.CrLf.CopyTo(msg.AsSpan(offset));
|
||||
|
||||
sender.QueueOutbound(msg);
|
||||
}
|
||||
|
||||
public Account GetOrCreateAccount(string name)
|
||||
|
||||
@@ -6,6 +6,7 @@ public static class NatsProtocol
|
||||
{
|
||||
public const int MaxControlLineSize = 4096;
|
||||
public const int MaxPayloadSize = 1024 * 1024; // 1MB
|
||||
public const long MaxPendingSize = 64 * 1024 * 1024; // 64MB default max pending
|
||||
public const int DefaultPort = 4222;
|
||||
public const string Version = "0.1.0";
|
||||
public const int ProtoVersion = 1;
|
||||
@@ -30,6 +31,8 @@ public static class NatsProtocol
|
||||
public const string ErrAuthTimeout = "Authentication Timeout";
|
||||
public const string ErrPermissionsPublish = "Permissions Violation for Publish";
|
||||
public const string ErrPermissionsSubscribe = "Permissions Violation for Subscription";
|
||||
public const string ErrSlowConsumer = "Slow Consumer";
|
||||
public const string ErrNoRespondersRequiresHeaders = "No Responders Requires Headers Support";
|
||||
}
|
||||
|
||||
public sealed class ServerInfo
|
||||
|
||||
24
tests/NATS.Server.Tests/ClientClosedReasonTests.cs
Normal file
24
tests/NATS.Server.Tests/ClientClosedReasonTests.cs
Normal file
@@ -0,0 +1,24 @@
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class ClientClosedReasonTests
|
||||
{
|
||||
[Fact]
|
||||
public void All_expected_close_reasons_exist()
|
||||
{
|
||||
// Verify all 17 enum values exist and are distinct (None + 16 named reasons)
|
||||
var values = Enum.GetValues<ClientClosedReason>();
|
||||
values.Length.ShouldBe(17);
|
||||
values.Distinct().Count().ShouldBe(17);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(ClientClosedReason.ClientClosed, "Client Closed")]
|
||||
[InlineData(ClientClosedReason.SlowConsumerPendingBytes, "Slow Consumer (Pending Bytes)")]
|
||||
[InlineData(ClientClosedReason.SlowConsumerWriteDeadline, "Slow Consumer (Write Deadline)")]
|
||||
[InlineData(ClientClosedReason.StaleConnection, "Stale Connection")]
|
||||
[InlineData(ClientClosedReason.ServerShutdown, "Server Shutdown")]
|
||||
public void ToReasonString_returns_human_readable_description(ClientClosedReason reason, string expected)
|
||||
{
|
||||
reason.ToReasonString().ShouldBe(expected);
|
||||
}
|
||||
}
|
||||
53
tests/NATS.Server.Tests/ClientFlagsTests.cs
Normal file
53
tests/NATS.Server.Tests/ClientFlagsTests.cs
Normal file
@@ -0,0 +1,53 @@
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class ClientFlagsTests
|
||||
{
|
||||
[Fact]
|
||||
public void SetFlag_and_HasFlag_work()
|
||||
{
|
||||
var holder = new ClientFlagHolder();
|
||||
holder.HasFlag(ClientFlags.ConnectReceived).ShouldBeFalse();
|
||||
|
||||
holder.SetFlag(ClientFlags.ConnectReceived);
|
||||
holder.HasFlag(ClientFlags.ConnectReceived).ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ClearFlag_removes_flag()
|
||||
{
|
||||
var holder = new ClientFlagHolder();
|
||||
holder.SetFlag(ClientFlags.ConnectReceived);
|
||||
holder.SetFlag(ClientFlags.IsSlowConsumer);
|
||||
|
||||
holder.ClearFlag(ClientFlags.ConnectReceived);
|
||||
|
||||
holder.HasFlag(ClientFlags.ConnectReceived).ShouldBeFalse();
|
||||
holder.HasFlag(ClientFlags.IsSlowConsumer).ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Multiple_flags_can_be_set_independently()
|
||||
{
|
||||
var holder = new ClientFlagHolder();
|
||||
holder.SetFlag(ClientFlags.ConnectReceived);
|
||||
holder.SetFlag(ClientFlags.WriteLoopStarted);
|
||||
holder.SetFlag(ClientFlags.FirstPongSent);
|
||||
|
||||
holder.HasFlag(ClientFlags.ConnectReceived).ShouldBeTrue();
|
||||
holder.HasFlag(ClientFlags.WriteLoopStarted).ShouldBeTrue();
|
||||
holder.HasFlag(ClientFlags.FirstPongSent).ShouldBeTrue();
|
||||
holder.HasFlag(ClientFlags.IsSlowConsumer).ShouldBeFalse();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void SetFlag_is_thread_safe()
|
||||
{
|
||||
var holder = new ClientFlagHolder();
|
||||
var flags = Enum.GetValues<ClientFlags>();
|
||||
|
||||
Parallel.ForEach(flags, flag => holder.SetFlag(flag));
|
||||
|
||||
foreach (var flag in flags)
|
||||
holder.HasFlag(flag).ShouldBeTrue();
|
||||
}
|
||||
}
|
||||
@@ -99,8 +99,8 @@ public class ClientTests : IAsyncDisposable
|
||||
using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||
await _clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
|
||||
|
||||
// Trigger SendErrAsync
|
||||
await _natsClient.SendErrAsync("Invalid Subject");
|
||||
// Trigger SendErr
|
||||
_natsClient.SendErr("Invalid Subject");
|
||||
|
||||
var n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
|
||||
var response = Encoding.ASCII.GetString(buf, 0, n);
|
||||
|
||||
117
tests/NATS.Server.Tests/NoRespondersTests.cs
Normal file
117
tests/NATS.Server.Tests/NoRespondersTests.cs
Normal file
@@ -0,0 +1,117 @@
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using System.Text;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using NATS.Server;
|
||||
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class NoRespondersTests : IAsyncLifetime
|
||||
{
|
||||
private readonly NatsServer _server;
|
||||
private readonly int _port;
|
||||
private readonly CancellationTokenSource _cts = new();
|
||||
|
||||
public NoRespondersTests()
|
||||
{
|
||||
_port = GetFreePort();
|
||||
_server = new NatsServer(new NatsOptions { Port = _port }, NullLoggerFactory.Instance);
|
||||
}
|
||||
|
||||
public async Task InitializeAsync()
|
||||
{
|
||||
_ = _server.StartAsync(_cts.Token);
|
||||
await _server.WaitForReadyAsync();
|
||||
}
|
||||
|
||||
public async Task DisposeAsync()
|
||||
{
|
||||
await _cts.CancelAsync();
|
||||
_server.Dispose();
|
||||
}
|
||||
|
||||
private static int GetFreePort()
|
||||
{
|
||||
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
sock.Bind(new IPEndPoint(IPAddress.Loopback, 0));
|
||||
return ((IPEndPoint)sock.LocalEndPoint!).Port;
|
||||
}
|
||||
|
||||
private async Task<Socket> ConnectClientAsync()
|
||||
{
|
||||
var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
await sock.ConnectAsync(IPAddress.Loopback, _port);
|
||||
return sock;
|
||||
}
|
||||
|
||||
private static async Task<string> ReadUntilAsync(Socket sock, string expected, int timeoutMs = 5000)
|
||||
{
|
||||
using var cts = new CancellationTokenSource(timeoutMs);
|
||||
var sb = new StringBuilder();
|
||||
var buf = new byte[4096];
|
||||
while (!sb.ToString().Contains(expected))
|
||||
{
|
||||
var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token);
|
||||
if (n == 0) break;
|
||||
sb.Append(Encoding.ASCII.GetString(buf, 0, n));
|
||||
}
|
||||
return sb.ToString();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task NoResponders_without_headers_closes_connection()
|
||||
{
|
||||
using var client = await ConnectClientAsync();
|
||||
|
||||
// Read INFO
|
||||
await ReadUntilAsync(client, "\r\n");
|
||||
|
||||
// Send CONNECT with no_responders:true but headers:false
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes(
|
||||
"CONNECT {\"no_responders\":true,\"headers\":false}\r\n"));
|
||||
|
||||
// Should receive -ERR and connection should close
|
||||
var response = await ReadUntilAsync(client, "-ERR");
|
||||
response.ShouldContain("-ERR");
|
||||
response.ShouldContain("No Responders Requires Headers Support");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task NoResponders_with_headers_accepted()
|
||||
{
|
||||
using var client = await ConnectClientAsync();
|
||||
|
||||
// Read INFO
|
||||
await ReadUntilAsync(client, "\r\n");
|
||||
|
||||
// Send CONNECT with both no_responders and headers true, then PING
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes(
|
||||
"CONNECT {\"no_responders\":true,\"headers\":true}\r\nPING\r\n"));
|
||||
|
||||
// Should receive PONG (connection stays alive)
|
||||
var response = await ReadUntilAsync(client, "PONG\r\n");
|
||||
response.ShouldContain("PONG\r\n");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task NoResponders_sends_503_when_no_subscribers()
|
||||
{
|
||||
using var client = await ConnectClientAsync();
|
||||
|
||||
// Read INFO
|
||||
await ReadUntilAsync(client, "\r\n");
|
||||
|
||||
// CONNECT with no_responders and headers enabled
|
||||
// SUB to the reply inbox so we can receive the 503
|
||||
// PUB to a subject with no subscribers, using a reply-to subject
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes(
|
||||
"CONNECT {\"no_responders\":true,\"headers\":true}\r\n" +
|
||||
"SUB _INBOX.reply 1\r\n" +
|
||||
"PUB no.subscribers _INBOX.reply 5\r\nHello\r\n"));
|
||||
|
||||
// Should receive HMSG with 503 status on the reply subject
|
||||
var response = await ReadUntilAsync(client, "HMSG");
|
||||
response.ShouldContain("HMSG _INBOX.reply 1");
|
||||
response.ShouldContain("503");
|
||||
}
|
||||
}
|
||||
137
tests/NATS.Server.Tests/VerboseModeTests.cs
Normal file
137
tests/NATS.Server.Tests/VerboseModeTests.cs
Normal file
@@ -0,0 +1,137 @@
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using System.Text;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using NATS.Server;
|
||||
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class VerboseModeTests : IAsyncLifetime
|
||||
{
|
||||
private readonly NatsServer _server;
|
||||
private readonly int _port;
|
||||
private readonly CancellationTokenSource _cts = new();
|
||||
|
||||
public VerboseModeTests()
|
||||
{
|
||||
_port = GetFreePort();
|
||||
_server = new NatsServer(new NatsOptions { Port = _port }, NullLoggerFactory.Instance);
|
||||
}
|
||||
|
||||
public async Task InitializeAsync()
|
||||
{
|
||||
_ = _server.StartAsync(_cts.Token);
|
||||
await _server.WaitForReadyAsync();
|
||||
}
|
||||
|
||||
public async Task DisposeAsync()
|
||||
{
|
||||
await _cts.CancelAsync();
|
||||
_server.Dispose();
|
||||
}
|
||||
|
||||
private static int GetFreePort()
|
||||
{
|
||||
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
sock.Bind(new IPEndPoint(IPAddress.Loopback, 0));
|
||||
return ((IPEndPoint)sock.LocalEndPoint!).Port;
|
||||
}
|
||||
|
||||
private async Task<Socket> ConnectClientAsync()
|
||||
{
|
||||
var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
await sock.ConnectAsync(IPAddress.Loopback, _port);
|
||||
return sock;
|
||||
}
|
||||
|
||||
private static async Task<string> ReadUntilAsync(Socket sock, string expected, int timeoutMs = 5000)
|
||||
{
|
||||
using var cts = new CancellationTokenSource(timeoutMs);
|
||||
var sb = new StringBuilder();
|
||||
var buf = new byte[4096];
|
||||
while (!sb.ToString().Contains(expected))
|
||||
{
|
||||
var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token);
|
||||
if (n == 0) break;
|
||||
sb.Append(Encoding.ASCII.GetString(buf, 0, n));
|
||||
}
|
||||
return sb.ToString();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Verbose_mode_sends_OK_after_CONNECT()
|
||||
{
|
||||
using var client = await ConnectClientAsync();
|
||||
|
||||
// Read INFO
|
||||
await ReadUntilAsync(client, "\r\n");
|
||||
|
||||
// Send CONNECT with verbose:true
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {\"verbose\":true}\r\n"));
|
||||
|
||||
// Should receive +OK after CONNECT
|
||||
var response = await ReadUntilAsync(client, "+OK\r\n");
|
||||
response.ShouldContain("+OK\r\n");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Verbose_mode_sends_OK_after_SUB()
|
||||
{
|
||||
using var client = await ConnectClientAsync();
|
||||
|
||||
// Read INFO
|
||||
await ReadUntilAsync(client, "\r\n");
|
||||
|
||||
// Send CONNECT with verbose:true, then SUB
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {\"verbose\":true}\r\n"));
|
||||
|
||||
// Read +OK from CONNECT
|
||||
await ReadUntilAsync(client, "+OK\r\n");
|
||||
|
||||
// Send SUB
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes("SUB foo 1\r\n"));
|
||||
|
||||
// Should receive +OK after SUB
|
||||
var response = await ReadUntilAsync(client, "+OK\r\n");
|
||||
response.ShouldContain("+OK\r\n");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Verbose_mode_sends_OK_after_PUB()
|
||||
{
|
||||
using var client = await ConnectClientAsync();
|
||||
|
||||
// Read INFO
|
||||
await ReadUntilAsync(client, "\r\n");
|
||||
|
||||
// Send CONNECT with verbose:true
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {\"verbose\":true}\r\n"));
|
||||
|
||||
// Read +OK from CONNECT
|
||||
await ReadUntilAsync(client, "+OK\r\n");
|
||||
|
||||
// Send PUB
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes("PUB foo 5\r\nHello\r\n"));
|
||||
|
||||
// Should receive +OK after PUB
|
||||
var response = await ReadUntilAsync(client, "+OK\r\n");
|
||||
response.ShouldContain("+OK\r\n");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Non_verbose_mode_does_not_send_OK()
|
||||
{
|
||||
using var client = await ConnectClientAsync();
|
||||
|
||||
// Read INFO
|
||||
await ReadUntilAsync(client, "\r\n");
|
||||
|
||||
// Send CONNECT without verbose (default false), then SUB, then PING
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB foo 1\r\nPING\r\n"));
|
||||
|
||||
// Should receive PONG but NOT +OK
|
||||
var response = await ReadUntilAsync(client, "PONG\r\n");
|
||||
response.ShouldContain("PONG\r\n");
|
||||
response.ShouldNotContain("+OK");
|
||||
}
|
||||
}
|
||||
363
tests/NATS.Server.Tests/WriteLoopTests.cs
Normal file
363
tests/NATS.Server.Tests/WriteLoopTests.cs
Normal file
@@ -0,0 +1,363 @@
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using System.Text;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using NATS.Server;
|
||||
using NATS.Server.Auth;
|
||||
using NATS.Server.Protocol;
|
||||
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class WriteLoopTests : IAsyncDisposable
|
||||
{
|
||||
private readonly Socket _serverSocket;
|
||||
private readonly Socket _clientSocket;
|
||||
private readonly CancellationTokenSource _cts = new();
|
||||
|
||||
private readonly ServerInfo _serverInfo = new()
|
||||
{
|
||||
ServerId = "test",
|
||||
ServerName = "test",
|
||||
Version = "0.1.0",
|
||||
Host = "127.0.0.1",
|
||||
Port = 4222,
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// Creates a connected socket pair via loopback and returns both sockets.
|
||||
/// </summary>
|
||||
private static (Socket serverSocket, Socket clientSocket) CreateSocketPair()
|
||||
{
|
||||
var listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
listener.Bind(new IPEndPoint(IPAddress.Loopback, 0));
|
||||
listener.Listen(1);
|
||||
var port = ((IPEndPoint)listener.LocalEndPoint!).Port;
|
||||
|
||||
var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
clientSocket.Connect(IPAddress.Loopback, port);
|
||||
var serverSocket = listener.Accept();
|
||||
listener.Dispose();
|
||||
|
||||
return (serverSocket, clientSocket);
|
||||
}
|
||||
|
||||
public WriteLoopTests()
|
||||
{
|
||||
(_serverSocket, _clientSocket) = CreateSocketPair();
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
await _cts.CancelAsync();
|
||||
_serverSocket.Dispose();
|
||||
_clientSocket.Dispose();
|
||||
}
|
||||
|
||||
private NatsClient CreateClient(NatsOptions? options = null)
|
||||
{
|
||||
options ??= new NatsOptions();
|
||||
var authService = AuthService.Build(options);
|
||||
return new NatsClient(
|
||||
1,
|
||||
new NetworkStream(_serverSocket, ownsSocket: false),
|
||||
_serverSocket,
|
||||
options,
|
||||
_serverInfo,
|
||||
authService,
|
||||
null,
|
||||
NullLogger.Instance,
|
||||
new ServerStats());
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reads all available data from the client socket until timeout, accumulating it into a string.
|
||||
/// </summary>
|
||||
private async Task<string> ReadFromClientSocketAsync(int bufferSize = 8192, int timeoutMs = 5000)
|
||||
{
|
||||
var buf = new byte[bufferSize];
|
||||
using var readCts = new CancellationTokenSource(timeoutMs);
|
||||
var n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
|
||||
return Encoding.ASCII.GetString(buf, 0, n);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reads and discards the INFO line that is sent on client startup.
|
||||
/// </summary>
|
||||
private async Task ConsumeInfoAsync()
|
||||
{
|
||||
var response = await ReadFromClientSocketAsync();
|
||||
response.ShouldStartWith("INFO ");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task QueueOutbound_writes_data_to_client()
|
||||
{
|
||||
using var natsClient = CreateClient();
|
||||
var runTask = natsClient.RunAsync(_cts.Token);
|
||||
|
||||
// Read and discard the initial INFO message
|
||||
await ConsumeInfoAsync();
|
||||
|
||||
// Queue some data via QueueOutbound
|
||||
var testData = "PING\r\n"u8.ToArray();
|
||||
var result = natsClient.QueueOutbound(testData);
|
||||
result.ShouldBeTrue();
|
||||
|
||||
// Read the data from the client socket
|
||||
var received = await ReadFromClientSocketAsync();
|
||||
received.ShouldBe("PING\r\n");
|
||||
|
||||
await _cts.CancelAsync();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task QueueOutbound_writes_multiple_messages_to_client()
|
||||
{
|
||||
using var natsClient = CreateClient();
|
||||
var runTask = natsClient.RunAsync(_cts.Token);
|
||||
|
||||
// Read and discard the initial INFO message
|
||||
await ConsumeInfoAsync();
|
||||
|
||||
// Queue multiple messages
|
||||
natsClient.QueueOutbound("MSG foo 1 5\r\nhello\r\n"u8.ToArray());
|
||||
natsClient.QueueOutbound("MSG bar 2 5\r\nworld\r\n"u8.ToArray());
|
||||
|
||||
// Read all data from the socket -- may arrive in one or two reads
|
||||
var sb = new StringBuilder();
|
||||
var buf = new byte[8192];
|
||||
using var readCts = new CancellationTokenSource(5000);
|
||||
while (sb.Length < "MSG foo 1 5\r\nhello\r\nMSG bar 2 5\r\nworld\r\n".Length)
|
||||
{
|
||||
var n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
|
||||
sb.Append(Encoding.ASCII.GetString(buf, 0, n));
|
||||
}
|
||||
|
||||
var received = sb.ToString();
|
||||
received.ShouldContain("MSG foo 1 5\r\nhello\r\n");
|
||||
received.ShouldContain("MSG bar 2 5\r\nworld\r\n");
|
||||
|
||||
await _cts.CancelAsync();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SlowConsumer_closes_when_pending_exceeds_max()
|
||||
{
|
||||
// Use a very small MaxPending so we can easily exceed it
|
||||
var options = new NatsOptions { MaxPending = 1024 };
|
||||
using var natsClient = CreateClient(options);
|
||||
var runTask = natsClient.RunAsync(_cts.Token);
|
||||
|
||||
// Read and discard the initial INFO message
|
||||
await ConsumeInfoAsync();
|
||||
|
||||
// Queue data that exceeds MaxPending. The first call adds to pending, then
|
||||
// subsequent calls should eventually trigger slow consumer detection.
|
||||
var largeData = new byte[2048];
|
||||
Array.Fill(largeData, (byte)'X');
|
||||
|
||||
var queued = natsClient.QueueOutbound(largeData);
|
||||
|
||||
// The QueueOutbound should return false since data exceeds MaxPending
|
||||
queued.ShouldBeFalse();
|
||||
|
||||
// CloseReason should be set to SlowConsumerPendingBytes
|
||||
// Give a small delay for the async close to propagate
|
||||
await Task.Delay(200);
|
||||
natsClient.CloseReason.ShouldBe(ClientClosedReason.SlowConsumerPendingBytes);
|
||||
|
||||
await _cts.CancelAsync();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SlowConsumer_sets_reason_before_closing_connection()
|
||||
{
|
||||
// Use very small MaxPending
|
||||
var options = new NatsOptions { MaxPending = 512 };
|
||||
using var natsClient = CreateClient(options);
|
||||
var runTask = natsClient.RunAsync(_cts.Token);
|
||||
|
||||
// Read and discard the initial INFO message
|
||||
await ConsumeInfoAsync();
|
||||
|
||||
// Try to queue data that exceeds MaxPending
|
||||
var largeData = new byte[1024];
|
||||
Array.Fill(largeData, (byte)'Y');
|
||||
natsClient.QueueOutbound(largeData);
|
||||
|
||||
// Wait for the close to propagate
|
||||
await Task.Delay(200);
|
||||
|
||||
natsClient.CloseReason.ShouldBe(ClientClosedReason.SlowConsumerPendingBytes);
|
||||
|
||||
// The connection should eventually be closed -- ReceiveAsync returns 0
|
||||
var buf = new byte[4096];
|
||||
using var readCts = new CancellationTokenSource(5000);
|
||||
try
|
||||
{
|
||||
// Read any remaining data (the -ERR 'Slow Consumer' message) then expect 0
|
||||
var total = 0;
|
||||
while (true)
|
||||
{
|
||||
var n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
|
||||
if (n == 0)
|
||||
break;
|
||||
total += n;
|
||||
}
|
||||
|
||||
// We should get here -- the socket was closed by the server
|
||||
true.ShouldBeTrue();
|
||||
}
|
||||
catch (SocketException)
|
||||
{
|
||||
// Also acceptable -- connection reset
|
||||
true.ShouldBeTrue();
|
||||
}
|
||||
|
||||
await _cts.CancelAsync();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task PendingBytes_tracks_queued_data()
|
||||
{
|
||||
using var natsClient = CreateClient();
|
||||
var runTask = natsClient.RunAsync(_cts.Token);
|
||||
|
||||
// Read and discard the initial INFO message -- note that the write loop
|
||||
// processes the INFO so by the time we've read it, PendingBytes for INFO
|
||||
// should have been decremented.
|
||||
await ConsumeInfoAsync();
|
||||
|
||||
// After INFO has been written and flushed, pending bytes should be back to 0
|
||||
// (give a tiny delay for the write loop to decrement)
|
||||
await Task.Delay(100);
|
||||
natsClient.PendingBytes.ShouldBe(0);
|
||||
|
||||
// Now queue known data
|
||||
var data1 = new byte[100];
|
||||
var data2 = new byte[200];
|
||||
natsClient.QueueOutbound(data1);
|
||||
|
||||
// PendingBytes should reflect the queued but not-yet-written data.
|
||||
// However, the write loop is running concurrently so it may drain quickly.
|
||||
// At a minimum, after queueing data1 PendingBytes should be >= 0
|
||||
// (it could have been written already). Let's queue both in quick succession
|
||||
// and check the combined pending.
|
||||
natsClient.QueueOutbound(data2);
|
||||
|
||||
// PendingBytes should be at least 0 (write loop may have already drained)
|
||||
natsClient.PendingBytes.ShouldBeGreaterThanOrEqualTo(0);
|
||||
|
||||
// Now let the write loop drain everything and verify it returns to 0
|
||||
await Task.Delay(500);
|
||||
natsClient.PendingBytes.ShouldBe(0);
|
||||
|
||||
await _cts.CancelAsync();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task PendingBytes_increases_with_queued_data_before_drain()
|
||||
{
|
||||
// To observe PendingBytes before the write loop drains it, we create
|
||||
// a scenario where we can check the value atomically after queuing.
|
||||
// We use a fresh client and check PendingBytes right after QueueOutbound.
|
||||
using var natsClient = CreateClient();
|
||||
var runTask = natsClient.RunAsync(_cts.Token);
|
||||
|
||||
// Read the INFO to avoid backpressure
|
||||
await ConsumeInfoAsync();
|
||||
await Task.Delay(100);
|
||||
|
||||
// Queue a message and check that PendingBytes is non-negative
|
||||
// (the write loop may drain it very quickly, so we can't guarantee a specific value,
|
||||
// but we can verify the property works and eventually returns to 0)
|
||||
var data = new byte[500];
|
||||
natsClient.QueueOutbound(data);
|
||||
|
||||
// PendingBytes is either 500 (not yet drained) or 0 (already drained) -- both valid
|
||||
var pending = natsClient.PendingBytes;
|
||||
pending.ShouldBeOneOf(0L, 500L);
|
||||
|
||||
// After draining, it should be 0
|
||||
await Task.Delay(500);
|
||||
natsClient.PendingBytes.ShouldBe(0);
|
||||
|
||||
// Read and discard the written data from the socket so it doesn't block
|
||||
var buf = new byte[8192];
|
||||
using var readCts = new CancellationTokenSource(2000);
|
||||
try
|
||||
{
|
||||
await _clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// OK -- may have already been consumed
|
||||
}
|
||||
|
||||
await _cts.CancelAsync();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task QueueOutbound_returns_false_when_client_is_closed()
|
||||
{
|
||||
var options = new NatsOptions { MaxPending = 512 };
|
||||
using var natsClient = CreateClient(options);
|
||||
var runTask = natsClient.RunAsync(_cts.Token);
|
||||
|
||||
// Read INFO
|
||||
await ConsumeInfoAsync();
|
||||
|
||||
// Trigger slow consumer close by exceeding MaxPending
|
||||
var largeData = new byte[1024];
|
||||
natsClient.QueueOutbound(largeData);
|
||||
|
||||
// Wait for close to propagate
|
||||
await Task.Delay(200);
|
||||
|
||||
// Subsequent QueueOutbound calls should return false
|
||||
var result = natsClient.QueueOutbound("PING\r\n"u8.ToArray());
|
||||
result.ShouldBeFalse();
|
||||
|
||||
await _cts.CancelAsync();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SlowConsumer_increments_server_stats()
|
||||
{
|
||||
var options = new NatsOptions { MaxPending = 512 };
|
||||
var stats = new ServerStats();
|
||||
var authService = AuthService.Build(options);
|
||||
using var natsClient = new NatsClient(
|
||||
1,
|
||||
new NetworkStream(_serverSocket, ownsSocket: false),
|
||||
_serverSocket,
|
||||
options,
|
||||
_serverInfo,
|
||||
authService,
|
||||
null,
|
||||
NullLogger.Instance,
|
||||
stats);
|
||||
|
||||
var runTask = natsClient.RunAsync(_cts.Token);
|
||||
|
||||
// Read INFO
|
||||
await ConsumeInfoAsync();
|
||||
|
||||
// Initial stats should be 0
|
||||
Interlocked.Read(ref stats.SlowConsumers).ShouldBe(0);
|
||||
Interlocked.Read(ref stats.SlowConsumerClients).ShouldBe(0);
|
||||
|
||||
// Trigger slow consumer
|
||||
var largeData = new byte[1024];
|
||||
natsClient.QueueOutbound(largeData);
|
||||
|
||||
// Wait for propagation
|
||||
await Task.Delay(200);
|
||||
|
||||
// Stats should be incremented
|
||||
Interlocked.Read(ref stats.SlowConsumers).ShouldBeGreaterThan(0);
|
||||
Interlocked.Read(ref stats.SlowConsumerClients).ShouldBeGreaterThan(0);
|
||||
|
||||
await _cts.CancelAsync();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user