Compare commits
18 Commits
3941c85e76
...
42c7c9cb7a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
42c7c9cb7a | ||
|
|
dab8004d6b | ||
|
|
f0b5edd7c6 | ||
|
|
7a897c1087 | ||
|
|
e9b6c7fdd3 | ||
|
|
0347e8a28c | ||
|
|
6afe11ad4d | ||
|
|
cc0fe04f3c | ||
|
|
4ad821394b | ||
|
|
0ec5583422 | ||
|
|
dddced444e | ||
|
|
e87d4c00d9 | ||
|
|
7cf6bb866e | ||
|
|
17a0a217dd | ||
|
|
5b383ada4b | ||
|
|
060e1ee23d | ||
|
|
f4efbcf09e | ||
|
|
f86ea57f43 |
@@ -8,6 +8,7 @@
|
||||
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.3" />
|
||||
<PackageVersion Include="Serilog.Extensions.Hosting" Version="10.0.0" />
|
||||
<PackageVersion Include="Serilog.Sinks.Console" Version="6.1.1" />
|
||||
<PackageVersion Include="Serilog.Sinks.File" Version="6.0.0" />
|
||||
|
||||
<!-- Authentication -->
|
||||
<PackageVersion Include="NATS.NKeys" Version="1.0.0-preview.3" />
|
||||
|
||||
@@ -135,16 +135,16 @@ Go implements a sophisticated slow consumer detection system:
|
||||
| Feature | Go | .NET | Notes |
|
||||
|---------|:--:|:----:|-------|
|
||||
| Multi-client-type command routing | Y | N | Go checks `c.kind` to allow/reject commands |
|
||||
| Protocol tracing in parser | Y | N | Go calls `traceInOp()` per operation |
|
||||
| Protocol tracing in parser | Y | Y | `TraceInOp()` logs `<<- OP arg` at `LogLevel.Trace` via optional `ILogger` |
|
||||
| Subject mapping (input→output) | Y | N | Go transforms subjects via mapping rules |
|
||||
| MIME header parsing | Y | N | .NET delegates header handling to client layer |
|
||||
| MIME header parsing | Y | Y | `NatsHeaderParser.Parse()` — status line + key-value headers from `ReadOnlySpan<byte>` |
|
||||
| Message trace event initialization | Y | N | |
|
||||
|
||||
### Protocol Writing
|
||||
| Aspect | Go | .NET | Notes |
|
||||
|--------|:--:|:----:|-------|
|
||||
| INFO serialization | Once at startup | Every send | .NET re-serializes JSON each time |
|
||||
| MSG/HMSG construction | Direct buffer write | String interpolation → byte encode | More allocations in .NET |
|
||||
| INFO serialization | Once at startup | Once at startup | Cached at startup; nonce connections serialize per-connection |
|
||||
| MSG/HMSG construction | Direct buffer write | Span-based buffer write | `int.TryFormat` + `CopyTo` into rented buffer, no string allocations |
|
||||
| Pre-encoded constants | Y | Y | Both pre-encode PING/PONG/OK |
|
||||
|
||||
---
|
||||
@@ -159,18 +159,18 @@ Go implements a sophisticated slow consumer detection system:
|
||||
| Result caching (1024 max) | Y | Y | Same limits |
|
||||
| `plist` optimization (>256 subs) | Y | N | Go converts high-fanout nodes to array |
|
||||
| Async cache sweep (background) | Y | N | .NET sweeps inline under write lock |
|
||||
| Atomic generation ID for invalidation | Y | N | .NET clears cache explicitly |
|
||||
| Atomic generation ID for invalidation | Y | Y | `Interlocked.Increment` on insert/remove; cached results store generation |
|
||||
| Cache eviction strategy | Random | First-N | Semantic difference minimal |
|
||||
|
||||
### Missing SubList Features
|
||||
### SubList Features
|
||||
| Feature | Go | .NET | Notes |
|
||||
|---------|:--:|:----:|-------|
|
||||
| `Stats()` — comprehensive statistics | Y | N | Matches, cache hits, inserts, removes, fanout |
|
||||
| `HasInterest()` — fast bool check | Y | N | |
|
||||
| `NumInterest()` — fast count | Y | N | |
|
||||
| `ReverseMatch()` — pattern→literal query | Y | N | |
|
||||
| `RemoveBatch()` — efficient bulk removal | Y | N | |
|
||||
| `All()` — enumerate all subscriptions | Y | N | |
|
||||
| `Stats()` — comprehensive statistics | Y | Y | Matches, cache hits, inserts, removes tracked via `Interlocked` |
|
||||
| `HasInterest()` — fast bool check | Y | Y | Walks trie without allocating result list |
|
||||
| `NumInterest()` — fast count | Y | Y | Counts plain + queue subs without allocation |
|
||||
| `ReverseMatch()` — pattern→literal query | Y | Y | Finds subscriptions whose wildcards match a literal subject |
|
||||
| `RemoveBatch()` — efficient bulk removal | Y | Y | Single generation increment for batch; increments `_removes` per sub |
|
||||
| `All()` — enumerate all subscriptions | Y | Y | Recursive trie walk returning all subscriptions |
|
||||
| Notification system (interest changes) | Y | N | |
|
||||
| Local/remote subscription filtering | Y | N | |
|
||||
| Queue weight expansion (remote subs) | Y | N | |
|
||||
@@ -181,16 +181,16 @@ Go implements a sophisticated slow consumer detection system:
|
||||
|---------|:--:|:----:|-------|
|
||||
| Basic validation (empty tokens, wildcards) | Y | Y | |
|
||||
| Literal subject check | Y | Y | |
|
||||
| UTF-8/null rune validation | Y | N | Go has `checkRunes` parameter |
|
||||
| Collision detection (`SubjectsCollide`) | Y | N | |
|
||||
| Token utilities (`tokenAt`, `numTokens`) | Y | N | |
|
||||
| UTF-8/null rune validation | Y | Y | `IsValidSubject(string, bool checkRunes)` rejects null bytes |
|
||||
| Collision detection (`SubjectsCollide`) | Y | Y | Token-by-token wildcard comparison; O(n) via upfront `Split` |
|
||||
| Token utilities (`tokenAt`, `numTokens`) | Y | Y | `TokenAt` returns `ReadOnlySpan<char>`; `NumTokens` counts separators |
|
||||
| Stack-allocated token buffer | Y | N | Go uses `[32]string{}` on stack |
|
||||
|
||||
### Subscription Lifecycle
|
||||
| Feature | Go | .NET | Notes |
|
||||
|---------|:--:|:----:|-------|
|
||||
| Per-account subscription limit | Y | N | |
|
||||
| Auto-unsubscribe on max messages | Y | Y | .NET enforces at delivery time (NatsServer.cs:269-270) |
|
||||
| Per-account subscription limit | Y | Y | `Account.IncrementSubscriptions()` returns false when `MaxSubscriptions` exceeded |
|
||||
| Auto-unsubscribe on max messages | Y | Y | Enforced at delivery; sub removed from trie + client dict when exhausted |
|
||||
| Subscription routing propagation | Y | N | For clusters |
|
||||
| Queue weight (`qw`) field | Y | N | For remote queue load balancing |
|
||||
|
||||
@@ -217,10 +217,10 @@ Go implements a sophisticated slow consumer detection system:
|
||||
| Feature | Go | .NET | Notes |
|
||||
|---------|:--:|:----:|-------|
|
||||
| Per-account SubList isolation | Y | Y | |
|
||||
| Multi-account user resolution | Y | N | .NET has basic account, no resolution |
|
||||
| Multi-account user resolution | Y | Y | `AccountConfig` per account in `NatsOptions.Accounts`; `GetOrCreateAccount` wires limits |
|
||||
| Account exports/imports | Y | N | |
|
||||
| Per-account connection limits | Y | N | |
|
||||
| Per-account subscription limits | Y | N | |
|
||||
| Per-account connection limits | Y | Y | `Account.AddClient()` returns false when `MaxConnections` exceeded |
|
||||
| Per-account subscription limits | Y | Y | `Account.IncrementSubscriptions()` enforced in `ProcessSub()` |
|
||||
| Account JetStream limits | Y | N | Excluded per scope |
|
||||
|
||||
### Permissions
|
||||
@@ -228,11 +228,12 @@ Go implements a sophisticated slow consumer detection system:
|
||||
|---------|:--:|:----:|-------|
|
||||
| Publish allow list | Y | Y | |
|
||||
| Subscribe allow list | Y | Y | |
|
||||
| Publish deny list | Y | Partial | .NET has deny in struct but limited enforcement |
|
||||
| Subscribe deny list | Y | Partial | Same |
|
||||
| Message-level deny filtering | Y | N | Go filters at delivery time |
|
||||
| Permission caching (128 entries) | Y | N | |
|
||||
| Response permissions (reply tracking) | Y | N | Dynamic reply subject authorization |
|
||||
| Publish deny list | Y | Y | Full enforcement with LRU-cached results |
|
||||
| Subscribe deny list | Y | Y | Queue-aware deny checking in `IsSubscribeAllowed` |
|
||||
| Message-level deny filtering | Y | Y | `IsDeliveryAllowed()` checked before MSG send; auto-unsub cleanup on deny |
|
||||
| Permission caching (128 entries) | Y | Y | `PermissionLruCache` — Dictionary+LinkedList LRU, matching Go's `maxPermCacheSize` |
|
||||
| Response permissions (reply tracking) | Y | Y | `ResponseTracker` with configurable TTL + max messages; not LRU-cached |
|
||||
| Auth expiry enforcement | Y | Y | `Task.Delay` timer closes client when JWT/auth expires |
|
||||
| Permission templates (JWT) | Y | N | e.g., `{{name()}}`, `{{account-tag(...)}}` |
|
||||
|
||||
---
|
||||
@@ -247,7 +248,7 @@ Go implements a sophisticated slow consumer detection system:
|
||||
| `-n/--name` (ServerName) | Y | Y | |
|
||||
| `-m/--http_port` (monitoring) | Y | Y | |
|
||||
| `-c` (config file) | Y | Stub | Flag parsed, stored in `ConfigFile`, no config parser |
|
||||
| `-D/-V/-DV` (debug/trace) | Y | N | |
|
||||
| `-D/-V/-DV` (debug/trace) | Y | Y | Sets `Debug`/`Trace` on `NatsOptions`, adjusts Serilog minimum level |
|
||||
| `--tlscert/--tlskey/--tlscacert` | Y | Y | |
|
||||
| `--tlsverify` | Y | Y | |
|
||||
| `--http_base_path` | Y | Y | |
|
||||
@@ -259,12 +260,12 @@ Go implements a sophisticated slow consumer detection system:
|
||||
| Config file parsing | Y | N | Go has custom `conf` parser with includes |
|
||||
| Hot reload (SIGHUP) | Y | N | |
|
||||
| Config change detection | Y | N | Go tracks `inConfig`/`inCmdLine` origins |
|
||||
| ~450 option fields | Y | ~54 | .NET covers core options only |
|
||||
| ~450 option fields | Y | ~62 | .NET covers core + debug/trace/logging/limits/tags options |
|
||||
|
||||
### Missing Options Categories
|
||||
- Logging options (file, rotation, syslog, trace levels)
|
||||
- Advanced limits (MaxSubs, MaxSubTokens, MaxPending, WriteDeadline)
|
||||
- Tags/metadata
|
||||
- ~~Logging options (file, rotation, syslog, trace levels)~~ — File logging (`-l`), `LogSizeLimit`, Debug/Trace implemented; syslog/color/timestamp not yet
|
||||
- ~~Advanced limits (MaxSubs, MaxSubTokens, MaxPending, WriteDeadline)~~ — `MaxSubs`, `MaxSubTokens` implemented; MaxPending/WriteDeadline already existed
|
||||
- ~~Tags/metadata~~ — `Tags` dictionary implemented in `NatsOptions`
|
||||
- OCSP configuration
|
||||
- WebSocket/MQTT options
|
||||
- Operator mode / account resolver
|
||||
@@ -350,11 +351,11 @@ Go implements a sophisticated slow consumer detection system:
|
||||
| Feature | Go | .NET | Notes |
|
||||
|---------|:--:|:----:|-------|
|
||||
| Structured logging | Partial | Y | .NET uses Serilog with ILogger<T> |
|
||||
| File logging with rotation | Y | N | |
|
||||
| File logging with rotation | Y | Y | `-l` flag + `LogSizeLimit` option via Serilog.Sinks.File with `fileSizeLimitBytes` |
|
||||
| Syslog (local and remote) | Y | N | |
|
||||
| Log reopening (SIGUSR1) | Y | N | |
|
||||
| Trace mode (protocol-level) | Y | N | |
|
||||
| Debug mode | Y | N | |
|
||||
| Trace mode (protocol-level) | Y | Y | `-V`/`-DV` flags; parser `TraceInOp()` logs `<<- OP arg` at Trace level |
|
||||
| Debug mode | Y | Y | `-D`/`-DV` flags lower Serilog minimum to Debug/Verbose |
|
||||
| Per-subsystem log control | Y | N | |
|
||||
| Color output on TTY | Y | N | |
|
||||
| Timestamp format control | Y | N | |
|
||||
@@ -378,16 +379,16 @@ Go implements a sophisticated slow consumer detection system:
|
||||
## Summary: Critical Gaps for Production Use
|
||||
|
||||
### High Priority
|
||||
1. **Slow consumer detection** — unbounded writes can exhaust memory (stat fields exist but no detection logic)
|
||||
2. **Write coalescing / batch flush** — performance gap for high-throughput scenarios
|
||||
1. ~~**Slow consumer detection**~~ — implemented (pending bytes threshold + write deadline)
|
||||
2. ~~**Write coalescing / batch flush**~~ — implemented (channel-based write loop)
|
||||
|
||||
### Medium Priority
|
||||
3. **Verbose mode** — clients expect `+OK` when `verbose: true`
|
||||
4. **Permission deny enforcement at delivery** — deny lists checked at SUB/PUB time but not during message delivery
|
||||
3. ~~**Verbose mode**~~ — implemented (`+OK` on CONNECT/SUB/UNSUB/PUB)
|
||||
4. ~~**Permission deny enforcement at delivery**~~ — implemented (`IsDeliveryAllowed` + auto-unsub cleanup)
|
||||
5. **Config file parsing** — needed for production deployment (CLI stub exists)
|
||||
6. **Hot reload** — needed for zero-downtime config changes (SIGHUP stub exists)
|
||||
7. **File logging with rotation** — needed for production logging
|
||||
8. **No-responders validation** — flag parsed but not enforced
|
||||
7. ~~**File logging with rotation**~~ — implemented (Serilog.Sinks.File with `-l` flag)
|
||||
8. ~~**No-responders validation**~~ — implemented (CONNECT validation + 503 HMSG)
|
||||
|
||||
### Lower Priority
|
||||
9. **Dynamic buffer sizing** — delegated to Pipe, less optimized for long-lived connections
|
||||
@@ -395,8 +396,8 @@ Go implements a sophisticated slow consumer detection system:
|
||||
11. **TLS certificate mapping** — property exists, not implemented
|
||||
12. **OCSP support** — certificate revocation checking
|
||||
13. **Subject mapping** — input→output subject transformation
|
||||
14. **Protocol tracing** — no trace-level logging
|
||||
15. **Subscription statistics** — SubList has no stats collection
|
||||
16. **Per-account limits** — connections, subscriptions per account
|
||||
17. **Reply subject tracking** — dynamic response permissions
|
||||
14. ~~**Protocol tracing**~~ — implemented (`TraceInOp` at `LogLevel.Trace`)
|
||||
15. ~~**Subscription statistics**~~ — implemented (`Stats()`, `HasInterest()`, `NumInterest()`, etc.)
|
||||
16. ~~**Per-account limits**~~ — implemented (connection + subscription limits via `AccountConfig`)
|
||||
17. ~~**Reply subject tracking**~~ — implemented (`ResponseTracker` with TTL + max messages)
|
||||
18. **Windows Service integration** — needed for Windows deployment
|
||||
|
||||
226
docs/plans/2026-02-23-sections-7-10-gaps-design.md
Normal file
226
docs/plans/2026-02-23-sections-7-10-gaps-design.md
Normal file
@@ -0,0 +1,226 @@
|
||||
# Sections 7-10 Gaps Design: Monitoring, TLS, Logging, Ping/Pong
|
||||
|
||||
**Date:** 2026-02-23
|
||||
**Scope:** Implement remaining gaps in differences.md sections 7 (Monitoring), 8 (TLS), 9 (Logging), 10 (Ping/Pong)
|
||||
**Goal:** Go parity for all features within scope
|
||||
|
||||
---
|
||||
|
||||
## Section 7: Monitoring
|
||||
|
||||
### 7a. `/subz` Endpoint
|
||||
|
||||
Replace the empty stub with a full `SubszHandler`.
|
||||
|
||||
**Models:**
|
||||
- `Subsz` — response envelope: `Id`, `Now`, `SublistStats`, `Total`, `Offset`, `Limit`, `Subs[]`
|
||||
- `SubszOptions` — `Offset`, `Limit`, `Subscriptions` (bool for detail), `Account` (filter), `Test` (literal subject filter)
|
||||
- Reuse existing `SubDetail` from Connz
|
||||
|
||||
**Algorithm:**
|
||||
1. Iterate all accounts (or filter by `Account` param)
|
||||
2. Collect all subscriptions from each account's SubList
|
||||
3. If `Test` subject provided, filter using `SubjectMatch.MatchLiteral()` to only return subs that would receive that message
|
||||
4. Apply pagination (offset/limit)
|
||||
5. If `Subscriptions` is true, include `SubDetail[]` array
|
||||
|
||||
**SubList stats** — add a `Stats()` method to `SubList` returning `SublistStats` (count, cache size, inserts, removes, matches, cache hits).
|
||||
|
||||
**Files:** New `Monitoring/SubszHandler.cs`, `Monitoring/Subsz.cs`. Modify `MonitorServer.cs`, `SubList.cs`.
|
||||
|
||||
### 7b. Connz `ByStop` / `ByReason` Sorting
|
||||
|
||||
Add two missing sort options for closed connection queries.
|
||||
|
||||
- Add `ByStop` and `ByReason` to `SortOpt` enum
|
||||
- Parse `sort=stop` and `sort=reason` in query params
|
||||
- Validate: these sorts only work with `state=closed` — return error if used with open connections
|
||||
|
||||
### 7c. Connz State Filtering & Closed Connections
|
||||
|
||||
Track closed connections and support state-based filtering.
|
||||
|
||||
**Closed connection tracking:**
|
||||
- `ClosedClient` record: `Cid`, `Ip`, `Port`, `Start`, `Stop`, `Reason`, `Name`, `Lang`, `Version`, `InMsgs`, `OutMsgs`, `InBytes`, `OutBytes`, `NumSubs`, `Rtt`, `TlsVersion`, `TlsCipherSuite`
|
||||
- `ConcurrentQueue<ClosedClient>` on `NatsServer` (capped at 10,000 entries)
|
||||
- Populate in `RemoveClient()` from client state before disposal
|
||||
|
||||
**State filter:**
|
||||
- Parse `state=open|closed|all` query param
|
||||
- `open` (default): current live connections only
|
||||
- `closed`: only from closed connections list
|
||||
- `all`: merge both
|
||||
|
||||
**Files:** Modify `NatsServer.cs`, `ConnzHandler.cs`, new `Monitoring/ClosedClient.cs`.
|
||||
|
||||
### 7d. Varz Slow Consumer Stats
|
||||
|
||||
Already at parity. `SlowConsumersStats` is populated from `ServerStats` counters. No changes needed.
|
||||
|
||||
---
|
||||
|
||||
## Section 8: TLS
|
||||
|
||||
### 8a. TLS Rate Limiting
|
||||
|
||||
Already implemented via `TlsRateLimiter` (semaphore + periodic refill timer). Wired into `AcceptClientAsync`. Only a unit test needed.
|
||||
|
||||
### 8b. TLS Cert-to-User Mapping (TlsMap)
|
||||
|
||||
Full DN parsing using .NET built-in `X500DistinguishedName`.
|
||||
|
||||
**New `TlsMapAuthenticator`:**
|
||||
- Implements `IAuthenticator`
|
||||
- Receives the list of configured `User` objects
|
||||
- On `Authenticate()`:
|
||||
1. Extract `X509Certificate2` from auth context (passed from `TlsConnectionState`)
|
||||
2. Parse subject DN via `cert.SubjectName` (`X500DistinguishedName`)
|
||||
3. Build normalized DN string from RDN components
|
||||
4. Try exact DN match against user map (key = DN string)
|
||||
5. If no exact match, try CN-only match
|
||||
6. Return `AuthResult` with matched user's permissions
|
||||
|
||||
**Auth context extension:**
|
||||
- Add `X509Certificate2? ClientCertificate` to `ClientAuthContext`
|
||||
- Pass certificate from `TlsConnectionState` in `ProcessConnectAsync`
|
||||
|
||||
**AuthService integration:**
|
||||
- When `options.TlsMap && options.TlsVerify`, add `TlsMapAuthenticator` to authenticator chain
|
||||
- TlsMap auth runs before other authenticators (cert-based auth takes priority)
|
||||
|
||||
**Files:** New `Auth/TlsMapAuthenticator.cs`. Modify `Auth/AuthService.cs`, `Auth/ClientAuthContext.cs`, `NatsClient.cs`.
|
||||
|
||||
---
|
||||
|
||||
## Section 9: Logging
|
||||
|
||||
### 9a. File Logging with Rotation
|
||||
|
||||
**New options on `NatsOptions`:**
|
||||
- `LogFile` (string?) — path to log file
|
||||
- `LogSizeLimit` (long) — file size in bytes before rotation (0 = unlimited)
|
||||
- `LogMaxFiles` (int) — max retained rotated files (0 = unlimited)
|
||||
|
||||
**CLI flags:** `--log_file`, `--log_size_limit`, `--log_max_files`
|
||||
|
||||
**Serilog config:** Add `WriteTo.File()` with `fileSizeLimitBytes` and `retainedFileCountLimit` when `LogFile` is set.
|
||||
|
||||
### 9b. Debug/Trace Modes
|
||||
|
||||
**New options on `NatsOptions`:**
|
||||
- `Debug` (bool) — enable debug-level logging
|
||||
- `Trace` (bool) — enable trace/verbose-level logging
|
||||
|
||||
**CLI flags:** `-D` (debug), `-V` or `-T` (trace), `-DV` (both)
|
||||
|
||||
**Serilog config:**
|
||||
- Default: `MinimumLevel.Information()`
|
||||
- `-D`: `MinimumLevel.Debug()`
|
||||
- `-V`/`-T`: `MinimumLevel.Verbose()`
|
||||
|
||||
### 9c. Color Output
|
||||
|
||||
Auto-detect TTY via `Console.IsOutputRedirected`.
|
||||
- TTY: use `Serilog.Sinks.Console` with `AnsiConsoleTheme.Code`
|
||||
- Non-TTY: use `ConsoleTheme.None`
|
||||
|
||||
Matches Go's behavior of disabling color when stderr is not a terminal.
|
||||
|
||||
### 9d. Timestamp Format Control
|
||||
|
||||
**New options on `NatsOptions`:**
|
||||
- `Logtime` (bool, default true) — include timestamps
|
||||
- `LogtimeUTC` (bool, default false) — use UTC format
|
||||
|
||||
**CLI flags:** `--logtime` (true/false), `--logtime_utc`
|
||||
|
||||
**Output template adjustment:**
|
||||
- With timestamps: `[{Timestamp:yyyy/MM/dd HH:mm:ss.ffffff} {Level:u3}] {Message:lj}{NewLine}{Exception}`
|
||||
- Without timestamps: `[{Level:u3}] {Message:lj}{NewLine}{Exception}`
|
||||
- UTC: set `Serilog.Formatting` culture to UTC
|
||||
|
||||
### 9e. Log Reopening (SIGUSR1)
|
||||
|
||||
When file logging is configured:
|
||||
- SIGUSR1 handler calls `ReOpenLogFile()` on the server
|
||||
- `ReOpenLogFile()` flushes and closes current Serilog logger, creates new one with same config
|
||||
- This enables external log rotation tools (logrotate)
|
||||
|
||||
**Files:** Modify `NatsOptions.cs`, `Program.cs`, `NatsServer.cs`.
|
||||
|
||||
---
|
||||
|
||||
## Section 10: Ping/Pong
|
||||
|
||||
### 10a. RTT Tracking
|
||||
|
||||
**New fields on `NatsClient`:**
|
||||
- `_rttStartTicks` (long) — UTC ticks when PING sent
|
||||
- `_rtt` (long) — computed RTT in ticks
|
||||
- `Rtt` property (TimeSpan) — computed from `_rtt`
|
||||
|
||||
**Logic:**
|
||||
- In `RunPingTimerAsync`, before writing PING: `_rttStartTicks = DateTime.UtcNow.Ticks`
|
||||
- In `DispatchCommandAsync` PONG handler: compute `_rtt = DateTime.UtcNow.Ticks - _rttStartTicks` (min 1 tick)
|
||||
- `computeRTT()` helper ensures minimum 1 tick (handles clock granularity on Windows)
|
||||
|
||||
**Monitoring exposure:**
|
||||
- Populate `ConnInfo.Rtt` as formatted string (e.g., `"1.234ms"`)
|
||||
- Add `ByRtt` sort option to Connz
|
||||
|
||||
### 10b. RTT-Based First PING Delay
|
||||
|
||||
**New state on `NatsClient`:**
|
||||
- `_firstPongSent` flag in `ClientFlags`
|
||||
|
||||
**Logic in `RunPingTimerAsync`:**
|
||||
- Before first PING, check: `_firstPongSent || timeSinceStart > 2 seconds`
|
||||
- If neither condition met, skip this PING cycle
|
||||
- Set `_firstPongSent` on first PONG after CONNECT (in PONG handler)
|
||||
|
||||
This prevents the server from sending PING (for RTT) before the client has had a chance to respond to the initial INFO with CONNECT+PING.
|
||||
|
||||
### 10c. Stale Connection Stats
|
||||
|
||||
**New model:**
|
||||
- `StaleConnectionStats` — `Clients`, `Routes`, `Gateways`, `Leafs` (matching Go)
|
||||
|
||||
**ServerStats extension:**
|
||||
- Add `StaleConnectionClients`, `StaleConnectionRoutes`, etc. fields
|
||||
- Increment in `MarkClosed(StaleConnection)` based on connection kind
|
||||
|
||||
**Varz exposure:**
|
||||
- Add `StaleConnectionStats` field to `Varz`
|
||||
- Populate from `ServerStats` counters
|
||||
|
||||
**Files:** Modify `NatsClient.cs`, `ServerStats.cs`, `Varz.cs`, `VarzHandler.cs`, `Connz.cs`, `ConnzHandler.cs`.
|
||||
|
||||
---
|
||||
|
||||
## Test Coverage
|
||||
|
||||
Each section includes unit tests:
|
||||
|
||||
| Feature | Test File | Tests |
|
||||
|---------|-----------|-------|
|
||||
| Subz endpoint | SubszHandlerTests.cs | Empty response, with subs, account filter, test subject filter, pagination |
|
||||
| Connz closed state | ConnzHandlerTests.cs | State=closed, ByStop sort, ByReason sort, validation errors |
|
||||
| TLS rate limiter | TlsRateLimiterTests.cs | Rate enforcement, refill behavior |
|
||||
| TlsMap auth | TlsMapAuthenticatorTests.cs | DN matching, CN fallback, no match |
|
||||
| File logging | LoggingTests.cs | File creation, rotation on size limit |
|
||||
| RTT tracking | ClientTests.cs | RTT computed on PONG, exposed in connz, ByRtt sort |
|
||||
| First PING delay | ClientTests.cs | PING delayed until first PONG or 2s |
|
||||
| Stale stats | ServerTests.cs | Stale counters incremented, exposed in varz |
|
||||
|
||||
---
|
||||
|
||||
## Parallelization Strategy
|
||||
|
||||
These work streams are independent and can be developed by parallel subagents:
|
||||
|
||||
1. **Monitoring stream** (7a, 7b, 7c): SubszHandler + Connz closed connections + state filter
|
||||
2. **TLS stream** (8b): TlsMapAuthenticator
|
||||
3. **Logging stream** (9a-9e): All logging improvements
|
||||
4. **Ping/Pong stream** (10a-10c): RTT tracking + first PING delay + stale stats
|
||||
|
||||
Streams 1-4 touch different files with minimal overlap. The only shared touch point is `NatsOptions.cs` (new options for logging and ping/pong), which can be handled by one stream first and the others will build on it.
|
||||
1911
docs/plans/2026-02-23-sections-7-10-gaps-plan.md
Normal file
1911
docs/plans/2026-02-23-sections-7-10-gaps-plan.md
Normal file
File diff suppressed because it is too large
Load Diff
19
docs/plans/2026-02-23-sections-7-10-gaps-plan.md.tasks.json
Normal file
19
docs/plans/2026-02-23-sections-7-10-gaps-plan.md.tasks.json
Normal file
@@ -0,0 +1,19 @@
|
||||
{
|
||||
"planPath": "docs/plans/2026-02-23-sections-7-10-gaps-plan.md",
|
||||
"tasks": [
|
||||
{"id": 6, "subject": "Task 0: Add NuGet dependencies for logging sinks", "status": "pending"},
|
||||
{"id": 7, "subject": "Task 1: Add logging and ping/pong options to NatsOptions", "status": "pending", "blockedBy": [6]},
|
||||
{"id": 8, "subject": "Task 2: Add CLI flag parsing for logging and debug/trace", "status": "pending", "blockedBy": [6, 7]},
|
||||
{"id": 9, "subject": "Task 3: Implement log reopening on SIGUSR1", "status": "pending", "blockedBy": [6, 7]},
|
||||
{"id": 10, "subject": "Task 4: Add RTT tracking to NatsClient", "status": "pending", "blockedBy": [6, 7]},
|
||||
{"id": 11, "subject": "Task 5: Add stale connection stats and expose in varz", "status": "pending", "blockedBy": [6, 7]},
|
||||
{"id": 12, "subject": "Task 6: Add closed connection tracking and connz state filtering", "status": "pending", "blockedBy": [10]},
|
||||
{"id": 13, "subject": "Task 7: Implement /subz endpoint", "status": "pending", "blockedBy": [6, 7]},
|
||||
{"id": 14, "subject": "Task 8: Implement TLS cert-to-user mapping (TlsMap)", "status": "pending", "blockedBy": [6, 7]},
|
||||
{"id": 15, "subject": "Task 9: Add TLS rate limiter test", "status": "pending", "blockedBy": [6, 7]},
|
||||
{"id": 16, "subject": "Task 10: File logging tests", "status": "pending", "blockedBy": [6, 7, 8]},
|
||||
{"id": 17, "subject": "Task 11: Run full test suite and verify", "status": "pending", "blockedBy": [8, 9, 10, 11, 12, 13, 14, 15, 16]},
|
||||
{"id": 18, "subject": "Task 12: Update differences.md", "status": "pending", "blockedBy": [17]}
|
||||
],
|
||||
"lastUpdated": "2026-02-23T00:00:00Z"
|
||||
}
|
||||
190
docs/plans/2026-02-23-sections3-6-gaps-design.md
Normal file
190
docs/plans/2026-02-23-sections3-6-gaps-design.md
Normal file
@@ -0,0 +1,190 @@
|
||||
# Sections 3-6 Gaps Implementation Design
|
||||
|
||||
> Approved 2026-02-23. Implements all remaining gaps in Protocol Parsing, Subscriptions & Subject Matching, Authentication & Authorization, and Configuration.
|
||||
|
||||
---
|
||||
|
||||
## Section 3 — Protocol Parsing
|
||||
|
||||
### 3a. INFO Serialization Caching
|
||||
|
||||
Add `byte[] _infoJsonCache` on `NatsServer`. Build once in `StartAsync()` after `ServerInfo` is populated. Rebuild only on config reload.
|
||||
|
||||
`NatsClient.SendInfo()` uses cached bytes for non-nonce connections. For NKey connections (which need a per-connection nonce), clone `ServerInfo`, set nonce, serialize fresh.
|
||||
|
||||
**Files:** `NatsServer.cs`, `NatsClient.cs`
|
||||
|
||||
### 3b. Protocol Tracing
|
||||
|
||||
Add `ILogger` to `NatsParser`. Add `TraceInOp(ReadOnlySpan<byte> op, ReadOnlySpan<byte> arg)` that logs at `LogLevel.Trace` after each command dispatch in `TryParse()`.
|
||||
|
||||
Controlled by `NatsOptions.Trace` (which sets log level filter).
|
||||
|
||||
**Files:** `NatsParser.cs`, `NatsOptions.cs`
|
||||
|
||||
### 3c. MIME Header Parsing
|
||||
|
||||
New `NatsHeaderParser` static class in `Protocol/`. Parses `NATS/1.0 <status> <description>\r\n` status line + `Key: Value\r\n` pairs.
|
||||
|
||||
Returns `NatsHeaders` readonly struct with `Status` (int), `Description` (string), and key-value `Dictionary<string, string[]>` lookup.
|
||||
|
||||
Used in `ProcessMessage()` for header inspection (no-responders status, future message tracing).
|
||||
|
||||
**Files:** New `Protocol/NatsHeaderParser.cs`, `NatsServer.cs`
|
||||
|
||||
### 3d. MSG/HMSG Construction Optimization
|
||||
|
||||
Pre-allocate buffers for MSG/HMSG prefix using `Span<byte>` and `Utf8Formatter` instead of string interpolation + `Encoding.UTF8.GetBytes()`. Reduces per-message allocations.
|
||||
|
||||
**Files:** `NatsClient.cs`
|
||||
|
||||
---
|
||||
|
||||
## Section 4 — Subscriptions & Subject Matching
|
||||
|
||||
### 4a. Atomic Generation ID for Cache Invalidation
|
||||
|
||||
Add `long _generation` to `SubList`. Increment via `Interlocked.Increment` on every `Insert()` and `Remove()`. Cache entries store generation at computation time. On `Match()`, stale generation = cache miss.
|
||||
|
||||
Replaces current explicit per-key cache removal. O(1) invalidation.
|
||||
|
||||
**Files:** `SubList.cs`
|
||||
|
||||
### 4b. Async Background Cache Sweep
|
||||
|
||||
Replace inline sweep (runs under write lock) with `PeriodicTimer`-based background task. Acquires write lock briefly to snapshot + evict stale entries. Triggered when cache count > 1024, sweeps to 256.
|
||||
|
||||
**Files:** `SubList.cs`
|
||||
|
||||
### 4c. `plist` Optimization for High-Fanout Nodes
|
||||
|
||||
On `TrieNode`, when `PlainSubs.Count > 256`, convert `HashSet<Subscription>` to `Subscription[]` flat array in `PList` field. `Match()` iterates `PList` when present. On count < 128, convert back to HashSet.
|
||||
|
||||
**Files:** `SubList.cs`
|
||||
|
||||
### 4d. SubList Utility Methods
|
||||
|
||||
| Method | Description |
|
||||
|--------|-------------|
|
||||
| `Stats()` | Returns `SubListStats` record with NumSubs, NumCache, NumInserts, NumRemoves, NumMatches, CacheHitRate, MaxFanout. Track via `Interlocked` counters. |
|
||||
| `HasInterest(string)` | Walk trie without building result, return true on first hit. |
|
||||
| `NumInterest(string)` | Walk trie, count without allocating result arrays. |
|
||||
| `ReverseMatch(string)` | Walk trie with literal tokens, collect all subscription patterns matching the literal. |
|
||||
| `RemoveBatch(IEnumerable<Subscription>)` | Single write-lock, batch removes, single generation bump. |
|
||||
| `All()` | Depth-first trie walk, yield all subscriptions. Returns `IReadOnlyList<Subscription>`. |
|
||||
| `MatchBytes(ReadOnlySpan<byte>)` | Zero-copy match using byte-based `TokenEnumerator`. |
|
||||
|
||||
**Files:** `SubList.cs`, new `SubListStats.cs`
|
||||
|
||||
### 4e. SubjectMatch Utilities
|
||||
|
||||
| Method | Description |
|
||||
|--------|-------------|
|
||||
| `SubjectsCollide(string, string)` | Token-by-token comparison handling `*` and `>`. Two patterns collide if any literal could match both. |
|
||||
| `TokenAt(string, int)` | Return nth dot-delimited token as `ReadOnlySpan<char>`. |
|
||||
| `NumTokens(string)` | Count dots + 1. |
|
||||
| UTF-8/null validation | Add `checkRunes` parameter to `IsValidSubject()`. When true, scan for `\0` bytes and validate UTF-8. |
|
||||
|
||||
**Files:** `SubjectMatch.cs`
|
||||
|
||||
### 4f. Per-Account Subscription Limits
|
||||
|
||||
Add `MaxSubscriptions` to `Account`. Track `SubscriptionCount` via `Interlocked`. Enforce in `ProcessSub()`. Close with `ClientClosedReason.MaxSubscriptionsExceeded`.
|
||||
|
||||
**Files:** `Account.cs`, `NatsClient.cs`
|
||||
|
||||
---
|
||||
|
||||
## Section 5 — Authentication & Authorization
|
||||
|
||||
### 5a. Deny List Enforcement at Delivery Time
|
||||
|
||||
In `NatsServer.DeliverMessage()`, before sending MSG, check `subscriber.Client.Permissions?.IsDeliveryAllowed(subject)`. New method checks publish deny list for the receiving client ("msg delivery filter"). Cache results in pub cache.
|
||||
|
||||
**Files:** `NatsServer.cs`, `ClientPermissions.cs`
|
||||
|
||||
### 5b. Permission Caching with 128-Entry LRU
|
||||
|
||||
Replace `ConcurrentDictionary<string, bool>` with custom `PermissionLruCache` (128 entries). `Dictionary<string, LinkedListNode>` + `LinkedList` for LRU ordering. Lock-protected (per-client, low contention).
|
||||
|
||||
**Files:** `ClientPermissions.cs`, new `Auth/PermissionLruCache.cs`
|
||||
|
||||
### 5c. Subscribe Deny Queue-Group Checking
|
||||
|
||||
`IsSubscribeAllowed(subject, queue)` checks queue group against subscribe deny list (currently ignores queue parameter).
|
||||
|
||||
**Files:** `ClientPermissions.cs`
|
||||
|
||||
### 5d. Response Permissions (Reply Tracking)
|
||||
|
||||
New `ResponseTracker` class on `NatsClient`. Created when `Permissions.Response` is non-null. Tracks reply subjects with TTL (`ResponsePermission.Expires`) and max count (`ResponsePermission.MaxMsgs`). `IsPublishAllowed()` consults tracker for reply subjects not in static allow list. Expired entries cleaned lazily + in ping timer.
|
||||
|
||||
**Files:** New `Auth/ResponseTracker.cs`, `ClientPermissions.cs`, `NatsClient.cs`
|
||||
|
||||
### 5e. Per-Account Connection Limits
|
||||
|
||||
Add `MaxConnections` to `Account`. Enforce in `ProcessConnectAsync()` after account assignment. Reject with `-ERR maximum connections for account exceeded`.
|
||||
|
||||
**Files:** `Account.cs`, `NatsClient.cs`
|
||||
|
||||
### 5f. Multi-Account User Resolution
|
||||
|
||||
Add `NatsOptions.Accounts` as `Dictionary<string, AccountConfig>` with per-account MaxConnections, MaxSubscriptions, DefaultPermissions. `AuthService` resolves account name to configured `Account` with limits.
|
||||
|
||||
**Files:** `NatsOptions.cs`, new `Auth/AccountConfig.cs`, `AuthService.cs`, `NatsServer.cs`
|
||||
|
||||
### 5g. Auth Expiry Enforcement
|
||||
|
||||
In `ProcessConnectAsync()`, if `AuthResult.Expiry` is set, start `CancellationTokenSource.CancelAfter(expiry - now)`. Link to client lifetime. On fire, close with `ClientClosedReason.AuthenticationExpired`.
|
||||
|
||||
**Files:** `NatsClient.cs`
|
||||
|
||||
### 5h. Auto-Unsub Cleanup
|
||||
|
||||
In `DeliverMessage()`, when `sub.MessageCount >= sub.MaxMessages`, call `sub.Client.RemoveSubscription(sub.Sid)` and `subList.Remove(sub)` to clean up both tracking dict and trie. Currently only skips delivery.
|
||||
|
||||
**Files:** `NatsServer.cs`, `NatsClient.cs`
|
||||
|
||||
---
|
||||
|
||||
## Section 6 — Configuration
|
||||
|
||||
### 6a. Debug/Trace CLI Flags
|
||||
|
||||
Add `-D`/`--debug`, `-V`/`--trace`, `-DV` to `Program.cs`. Set `NatsOptions.Debug` and `NatsOptions.Trace`. Wire into Serilog minimum level.
|
||||
|
||||
**Files:** `Program.cs`
|
||||
|
||||
### 6b. New NatsOptions Fields
|
||||
|
||||
| Field | Type | Default | Purpose |
|
||||
|-------|------|---------|---------|
|
||||
| `MaxSubs` | int | 0 (unlimited) | Per-connection subscription limit |
|
||||
| `MaxSubTokens` | int | 0 (unlimited) | Max tokens in a subject |
|
||||
| `Debug` | bool | false | Enable debug-level logging |
|
||||
| `Trace` | bool | false | Enable trace-level protocol logging |
|
||||
| `LogFile` | string? | null | Log to file instead of console |
|
||||
| `LogSizeLimit` | long | 0 | Max log file size before rotation |
|
||||
| `Tags` | Dictionary<string, string>? | null | Server metadata tags |
|
||||
|
||||
**Files:** `NatsOptions.cs`
|
||||
|
||||
### 6c. Logging Options Wiring
|
||||
|
||||
In `Program.cs`, if `LogFile` is set, add Serilog `File` sink with `LogSizeLimit`. If `Debug`/`Trace`, override Serilog minimum level.
|
||||
|
||||
**Files:** `Program.cs`
|
||||
|
||||
---
|
||||
|
||||
## Implementation Strategy
|
||||
|
||||
Execute in a git worktree. Parallelize where files don't overlap:
|
||||
|
||||
- **Parallel batch 1:** SubjectMatch utilities (4e) | NatsOptions + CLI flags (6a, 6b) | NatsHeaderParser (3c) | PermissionLruCache (5b) | SubListStats (4d stats class)
|
||||
- **Parallel batch 2:** SubList overhaul (4a, 4b, 4c, 4d methods) | Account limits + config (4f, 5e, 5f, 5g) | Response tracker (5d)
|
||||
- **Parallel batch 3:** Protocol tracing (3b) | INFO caching (3a) | MSG optimization (3d)
|
||||
- **Sequential:** Delivery-time enforcement (5a, 5c, 5h) — touches NatsServer.cs + ClientPermissions.cs, must be coordinated
|
||||
- **Final:** Logging wiring (6c) | differences.md update
|
||||
|
||||
Tests added alongside each feature in the appropriate test file.
|
||||
2384
docs/plans/2026-02-23-sections3-6-gaps-plan.md
Normal file
2384
docs/plans/2026-02-23-sections3-6-gaps-plan.md
Normal file
File diff suppressed because it is too large
Load Diff
20
docs/plans/2026-02-23-sections3-6-gaps-plan.md.tasks.json
Normal file
20
docs/plans/2026-02-23-sections3-6-gaps-plan.md.tasks.json
Normal file
@@ -0,0 +1,20 @@
|
||||
{
|
||||
"planPath": "docs/plans/2026-02-23-sections3-6-gaps-plan.md",
|
||||
"tasks": [
|
||||
{"id": 6, "subject": "Task 1: NatsOptions — Add New Configuration Fields", "status": "pending"},
|
||||
{"id": 7, "subject": "Task 2: CLI Flags — Add -D/-V/-DV and Logging Options", "status": "pending", "blockedBy": [6]},
|
||||
{"id": 8, "subject": "Task 3: SubjectMatch Utilities", "status": "pending"},
|
||||
{"id": 9, "subject": "Task 4: SubList — Generation ID, Stats, and Utility Methods", "status": "pending"},
|
||||
{"id": 10, "subject": "Task 5: NatsHeaderParser — MIME Header Parsing", "status": "pending"},
|
||||
{"id": 11, "subject": "Task 6: PermissionLruCache — 128-Entry LRU", "status": "pending"},
|
||||
{"id": 12, "subject": "Task 7: Account Limits and AccountConfig", "status": "pending", "blockedBy": [6]},
|
||||
{"id": 13, "subject": "Task 8: MaxSubs Enforcement, Subscribe Deny Queue, Delivery-Time Deny", "status": "pending", "blockedBy": [11, 12]},
|
||||
{"id": 14, "subject": "Task 9: Response Permissions (Reply Tracking)", "status": "pending", "blockedBy": [11, 13]},
|
||||
{"id": 15, "subject": "Task 10: Auth Expiry Enforcement", "status": "pending"},
|
||||
{"id": 16, "subject": "Task 11: INFO Serialization Caching", "status": "pending"},
|
||||
{"id": 17, "subject": "Task 12: Protocol Tracing", "status": "pending", "blockedBy": [6]},
|
||||
{"id": 18, "subject": "Task 13: MSG/HMSG Construction Optimization", "status": "pending"},
|
||||
{"id": 19, "subject": "Task 14: Verify, Run Full Test Suite, Update differences.md", "status": "pending", "blockedBy": [13, 14, 15, 16, 17, 18]}
|
||||
],
|
||||
"lastUpdated": "2026-02-23T00:00:00Z"
|
||||
}
|
||||
@@ -11,6 +11,7 @@
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Serilog.Extensions.Hosting" />
|
||||
<PackageReference Include="Serilog.Sinks.Console" />
|
||||
<PackageReference Include="Serilog.Sinks.File" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
|
||||
@@ -1,12 +1,6 @@
|
||||
using NATS.Server;
|
||||
using Serilog;
|
||||
|
||||
Log.Logger = new LoggerConfiguration()
|
||||
.MinimumLevel.Debug()
|
||||
.Enrich.FromLogContext()
|
||||
.WriteTo.Console(outputTemplate: "[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj}{NewLine}{Exception}")
|
||||
.CreateLogger();
|
||||
|
||||
var options = new NatsOptions();
|
||||
|
||||
// Simple CLI argument parsing
|
||||
@@ -55,9 +49,47 @@ for (int i = 0; i < args.Length; i++)
|
||||
case "--tlsverify":
|
||||
options.TlsVerify = true;
|
||||
break;
|
||||
case "-D" or "--debug":
|
||||
options.Debug = true;
|
||||
break;
|
||||
case "-V" or "--trace":
|
||||
options.Trace = true;
|
||||
break;
|
||||
case "-DV":
|
||||
options.Debug = true;
|
||||
options.Trace = true;
|
||||
break;
|
||||
case "-l" or "--log" when i + 1 < args.Length:
|
||||
options.LogFile = args[++i];
|
||||
break;
|
||||
case "--log_size_limit" when i + 1 < args.Length:
|
||||
options.LogSizeLimit = long.Parse(args[++i]);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Configure Serilog based on options
|
||||
var logConfig = new LoggerConfiguration()
|
||||
.Enrich.FromLogContext()
|
||||
.WriteTo.Console(outputTemplate: "[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj}{NewLine}{Exception}");
|
||||
|
||||
if (options.Trace)
|
||||
logConfig.MinimumLevel.Verbose();
|
||||
else if (options.Debug)
|
||||
logConfig.MinimumLevel.Debug();
|
||||
else
|
||||
logConfig.MinimumLevel.Information();
|
||||
|
||||
if (options.LogFile != null)
|
||||
{
|
||||
logConfig.WriteTo.File(
|
||||
options.LogFile,
|
||||
fileSizeLimitBytes: options.LogSizeLimit > 0 ? options.LogSizeLimit : null,
|
||||
rollOnFileSizeLimit: options.LogSizeLimit > 0);
|
||||
}
|
||||
|
||||
Log.Logger = logConfig.CreateLogger();
|
||||
|
||||
using var loggerFactory = new Serilog.Extensions.Logging.SerilogLoggerFactory(Log.Logger);
|
||||
using var server = new NatsServer(options, loggerFactory);
|
||||
|
||||
|
||||
@@ -10,8 +10,11 @@ public sealed class Account : IDisposable
|
||||
public string Name { get; }
|
||||
public SubList SubList { get; } = new();
|
||||
public Permissions? DefaultPermissions { get; set; }
|
||||
public int MaxConnections { get; set; } // 0 = unlimited
|
||||
public int MaxSubscriptions { get; set; } // 0 = unlimited
|
||||
|
||||
private readonly ConcurrentDictionary<ulong, byte> _clients = new();
|
||||
private int _subscriptionCount;
|
||||
|
||||
public Account(string name)
|
||||
{
|
||||
@@ -19,10 +22,31 @@ public sealed class Account : IDisposable
|
||||
}
|
||||
|
||||
public int ClientCount => _clients.Count;
|
||||
public int SubscriptionCount => Volatile.Read(ref _subscriptionCount);
|
||||
|
||||
public void AddClient(ulong clientId) => _clients[clientId] = 0;
|
||||
/// <summary>Returns false if max connections exceeded.</summary>
|
||||
public bool AddClient(ulong clientId)
|
||||
{
|
||||
if (MaxConnections > 0 && _clients.Count >= MaxConnections)
|
||||
return false;
|
||||
_clients[clientId] = 0;
|
||||
return true;
|
||||
}
|
||||
|
||||
public void RemoveClient(ulong clientId) => _clients.TryRemove(clientId, out _);
|
||||
|
||||
public bool IncrementSubscriptions()
|
||||
{
|
||||
if (MaxSubscriptions > 0 && Volatile.Read(ref _subscriptionCount) >= MaxSubscriptions)
|
||||
return false;
|
||||
Interlocked.Increment(ref _subscriptionCount);
|
||||
return true;
|
||||
}
|
||||
|
||||
public void DecrementSubscriptions()
|
||||
{
|
||||
Interlocked.Decrement(ref _subscriptionCount);
|
||||
}
|
||||
|
||||
public void Dispose() => SubList.Dispose();
|
||||
}
|
||||
|
||||
8
src/NATS.Server/Auth/AccountConfig.cs
Normal file
8
src/NATS.Server/Auth/AccountConfig.cs
Normal file
@@ -0,0 +1,8 @@
|
||||
namespace NATS.Server.Auth;
|
||||
|
||||
public sealed class AccountConfig
|
||||
{
|
||||
public int MaxConnections { get; init; } // 0 = unlimited
|
||||
public int MaxSubscriptions { get; init; } // 0 = unlimited
|
||||
public Permissions? DefaultPermissions { get; init; }
|
||||
}
|
||||
@@ -1,4 +1,3 @@
|
||||
using System.Collections.Concurrent;
|
||||
using NATS.Server.Subscriptions;
|
||||
|
||||
namespace NATS.Server.Auth;
|
||||
@@ -7,12 +6,14 @@ public sealed class ClientPermissions : IDisposable
|
||||
{
|
||||
private readonly PermissionSet? _publish;
|
||||
private readonly PermissionSet? _subscribe;
|
||||
private readonly ConcurrentDictionary<string, bool> _pubCache = new(StringComparer.Ordinal);
|
||||
private readonly ResponseTracker? _responseTracker;
|
||||
private readonly PermissionLruCache _pubCache = new(128);
|
||||
|
||||
private ClientPermissions(PermissionSet? publish, PermissionSet? subscribe)
|
||||
private ClientPermissions(PermissionSet? publish, PermissionSet? subscribe, ResponseTracker? responseTracker)
|
||||
{
|
||||
_publish = publish;
|
||||
_subscribe = subscribe;
|
||||
_responseTracker = responseTracker;
|
||||
}
|
||||
|
||||
public static ClientPermissions? Build(Permissions? permissions)
|
||||
@@ -22,27 +23,55 @@ public sealed class ClientPermissions : IDisposable
|
||||
|
||||
var pub = PermissionSet.Build(permissions.Publish);
|
||||
var sub = PermissionSet.Build(permissions.Subscribe);
|
||||
ResponseTracker? responseTracker = null;
|
||||
if (permissions.Response != null)
|
||||
responseTracker = new ResponseTracker(permissions.Response.MaxMsgs, permissions.Response.Expires);
|
||||
|
||||
if (pub == null && sub == null)
|
||||
if (pub == null && sub == null && responseTracker == null)
|
||||
return null;
|
||||
|
||||
return new ClientPermissions(pub, sub);
|
||||
return new ClientPermissions(pub, sub, responseTracker);
|
||||
}
|
||||
|
||||
public ResponseTracker? ResponseTracker => _responseTracker;
|
||||
|
||||
public bool IsPublishAllowed(string subject)
|
||||
{
|
||||
if (_publish == null)
|
||||
return true;
|
||||
|
||||
return _pubCache.GetOrAdd(subject, _publish.IsAllowed);
|
||||
if (_pubCache.TryGet(subject, out var cached))
|
||||
return cached;
|
||||
|
||||
var allowed = _publish.IsAllowed(subject);
|
||||
|
||||
// If denied but response tracking is enabled, check reply table
|
||||
if (!allowed && _responseTracker != null)
|
||||
{
|
||||
if (_responseTracker.IsReplyAllowed(subject))
|
||||
return true; // Don't cache dynamic reply permissions
|
||||
}
|
||||
|
||||
_pubCache.Set(subject, allowed);
|
||||
return allowed;
|
||||
}
|
||||
|
||||
public bool IsSubscribeAllowed(string subject, string? queue = null)
|
||||
{
|
||||
if (_subscribe == null)
|
||||
return true;
|
||||
if (!_subscribe.IsAllowed(subject))
|
||||
return false;
|
||||
if (queue != null && _subscribe.IsDenied(queue))
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
return _subscribe.IsAllowed(subject);
|
||||
public bool IsDeliveryAllowed(string subject)
|
||||
{
|
||||
if (_subscribe == null)
|
||||
return true;
|
||||
return _subscribe.IsDeliveryAllowed(subject);
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
@@ -113,6 +142,21 @@ public sealed class PermissionSet : IDisposable
|
||||
return allowed;
|
||||
}
|
||||
|
||||
public bool IsDenied(string subject)
|
||||
{
|
||||
if (_deny == null) return false;
|
||||
var result = _deny.Match(subject);
|
||||
return result.PlainSubs.Length > 0 || result.QueueSubs.Length > 0;
|
||||
}
|
||||
|
||||
public bool IsDeliveryAllowed(string subject)
|
||||
{
|
||||
if (_deny == null)
|
||||
return true;
|
||||
var result = _deny.Match(subject);
|
||||
return result.PlainSubs.Length == 0 && result.QueueSubs.Length == 0;
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
_allow?.Dispose();
|
||||
|
||||
73
src/NATS.Server/Auth/PermissionLruCache.cs
Normal file
73
src/NATS.Server/Auth/PermissionLruCache.cs
Normal file
@@ -0,0 +1,73 @@
|
||||
namespace NATS.Server.Auth;
|
||||
|
||||
/// <summary>
|
||||
/// Fixed-capacity LRU cache for permission results.
|
||||
/// Lock-protected (per-client, low contention).
|
||||
/// Reference: Go client.go maxPermCacheSize=128.
|
||||
/// </summary>
|
||||
public sealed class PermissionLruCache
|
||||
{
|
||||
private readonly int _capacity;
|
||||
private readonly Dictionary<string, LinkedListNode<(string Key, bool Value)>> _map;
|
||||
private readonly LinkedList<(string Key, bool Value)> _list = new();
|
||||
private readonly object _lock = new();
|
||||
|
||||
public PermissionLruCache(int capacity = 128)
|
||||
{
|
||||
_capacity = capacity;
|
||||
_map = new Dictionary<string, LinkedListNode<(string Key, bool Value)>>(capacity, StringComparer.Ordinal);
|
||||
}
|
||||
|
||||
public bool TryGet(string key, out bool value)
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
if (_map.TryGetValue(key, out var node))
|
||||
{
|
||||
value = node.Value.Value;
|
||||
_list.Remove(node);
|
||||
_list.AddFirst(node);
|
||||
return true;
|
||||
}
|
||||
|
||||
value = default;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public int Count
|
||||
{
|
||||
get
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
return _map.Count;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void Set(string key, bool value)
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
if (_map.TryGetValue(key, out var existing))
|
||||
{
|
||||
_list.Remove(existing);
|
||||
existing.Value = (key, value);
|
||||
_list.AddFirst(existing);
|
||||
return;
|
||||
}
|
||||
|
||||
if (_map.Count >= _capacity)
|
||||
{
|
||||
var last = _list.Last!;
|
||||
_map.Remove(last.Value.Key);
|
||||
_list.RemoveLast();
|
||||
}
|
||||
|
||||
var node = new LinkedListNode<(string Key, bool Value)>((key, value));
|
||||
_list.AddFirst(node);
|
||||
_map[key] = node;
|
||||
}
|
||||
}
|
||||
}
|
||||
78
src/NATS.Server/Auth/ResponseTracker.cs
Normal file
78
src/NATS.Server/Auth/ResponseTracker.cs
Normal file
@@ -0,0 +1,78 @@
|
||||
namespace NATS.Server.Auth;
|
||||
|
||||
/// <summary>
|
||||
/// Tracks reply subjects that a client is temporarily allowed to publish to.
|
||||
/// Reference: Go client.go resp struct, setResponsePermissionIfNeeded.
|
||||
/// </summary>
|
||||
public sealed class ResponseTracker
|
||||
{
|
||||
private readonly int _maxMsgs; // 0 = unlimited
|
||||
private readonly TimeSpan _expires; // TimeSpan.Zero = no TTL
|
||||
private readonly Dictionary<string, (DateTime RegisteredAt, int Count)> _replies = new(StringComparer.Ordinal);
|
||||
private readonly object _lock = new();
|
||||
|
||||
public ResponseTracker(int maxMsgs, TimeSpan expires)
|
||||
{
|
||||
_maxMsgs = maxMsgs;
|
||||
_expires = expires;
|
||||
}
|
||||
|
||||
public int Count
|
||||
{
|
||||
get { lock (_lock) return _replies.Count; }
|
||||
}
|
||||
|
||||
public void RegisterReply(string replySubject)
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
_replies[replySubject] = (DateTime.UtcNow, 0);
|
||||
}
|
||||
}
|
||||
|
||||
public bool IsReplyAllowed(string subject)
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
if (!_replies.TryGetValue(subject, out var entry))
|
||||
return false;
|
||||
|
||||
if (_expires > TimeSpan.Zero && DateTime.UtcNow - entry.RegisteredAt > _expires)
|
||||
{
|
||||
_replies.Remove(subject);
|
||||
return false;
|
||||
}
|
||||
|
||||
var newCount = entry.Count + 1;
|
||||
if (_maxMsgs > 0 && newCount > _maxMsgs)
|
||||
{
|
||||
_replies.Remove(subject);
|
||||
return false;
|
||||
}
|
||||
|
||||
_replies[subject] = (entry.RegisteredAt, newCount);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
public void Prune()
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
if (_expires <= TimeSpan.Zero && _maxMsgs <= 0)
|
||||
return;
|
||||
|
||||
var now = DateTime.UtcNow;
|
||||
var toRemove = new List<string>();
|
||||
foreach (var (key, entry) in _replies)
|
||||
{
|
||||
if (_expires > TimeSpan.Zero && now - entry.RegisteredAt > _expires)
|
||||
toRemove.Add(key);
|
||||
else if (_maxMsgs > 0 && entry.Count >= _maxMsgs)
|
||||
toRemove.Add(key);
|
||||
}
|
||||
foreach (var key in toRemove)
|
||||
_replies.Remove(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -23,6 +23,7 @@ public enum ClientClosedReason
|
||||
ServerShutdown,
|
||||
MsgHeaderViolation,
|
||||
NoRespondersRequiresHeaders,
|
||||
AuthenticationExpired,
|
||||
}
|
||||
|
||||
public static class ClientClosedReasonExtensions
|
||||
@@ -46,6 +47,7 @@ public static class ClientClosedReasonExtensions
|
||||
ClientClosedReason.ServerShutdown => "Server Shutdown",
|
||||
ClientClosedReason.MsgHeaderViolation => "Message Header Violation",
|
||||
ClientClosedReason.NoRespondersRequiresHeaders => "No Responders Requires Headers",
|
||||
ClientClosedReason.AuthenticationExpired => "Authentication Expired",
|
||||
_ => reason.ToString(),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -48,6 +48,7 @@ public sealed class NatsClient : IDisposable
|
||||
public ClientOptions? ClientOpts { get; private set; }
|
||||
public IMessageRouter? Router { get; set; }
|
||||
public Account? Account { get; private set; }
|
||||
public ClientPermissions? Permissions => _permissions;
|
||||
|
||||
private readonly ClientFlagHolder _flags = new();
|
||||
public bool ConnectReceived => _flags.HasFlag(ClientFlags.ConnectReceived);
|
||||
@@ -90,7 +91,7 @@ public sealed class NatsClient : IDisposable
|
||||
_nonce = nonce;
|
||||
_logger = logger;
|
||||
_serverStats = serverStats;
|
||||
_parser = new NatsParser(options.MaxPayload);
|
||||
_parser = new NatsParser(options.MaxPayload, options.Trace ? logger : null);
|
||||
StartTime = DateTime.UtcNow;
|
||||
_lastActivityTicks = StartTime.Ticks;
|
||||
if (socket.RemoteEndPoint is IPEndPoint ep)
|
||||
@@ -348,6 +349,7 @@ public sealed class NatsClient : IDisposable
|
||||
?? new ClientOptions();
|
||||
|
||||
// Authenticate if auth is required
|
||||
AuthResult? authResult = null;
|
||||
if (_authService.IsAuthRequired)
|
||||
{
|
||||
var context = new ClientAuthContext
|
||||
@@ -356,8 +358,8 @@ public sealed class NatsClient : IDisposable
|
||||
Nonce = _nonce ?? [],
|
||||
};
|
||||
|
||||
var result = _authService.Authenticate(context);
|
||||
if (result == null)
|
||||
authResult = _authService.Authenticate(context);
|
||||
if (authResult == null)
|
||||
{
|
||||
_logger.LogWarning("Client {ClientId} authentication failed", Id);
|
||||
await SendErrAndCloseAsync(NatsProtocol.ErrAuthorizationViolation, ClientClosedReason.AuthenticationViolation);
|
||||
@@ -365,17 +367,23 @@ public sealed class NatsClient : IDisposable
|
||||
}
|
||||
|
||||
// Build permissions from auth result
|
||||
_permissions = ClientPermissions.Build(result.Permissions);
|
||||
_permissions = ClientPermissions.Build(authResult.Permissions);
|
||||
|
||||
// Resolve account
|
||||
if (Router is NatsServer server)
|
||||
{
|
||||
var accountName = result.AccountName ?? Account.GlobalAccountName;
|
||||
var accountName = authResult.AccountName ?? Account.GlobalAccountName;
|
||||
Account = server.GetOrCreateAccount(accountName);
|
||||
Account.AddClient(Id);
|
||||
if (!Account.AddClient(Id))
|
||||
{
|
||||
Account = null;
|
||||
await SendErrAndCloseAsync("maximum connections for account exceeded",
|
||||
ClientClosedReason.AuthenticationViolation);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
_logger.LogDebug("Client {ClientId} authenticated as {Identity}", Id, result.Identity);
|
||||
_logger.LogDebug("Client {ClientId} authenticated as {Identity}", Id, authResult.Identity);
|
||||
|
||||
// Clear nonce after use -- defense-in-depth against memory dumps
|
||||
if (_nonce != null)
|
||||
@@ -386,7 +394,13 @@ public sealed class NatsClient : IDisposable
|
||||
if (Account == null && Router is NatsServer server2)
|
||||
{
|
||||
Account = server2.GetOrCreateAccount(Account.GlobalAccountName);
|
||||
Account.AddClient(Id);
|
||||
if (!Account.AddClient(Id))
|
||||
{
|
||||
Account = null;
|
||||
await SendErrAndCloseAsync("maximum connections for account exceeded",
|
||||
ClientClosedReason.AuthenticationViolation);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Validate no_responders requires headers
|
||||
@@ -401,6 +415,32 @@ public sealed class NatsClient : IDisposable
|
||||
_flags.SetFlag(ClientFlags.ConnectReceived);
|
||||
_flags.SetFlag(ClientFlags.ConnectProcessFinished);
|
||||
_logger.LogDebug("CONNECT received from client {ClientId}, name={ClientName}", Id, ClientOpts?.Name);
|
||||
|
||||
// Start auth expiry timer if needed
|
||||
if (_authService.IsAuthRequired && authResult?.Expiry is { } expiry)
|
||||
{
|
||||
var remaining = expiry - DateTimeOffset.UtcNow;
|
||||
if (remaining > TimeSpan.Zero)
|
||||
{
|
||||
_ = Task.Run(async () =>
|
||||
{
|
||||
try
|
||||
{
|
||||
await Task.Delay(remaining, _clientCts!.Token);
|
||||
_logger.LogDebug("Client {ClientId} authentication expired", Id);
|
||||
await SendErrAndCloseAsync("Authentication Expired",
|
||||
ClientClosedReason.AuthenticationExpired);
|
||||
}
|
||||
catch (OperationCanceledException) { }
|
||||
}, _clientCts!.Token);
|
||||
}
|
||||
else
|
||||
{
|
||||
await SendErrAndCloseAsync("Authentication Expired",
|
||||
ClientClosedReason.AuthenticationExpired);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void ProcessSub(ParsedCommand cmd)
|
||||
@@ -413,6 +453,24 @@ public sealed class NatsClient : IDisposable
|
||||
return;
|
||||
}
|
||||
|
||||
// Per-connection subscription limit
|
||||
if (_options.MaxSubs > 0 && _subs.Count >= _options.MaxSubs)
|
||||
{
|
||||
_logger.LogDebug("Client {ClientId} max subscriptions exceeded", Id);
|
||||
_ = SendErrAndCloseAsync(NatsProtocol.ErrMaxSubscriptionsExceeded,
|
||||
ClientClosedReason.MaxSubscriptionsExceeded);
|
||||
return;
|
||||
}
|
||||
|
||||
// Per-account subscription limit
|
||||
if (Account != null && !Account.IncrementSubscriptions())
|
||||
{
|
||||
_logger.LogDebug("Client {ClientId} account subscription limit exceeded", Id);
|
||||
_ = SendErrAndCloseAsync(NatsProtocol.ErrMaxSubscriptionsExceeded,
|
||||
ClientClosedReason.MaxSubscriptionsExceeded);
|
||||
return;
|
||||
}
|
||||
|
||||
var sub = new Subscription
|
||||
{
|
||||
Subject = cmd.Subject!,
|
||||
@@ -443,6 +501,7 @@ public sealed class NatsClient : IDisposable
|
||||
}
|
||||
|
||||
_subs.Remove(cmd.Sid!);
|
||||
Account?.DecrementSubscriptions();
|
||||
|
||||
Account?.SubList.Remove(sub);
|
||||
}
|
||||
@@ -491,9 +550,20 @@ public sealed class NatsClient : IDisposable
|
||||
|
||||
private void SendInfo()
|
||||
{
|
||||
var infoJson = JsonSerializer.Serialize(_serverInfo);
|
||||
var infoLine = Encoding.ASCII.GetBytes($"INFO {infoJson}\r\n");
|
||||
QueueOutbound(infoLine);
|
||||
// Use the cached INFO bytes from the server when there is no per-connection
|
||||
// nonce (i.e. NKey auth is not active for this connection). When a nonce is
|
||||
// present the _serverInfo was already cloned with the nonce embedded, so we
|
||||
// must serialise it individually.
|
||||
if (_nonce == null && Router is NatsServer server)
|
||||
{
|
||||
QueueOutbound(server.CachedInfoLine);
|
||||
}
|
||||
else
|
||||
{
|
||||
var infoJson = JsonSerializer.Serialize(_serverInfo);
|
||||
var infoLine = Encoding.ASCII.GetBytes($"INFO {infoJson}\r\n");
|
||||
QueueOutbound(infoLine);
|
||||
}
|
||||
}
|
||||
|
||||
public void SendMessage(string subject, string sid, string? replyTo,
|
||||
@@ -504,26 +574,78 @@ public sealed class NatsClient : IDisposable
|
||||
Interlocked.Increment(ref _serverStats.OutMsgs);
|
||||
Interlocked.Add(ref _serverStats.OutBytes, payload.Length + headers.Length);
|
||||
|
||||
byte[] line;
|
||||
// Estimate control line size
|
||||
var estimatedLineSize = 5 + subject.Length + 1 + sid.Length + 1
|
||||
+ (replyTo != null ? replyTo.Length + 1 : 0) + 20 + 2;
|
||||
|
||||
var totalPayloadLen = headers.Length + payload.Length;
|
||||
var totalLen = estimatedLineSize + totalPayloadLen + 2;
|
||||
var buffer = new byte[totalLen];
|
||||
var span = buffer.AsSpan();
|
||||
int pos = 0;
|
||||
|
||||
// Write prefix
|
||||
if (headers.Length > 0)
|
||||
{
|
||||
int totalSize = headers.Length + payload.Length;
|
||||
line = Encoding.ASCII.GetBytes($"HMSG {subject} {sid} {(replyTo != null ? replyTo + " " : "")}{headers.Length} {totalSize}\r\n");
|
||||
"HMSG "u8.CopyTo(span);
|
||||
pos = 5;
|
||||
}
|
||||
else
|
||||
{
|
||||
line = Encoding.ASCII.GetBytes($"MSG {subject} {sid} {(replyTo != null ? replyTo + " " : "")}{payload.Length}\r\n");
|
||||
"MSG "u8.CopyTo(span);
|
||||
pos = 4;
|
||||
}
|
||||
|
||||
var totalLen = line.Length + headers.Length + payload.Length + NatsProtocol.CrLf.Length;
|
||||
var msg = new byte[totalLen];
|
||||
var offset = 0;
|
||||
line.CopyTo(msg.AsSpan(offset)); offset += line.Length;
|
||||
if (headers.Length > 0) { headers.Span.CopyTo(msg.AsSpan(offset)); offset += headers.Length; }
|
||||
if (payload.Length > 0) { payload.Span.CopyTo(msg.AsSpan(offset)); offset += payload.Length; }
|
||||
NatsProtocol.CrLf.CopyTo(msg.AsSpan(offset));
|
||||
// Subject
|
||||
pos += Encoding.ASCII.GetBytes(subject, span[pos..]);
|
||||
span[pos++] = (byte)' ';
|
||||
|
||||
QueueOutbound(msg);
|
||||
// SID
|
||||
pos += Encoding.ASCII.GetBytes(sid, span[pos..]);
|
||||
span[pos++] = (byte)' ';
|
||||
|
||||
// Reply-to
|
||||
if (replyTo != null)
|
||||
{
|
||||
pos += Encoding.ASCII.GetBytes(replyTo, span[pos..]);
|
||||
span[pos++] = (byte)' ';
|
||||
}
|
||||
|
||||
// Sizes
|
||||
if (headers.Length > 0)
|
||||
{
|
||||
int totalSize = headers.Length + payload.Length;
|
||||
headers.Length.TryFormat(span[pos..], out int written);
|
||||
pos += written;
|
||||
span[pos++] = (byte)' ';
|
||||
totalSize.TryFormat(span[pos..], out written);
|
||||
pos += written;
|
||||
}
|
||||
else
|
||||
{
|
||||
payload.Length.TryFormat(span[pos..], out int written);
|
||||
pos += written;
|
||||
}
|
||||
|
||||
// CRLF
|
||||
span[pos++] = (byte)'\r';
|
||||
span[pos++] = (byte)'\n';
|
||||
|
||||
// Headers + payload + trailing CRLF
|
||||
if (headers.Length > 0)
|
||||
{
|
||||
headers.Span.CopyTo(span[pos..]);
|
||||
pos += headers.Length;
|
||||
}
|
||||
if (payload.Length > 0)
|
||||
{
|
||||
payload.Span.CopyTo(span[pos..]);
|
||||
pos += payload.Length;
|
||||
}
|
||||
span[pos++] = (byte)'\r';
|
||||
span[pos++] = (byte)'\n';
|
||||
|
||||
QueueOutbound(buffer.AsMemory(0, pos));
|
||||
}
|
||||
|
||||
private void WriteProtocol(byte[] data)
|
||||
@@ -689,6 +811,12 @@ public sealed class NatsClient : IDisposable
|
||||
catch (ObjectDisposedException) { }
|
||||
}
|
||||
|
||||
public void RemoveSubscription(string sid)
|
||||
{
|
||||
if (_subs.Remove(sid))
|
||||
Account?.DecrementSubscriptions();
|
||||
}
|
||||
|
||||
public void RemoveAllSubscriptions(SubList subList)
|
||||
{
|
||||
foreach (var sub in _subs.Values)
|
||||
|
||||
@@ -16,6 +16,22 @@ public sealed class NatsOptions
|
||||
public TimeSpan PingInterval { get; set; } = TimeSpan.FromMinutes(2);
|
||||
public int MaxPingsOut { get; set; } = 2;
|
||||
|
||||
// Subscription limits
|
||||
public int MaxSubs { get; set; } // 0 = unlimited (per-connection)
|
||||
public int MaxSubTokens { get; set; } // 0 = unlimited
|
||||
|
||||
// Logging / diagnostics
|
||||
public bool Debug { get; set; }
|
||||
public bool Trace { get; set; }
|
||||
public string? LogFile { get; set; }
|
||||
public long LogSizeLimit { get; set; }
|
||||
|
||||
// Server tags (exposed via /varz)
|
||||
public Dictionary<string, string>? Tags { get; set; }
|
||||
|
||||
// Account configuration
|
||||
public Dictionary<string, AccountConfig>? Accounts { get; set; }
|
||||
|
||||
// Simple auth (single user)
|
||||
public string? Username { get; set; }
|
||||
public string? Password { get; set; }
|
||||
|
||||
@@ -43,6 +43,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
|
||||
private int _lameDuck;
|
||||
|
||||
private byte[] _cachedInfoLine = [];
|
||||
|
||||
private readonly List<PosixSignalRegistration> _signalRegistrations = [];
|
||||
|
||||
private string? _portsFilePath;
|
||||
@@ -51,6 +53,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
private static readonly TimeSpan AcceptMaxSleep = TimeSpan.FromSeconds(1);
|
||||
|
||||
public SubList SubList => _globalAccount.SubList;
|
||||
public byte[] CachedInfoLine => _cachedInfoLine;
|
||||
public ServerStats Stats => _stats;
|
||||
public DateTime StartTime => new(Interlocked.Read(ref _startTimeTicks), DateTimeKind.Utc);
|
||||
public string ServerId => _serverInfo.ServerId;
|
||||
@@ -272,6 +275,14 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
if (options.TlsRateLimit > 0)
|
||||
_tlsRateLimiter = new TlsRateLimiter(options.TlsRateLimit);
|
||||
}
|
||||
|
||||
BuildCachedInfo();
|
||||
}
|
||||
|
||||
private void BuildCachedInfo()
|
||||
{
|
||||
var infoJson = System.Text.Json.JsonSerializer.Serialize(_serverInfo);
|
||||
_cachedInfoLine = Encoding.ASCII.GetBytes($"INFO {infoJson}\r\n");
|
||||
}
|
||||
|
||||
public async Task StartAsync(CancellationToken ct)
|
||||
@@ -292,6 +303,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
var actualPort = ((IPEndPoint)_listener.LocalEndPoint!).Port;
|
||||
_options.Port = actualPort;
|
||||
_serverInfo.Port = actualPort;
|
||||
BuildCachedInfo();
|
||||
}
|
||||
|
||||
_listeningStarted.TrySetResult();
|
||||
@@ -523,7 +535,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
}
|
||||
}
|
||||
|
||||
private static void DeliverMessage(Subscription sub, string subject, string? replyTo,
|
||||
private void DeliverMessage(Subscription sub, string subject, string? replyTo,
|
||||
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
|
||||
{
|
||||
var client = sub.Client;
|
||||
@@ -532,9 +544,26 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
// Check auto-unsub
|
||||
var count = Interlocked.Increment(ref sub.MessageCount);
|
||||
if (sub.MaxMessages > 0 && count > sub.MaxMessages)
|
||||
{
|
||||
// Clean up exhausted subscription from trie and client tracking
|
||||
var subList = client.Account?.SubList ?? _globalAccount.SubList;
|
||||
subList.Remove(sub);
|
||||
client.RemoveSubscription(sub.Sid);
|
||||
return;
|
||||
}
|
||||
|
||||
// Deny-list delivery filter
|
||||
if (client.Permissions?.IsDeliveryAllowed(subject) == false)
|
||||
return;
|
||||
|
||||
client.SendMessage(subject, sub.Sid, replyTo, headers, payload);
|
||||
|
||||
// Track reply subject for response permissions
|
||||
if (replyTo != null && client.Permissions?.ResponseTracker != null)
|
||||
{
|
||||
if (client.Permissions.IsPublishAllowed(replyTo) == false)
|
||||
client.Permissions.ResponseTracker.RegisterReply(replyTo);
|
||||
}
|
||||
}
|
||||
|
||||
private static void SendNoResponders(NatsClient sender, string replyTo)
|
||||
@@ -569,7 +598,17 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
|
||||
public Account GetOrCreateAccount(string name)
|
||||
{
|
||||
return _accounts.GetOrAdd(name, n => new Account(n));
|
||||
return _accounts.GetOrAdd(name, n =>
|
||||
{
|
||||
var acc = new Account(n);
|
||||
if (_options.Accounts != null && _options.Accounts.TryGetValue(n, out var config))
|
||||
{
|
||||
acc.MaxConnections = config.MaxConnections;
|
||||
acc.MaxSubscriptions = config.MaxSubscriptions;
|
||||
acc.DefaultPermissions = config.DefaultPermissions;
|
||||
}
|
||||
return acc;
|
||||
});
|
||||
}
|
||||
|
||||
public void RemoveClient(NatsClient client)
|
||||
|
||||
108
src/NATS.Server/Protocol/NatsHeaderParser.cs
Normal file
108
src/NATS.Server/Protocol/NatsHeaderParser.cs
Normal file
@@ -0,0 +1,108 @@
|
||||
using System.Collections.ObjectModel;
|
||||
using System.Text;
|
||||
|
||||
namespace NATS.Server.Protocol;
|
||||
|
||||
public readonly struct NatsHeaders()
|
||||
{
|
||||
public int Status { get; init; }
|
||||
public string Description { get; init; } = string.Empty;
|
||||
public IReadOnlyDictionary<string, string[]> Headers { get; init; } = ReadOnlyDictionary<string, string[]>.Empty;
|
||||
|
||||
public static readonly NatsHeaders Invalid = new()
|
||||
{
|
||||
Status = -1,
|
||||
Description = string.Empty,
|
||||
Headers = ReadOnlyDictionary<string, string[]>.Empty,
|
||||
};
|
||||
}
|
||||
|
||||
public static class NatsHeaderParser
|
||||
{
|
||||
private static ReadOnlySpan<byte> CrLf => "\r\n"u8;
|
||||
private static ReadOnlySpan<byte> Prefix => "NATS/1.0"u8;
|
||||
|
||||
public static NatsHeaders Parse(ReadOnlySpan<byte> data)
|
||||
{
|
||||
if (data.Length < Prefix.Length)
|
||||
return NatsHeaders.Invalid;
|
||||
|
||||
if (!data[..Prefix.Length].SequenceEqual(Prefix))
|
||||
return NatsHeaders.Invalid;
|
||||
|
||||
int pos = Prefix.Length;
|
||||
int status = 0;
|
||||
string description = string.Empty;
|
||||
|
||||
// Parse status line: NATS/1.0[ status[ description]]\r\n
|
||||
int lineEnd = data[pos..].IndexOf(CrLf);
|
||||
if (lineEnd < 0)
|
||||
return NatsHeaders.Invalid;
|
||||
|
||||
var statusLine = data[pos..(pos + lineEnd)];
|
||||
pos += lineEnd + 2; // skip \r\n
|
||||
|
||||
if (statusLine.Length > 0)
|
||||
{
|
||||
int si = 0;
|
||||
while (si < statusLine.Length && statusLine[si] == (byte)' ')
|
||||
si++;
|
||||
|
||||
int numStart = si;
|
||||
while (si < statusLine.Length && statusLine[si] >= (byte)'0' && statusLine[si] <= (byte)'9')
|
||||
si++;
|
||||
|
||||
if (si > numStart && si - numStart <= 5) // max 5 digits to avoid overflow
|
||||
{
|
||||
for (int idx = numStart; idx < si; idx++)
|
||||
status = status * 10 + (statusLine[idx] - '0');
|
||||
|
||||
while (si < statusLine.Length && statusLine[si] == (byte)' ')
|
||||
si++;
|
||||
if (si < statusLine.Length)
|
||||
description = Encoding.ASCII.GetString(statusLine[si..]);
|
||||
}
|
||||
}
|
||||
|
||||
// Parse key-value headers until empty line
|
||||
var headers = new Dictionary<string, List<string>>(StringComparer.OrdinalIgnoreCase);
|
||||
while (pos < data.Length)
|
||||
{
|
||||
var remaining = data[pos..];
|
||||
if (remaining.Length >= 2 && remaining[0] == (byte)'\r' && remaining[1] == (byte)'\n')
|
||||
break;
|
||||
|
||||
lineEnd = remaining.IndexOf(CrLf);
|
||||
if (lineEnd < 0)
|
||||
break;
|
||||
|
||||
var headerLine = remaining[..lineEnd];
|
||||
pos += lineEnd + 2;
|
||||
|
||||
int colon = headerLine.IndexOf((byte)':');
|
||||
if (colon < 0)
|
||||
continue;
|
||||
|
||||
var key = Encoding.ASCII.GetString(headerLine[..colon]).Trim();
|
||||
var value = Encoding.ASCII.GetString(headerLine[(colon + 1)..]).Trim();
|
||||
|
||||
if (!headers.TryGetValue(key, out var values))
|
||||
{
|
||||
values = [];
|
||||
headers[key] = values;
|
||||
}
|
||||
values.Add(value);
|
||||
}
|
||||
|
||||
var result = new Dictionary<string, string[]>(headers.Count, StringComparer.OrdinalIgnoreCase);
|
||||
foreach (var (k, v) in headers)
|
||||
result[k] = v.ToArray();
|
||||
|
||||
return new NatsHeaders
|
||||
{
|
||||
Status = status,
|
||||
Description = description,
|
||||
Headers = result,
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
using System.Buffers;
|
||||
using System.Text;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace NATS.Server.Protocol;
|
||||
|
||||
@@ -35,6 +36,7 @@ public sealed class NatsParser
|
||||
{
|
||||
private static readonly byte[] CrLfBytes = "\r\n"u8.ToArray();
|
||||
private readonly int _maxPayload;
|
||||
private readonly ILogger? _logger;
|
||||
|
||||
// State for split-packet payload reading
|
||||
private bool _awaitingPayload;
|
||||
@@ -44,9 +46,20 @@ public sealed class NatsParser
|
||||
private int _pendingHeaderSize;
|
||||
private CommandType _pendingType;
|
||||
|
||||
public NatsParser(int maxPayload = NatsProtocol.MaxPayloadSize)
|
||||
public NatsParser(int maxPayload = NatsProtocol.MaxPayloadSize, ILogger? logger = null)
|
||||
{
|
||||
_maxPayload = maxPayload;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
private void TraceInOp(string op, ReadOnlySpan<byte> arg = default)
|
||||
{
|
||||
if (_logger == null || !_logger.IsEnabled(LogLevel.Trace))
|
||||
return;
|
||||
if (arg.IsEmpty)
|
||||
_logger.LogTrace("<<- {Op}", op);
|
||||
else
|
||||
_logger.LogTrace("<<- {Op} {Arg}", op, Encoding.ASCII.GetString(arg));
|
||||
}
|
||||
|
||||
public bool TryParse(ref ReadOnlySequence<byte> buffer, out ParsedCommand command)
|
||||
@@ -91,6 +104,7 @@ public sealed class NatsParser
|
||||
{
|
||||
command = ParsedCommand.Simple(CommandType.Ping);
|
||||
buffer = buffer.Slice(reader.Position);
|
||||
TraceInOp("PING");
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -98,6 +112,7 @@ public sealed class NatsParser
|
||||
{
|
||||
command = ParsedCommand.Simple(CommandType.Pong);
|
||||
buffer = buffer.Slice(reader.Position);
|
||||
TraceInOp("PONG");
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -121,6 +136,7 @@ public sealed class NatsParser
|
||||
{
|
||||
command = ParseSub(lineSpan);
|
||||
buffer = buffer.Slice(reader.Position);
|
||||
TraceInOp("SUB", lineSpan[4..]);
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -131,6 +147,7 @@ public sealed class NatsParser
|
||||
{
|
||||
command = ParseUnsub(lineSpan);
|
||||
buffer = buffer.Slice(reader.Position);
|
||||
TraceInOp("UNSUB", lineSpan[6..]);
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -141,6 +158,7 @@ public sealed class NatsParser
|
||||
{
|
||||
command = ParseConnect(lineSpan);
|
||||
buffer = buffer.Slice(reader.Position);
|
||||
TraceInOp("CONNECT");
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -151,6 +169,7 @@ public sealed class NatsParser
|
||||
{
|
||||
command = ParseInfo(lineSpan);
|
||||
buffer = buffer.Slice(reader.Position);
|
||||
TraceInOp("INFO");
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -159,11 +178,13 @@ public sealed class NatsParser
|
||||
case (byte)'+': // +OK
|
||||
command = ParsedCommand.Simple(CommandType.Ok);
|
||||
buffer = buffer.Slice(reader.Position);
|
||||
TraceInOp("+OK");
|
||||
return true;
|
||||
|
||||
case (byte)'-': // -ERR
|
||||
command = ParsedCommand.Simple(CommandType.Err);
|
||||
buffer = buffer.Slice(reader.Position);
|
||||
TraceInOp("-ERR");
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -215,6 +236,7 @@ public sealed class NatsParser
|
||||
_pendingHeaderSize = -1;
|
||||
_pendingType = CommandType.Pub;
|
||||
|
||||
TraceInOp("PUB", argsSpan);
|
||||
return TryReadPayload(ref buffer, out command);
|
||||
}
|
||||
|
||||
@@ -264,6 +286,7 @@ public sealed class NatsParser
|
||||
_pendingHeaderSize = hdrSize;
|
||||
_pendingType = CommandType.HPub;
|
||||
|
||||
TraceInOp("HPUB", argsSpan);
|
||||
return TryReadPayload(ref buffer, out command);
|
||||
}
|
||||
|
||||
|
||||
@@ -33,6 +33,7 @@ public static class NatsProtocol
|
||||
public const string ErrPermissionsSubscribe = "Permissions Violation for Subscription";
|
||||
public const string ErrSlowConsumer = "Slow Consumer";
|
||||
public const string ErrNoRespondersRequiresHeaders = "No Responders Requires Headers Support";
|
||||
public const string ErrMaxSubscriptionsExceeded = "Maximum Subscriptions Exceeded";
|
||||
}
|
||||
|
||||
public sealed class ServerInfo
|
||||
|
||||
@@ -13,9 +13,16 @@ public sealed class SubList : IDisposable
|
||||
|
||||
private readonly ReaderWriterLockSlim _lock = new();
|
||||
private readonly TrieLevel _root = new();
|
||||
private Dictionary<string, SubListResult>? _cache = new(StringComparer.Ordinal);
|
||||
private Dictionary<string, CachedResult>? _cache = new(StringComparer.Ordinal);
|
||||
private uint _count;
|
||||
private volatile bool _disposed;
|
||||
private long _generation;
|
||||
private ulong _matches;
|
||||
private ulong _cacheHits;
|
||||
private ulong _inserts;
|
||||
private ulong _removes;
|
||||
|
||||
private readonly record struct CachedResult(SubListResult Result, long Generation);
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
@@ -90,7 +97,8 @@ public sealed class SubList : IDisposable
|
||||
}
|
||||
|
||||
_count++;
|
||||
AddToCache(subject, sub);
|
||||
_inserts++;
|
||||
Interlocked.Increment(ref _generation);
|
||||
}
|
||||
finally
|
||||
{
|
||||
@@ -104,78 +112,10 @@ public sealed class SubList : IDisposable
|
||||
_lock.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
var level = _root;
|
||||
TrieNode? node = null;
|
||||
bool sawFwc = false;
|
||||
|
||||
var pathList = new List<(TrieLevel level, TrieNode node, string token, bool isPwc, bool isFwc)>();
|
||||
|
||||
foreach (var token in new TokenEnumerator(sub.Subject))
|
||||
if (RemoveInternal(sub))
|
||||
{
|
||||
if (token.Length == 0 || sawFwc)
|
||||
return;
|
||||
|
||||
bool isPwc = token.Length == 1 && token[0] == SubjectMatch.Pwc;
|
||||
bool isFwc = token.Length == 1 && token[0] == SubjectMatch.Fwc;
|
||||
|
||||
if (isPwc)
|
||||
{
|
||||
node = level.Pwc;
|
||||
}
|
||||
else if (isFwc)
|
||||
{
|
||||
node = level.Fwc;
|
||||
sawFwc = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
level.Nodes.TryGetValue(token.ToString(), out node);
|
||||
}
|
||||
|
||||
if (node == null)
|
||||
return; // not found
|
||||
|
||||
var tokenStr = token.ToString();
|
||||
pathList.Add((level, node, tokenStr, isPwc, isFwc));
|
||||
if (node.Next == null)
|
||||
return; // corrupted trie state
|
||||
level = node.Next;
|
||||
}
|
||||
|
||||
if (node == null) return;
|
||||
|
||||
// Remove from node
|
||||
bool removed;
|
||||
if (sub.Queue == null)
|
||||
{
|
||||
removed = node.PlainSubs.Remove(sub);
|
||||
}
|
||||
else
|
||||
{
|
||||
removed = false;
|
||||
if (node.QueueSubs.TryGetValue(sub.Queue, out var qset))
|
||||
{
|
||||
removed = qset.Remove(sub);
|
||||
if (qset.Count == 0)
|
||||
node.QueueSubs.Remove(sub.Queue);
|
||||
}
|
||||
}
|
||||
|
||||
if (!removed) return;
|
||||
|
||||
_count--;
|
||||
RemoveFromCache(sub.Subject);
|
||||
|
||||
// Prune empty nodes (walk backwards)
|
||||
for (int i = pathList.Count - 1; i >= 0; i--)
|
||||
{
|
||||
var (l, n, t, isPwc, isFwc) = pathList[i];
|
||||
if (n.IsEmpty)
|
||||
{
|
||||
if (isPwc) l.Pwc = null;
|
||||
else if (isFwc) l.Fwc = null;
|
||||
else l.Nodes.Remove(t);
|
||||
}
|
||||
_removes++;
|
||||
Interlocked.Increment(ref _generation);
|
||||
}
|
||||
}
|
||||
finally
|
||||
@@ -184,22 +124,107 @@ public sealed class SubList : IDisposable
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Core remove logic without lock acquisition or generation bumping.
|
||||
/// Assumes write lock is held. Returns true if a subscription was actually removed.
|
||||
/// </summary>
|
||||
private bool RemoveInternal(Subscription sub)
|
||||
{
|
||||
var level = _root;
|
||||
TrieNode? node = null;
|
||||
bool sawFwc = false;
|
||||
|
||||
var pathList = new List<(TrieLevel level, TrieNode node, string token, bool isPwc, bool isFwc)>();
|
||||
|
||||
foreach (var token in new TokenEnumerator(sub.Subject))
|
||||
{
|
||||
if (token.Length == 0 || sawFwc)
|
||||
return false;
|
||||
|
||||
bool isPwc = token.Length == 1 && token[0] == SubjectMatch.Pwc;
|
||||
bool isFwc = token.Length == 1 && token[0] == SubjectMatch.Fwc;
|
||||
|
||||
if (isPwc)
|
||||
{
|
||||
node = level.Pwc;
|
||||
}
|
||||
else if (isFwc)
|
||||
{
|
||||
node = level.Fwc;
|
||||
sawFwc = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
level.Nodes.TryGetValue(token.ToString(), out node);
|
||||
}
|
||||
|
||||
if (node == null)
|
||||
return false; // not found
|
||||
|
||||
var tokenStr = token.ToString();
|
||||
pathList.Add((level, node, tokenStr, isPwc, isFwc));
|
||||
if (node.Next == null)
|
||||
return false; // corrupted trie state
|
||||
level = node.Next;
|
||||
}
|
||||
|
||||
if (node == null) return false;
|
||||
|
||||
// Remove from node
|
||||
bool removed;
|
||||
if (sub.Queue == null)
|
||||
{
|
||||
removed = node.PlainSubs.Remove(sub);
|
||||
}
|
||||
else
|
||||
{
|
||||
removed = false;
|
||||
if (node.QueueSubs.TryGetValue(sub.Queue, out var qset))
|
||||
{
|
||||
removed = qset.Remove(sub);
|
||||
if (qset.Count == 0)
|
||||
node.QueueSubs.Remove(sub.Queue);
|
||||
}
|
||||
}
|
||||
|
||||
if (!removed) return false;
|
||||
|
||||
_count--;
|
||||
|
||||
// Prune empty nodes (walk backwards)
|
||||
for (int i = pathList.Count - 1; i >= 0; i--)
|
||||
{
|
||||
var (l, n, t, isPwc, isFwc) = pathList[i];
|
||||
if (n.IsEmpty)
|
||||
{
|
||||
if (isPwc) l.Pwc = null;
|
||||
else if (isFwc) l.Fwc = null;
|
||||
else l.Nodes.Remove(t);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
public SubListResult Match(string subject)
|
||||
{
|
||||
// Check cache under read lock first.
|
||||
Interlocked.Increment(ref _matches);
|
||||
var currentGen = Interlocked.Read(ref _generation);
|
||||
|
||||
_lock.EnterReadLock();
|
||||
try
|
||||
{
|
||||
if (_cache != null && _cache.TryGetValue(subject, out var cached))
|
||||
return cached;
|
||||
if (_cache != null && _cache.TryGetValue(subject, out var cached) && cached.Generation == currentGen)
|
||||
{
|
||||
Interlocked.Increment(ref _cacheHits);
|
||||
return cached.Result;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_lock.ExitReadLock();
|
||||
}
|
||||
|
||||
// Cache miss -- tokenize and match under write lock (needed for cache update).
|
||||
// Tokenize the subject.
|
||||
var tokens = Tokenize(subject);
|
||||
if (tokens == null)
|
||||
return SubListResult.Empty;
|
||||
@@ -207,13 +232,15 @@ public sealed class SubList : IDisposable
|
||||
_lock.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
// Re-check cache after acquiring write lock.
|
||||
if (_cache != null && _cache.TryGetValue(subject, out var cached))
|
||||
return cached;
|
||||
currentGen = Interlocked.Read(ref _generation);
|
||||
if (_cache != null && _cache.TryGetValue(subject, out var cached) && cached.Generation == currentGen)
|
||||
{
|
||||
Interlocked.Increment(ref _cacheHits);
|
||||
return cached.Result;
|
||||
}
|
||||
|
||||
var plainSubs = new List<Subscription>();
|
||||
var queueSubs = new List<List<Subscription>>();
|
||||
|
||||
MatchLevel(_root, tokens, 0, plainSubs, queueSubs);
|
||||
|
||||
SubListResult result;
|
||||
@@ -226,19 +253,14 @@ public sealed class SubList : IDisposable
|
||||
var queueSubsArr = new Subscription[queueSubs.Count][];
|
||||
for (int i = 0; i < queueSubs.Count; i++)
|
||||
queueSubsArr[i] = queueSubs[i].ToArray();
|
||||
|
||||
result = new SubListResult(
|
||||
plainSubs.ToArray(),
|
||||
queueSubsArr);
|
||||
result = new SubListResult(plainSubs.ToArray(), queueSubsArr);
|
||||
}
|
||||
|
||||
if (_cache != null)
|
||||
{
|
||||
_cache[subject] = result;
|
||||
|
||||
_cache[subject] = new CachedResult(result, currentGen);
|
||||
if (_cache.Count > CacheMax)
|
||||
{
|
||||
// Sweep: remove entries until at CacheSweep count.
|
||||
var keys = _cache.Keys.Take(_cache.Count - CacheSweep).ToList();
|
||||
foreach (var key in keys)
|
||||
_cache.Remove(key);
|
||||
@@ -356,119 +378,355 @@ public sealed class SubList : IDisposable
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Adds a subscription to matching cache entries.
|
||||
/// Assumes write lock is held.
|
||||
/// </summary>
|
||||
private void AddToCache(string subject, Subscription sub)
|
||||
public SubListStats Stats()
|
||||
{
|
||||
if (_cache == null)
|
||||
return;
|
||||
|
||||
// If literal subject, we can do a direct lookup.
|
||||
if (SubjectMatch.IsLiteral(subject))
|
||||
_lock.EnterReadLock();
|
||||
uint numSubs, numCache;
|
||||
ulong inserts, removes;
|
||||
try
|
||||
{
|
||||
if (_cache.TryGetValue(subject, out var r))
|
||||
{
|
||||
_cache[subject] = AddSubToResult(r, sub);
|
||||
}
|
||||
return;
|
||||
numSubs = _count;
|
||||
numCache = (uint)(_cache?.Count ?? 0);
|
||||
inserts = _inserts;
|
||||
removes = _removes;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_lock.ExitReadLock();
|
||||
}
|
||||
|
||||
// Wildcard subscription -- check all cached keys.
|
||||
var keysToUpdate = new List<(string key, SubListResult result)>();
|
||||
foreach (var (key, r) in _cache)
|
||||
var matches = Interlocked.Read(ref _matches);
|
||||
var cacheHits = Interlocked.Read(ref _cacheHits);
|
||||
var hitRate = matches > 0 ? (double)cacheHits / matches : 0.0;
|
||||
|
||||
uint maxFanout = 0;
|
||||
long totalFanout = 0;
|
||||
int cacheEntries = 0;
|
||||
|
||||
_lock.EnterReadLock();
|
||||
try
|
||||
{
|
||||
if (SubjectMatch.MatchLiteral(key, subject))
|
||||
if (_cache != null)
|
||||
{
|
||||
keysToUpdate.Add((key, r));
|
||||
foreach (var (_, entry) in _cache)
|
||||
{
|
||||
var r = entry.Result;
|
||||
var f = r.PlainSubs.Length + r.QueueSubs.Length;
|
||||
totalFanout += f;
|
||||
if (f > maxFanout) maxFanout = (uint)f;
|
||||
cacheEntries++;
|
||||
}
|
||||
}
|
||||
}
|
||||
foreach (var (key, r) in keysToUpdate)
|
||||
finally
|
||||
{
|
||||
_cache[key] = AddSubToResult(r, sub);
|
||||
_lock.ExitReadLock();
|
||||
}
|
||||
|
||||
return new SubListStats
|
||||
{
|
||||
NumSubs = numSubs,
|
||||
NumCache = numCache,
|
||||
NumInserts = inserts,
|
||||
NumRemoves = removes,
|
||||
NumMatches = matches,
|
||||
CacheHitRate = hitRate,
|
||||
MaxFanout = maxFanout,
|
||||
AvgFanout = cacheEntries > 0 ? (double)totalFanout / cacheEntries : 0.0,
|
||||
};
|
||||
}
|
||||
|
||||
public bool HasInterest(string subject)
|
||||
{
|
||||
var currentGen = Interlocked.Read(ref _generation);
|
||||
_lock.EnterReadLock();
|
||||
try
|
||||
{
|
||||
if (_cache != null && _cache.TryGetValue(subject, out var cached) && cached.Generation == currentGen)
|
||||
{
|
||||
var r = cached.Result;
|
||||
return r.PlainSubs.Length > 0 || r.QueueSubs.Length > 0;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_lock.ExitReadLock();
|
||||
}
|
||||
|
||||
var tokens = Tokenize(subject);
|
||||
if (tokens == null) return false;
|
||||
|
||||
_lock.EnterReadLock();
|
||||
try
|
||||
{
|
||||
return HasInterestLevel(_root, tokens, 0);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_lock.ExitReadLock();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Removes cache entries that match the given subject.
|
||||
/// Assumes write lock is held.
|
||||
/// </summary>
|
||||
private void RemoveFromCache(string subject)
|
||||
public (int plainCount, int queueCount) NumInterest(string subject)
|
||||
{
|
||||
if (_cache == null)
|
||||
return;
|
||||
var tokens = Tokenize(subject);
|
||||
if (tokens == null) return (0, 0);
|
||||
|
||||
// If literal subject, we can do a direct removal.
|
||||
if (SubjectMatch.IsLiteral(subject))
|
||||
_lock.EnterReadLock();
|
||||
try
|
||||
{
|
||||
_cache.Remove(subject);
|
||||
return;
|
||||
int np = 0, nq = 0;
|
||||
CountInterestLevel(_root, tokens, 0, ref np, ref nq);
|
||||
return (np, nq);
|
||||
}
|
||||
|
||||
// Wildcard subscription -- remove all matching cached keys.
|
||||
var keysToRemove = new List<string>();
|
||||
foreach (var key in _cache.Keys)
|
||||
finally
|
||||
{
|
||||
if (SubjectMatch.MatchLiteral(key, subject))
|
||||
{
|
||||
keysToRemove.Add(key);
|
||||
}
|
||||
}
|
||||
foreach (var key in keysToRemove)
|
||||
{
|
||||
_cache.Remove(key);
|
||||
_lock.ExitReadLock();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates a new result with the given subscription added.
|
||||
/// </summary>
|
||||
private static SubListResult AddSubToResult(SubListResult result, Subscription sub)
|
||||
public void RemoveBatch(IEnumerable<Subscription> subs)
|
||||
{
|
||||
if (sub.Queue == null)
|
||||
_lock.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
var newPlain = new Subscription[result.PlainSubs.Length + 1];
|
||||
result.PlainSubs.CopyTo(newPlain, 0);
|
||||
newPlain[^1] = sub;
|
||||
return new SubListResult(newPlain, result.QueueSubs);
|
||||
var wasEnabled = _cache != null;
|
||||
_cache = null;
|
||||
|
||||
foreach (var sub in subs)
|
||||
{
|
||||
if (RemoveInternal(sub))
|
||||
_removes++;
|
||||
}
|
||||
|
||||
Interlocked.Increment(ref _generation);
|
||||
|
||||
if (wasEnabled)
|
||||
_cache = new Dictionary<string, CachedResult>(StringComparer.Ordinal);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_lock.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
public IReadOnlyList<Subscription> All()
|
||||
{
|
||||
var subs = new List<Subscription>();
|
||||
_lock.EnterReadLock();
|
||||
try
|
||||
{
|
||||
CollectAllSubs(_root, subs);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_lock.ExitReadLock();
|
||||
}
|
||||
return subs;
|
||||
}
|
||||
|
||||
public SubListResult ReverseMatch(string subject)
|
||||
{
|
||||
var tokens = Tokenize(subject);
|
||||
if (tokens == null)
|
||||
return SubListResult.Empty;
|
||||
|
||||
_lock.EnterReadLock();
|
||||
try
|
||||
{
|
||||
var plainSubs = new List<Subscription>();
|
||||
var queueSubs = new List<List<Subscription>>();
|
||||
ReverseMatchLevel(_root, tokens, 0, plainSubs, queueSubs);
|
||||
|
||||
if (plainSubs.Count == 0 && queueSubs.Count == 0)
|
||||
return SubListResult.Empty;
|
||||
|
||||
var queueSubsArr = new Subscription[queueSubs.Count][];
|
||||
for (int i = 0; i < queueSubs.Count; i++)
|
||||
queueSubsArr[i] = queueSubs[i].ToArray();
|
||||
return new SubListResult(plainSubs.ToArray(), queueSubsArr);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_lock.ExitReadLock();
|
||||
}
|
||||
}
|
||||
|
||||
private static bool HasInterestLevel(TrieLevel? level, string[] tokens, int tokenIndex)
|
||||
{
|
||||
TrieNode? pwc = null;
|
||||
TrieNode? node = null;
|
||||
|
||||
for (int i = tokenIndex; i < tokens.Length; i++)
|
||||
{
|
||||
if (level == null) return false;
|
||||
if (level.Fwc != null && NodeHasInterest(level.Fwc)) return true;
|
||||
|
||||
pwc = level.Pwc;
|
||||
if (pwc != null && HasInterestLevel(pwc.Next, tokens, i + 1)) return true;
|
||||
|
||||
node = null;
|
||||
if (level.Nodes.TryGetValue(tokens[i], out var found))
|
||||
{
|
||||
node = found;
|
||||
level = node.Next;
|
||||
}
|
||||
else
|
||||
{
|
||||
level = null;
|
||||
}
|
||||
}
|
||||
|
||||
if (node != null && NodeHasInterest(node)) return true;
|
||||
if (pwc != null && NodeHasInterest(pwc)) return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
private static bool NodeHasInterest(TrieNode node)
|
||||
{
|
||||
return node.PlainSubs.Count > 0 || node.QueueSubs.Count > 0;
|
||||
}
|
||||
|
||||
private static void CountInterestLevel(TrieLevel? level, string[] tokens, int tokenIndex,
|
||||
ref int np, ref int nq)
|
||||
{
|
||||
TrieNode? pwc = null;
|
||||
TrieNode? node = null;
|
||||
|
||||
for (int i = tokenIndex; i < tokens.Length; i++)
|
||||
{
|
||||
if (level == null) return;
|
||||
if (level.Fwc != null) AddNodeCounts(level.Fwc, ref np, ref nq);
|
||||
|
||||
pwc = level.Pwc;
|
||||
if (pwc != null) CountInterestLevel(pwc.Next, tokens, i + 1, ref np, ref nq);
|
||||
|
||||
node = null;
|
||||
if (level.Nodes.TryGetValue(tokens[i], out var found))
|
||||
{
|
||||
node = found;
|
||||
level = node.Next;
|
||||
}
|
||||
else
|
||||
{
|
||||
level = null;
|
||||
}
|
||||
}
|
||||
|
||||
if (node != null) AddNodeCounts(node, ref np, ref nq);
|
||||
if (pwc != null) AddNodeCounts(pwc, ref np, ref nq);
|
||||
}
|
||||
|
||||
private static void AddNodeCounts(TrieNode node, ref int np, ref int nq)
|
||||
{
|
||||
np += node.PlainSubs.Count;
|
||||
foreach (var (_, qset) in node.QueueSubs)
|
||||
nq += qset.Count;
|
||||
}
|
||||
|
||||
private static void CollectAllSubs(TrieLevel level, List<Subscription> subs)
|
||||
{
|
||||
foreach (var (_, node) in level.Nodes)
|
||||
{
|
||||
foreach (var sub in node.PlainSubs)
|
||||
subs.Add(sub);
|
||||
foreach (var (_, qset) in node.QueueSubs)
|
||||
foreach (var sub in qset)
|
||||
subs.Add(sub);
|
||||
if (node.Next != null)
|
||||
CollectAllSubs(node.Next, subs);
|
||||
}
|
||||
if (level.Pwc != null)
|
||||
{
|
||||
foreach (var sub in level.Pwc.PlainSubs)
|
||||
subs.Add(sub);
|
||||
foreach (var (_, qset) in level.Pwc.QueueSubs)
|
||||
foreach (var sub in qset)
|
||||
subs.Add(sub);
|
||||
if (level.Pwc.Next != null)
|
||||
CollectAllSubs(level.Pwc.Next, subs);
|
||||
}
|
||||
if (level.Fwc != null)
|
||||
{
|
||||
foreach (var sub in level.Fwc.PlainSubs)
|
||||
subs.Add(sub);
|
||||
foreach (var (_, qset) in level.Fwc.QueueSubs)
|
||||
foreach (var sub in qset)
|
||||
subs.Add(sub);
|
||||
if (level.Fwc.Next != null)
|
||||
CollectAllSubs(level.Fwc.Next, subs);
|
||||
}
|
||||
}
|
||||
|
||||
private static void ReverseMatchLevel(TrieLevel? level, string[] tokens, int tokenIndex,
|
||||
List<Subscription> plainSubs, List<List<Subscription>> queueSubs)
|
||||
{
|
||||
if (level == null || tokenIndex >= tokens.Length)
|
||||
return;
|
||||
|
||||
var token = tokens[tokenIndex];
|
||||
bool isLast = tokenIndex == tokens.Length - 1;
|
||||
|
||||
if (token == ">")
|
||||
{
|
||||
CollectAllNodes(level, plainSubs, queueSubs);
|
||||
return;
|
||||
}
|
||||
|
||||
if (token == "*")
|
||||
{
|
||||
foreach (var (_, node) in level.Nodes)
|
||||
{
|
||||
if (isLast)
|
||||
AddNodeToResults(node, plainSubs, queueSubs);
|
||||
else
|
||||
ReverseMatchLevel(node.Next, tokens, tokenIndex + 1, plainSubs, queueSubs);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Find existing queue group
|
||||
var queueSubs = result.QueueSubs;
|
||||
int slot = -1;
|
||||
for (int i = 0; i < queueSubs.Length; i++)
|
||||
if (level.Nodes.TryGetValue(token, out var node))
|
||||
{
|
||||
if (queueSubs[i].Length > 0 && queueSubs[i][0].Queue == sub.Queue)
|
||||
{
|
||||
slot = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Deep copy queue subs
|
||||
var newQueueSubs = new Subscription[queueSubs.Length + (slot < 0 ? 1 : 0)][];
|
||||
for (int i = 0; i < queueSubs.Length; i++)
|
||||
{
|
||||
if (i == slot)
|
||||
{
|
||||
var newGroup = new Subscription[queueSubs[i].Length + 1];
|
||||
queueSubs[i].CopyTo(newGroup, 0);
|
||||
newGroup[^1] = sub;
|
||||
newQueueSubs[i] = newGroup;
|
||||
}
|
||||
if (isLast)
|
||||
AddNodeToResults(node, plainSubs, queueSubs);
|
||||
else
|
||||
{
|
||||
newQueueSubs[i] = (Subscription[])queueSubs[i].Clone();
|
||||
}
|
||||
}
|
||||
if (slot < 0)
|
||||
{
|
||||
newQueueSubs[^1] = [sub];
|
||||
ReverseMatchLevel(node.Next, tokens, tokenIndex + 1, plainSubs, queueSubs);
|
||||
}
|
||||
}
|
||||
|
||||
return new SubListResult(result.PlainSubs, newQueueSubs);
|
||||
if (level.Pwc != null)
|
||||
{
|
||||
if (isLast)
|
||||
AddNodeToResults(level.Pwc, plainSubs, queueSubs);
|
||||
else
|
||||
ReverseMatchLevel(level.Pwc.Next, tokens, tokenIndex + 1, plainSubs, queueSubs);
|
||||
}
|
||||
if (level.Fwc != null)
|
||||
{
|
||||
AddNodeToResults(level.Fwc, plainSubs, queueSubs);
|
||||
}
|
||||
}
|
||||
|
||||
private static void CollectAllNodes(TrieLevel level, List<Subscription> plainSubs,
|
||||
List<List<Subscription>> queueSubs)
|
||||
{
|
||||
foreach (var (_, node) in level.Nodes)
|
||||
{
|
||||
AddNodeToResults(node, plainSubs, queueSubs);
|
||||
if (node.Next != null)
|
||||
CollectAllNodes(node.Next, plainSubs, queueSubs);
|
||||
}
|
||||
if (level.Pwc != null)
|
||||
{
|
||||
AddNodeToResults(level.Pwc, plainSubs, queueSubs);
|
||||
if (level.Pwc.Next != null)
|
||||
CollectAllNodes(level.Pwc.Next, plainSubs, queueSubs);
|
||||
}
|
||||
if (level.Fwc != null)
|
||||
{
|
||||
AddNodeToResults(level.Fwc, plainSubs, queueSubs);
|
||||
if (level.Fwc.Next != null)
|
||||
CollectAllNodes(level.Fwc.Next, plainSubs, queueSubs);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
13
src/NATS.Server/Subscriptions/SubListStats.cs
Normal file
13
src/NATS.Server/Subscriptions/SubListStats.cs
Normal file
@@ -0,0 +1,13 @@
|
||||
namespace NATS.Server.Subscriptions;
|
||||
|
||||
public sealed class SubListStats
|
||||
{
|
||||
public uint NumSubs { get; init; }
|
||||
public uint NumCache { get; init; }
|
||||
public ulong NumInserts { get; init; }
|
||||
public ulong NumRemoves { get; init; }
|
||||
public ulong NumMatches { get; init; }
|
||||
public double CacheHitRate { get; init; }
|
||||
public uint MaxFanout { get; init; }
|
||||
public double AvgFanout { get; init; }
|
||||
}
|
||||
@@ -113,4 +113,112 @@ public static class SubjectMatch
|
||||
|
||||
return li >= literal.Length; // both exhausted
|
||||
}
|
||||
|
||||
/// <summary>Count dot-delimited tokens. Empty string returns 0.</summary>
|
||||
public static int NumTokens(string subject)
|
||||
{
|
||||
if (string.IsNullOrEmpty(subject))
|
||||
return 0;
|
||||
int count = 1;
|
||||
for (int i = 0; i < subject.Length; i++)
|
||||
{
|
||||
if (subject[i] == Sep)
|
||||
count++;
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
/// <summary>Return the 0-based nth token as a span. Returns empty if out of range.</summary>
|
||||
public static ReadOnlySpan<char> TokenAt(string subject, int index)
|
||||
{
|
||||
if (string.IsNullOrEmpty(subject))
|
||||
return default;
|
||||
|
||||
var span = subject.AsSpan();
|
||||
int current = 0;
|
||||
int start = 0;
|
||||
for (int i = 0; i < span.Length; i++)
|
||||
{
|
||||
if (span[i] == Sep)
|
||||
{
|
||||
if (current == index)
|
||||
return span[start..i];
|
||||
start = i + 1;
|
||||
current++;
|
||||
}
|
||||
}
|
||||
if (current == index)
|
||||
return span[start..];
|
||||
return default;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Determines if two subject patterns (possibly containing wildcards) can both
|
||||
/// match the same literal subject. Reference: Go sublist.go SubjectsCollide.
|
||||
/// </summary>
|
||||
public static bool SubjectsCollide(string subj1, string subj2)
|
||||
{
|
||||
if (subj1 == subj2)
|
||||
return true;
|
||||
|
||||
bool lit1 = IsLiteral(subj1);
|
||||
bool lit2 = IsLiteral(subj2);
|
||||
|
||||
if (lit1 && lit2)
|
||||
return false;
|
||||
|
||||
if (lit1 && !lit2)
|
||||
return MatchLiteral(subj1, subj2);
|
||||
if (lit2 && !lit1)
|
||||
return MatchLiteral(subj2, subj1);
|
||||
|
||||
// Both have wildcards — split once to avoid O(n²) TokenAt calls
|
||||
var tokens1 = subj1.Split(Sep);
|
||||
var tokens2 = subj2.Split(Sep);
|
||||
int n1 = tokens1.Length;
|
||||
int n2 = tokens2.Length;
|
||||
bool hasFwc1 = tokens1[^1] == ">";
|
||||
bool hasFwc2 = tokens2[^1] == ">";
|
||||
|
||||
if (!hasFwc1 && !hasFwc2 && n1 != n2)
|
||||
return false;
|
||||
if (n1 < n2 && !hasFwc1)
|
||||
return false;
|
||||
if (n2 < n1 && !hasFwc2)
|
||||
return false;
|
||||
|
||||
int stop = Math.Min(n1, n2);
|
||||
for (int i = 0; i < stop; i++)
|
||||
{
|
||||
if (!TokensCanMatch(tokens1[i], tokens2[i]))
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private static bool TokensCanMatch(ReadOnlySpan<char> t1, ReadOnlySpan<char> t2)
|
||||
{
|
||||
if (t1.Length == 1 && (t1[0] == Pwc || t1[0] == Fwc))
|
||||
return true;
|
||||
if (t2.Length == 1 && (t2[0] == Pwc || t2[0] == Fwc))
|
||||
return true;
|
||||
return t1.SequenceEqual(t2);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Validates subject. When checkRunes is true, also rejects null bytes.
|
||||
/// </summary>
|
||||
public static bool IsValidSubject(string subject, bool checkRunes)
|
||||
{
|
||||
if (!IsValidSubject(subject))
|
||||
return false;
|
||||
if (!checkRunes)
|
||||
return true;
|
||||
for (int i = 0; i < subject.Length; i++)
|
||||
{
|
||||
if (subject[i] == '\0')
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,4 +32,40 @@ public class AccountTests
|
||||
{
|
||||
Account.GlobalAccountName.ShouldBe("$G");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Account_enforces_max_connections()
|
||||
{
|
||||
var acc = new Account("test") { MaxConnections = 2 };
|
||||
acc.AddClient(1).ShouldBeTrue();
|
||||
acc.AddClient(2).ShouldBeTrue();
|
||||
acc.AddClient(3).ShouldBeFalse(); // exceeds limit
|
||||
acc.ClientCount.ShouldBe(2);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Account_unlimited_connections_when_zero()
|
||||
{
|
||||
var acc = new Account("test") { MaxConnections = 0 };
|
||||
acc.AddClient(1).ShouldBeTrue();
|
||||
acc.AddClient(2).ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Account_enforces_max_subscriptions()
|
||||
{
|
||||
var acc = new Account("test") { MaxSubscriptions = 2 };
|
||||
acc.IncrementSubscriptions().ShouldBeTrue();
|
||||
acc.IncrementSubscriptions().ShouldBeTrue();
|
||||
acc.IncrementSubscriptions().ShouldBeFalse();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Account_decrement_subscriptions()
|
||||
{
|
||||
var acc = new Account("test") { MaxSubscriptions = 1 };
|
||||
acc.IncrementSubscriptions().ShouldBeTrue();
|
||||
acc.DecrementSubscriptions();
|
||||
acc.IncrementSubscriptions().ShouldBeTrue(); // slot freed
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,10 +5,10 @@ public class ClientClosedReasonTests
|
||||
[Fact]
|
||||
public void All_expected_close_reasons_exist()
|
||||
{
|
||||
// Verify all 17 enum values exist and are distinct (None + 16 named reasons)
|
||||
// Verify all 18 enum values exist and are distinct (None + 17 named reasons)
|
||||
var values = Enum.GetValues<ClientClosedReason>();
|
||||
values.Length.ShouldBe(17);
|
||||
values.Distinct().Count().ShouldBe(17);
|
||||
values.Length.ShouldBe(18);
|
||||
values.Distinct().Count().ShouldBe(18);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
|
||||
51
tests/NATS.Server.Tests/NatsHeaderParserTests.cs
Normal file
51
tests/NATS.Server.Tests/NatsHeaderParserTests.cs
Normal file
@@ -0,0 +1,51 @@
|
||||
using NATS.Server.Protocol;
|
||||
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class NatsHeaderParserTests
|
||||
{
|
||||
[Fact]
|
||||
public void Parse_status_line_only()
|
||||
{
|
||||
var input = "NATS/1.0 503\r\n\r\n"u8;
|
||||
var result = NatsHeaderParser.Parse(input);
|
||||
result.Status.ShouldBe(503);
|
||||
result.Description.ShouldBeEmpty();
|
||||
result.Headers.ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Parse_status_with_description()
|
||||
{
|
||||
var input = "NATS/1.0 503 No Responders\r\n\r\n"u8;
|
||||
var result = NatsHeaderParser.Parse(input);
|
||||
result.Status.ShouldBe(503);
|
||||
result.Description.ShouldBe("No Responders");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Parse_headers_with_values()
|
||||
{
|
||||
var input = "NATS/1.0\r\nFoo: bar\r\nBaz: qux\r\n\r\n"u8;
|
||||
var result = NatsHeaderParser.Parse(input);
|
||||
result.Status.ShouldBe(0);
|
||||
result.Headers["Foo"].ShouldBe(["bar"]);
|
||||
result.Headers["Baz"].ShouldBe(["qux"]);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Parse_multi_value_header()
|
||||
{
|
||||
var input = "NATS/1.0\r\nX-Tag: a\r\nX-Tag: b\r\n\r\n"u8;
|
||||
var result = NatsHeaderParser.Parse(input);
|
||||
result.Headers["X-Tag"].ShouldBe(["a", "b"]);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Parse_invalid_returns_defaults()
|
||||
{
|
||||
var input = "GARBAGE\r\n\r\n"u8;
|
||||
var result = NatsHeaderParser.Parse(input);
|
||||
result.Status.ShouldBe(-1);
|
||||
}
|
||||
}
|
||||
17
tests/NATS.Server.Tests/NatsOptionsTests.cs
Normal file
17
tests/NATS.Server.Tests/NatsOptionsTests.cs
Normal file
@@ -0,0 +1,17 @@
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class NatsOptionsTests
|
||||
{
|
||||
[Fact]
|
||||
public void Defaults_are_correct()
|
||||
{
|
||||
var opts = new NatsOptions();
|
||||
opts.MaxSubs.ShouldBe(0);
|
||||
opts.MaxSubTokens.ShouldBe(0);
|
||||
opts.Debug.ShouldBe(false);
|
||||
opts.Trace.ShouldBe(false);
|
||||
opts.LogFile.ShouldBeNull();
|
||||
opts.LogSizeLimit.ShouldBe(0L);
|
||||
opts.Tags.ShouldBeNull();
|
||||
}
|
||||
}
|
||||
52
tests/NATS.Server.Tests/PermissionLruCacheTests.cs
Normal file
52
tests/NATS.Server.Tests/PermissionLruCacheTests.cs
Normal file
@@ -0,0 +1,52 @@
|
||||
using NATS.Server.Auth;
|
||||
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class PermissionLruCacheTests
|
||||
{
|
||||
[Fact]
|
||||
public void Get_returns_none_for_unknown_key()
|
||||
{
|
||||
var cache = new PermissionLruCache(128);
|
||||
cache.TryGet("foo", out _).ShouldBeFalse();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Set_and_get_returns_value()
|
||||
{
|
||||
var cache = new PermissionLruCache(128);
|
||||
cache.Set("foo", true);
|
||||
cache.TryGet("foo", out var v).ShouldBeTrue();
|
||||
v.ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Evicts_oldest_when_full()
|
||||
{
|
||||
var cache = new PermissionLruCache(3);
|
||||
cache.Set("a", true);
|
||||
cache.Set("b", true);
|
||||
cache.Set("c", true);
|
||||
cache.Set("d", true); // evicts "a"
|
||||
|
||||
cache.TryGet("a", out _).ShouldBeFalse();
|
||||
cache.TryGet("b", out _).ShouldBeTrue();
|
||||
cache.TryGet("d", out _).ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Get_promotes_to_front()
|
||||
{
|
||||
var cache = new PermissionLruCache(3);
|
||||
cache.Set("a", true);
|
||||
cache.Set("b", true);
|
||||
cache.Set("c", true);
|
||||
|
||||
// Access "a" to promote it
|
||||
cache.TryGet("a", out _);
|
||||
|
||||
cache.Set("d", true); // should evict "b" (oldest untouched)
|
||||
cache.TryGet("a", out _).ShouldBeTrue();
|
||||
cache.TryGet("b", out _).ShouldBeFalse();
|
||||
}
|
||||
}
|
||||
51
tests/NATS.Server.Tests/ResponseTrackerTests.cs
Normal file
51
tests/NATS.Server.Tests/ResponseTrackerTests.cs
Normal file
@@ -0,0 +1,51 @@
|
||||
using NATS.Server.Auth;
|
||||
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class ResponseTrackerTests
|
||||
{
|
||||
[Fact]
|
||||
public void Allows_reply_subject_after_registration()
|
||||
{
|
||||
var tracker = new ResponseTracker(maxMsgs: 1, expires: TimeSpan.FromMinutes(5));
|
||||
tracker.RegisterReply("_INBOX.abc123");
|
||||
tracker.IsReplyAllowed("_INBOX.abc123").ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Denies_unknown_reply_subject()
|
||||
{
|
||||
var tracker = new ResponseTracker(maxMsgs: 1, expires: TimeSpan.FromMinutes(5));
|
||||
tracker.IsReplyAllowed("_INBOX.unknown").ShouldBeFalse();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Enforces_max_messages()
|
||||
{
|
||||
var tracker = new ResponseTracker(maxMsgs: 2, expires: TimeSpan.FromMinutes(5));
|
||||
tracker.RegisterReply("_INBOX.abc");
|
||||
tracker.IsReplyAllowed("_INBOX.abc").ShouldBeTrue();
|
||||
tracker.IsReplyAllowed("_INBOX.abc").ShouldBeTrue();
|
||||
tracker.IsReplyAllowed("_INBOX.abc").ShouldBeFalse(); // exceeded
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Enforces_expiry()
|
||||
{
|
||||
var tracker = new ResponseTracker(maxMsgs: 0, expires: TimeSpan.FromMilliseconds(1));
|
||||
tracker.RegisterReply("_INBOX.abc");
|
||||
Thread.Sleep(50);
|
||||
tracker.IsReplyAllowed("_INBOX.abc").ShouldBeFalse();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Prune_removes_expired()
|
||||
{
|
||||
var tracker = new ResponseTracker(maxMsgs: 0, expires: TimeSpan.FromMilliseconds(1));
|
||||
tracker.RegisterReply("_INBOX.a");
|
||||
tracker.RegisterReply("_INBOX.b");
|
||||
Thread.Sleep(50);
|
||||
tracker.Prune();
|
||||
tracker.Count.ShouldBe(0);
|
||||
}
|
||||
}
|
||||
@@ -156,4 +156,125 @@ public class SubListTests
|
||||
var r = sl.Match("foo.bar.baz");
|
||||
r.PlainSubs.Length.ShouldBe(3);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Stats_returns_correct_values()
|
||||
{
|
||||
var sl = new SubList();
|
||||
sl.Insert(MakeSub("foo.bar", sid: "1"));
|
||||
sl.Insert(MakeSub("foo.baz", sid: "2"));
|
||||
sl.Match("foo.bar");
|
||||
sl.Match("foo.bar"); // cache hit
|
||||
|
||||
var stats = sl.Stats();
|
||||
stats.NumSubs.ShouldBe(2u);
|
||||
stats.NumInserts.ShouldBe(2ul);
|
||||
stats.NumMatches.ShouldBe(2ul);
|
||||
stats.CacheHitRate.ShouldBeGreaterThan(0.0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void HasInterest_returns_true_when_subscribers_exist()
|
||||
{
|
||||
var sl = new SubList();
|
||||
sl.Insert(MakeSub("foo.bar"));
|
||||
sl.HasInterest("foo.bar").ShouldBeTrue();
|
||||
sl.HasInterest("foo.baz").ShouldBeFalse();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void HasInterest_with_wildcards()
|
||||
{
|
||||
var sl = new SubList();
|
||||
sl.Insert(MakeSub("foo.*"));
|
||||
sl.HasInterest("foo.bar").ShouldBeTrue();
|
||||
sl.HasInterest("bar.baz").ShouldBeFalse();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void NumInterest_counts_subscribers()
|
||||
{
|
||||
var sl = new SubList();
|
||||
sl.Insert(MakeSub("foo.bar", sid: "1"));
|
||||
sl.Insert(MakeSub("foo.*", sid: "2"));
|
||||
sl.Insert(MakeSub("foo.bar", queue: "q1", sid: "3"));
|
||||
|
||||
var (np, nq) = sl.NumInterest("foo.bar");
|
||||
np.ShouldBe(2); // foo.bar + foo.*
|
||||
nq.ShouldBe(1); // queue sub
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void RemoveBatch_removes_all()
|
||||
{
|
||||
var sl = new SubList();
|
||||
var sub1 = MakeSub("foo.bar", sid: "1");
|
||||
var sub2 = MakeSub("foo.baz", sid: "2");
|
||||
var sub3 = MakeSub("bar.qux", sid: "3");
|
||||
sl.Insert(sub1);
|
||||
sl.Insert(sub2);
|
||||
sl.Insert(sub3);
|
||||
sl.Count.ShouldBe(3u);
|
||||
|
||||
sl.RemoveBatch([sub1, sub2]);
|
||||
sl.Count.ShouldBe(1u);
|
||||
sl.Match("foo.bar").PlainSubs.ShouldBeEmpty();
|
||||
sl.Match("bar.qux").PlainSubs.ShouldHaveSingleItem();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void All_returns_every_subscription()
|
||||
{
|
||||
var sl = new SubList();
|
||||
var sub1 = MakeSub("foo.bar", sid: "1");
|
||||
var sub2 = MakeSub("foo.*", sid: "2");
|
||||
var sub3 = MakeSub("bar.>", queue: "q", sid: "3");
|
||||
sl.Insert(sub1);
|
||||
sl.Insert(sub2);
|
||||
sl.Insert(sub3);
|
||||
|
||||
var all = sl.All();
|
||||
all.Count.ShouldBe(3);
|
||||
all.ShouldContain(sub1);
|
||||
all.ShouldContain(sub2);
|
||||
all.ShouldContain(sub3);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ReverseMatch_finds_patterns_matching_literal()
|
||||
{
|
||||
var sl = new SubList();
|
||||
var sub1 = MakeSub("foo.bar", sid: "1");
|
||||
var sub2 = MakeSub("foo.*", sid: "2");
|
||||
var sub3 = MakeSub("foo.>", sid: "3");
|
||||
var sub4 = MakeSub("bar.baz", sid: "4");
|
||||
sl.Insert(sub1);
|
||||
sl.Insert(sub2);
|
||||
sl.Insert(sub3);
|
||||
sl.Insert(sub4);
|
||||
|
||||
var result = sl.ReverseMatch("foo.bar");
|
||||
result.PlainSubs.Length.ShouldBe(3); // foo.bar, foo.*, foo.>
|
||||
result.PlainSubs.ShouldContain(sub1);
|
||||
result.PlainSubs.ShouldContain(sub2);
|
||||
result.PlainSubs.ShouldContain(sub3);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Generation_ID_invalidates_cache()
|
||||
{
|
||||
var sl = new SubList();
|
||||
sl.Insert(MakeSub("foo.bar", sid: "1"));
|
||||
|
||||
// Prime cache
|
||||
var r1 = sl.Match("foo.bar");
|
||||
r1.PlainSubs.Length.ShouldBe(1);
|
||||
|
||||
// Insert another sub (bumps generation)
|
||||
sl.Insert(MakeSub("foo.bar", sid: "2"));
|
||||
|
||||
// Cache should be invalidated by generation mismatch
|
||||
var r2 = sl.Match("foo.bar");
|
||||
r2.PlainSubs.Length.ShouldBe(2);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,4 +54,50 @@ public class SubjectMatchTests
|
||||
{
|
||||
SubjectMatch.MatchLiteral(literal, pattern).ShouldBe(expected);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData("foo.bar.baz", 3)]
|
||||
[InlineData("foo", 1)]
|
||||
[InlineData("a.b.c.d.e", 5)]
|
||||
[InlineData("", 0)]
|
||||
public void NumTokens(string subject, int expected)
|
||||
{
|
||||
SubjectMatch.NumTokens(subject).ShouldBe(expected);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData("foo.bar.baz", 0, "foo")]
|
||||
[InlineData("foo.bar.baz", 1, "bar")]
|
||||
[InlineData("foo.bar.baz", 2, "baz")]
|
||||
[InlineData("foo", 0, "foo")]
|
||||
[InlineData("foo.bar.baz", 5, "")]
|
||||
public void TokenAt(string subject, int index, string expected)
|
||||
{
|
||||
SubjectMatch.TokenAt(subject, index).ToString().ShouldBe(expected);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData("foo.bar", "foo.bar", true)]
|
||||
[InlineData("foo.bar", "foo.baz", false)]
|
||||
[InlineData("foo.*", "foo.bar", true)]
|
||||
[InlineData("foo.*", "foo.>", true)]
|
||||
[InlineData("foo.>", "foo.bar.baz", true)]
|
||||
[InlineData(">", "foo.bar", true)]
|
||||
[InlineData("foo.*", "bar.*", false)]
|
||||
[InlineData("foo.*.baz", "foo.bar.*", true)]
|
||||
[InlineData("*.bar", "foo.*", true)]
|
||||
[InlineData("foo.*", "bar.>", false)]
|
||||
public void SubjectsCollide(string subj1, string subj2, bool expected)
|
||||
{
|
||||
SubjectMatch.SubjectsCollide(subj1, subj2).ShouldBe(expected);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData("foo\0bar", true, false)]
|
||||
[InlineData("foo\0bar", false, true)]
|
||||
[InlineData("foo.bar", true, true)]
|
||||
public void IsValidSubject_checkRunes(string subject, bool checkRunes, bool expected)
|
||||
{
|
||||
SubjectMatch.IsValidSubject(subject, checkRunes).ShouldBe(expected);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user