Compare commits

...

18 Commits

Author SHA1 Message Date
Joseph Doherty
42c7c9cb7a docs: update differences.md sections 3-6 and 9 to reflect implemented features
Update comparison tables for protocol parsing (tracing, MIME headers, INFO caching,
Span-based MSG), subscriptions (generation ID, Stats, SubjectsCollide, token utils,
account limits), auth (deny enforcement, LRU cache, response permissions, auth expiry),
configuration (CLI flags, MaxSubs, Tags, file logging), and logging (trace/debug modes,
file output). Mark 11 summary items as resolved.
2026-02-23 01:07:14 -05:00
Joseph Doherty
dab8004d6b feat: cache INFO serialization — build once at startup instead of per-connection
Avoids re-serializing the same ServerInfo JSON on every new connection. The
cache is rebuilt when the ephemeral port is resolved. Connections that carry a
per-connection nonce (NKey auth) continue to serialize individually so the nonce
is included correctly.
2026-02-23 01:01:38 -05:00
Joseph Doherty
f0b5edd7c6 feat: add response permission tracking for dynamic reply subject authorization 2026-02-23 00:59:15 -05:00
Joseph Doherty
7a897c1087 feat: add MaxSubs enforcement, delivery-time deny filtering, auto-unsub cleanup 2026-02-23 00:53:15 -05:00
Joseph Doherty
e9b6c7fdd3 feat: add protocol tracing (<<- op arg) at LogLevel.Trace 2026-02-23 00:52:00 -05:00
Joseph Doherty
0347e8a28c fix: increment _removes counter in RemoveBatch for accurate stats 2026-02-23 00:48:53 -05:00
Joseph Doherty
6afe11ad4d feat: add per-account connection/subscription limits with AccountConfig 2026-02-23 00:46:16 -05:00
Joseph Doherty
cc0fe04f3c feat: add generation-based cache, Stats, HasInterest, NumInterest, RemoveBatch, All, ReverseMatch to SubList 2026-02-23 00:45:28 -05:00
Joseph Doherty
4ad821394b feat: add -D/-V/-DV debug/trace CLI flags and file logging support 2026-02-23 00:41:49 -05:00
Joseph Doherty
0ec5583422 fix: address code quality review findings for batch 1
- SubjectsCollide: split tokens once upfront instead of O(n²) TokenAt calls
- NatsHeaderParser: manual digit accumulation avoids string allocation and overflow
- NatsHeaders: use IReadOnlyDictionary for Headers, immutable Invalid sentinel
- PermissionLruCache: add missing Count property
2026-02-23 00:40:14 -05:00
Joseph Doherty
dddced444e feat: add NumTokens, TokenAt, SubjectsCollide, UTF-8 validation to SubjectMatch 2026-02-23 00:33:43 -05:00
Joseph Doherty
e87d4c00d9 feat: add NatsHeaderParser for MIME header parsing 2026-02-23 00:33:24 -05:00
Joseph Doherty
7cf6bb866e feat: add PermissionLruCache (128-entry LRU) and wire into ClientPermissions 2026-02-23 00:33:15 -05:00
Joseph Doherty
17a0a217dd feat: add MaxSubs, MaxSubTokens, Debug, Trace, LogFile, LogSizeLimit, Tags to NatsOptions 2026-02-23 00:32:12 -05:00
Joseph Doherty
5b383ada4b docs: add implementation plan for sections 3-6 gaps 2026-02-23 00:28:31 -05:00
Joseph Doherty
060e1ee23d docs: add implementation plan for sections 7-10 gaps 2026-02-23 00:25:04 -05:00
Joseph Doherty
f4efbcf09e docs: add design for sections 7-10 gaps implementation 2026-02-23 00:17:35 -05:00
Joseph Doherty
f86ea57f43 docs: add design for sections 3-6 gaps implementation 2026-02-23 00:17:24 -05:00
33 changed files with 6341 additions and 259 deletions

View File

@@ -8,6 +8,7 @@
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.3" />
<PackageVersion Include="Serilog.Extensions.Hosting" Version="10.0.0" />
<PackageVersion Include="Serilog.Sinks.Console" Version="6.1.1" />
<PackageVersion Include="Serilog.Sinks.File" Version="6.0.0" />
<!-- Authentication -->
<PackageVersion Include="NATS.NKeys" Version="1.0.0-preview.3" />

View File

@@ -135,16 +135,16 @@ Go implements a sophisticated slow consumer detection system:
| Feature | Go | .NET | Notes |
|---------|:--:|:----:|-------|
| Multi-client-type command routing | Y | N | Go checks `c.kind` to allow/reject commands |
| Protocol tracing in parser | Y | N | Go calls `traceInOp()` per operation |
| Protocol tracing in parser | Y | Y | `TraceInOp()` logs `<<- OP arg` at `LogLevel.Trace` via optional `ILogger` |
| Subject mapping (input→output) | Y | N | Go transforms subjects via mapping rules |
| MIME header parsing | Y | N | .NET delegates header handling to client layer |
| MIME header parsing | Y | Y | `NatsHeaderParser.Parse()` — status line + key-value headers from `ReadOnlySpan<byte>` |
| Message trace event initialization | Y | N | |
### Protocol Writing
| Aspect | Go | .NET | Notes |
|--------|:--:|:----:|-------|
| INFO serialization | Once at startup | Every send | .NET re-serializes JSON each time |
| MSG/HMSG construction | Direct buffer write | String interpolation → byte encode | More allocations in .NET |
| INFO serialization | Once at startup | Once at startup | Cached at startup; nonce connections serialize per-connection |
| MSG/HMSG construction | Direct buffer write | Span-based buffer write | `int.TryFormat` + `CopyTo` into rented buffer, no string allocations |
| Pre-encoded constants | Y | Y | Both pre-encode PING/PONG/OK |
---
@@ -159,18 +159,18 @@ Go implements a sophisticated slow consumer detection system:
| Result caching (1024 max) | Y | Y | Same limits |
| `plist` optimization (>256 subs) | Y | N | Go converts high-fanout nodes to array |
| Async cache sweep (background) | Y | N | .NET sweeps inline under write lock |
| Atomic generation ID for invalidation | Y | N | .NET clears cache explicitly |
| Atomic generation ID for invalidation | Y | Y | `Interlocked.Increment` on insert/remove; cached results store generation |
| Cache eviction strategy | Random | First-N | Semantic difference minimal |
### Missing SubList Features
### SubList Features
| Feature | Go | .NET | Notes |
|---------|:--:|:----:|-------|
| `Stats()` — comprehensive statistics | Y | N | Matches, cache hits, inserts, removes, fanout |
| `HasInterest()` — fast bool check | Y | N | |
| `NumInterest()` — fast count | Y | N | |
| `ReverseMatch()` — pattern→literal query | Y | N | |
| `RemoveBatch()` — efficient bulk removal | Y | N | |
| `All()` — enumerate all subscriptions | Y | N | |
| `Stats()` — comprehensive statistics | Y | Y | Matches, cache hits, inserts, removes tracked via `Interlocked` |
| `HasInterest()` — fast bool check | Y | Y | Walks trie without allocating result list |
| `NumInterest()` — fast count | Y | Y | Counts plain + queue subs without allocation |
| `ReverseMatch()` — pattern→literal query | Y | Y | Finds subscriptions whose wildcards match a literal subject |
| `RemoveBatch()` — efficient bulk removal | Y | Y | Single generation increment for batch; increments `_removes` per sub |
| `All()` — enumerate all subscriptions | Y | Y | Recursive trie walk returning all subscriptions |
| Notification system (interest changes) | Y | N | |
| Local/remote subscription filtering | Y | N | |
| Queue weight expansion (remote subs) | Y | N | |
@@ -181,16 +181,16 @@ Go implements a sophisticated slow consumer detection system:
|---------|:--:|:----:|-------|
| Basic validation (empty tokens, wildcards) | Y | Y | |
| Literal subject check | Y | Y | |
| UTF-8/null rune validation | Y | N | Go has `checkRunes` parameter |
| Collision detection (`SubjectsCollide`) | Y | N | |
| Token utilities (`tokenAt`, `numTokens`) | Y | N | |
| UTF-8/null rune validation | Y | Y | `IsValidSubject(string, bool checkRunes)` rejects null bytes |
| Collision detection (`SubjectsCollide`) | Y | Y | Token-by-token wildcard comparison; O(n) via upfront `Split` |
| Token utilities (`tokenAt`, `numTokens`) | Y | Y | `TokenAt` returns `ReadOnlySpan<char>`; `NumTokens` counts separators |
| Stack-allocated token buffer | Y | N | Go uses `[32]string{}` on stack |
### Subscription Lifecycle
| Feature | Go | .NET | Notes |
|---------|:--:|:----:|-------|
| Per-account subscription limit | Y | N | |
| Auto-unsubscribe on max messages | Y | Y | .NET enforces at delivery time (NatsServer.cs:269-270) |
| Per-account subscription limit | Y | Y | `Account.IncrementSubscriptions()` returns false when `MaxSubscriptions` exceeded |
| Auto-unsubscribe on max messages | Y | Y | Enforced at delivery; sub removed from trie + client dict when exhausted |
| Subscription routing propagation | Y | N | For clusters |
| Queue weight (`qw`) field | Y | N | For remote queue load balancing |
@@ -217,10 +217,10 @@ Go implements a sophisticated slow consumer detection system:
| Feature | Go | .NET | Notes |
|---------|:--:|:----:|-------|
| Per-account SubList isolation | Y | Y | |
| Multi-account user resolution | Y | N | .NET has basic account, no resolution |
| Multi-account user resolution | Y | Y | `AccountConfig` per account in `NatsOptions.Accounts`; `GetOrCreateAccount` wires limits |
| Account exports/imports | Y | N | |
| Per-account connection limits | Y | N | |
| Per-account subscription limits | Y | N | |
| Per-account connection limits | Y | Y | `Account.AddClient()` returns false when `MaxConnections` exceeded |
| Per-account subscription limits | Y | Y | `Account.IncrementSubscriptions()` enforced in `ProcessSub()` |
| Account JetStream limits | Y | N | Excluded per scope |
### Permissions
@@ -228,11 +228,12 @@ Go implements a sophisticated slow consumer detection system:
|---------|:--:|:----:|-------|
| Publish allow list | Y | Y | |
| Subscribe allow list | Y | Y | |
| Publish deny list | Y | Partial | .NET has deny in struct but limited enforcement |
| Subscribe deny list | Y | Partial | Same |
| Message-level deny filtering | Y | N | Go filters at delivery time |
| Permission caching (128 entries) | Y | N | |
| Response permissions (reply tracking) | Y | N | Dynamic reply subject authorization |
| Publish deny list | Y | Y | Full enforcement with LRU-cached results |
| Subscribe deny list | Y | Y | Queue-aware deny checking in `IsSubscribeAllowed` |
| Message-level deny filtering | Y | Y | `IsDeliveryAllowed()` checked before MSG send; auto-unsub cleanup on deny |
| Permission caching (128 entries) | Y | Y | `PermissionLruCache` — Dictionary+LinkedList LRU, matching Go's `maxPermCacheSize` |
| Response permissions (reply tracking) | Y | Y | `ResponseTracker` with configurable TTL + max messages; not LRU-cached |
| Auth expiry enforcement | Y | Y | `Task.Delay` timer closes client when JWT/auth expires |
| Permission templates (JWT) | Y | N | e.g., `{{name()}}`, `{{account-tag(...)}}` |
---
@@ -247,7 +248,7 @@ Go implements a sophisticated slow consumer detection system:
| `-n/--name` (ServerName) | Y | Y | |
| `-m/--http_port` (monitoring) | Y | Y | |
| `-c` (config file) | Y | Stub | Flag parsed, stored in `ConfigFile`, no config parser |
| `-D/-V/-DV` (debug/trace) | Y | N | |
| `-D/-V/-DV` (debug/trace) | Y | Y | Sets `Debug`/`Trace` on `NatsOptions`, adjusts Serilog minimum level |
| `--tlscert/--tlskey/--tlscacert` | Y | Y | |
| `--tlsverify` | Y | Y | |
| `--http_base_path` | Y | Y | |
@@ -259,12 +260,12 @@ Go implements a sophisticated slow consumer detection system:
| Config file parsing | Y | N | Go has custom `conf` parser with includes |
| Hot reload (SIGHUP) | Y | N | |
| Config change detection | Y | N | Go tracks `inConfig`/`inCmdLine` origins |
| ~450 option fields | Y | ~54 | .NET covers core options only |
| ~450 option fields | Y | ~62 | .NET covers core + debug/trace/logging/limits/tags options |
### Missing Options Categories
- Logging options (file, rotation, syslog, trace levels)
- Advanced limits (MaxSubs, MaxSubTokens, MaxPending, WriteDeadline)
- Tags/metadata
- ~~Logging options (file, rotation, syslog, trace levels)~~ — File logging (`-l`), `LogSizeLimit`, Debug/Trace implemented; syslog/color/timestamp not yet
- ~~Advanced limits (MaxSubs, MaxSubTokens, MaxPending, WriteDeadline)~~ — `MaxSubs`, `MaxSubTokens` implemented; MaxPending/WriteDeadline already existed
- ~~Tags/metadata~~ — `Tags` dictionary implemented in `NatsOptions`
- OCSP configuration
- WebSocket/MQTT options
- Operator mode / account resolver
@@ -350,11 +351,11 @@ Go implements a sophisticated slow consumer detection system:
| Feature | Go | .NET | Notes |
|---------|:--:|:----:|-------|
| Structured logging | Partial | Y | .NET uses Serilog with ILogger<T> |
| File logging with rotation | Y | N | |
| File logging with rotation | Y | Y | `-l` flag + `LogSizeLimit` option via Serilog.Sinks.File with `fileSizeLimitBytes` |
| Syslog (local and remote) | Y | N | |
| Log reopening (SIGUSR1) | Y | N | |
| Trace mode (protocol-level) | Y | N | |
| Debug mode | Y | N | |
| Trace mode (protocol-level) | Y | Y | `-V`/`-DV` flags; parser `TraceInOp()` logs `<<- OP arg` at Trace level |
| Debug mode | Y | Y | `-D`/`-DV` flags lower Serilog minimum to Debug/Verbose |
| Per-subsystem log control | Y | N | |
| Color output on TTY | Y | N | |
| Timestamp format control | Y | N | |
@@ -378,16 +379,16 @@ Go implements a sophisticated slow consumer detection system:
## Summary: Critical Gaps for Production Use
### High Priority
1. **Slow consumer detection** — unbounded writes can exhaust memory (stat fields exist but no detection logic)
2. **Write coalescing / batch flush** — performance gap for high-throughput scenarios
1. ~~**Slow consumer detection**~~ — implemented (pending bytes threshold + write deadline)
2. ~~**Write coalescing / batch flush**~~ — implemented (channel-based write loop)
### Medium Priority
3. **Verbose mode** — clients expect `+OK` when `verbose: true`
4. **Permission deny enforcement at delivery** — deny lists checked at SUB/PUB time but not during message delivery
3. ~~**Verbose mode**~~ — implemented (`+OK` on CONNECT/SUB/UNSUB/PUB)
4. ~~**Permission deny enforcement at delivery**~~ — implemented (`IsDeliveryAllowed` + auto-unsub cleanup)
5. **Config file parsing** — needed for production deployment (CLI stub exists)
6. **Hot reload** — needed for zero-downtime config changes (SIGHUP stub exists)
7. **File logging with rotation** — needed for production logging
8. **No-responders validation** — flag parsed but not enforced
7. ~~**File logging with rotation**~~ — implemented (Serilog.Sinks.File with `-l` flag)
8. ~~**No-responders validation**~~ — implemented (CONNECT validation + 503 HMSG)
### Lower Priority
9. **Dynamic buffer sizing** — delegated to Pipe, less optimized for long-lived connections
@@ -395,8 +396,8 @@ Go implements a sophisticated slow consumer detection system:
11. **TLS certificate mapping** — property exists, not implemented
12. **OCSP support** — certificate revocation checking
13. **Subject mapping** — input→output subject transformation
14. **Protocol tracing** — no trace-level logging
15. **Subscription statistics** — SubList has no stats collection
16. **Per-account limits**connections, subscriptions per account
17. **Reply subject tracking** — dynamic response permissions
14. ~~**Protocol tracing**~~ — implemented (`TraceInOp` at `LogLevel.Trace`)
15. ~~**Subscription statistics**~~ — implemented (`Stats()`, `HasInterest()`, `NumInterest()`, etc.)
16. ~~**Per-account limits**~~ — implemented (connection + subscription limits via `AccountConfig`)
17. ~~**Reply subject tracking**~~ — implemented (`ResponseTracker` with TTL + max messages)
18. **Windows Service integration** — needed for Windows deployment

View File

@@ -0,0 +1,226 @@
# Sections 7-10 Gaps Design: Monitoring, TLS, Logging, Ping/Pong
**Date:** 2026-02-23
**Scope:** Implement remaining gaps in differences.md sections 7 (Monitoring), 8 (TLS), 9 (Logging), 10 (Ping/Pong)
**Goal:** Go parity for all features within scope
---
## Section 7: Monitoring
### 7a. `/subz` Endpoint
Replace the empty stub with a full `SubszHandler`.
**Models:**
- `Subsz` — response envelope: `Id`, `Now`, `SublistStats`, `Total`, `Offset`, `Limit`, `Subs[]`
- `SubszOptions``Offset`, `Limit`, `Subscriptions` (bool for detail), `Account` (filter), `Test` (literal subject filter)
- Reuse existing `SubDetail` from Connz
**Algorithm:**
1. Iterate all accounts (or filter by `Account` param)
2. Collect all subscriptions from each account's SubList
3. If `Test` subject provided, filter using `SubjectMatch.MatchLiteral()` to only return subs that would receive that message
4. Apply pagination (offset/limit)
5. If `Subscriptions` is true, include `SubDetail[]` array
**SubList stats** — add a `Stats()` method to `SubList` returning `SublistStats` (count, cache size, inserts, removes, matches, cache hits).
**Files:** New `Monitoring/SubszHandler.cs`, `Monitoring/Subsz.cs`. Modify `MonitorServer.cs`, `SubList.cs`.
### 7b. Connz `ByStop` / `ByReason` Sorting
Add two missing sort options for closed connection queries.
- Add `ByStop` and `ByReason` to `SortOpt` enum
- Parse `sort=stop` and `sort=reason` in query params
- Validate: these sorts only work with `state=closed` — return error if used with open connections
### 7c. Connz State Filtering & Closed Connections
Track closed connections and support state-based filtering.
**Closed connection tracking:**
- `ClosedClient` record: `Cid`, `Ip`, `Port`, `Start`, `Stop`, `Reason`, `Name`, `Lang`, `Version`, `InMsgs`, `OutMsgs`, `InBytes`, `OutBytes`, `NumSubs`, `Rtt`, `TlsVersion`, `TlsCipherSuite`
- `ConcurrentQueue<ClosedClient>` on `NatsServer` (capped at 10,000 entries)
- Populate in `RemoveClient()` from client state before disposal
**State filter:**
- Parse `state=open|closed|all` query param
- `open` (default): current live connections only
- `closed`: only from closed connections list
- `all`: merge both
**Files:** Modify `NatsServer.cs`, `ConnzHandler.cs`, new `Monitoring/ClosedClient.cs`.
### 7d. Varz Slow Consumer Stats
Already at parity. `SlowConsumersStats` is populated from `ServerStats` counters. No changes needed.
---
## Section 8: TLS
### 8a. TLS Rate Limiting
Already implemented via `TlsRateLimiter` (semaphore + periodic refill timer). Wired into `AcceptClientAsync`. Only a unit test needed.
### 8b. TLS Cert-to-User Mapping (TlsMap)
Full DN parsing using .NET built-in `X500DistinguishedName`.
**New `TlsMapAuthenticator`:**
- Implements `IAuthenticator`
- Receives the list of configured `User` objects
- On `Authenticate()`:
1. Extract `X509Certificate2` from auth context (passed from `TlsConnectionState`)
2. Parse subject DN via `cert.SubjectName` (`X500DistinguishedName`)
3. Build normalized DN string from RDN components
4. Try exact DN match against user map (key = DN string)
5. If no exact match, try CN-only match
6. Return `AuthResult` with matched user's permissions
**Auth context extension:**
- Add `X509Certificate2? ClientCertificate` to `ClientAuthContext`
- Pass certificate from `TlsConnectionState` in `ProcessConnectAsync`
**AuthService integration:**
- When `options.TlsMap && options.TlsVerify`, add `TlsMapAuthenticator` to authenticator chain
- TlsMap auth runs before other authenticators (cert-based auth takes priority)
**Files:** New `Auth/TlsMapAuthenticator.cs`. Modify `Auth/AuthService.cs`, `Auth/ClientAuthContext.cs`, `NatsClient.cs`.
---
## Section 9: Logging
### 9a. File Logging with Rotation
**New options on `NatsOptions`:**
- `LogFile` (string?) — path to log file
- `LogSizeLimit` (long) — file size in bytes before rotation (0 = unlimited)
- `LogMaxFiles` (int) — max retained rotated files (0 = unlimited)
**CLI flags:** `--log_file`, `--log_size_limit`, `--log_max_files`
**Serilog config:** Add `WriteTo.File()` with `fileSizeLimitBytes` and `retainedFileCountLimit` when `LogFile` is set.
### 9b. Debug/Trace Modes
**New options on `NatsOptions`:**
- `Debug` (bool) — enable debug-level logging
- `Trace` (bool) — enable trace/verbose-level logging
**CLI flags:** `-D` (debug), `-V` or `-T` (trace), `-DV` (both)
**Serilog config:**
- Default: `MinimumLevel.Information()`
- `-D`: `MinimumLevel.Debug()`
- `-V`/`-T`: `MinimumLevel.Verbose()`
### 9c. Color Output
Auto-detect TTY via `Console.IsOutputRedirected`.
- TTY: use `Serilog.Sinks.Console` with `AnsiConsoleTheme.Code`
- Non-TTY: use `ConsoleTheme.None`
Matches Go's behavior of disabling color when stderr is not a terminal.
### 9d. Timestamp Format Control
**New options on `NatsOptions`:**
- `Logtime` (bool, default true) — include timestamps
- `LogtimeUTC` (bool, default false) — use UTC format
**CLI flags:** `--logtime` (true/false), `--logtime_utc`
**Output template adjustment:**
- With timestamps: `[{Timestamp:yyyy/MM/dd HH:mm:ss.ffffff} {Level:u3}] {Message:lj}{NewLine}{Exception}`
- Without timestamps: `[{Level:u3}] {Message:lj}{NewLine}{Exception}`
- UTC: set `Serilog.Formatting` culture to UTC
### 9e. Log Reopening (SIGUSR1)
When file logging is configured:
- SIGUSR1 handler calls `ReOpenLogFile()` on the server
- `ReOpenLogFile()` flushes and closes current Serilog logger, creates new one with same config
- This enables external log rotation tools (logrotate)
**Files:** Modify `NatsOptions.cs`, `Program.cs`, `NatsServer.cs`.
---
## Section 10: Ping/Pong
### 10a. RTT Tracking
**New fields on `NatsClient`:**
- `_rttStartTicks` (long) — UTC ticks when PING sent
- `_rtt` (long) — computed RTT in ticks
- `Rtt` property (TimeSpan) — computed from `_rtt`
**Logic:**
- In `RunPingTimerAsync`, before writing PING: `_rttStartTicks = DateTime.UtcNow.Ticks`
- In `DispatchCommandAsync` PONG handler: compute `_rtt = DateTime.UtcNow.Ticks - _rttStartTicks` (min 1 tick)
- `computeRTT()` helper ensures minimum 1 tick (handles clock granularity on Windows)
**Monitoring exposure:**
- Populate `ConnInfo.Rtt` as formatted string (e.g., `"1.234ms"`)
- Add `ByRtt` sort option to Connz
### 10b. RTT-Based First PING Delay
**New state on `NatsClient`:**
- `_firstPongSent` flag in `ClientFlags`
**Logic in `RunPingTimerAsync`:**
- Before first PING, check: `_firstPongSent || timeSinceStart > 2 seconds`
- If neither condition met, skip this PING cycle
- Set `_firstPongSent` on first PONG after CONNECT (in PONG handler)
This prevents the server from sending PING (for RTT) before the client has had a chance to respond to the initial INFO with CONNECT+PING.
### 10c. Stale Connection Stats
**New model:**
- `StaleConnectionStats``Clients`, `Routes`, `Gateways`, `Leafs` (matching Go)
**ServerStats extension:**
- Add `StaleConnectionClients`, `StaleConnectionRoutes`, etc. fields
- Increment in `MarkClosed(StaleConnection)` based on connection kind
**Varz exposure:**
- Add `StaleConnectionStats` field to `Varz`
- Populate from `ServerStats` counters
**Files:** Modify `NatsClient.cs`, `ServerStats.cs`, `Varz.cs`, `VarzHandler.cs`, `Connz.cs`, `ConnzHandler.cs`.
---
## Test Coverage
Each section includes unit tests:
| Feature | Test File | Tests |
|---------|-----------|-------|
| Subz endpoint | SubszHandlerTests.cs | Empty response, with subs, account filter, test subject filter, pagination |
| Connz closed state | ConnzHandlerTests.cs | State=closed, ByStop sort, ByReason sort, validation errors |
| TLS rate limiter | TlsRateLimiterTests.cs | Rate enforcement, refill behavior |
| TlsMap auth | TlsMapAuthenticatorTests.cs | DN matching, CN fallback, no match |
| File logging | LoggingTests.cs | File creation, rotation on size limit |
| RTT tracking | ClientTests.cs | RTT computed on PONG, exposed in connz, ByRtt sort |
| First PING delay | ClientTests.cs | PING delayed until first PONG or 2s |
| Stale stats | ServerTests.cs | Stale counters incremented, exposed in varz |
---
## Parallelization Strategy
These work streams are independent and can be developed by parallel subagents:
1. **Monitoring stream** (7a, 7b, 7c): SubszHandler + Connz closed connections + state filter
2. **TLS stream** (8b): TlsMapAuthenticator
3. **Logging stream** (9a-9e): All logging improvements
4. **Ping/Pong stream** (10a-10c): RTT tracking + first PING delay + stale stats
Streams 1-4 touch different files with minimal overlap. The only shared touch point is `NatsOptions.cs` (new options for logging and ping/pong), which can be handled by one stream first and the others will build on it.

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,19 @@
{
"planPath": "docs/plans/2026-02-23-sections-7-10-gaps-plan.md",
"tasks": [
{"id": 6, "subject": "Task 0: Add NuGet dependencies for logging sinks", "status": "pending"},
{"id": 7, "subject": "Task 1: Add logging and ping/pong options to NatsOptions", "status": "pending", "blockedBy": [6]},
{"id": 8, "subject": "Task 2: Add CLI flag parsing for logging and debug/trace", "status": "pending", "blockedBy": [6, 7]},
{"id": 9, "subject": "Task 3: Implement log reopening on SIGUSR1", "status": "pending", "blockedBy": [6, 7]},
{"id": 10, "subject": "Task 4: Add RTT tracking to NatsClient", "status": "pending", "blockedBy": [6, 7]},
{"id": 11, "subject": "Task 5: Add stale connection stats and expose in varz", "status": "pending", "blockedBy": [6, 7]},
{"id": 12, "subject": "Task 6: Add closed connection tracking and connz state filtering", "status": "pending", "blockedBy": [10]},
{"id": 13, "subject": "Task 7: Implement /subz endpoint", "status": "pending", "blockedBy": [6, 7]},
{"id": 14, "subject": "Task 8: Implement TLS cert-to-user mapping (TlsMap)", "status": "pending", "blockedBy": [6, 7]},
{"id": 15, "subject": "Task 9: Add TLS rate limiter test", "status": "pending", "blockedBy": [6, 7]},
{"id": 16, "subject": "Task 10: File logging tests", "status": "pending", "blockedBy": [6, 7, 8]},
{"id": 17, "subject": "Task 11: Run full test suite and verify", "status": "pending", "blockedBy": [8, 9, 10, 11, 12, 13, 14, 15, 16]},
{"id": 18, "subject": "Task 12: Update differences.md", "status": "pending", "blockedBy": [17]}
],
"lastUpdated": "2026-02-23T00:00:00Z"
}

View File

@@ -0,0 +1,190 @@
# Sections 3-6 Gaps Implementation Design
> Approved 2026-02-23. Implements all remaining gaps in Protocol Parsing, Subscriptions & Subject Matching, Authentication & Authorization, and Configuration.
---
## Section 3 — Protocol Parsing
### 3a. INFO Serialization Caching
Add `byte[] _infoJsonCache` on `NatsServer`. Build once in `StartAsync()` after `ServerInfo` is populated. Rebuild only on config reload.
`NatsClient.SendInfo()` uses cached bytes for non-nonce connections. For NKey connections (which need a per-connection nonce), clone `ServerInfo`, set nonce, serialize fresh.
**Files:** `NatsServer.cs`, `NatsClient.cs`
### 3b. Protocol Tracing
Add `ILogger` to `NatsParser`. Add `TraceInOp(ReadOnlySpan<byte> op, ReadOnlySpan<byte> arg)` that logs at `LogLevel.Trace` after each command dispatch in `TryParse()`.
Controlled by `NatsOptions.Trace` (which sets log level filter).
**Files:** `NatsParser.cs`, `NatsOptions.cs`
### 3c. MIME Header Parsing
New `NatsHeaderParser` static class in `Protocol/`. Parses `NATS/1.0 <status> <description>\r\n` status line + `Key: Value\r\n` pairs.
Returns `NatsHeaders` readonly struct with `Status` (int), `Description` (string), and key-value `Dictionary<string, string[]>` lookup.
Used in `ProcessMessage()` for header inspection (no-responders status, future message tracing).
**Files:** New `Protocol/NatsHeaderParser.cs`, `NatsServer.cs`
### 3d. MSG/HMSG Construction Optimization
Pre-allocate buffers for MSG/HMSG prefix using `Span<byte>` and `Utf8Formatter` instead of string interpolation + `Encoding.UTF8.GetBytes()`. Reduces per-message allocations.
**Files:** `NatsClient.cs`
---
## Section 4 — Subscriptions & Subject Matching
### 4a. Atomic Generation ID for Cache Invalidation
Add `long _generation` to `SubList`. Increment via `Interlocked.Increment` on every `Insert()` and `Remove()`. Cache entries store generation at computation time. On `Match()`, stale generation = cache miss.
Replaces current explicit per-key cache removal. O(1) invalidation.
**Files:** `SubList.cs`
### 4b. Async Background Cache Sweep
Replace inline sweep (runs under write lock) with `PeriodicTimer`-based background task. Acquires write lock briefly to snapshot + evict stale entries. Triggered when cache count > 1024, sweeps to 256.
**Files:** `SubList.cs`
### 4c. `plist` Optimization for High-Fanout Nodes
On `TrieNode`, when `PlainSubs.Count > 256`, convert `HashSet<Subscription>` to `Subscription[]` flat array in `PList` field. `Match()` iterates `PList` when present. On count < 128, convert back to HashSet.
**Files:** `SubList.cs`
### 4d. SubList Utility Methods
| Method | Description |
|--------|-------------|
| `Stats()` | Returns `SubListStats` record with NumSubs, NumCache, NumInserts, NumRemoves, NumMatches, CacheHitRate, MaxFanout. Track via `Interlocked` counters. |
| `HasInterest(string)` | Walk trie without building result, return true on first hit. |
| `NumInterest(string)` | Walk trie, count without allocating result arrays. |
| `ReverseMatch(string)` | Walk trie with literal tokens, collect all subscription patterns matching the literal. |
| `RemoveBatch(IEnumerable<Subscription>)` | Single write-lock, batch removes, single generation bump. |
| `All()` | Depth-first trie walk, yield all subscriptions. Returns `IReadOnlyList<Subscription>`. |
| `MatchBytes(ReadOnlySpan<byte>)` | Zero-copy match using byte-based `TokenEnumerator`. |
**Files:** `SubList.cs`, new `SubListStats.cs`
### 4e. SubjectMatch Utilities
| Method | Description |
|--------|-------------|
| `SubjectsCollide(string, string)` | Token-by-token comparison handling `*` and `>`. Two patterns collide if any literal could match both. |
| `TokenAt(string, int)` | Return nth dot-delimited token as `ReadOnlySpan<char>`. |
| `NumTokens(string)` | Count dots + 1. |
| UTF-8/null validation | Add `checkRunes` parameter to `IsValidSubject()`. When true, scan for `\0` bytes and validate UTF-8. |
**Files:** `SubjectMatch.cs`
### 4f. Per-Account Subscription Limits
Add `MaxSubscriptions` to `Account`. Track `SubscriptionCount` via `Interlocked`. Enforce in `ProcessSub()`. Close with `ClientClosedReason.MaxSubscriptionsExceeded`.
**Files:** `Account.cs`, `NatsClient.cs`
---
## Section 5 — Authentication & Authorization
### 5a. Deny List Enforcement at Delivery Time
In `NatsServer.DeliverMessage()`, before sending MSG, check `subscriber.Client.Permissions?.IsDeliveryAllowed(subject)`. New method checks publish deny list for the receiving client ("msg delivery filter"). Cache results in pub cache.
**Files:** `NatsServer.cs`, `ClientPermissions.cs`
### 5b. Permission Caching with 128-Entry LRU
Replace `ConcurrentDictionary<string, bool>` with custom `PermissionLruCache` (128 entries). `Dictionary<string, LinkedListNode>` + `LinkedList` for LRU ordering. Lock-protected (per-client, low contention).
**Files:** `ClientPermissions.cs`, new `Auth/PermissionLruCache.cs`
### 5c. Subscribe Deny Queue-Group Checking
`IsSubscribeAllowed(subject, queue)` checks queue group against subscribe deny list (currently ignores queue parameter).
**Files:** `ClientPermissions.cs`
### 5d. Response Permissions (Reply Tracking)
New `ResponseTracker` class on `NatsClient`. Created when `Permissions.Response` is non-null. Tracks reply subjects with TTL (`ResponsePermission.Expires`) and max count (`ResponsePermission.MaxMsgs`). `IsPublishAllowed()` consults tracker for reply subjects not in static allow list. Expired entries cleaned lazily + in ping timer.
**Files:** New `Auth/ResponseTracker.cs`, `ClientPermissions.cs`, `NatsClient.cs`
### 5e. Per-Account Connection Limits
Add `MaxConnections` to `Account`. Enforce in `ProcessConnectAsync()` after account assignment. Reject with `-ERR maximum connections for account exceeded`.
**Files:** `Account.cs`, `NatsClient.cs`
### 5f. Multi-Account User Resolution
Add `NatsOptions.Accounts` as `Dictionary<string, AccountConfig>` with per-account MaxConnections, MaxSubscriptions, DefaultPermissions. `AuthService` resolves account name to configured `Account` with limits.
**Files:** `NatsOptions.cs`, new `Auth/AccountConfig.cs`, `AuthService.cs`, `NatsServer.cs`
### 5g. Auth Expiry Enforcement
In `ProcessConnectAsync()`, if `AuthResult.Expiry` is set, start `CancellationTokenSource.CancelAfter(expiry - now)`. Link to client lifetime. On fire, close with `ClientClosedReason.AuthenticationExpired`.
**Files:** `NatsClient.cs`
### 5h. Auto-Unsub Cleanup
In `DeliverMessage()`, when `sub.MessageCount >= sub.MaxMessages`, call `sub.Client.RemoveSubscription(sub.Sid)` and `subList.Remove(sub)` to clean up both tracking dict and trie. Currently only skips delivery.
**Files:** `NatsServer.cs`, `NatsClient.cs`
---
## Section 6 — Configuration
### 6a. Debug/Trace CLI Flags
Add `-D`/`--debug`, `-V`/`--trace`, `-DV` to `Program.cs`. Set `NatsOptions.Debug` and `NatsOptions.Trace`. Wire into Serilog minimum level.
**Files:** `Program.cs`
### 6b. New NatsOptions Fields
| Field | Type | Default | Purpose |
|-------|------|---------|---------|
| `MaxSubs` | int | 0 (unlimited) | Per-connection subscription limit |
| `MaxSubTokens` | int | 0 (unlimited) | Max tokens in a subject |
| `Debug` | bool | false | Enable debug-level logging |
| `Trace` | bool | false | Enable trace-level protocol logging |
| `LogFile` | string? | null | Log to file instead of console |
| `LogSizeLimit` | long | 0 | Max log file size before rotation |
| `Tags` | Dictionary<string, string>? | null | Server metadata tags |
**Files:** `NatsOptions.cs`
### 6c. Logging Options Wiring
In `Program.cs`, if `LogFile` is set, add Serilog `File` sink with `LogSizeLimit`. If `Debug`/`Trace`, override Serilog minimum level.
**Files:** `Program.cs`
---
## Implementation Strategy
Execute in a git worktree. Parallelize where files don't overlap:
- **Parallel batch 1:** SubjectMatch utilities (4e) | NatsOptions + CLI flags (6a, 6b) | NatsHeaderParser (3c) | PermissionLruCache (5b) | SubListStats (4d stats class)
- **Parallel batch 2:** SubList overhaul (4a, 4b, 4c, 4d methods) | Account limits + config (4f, 5e, 5f, 5g) | Response tracker (5d)
- **Parallel batch 3:** Protocol tracing (3b) | INFO caching (3a) | MSG optimization (3d)
- **Sequential:** Delivery-time enforcement (5a, 5c, 5h) — touches NatsServer.cs + ClientPermissions.cs, must be coordinated
- **Final:** Logging wiring (6c) | differences.md update
Tests added alongside each feature in the appropriate test file.

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,20 @@
{
"planPath": "docs/plans/2026-02-23-sections3-6-gaps-plan.md",
"tasks": [
{"id": 6, "subject": "Task 1: NatsOptions — Add New Configuration Fields", "status": "pending"},
{"id": 7, "subject": "Task 2: CLI Flags — Add -D/-V/-DV and Logging Options", "status": "pending", "blockedBy": [6]},
{"id": 8, "subject": "Task 3: SubjectMatch Utilities", "status": "pending"},
{"id": 9, "subject": "Task 4: SubList — Generation ID, Stats, and Utility Methods", "status": "pending"},
{"id": 10, "subject": "Task 5: NatsHeaderParser — MIME Header Parsing", "status": "pending"},
{"id": 11, "subject": "Task 6: PermissionLruCache — 128-Entry LRU", "status": "pending"},
{"id": 12, "subject": "Task 7: Account Limits and AccountConfig", "status": "pending", "blockedBy": [6]},
{"id": 13, "subject": "Task 8: MaxSubs Enforcement, Subscribe Deny Queue, Delivery-Time Deny", "status": "pending", "blockedBy": [11, 12]},
{"id": 14, "subject": "Task 9: Response Permissions (Reply Tracking)", "status": "pending", "blockedBy": [11, 13]},
{"id": 15, "subject": "Task 10: Auth Expiry Enforcement", "status": "pending"},
{"id": 16, "subject": "Task 11: INFO Serialization Caching", "status": "pending"},
{"id": 17, "subject": "Task 12: Protocol Tracing", "status": "pending", "blockedBy": [6]},
{"id": 18, "subject": "Task 13: MSG/HMSG Construction Optimization", "status": "pending"},
{"id": 19, "subject": "Task 14: Verify, Run Full Test Suite, Update differences.md", "status": "pending", "blockedBy": [13, 14, 15, 16, 17, 18]}
],
"lastUpdated": "2026-02-23T00:00:00Z"
}

View File

@@ -11,6 +11,7 @@
<ItemGroup>
<PackageReference Include="Serilog.Extensions.Hosting" />
<PackageReference Include="Serilog.Sinks.Console" />
<PackageReference Include="Serilog.Sinks.File" />
</ItemGroup>
</Project>

View File

@@ -1,12 +1,6 @@
using NATS.Server;
using Serilog;
Log.Logger = new LoggerConfiguration()
.MinimumLevel.Debug()
.Enrich.FromLogContext()
.WriteTo.Console(outputTemplate: "[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj}{NewLine}{Exception}")
.CreateLogger();
var options = new NatsOptions();
// Simple CLI argument parsing
@@ -55,9 +49,47 @@ for (int i = 0; i < args.Length; i++)
case "--tlsverify":
options.TlsVerify = true;
break;
case "-D" or "--debug":
options.Debug = true;
break;
case "-V" or "--trace":
options.Trace = true;
break;
case "-DV":
options.Debug = true;
options.Trace = true;
break;
case "-l" or "--log" when i + 1 < args.Length:
options.LogFile = args[++i];
break;
case "--log_size_limit" when i + 1 < args.Length:
options.LogSizeLimit = long.Parse(args[++i]);
break;
}
}
// Configure Serilog based on options
var logConfig = new LoggerConfiguration()
.Enrich.FromLogContext()
.WriteTo.Console(outputTemplate: "[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj}{NewLine}{Exception}");
if (options.Trace)
logConfig.MinimumLevel.Verbose();
else if (options.Debug)
logConfig.MinimumLevel.Debug();
else
logConfig.MinimumLevel.Information();
if (options.LogFile != null)
{
logConfig.WriteTo.File(
options.LogFile,
fileSizeLimitBytes: options.LogSizeLimit > 0 ? options.LogSizeLimit : null,
rollOnFileSizeLimit: options.LogSizeLimit > 0);
}
Log.Logger = logConfig.CreateLogger();
using var loggerFactory = new Serilog.Extensions.Logging.SerilogLoggerFactory(Log.Logger);
using var server = new NatsServer(options, loggerFactory);

View File

@@ -10,8 +10,11 @@ public sealed class Account : IDisposable
public string Name { get; }
public SubList SubList { get; } = new();
public Permissions? DefaultPermissions { get; set; }
public int MaxConnections { get; set; } // 0 = unlimited
public int MaxSubscriptions { get; set; } // 0 = unlimited
private readonly ConcurrentDictionary<ulong, byte> _clients = new();
private int _subscriptionCount;
public Account(string name)
{
@@ -19,10 +22,31 @@ public sealed class Account : IDisposable
}
public int ClientCount => _clients.Count;
public int SubscriptionCount => Volatile.Read(ref _subscriptionCount);
public void AddClient(ulong clientId) => _clients[clientId] = 0;
/// <summary>Returns false if max connections exceeded.</summary>
public bool AddClient(ulong clientId)
{
if (MaxConnections > 0 && _clients.Count >= MaxConnections)
return false;
_clients[clientId] = 0;
return true;
}
public void RemoveClient(ulong clientId) => _clients.TryRemove(clientId, out _);
public bool IncrementSubscriptions()
{
if (MaxSubscriptions > 0 && Volatile.Read(ref _subscriptionCount) >= MaxSubscriptions)
return false;
Interlocked.Increment(ref _subscriptionCount);
return true;
}
public void DecrementSubscriptions()
{
Interlocked.Decrement(ref _subscriptionCount);
}
public void Dispose() => SubList.Dispose();
}

View File

@@ -0,0 +1,8 @@
namespace NATS.Server.Auth;
public sealed class AccountConfig
{
public int MaxConnections { get; init; } // 0 = unlimited
public int MaxSubscriptions { get; init; } // 0 = unlimited
public Permissions? DefaultPermissions { get; init; }
}

View File

@@ -1,4 +1,3 @@
using System.Collections.Concurrent;
using NATS.Server.Subscriptions;
namespace NATS.Server.Auth;
@@ -7,12 +6,14 @@ public sealed class ClientPermissions : IDisposable
{
private readonly PermissionSet? _publish;
private readonly PermissionSet? _subscribe;
private readonly ConcurrentDictionary<string, bool> _pubCache = new(StringComparer.Ordinal);
private readonly ResponseTracker? _responseTracker;
private readonly PermissionLruCache _pubCache = new(128);
private ClientPermissions(PermissionSet? publish, PermissionSet? subscribe)
private ClientPermissions(PermissionSet? publish, PermissionSet? subscribe, ResponseTracker? responseTracker)
{
_publish = publish;
_subscribe = subscribe;
_responseTracker = responseTracker;
}
public static ClientPermissions? Build(Permissions? permissions)
@@ -22,27 +23,55 @@ public sealed class ClientPermissions : IDisposable
var pub = PermissionSet.Build(permissions.Publish);
var sub = PermissionSet.Build(permissions.Subscribe);
ResponseTracker? responseTracker = null;
if (permissions.Response != null)
responseTracker = new ResponseTracker(permissions.Response.MaxMsgs, permissions.Response.Expires);
if (pub == null && sub == null)
if (pub == null && sub == null && responseTracker == null)
return null;
return new ClientPermissions(pub, sub);
return new ClientPermissions(pub, sub, responseTracker);
}
public ResponseTracker? ResponseTracker => _responseTracker;
public bool IsPublishAllowed(string subject)
{
if (_publish == null)
return true;
return _pubCache.GetOrAdd(subject, _publish.IsAllowed);
if (_pubCache.TryGet(subject, out var cached))
return cached;
var allowed = _publish.IsAllowed(subject);
// If denied but response tracking is enabled, check reply table
if (!allowed && _responseTracker != null)
{
if (_responseTracker.IsReplyAllowed(subject))
return true; // Don't cache dynamic reply permissions
}
_pubCache.Set(subject, allowed);
return allowed;
}
public bool IsSubscribeAllowed(string subject, string? queue = null)
{
if (_subscribe == null)
return true;
if (!_subscribe.IsAllowed(subject))
return false;
if (queue != null && _subscribe.IsDenied(queue))
return false;
return true;
}
return _subscribe.IsAllowed(subject);
public bool IsDeliveryAllowed(string subject)
{
if (_subscribe == null)
return true;
return _subscribe.IsDeliveryAllowed(subject);
}
public void Dispose()
@@ -113,6 +142,21 @@ public sealed class PermissionSet : IDisposable
return allowed;
}
public bool IsDenied(string subject)
{
if (_deny == null) return false;
var result = _deny.Match(subject);
return result.PlainSubs.Length > 0 || result.QueueSubs.Length > 0;
}
public bool IsDeliveryAllowed(string subject)
{
if (_deny == null)
return true;
var result = _deny.Match(subject);
return result.PlainSubs.Length == 0 && result.QueueSubs.Length == 0;
}
public void Dispose()
{
_allow?.Dispose();

View File

@@ -0,0 +1,73 @@
namespace NATS.Server.Auth;
/// <summary>
/// Fixed-capacity LRU cache for permission results.
/// Lock-protected (per-client, low contention).
/// Reference: Go client.go maxPermCacheSize=128.
/// </summary>
public sealed class PermissionLruCache
{
private readonly int _capacity;
private readonly Dictionary<string, LinkedListNode<(string Key, bool Value)>> _map;
private readonly LinkedList<(string Key, bool Value)> _list = new();
private readonly object _lock = new();
public PermissionLruCache(int capacity = 128)
{
_capacity = capacity;
_map = new Dictionary<string, LinkedListNode<(string Key, bool Value)>>(capacity, StringComparer.Ordinal);
}
public bool TryGet(string key, out bool value)
{
lock (_lock)
{
if (_map.TryGetValue(key, out var node))
{
value = node.Value.Value;
_list.Remove(node);
_list.AddFirst(node);
return true;
}
value = default;
return false;
}
}
public int Count
{
get
{
lock (_lock)
{
return _map.Count;
}
}
}
public void Set(string key, bool value)
{
lock (_lock)
{
if (_map.TryGetValue(key, out var existing))
{
_list.Remove(existing);
existing.Value = (key, value);
_list.AddFirst(existing);
return;
}
if (_map.Count >= _capacity)
{
var last = _list.Last!;
_map.Remove(last.Value.Key);
_list.RemoveLast();
}
var node = new LinkedListNode<(string Key, bool Value)>((key, value));
_list.AddFirst(node);
_map[key] = node;
}
}
}

View File

@@ -0,0 +1,78 @@
namespace NATS.Server.Auth;
/// <summary>
/// Tracks reply subjects that a client is temporarily allowed to publish to.
/// Reference: Go client.go resp struct, setResponsePermissionIfNeeded.
/// </summary>
public sealed class ResponseTracker
{
private readonly int _maxMsgs; // 0 = unlimited
private readonly TimeSpan _expires; // TimeSpan.Zero = no TTL
private readonly Dictionary<string, (DateTime RegisteredAt, int Count)> _replies = new(StringComparer.Ordinal);
private readonly object _lock = new();
public ResponseTracker(int maxMsgs, TimeSpan expires)
{
_maxMsgs = maxMsgs;
_expires = expires;
}
public int Count
{
get { lock (_lock) return _replies.Count; }
}
public void RegisterReply(string replySubject)
{
lock (_lock)
{
_replies[replySubject] = (DateTime.UtcNow, 0);
}
}
public bool IsReplyAllowed(string subject)
{
lock (_lock)
{
if (!_replies.TryGetValue(subject, out var entry))
return false;
if (_expires > TimeSpan.Zero && DateTime.UtcNow - entry.RegisteredAt > _expires)
{
_replies.Remove(subject);
return false;
}
var newCount = entry.Count + 1;
if (_maxMsgs > 0 && newCount > _maxMsgs)
{
_replies.Remove(subject);
return false;
}
_replies[subject] = (entry.RegisteredAt, newCount);
return true;
}
}
public void Prune()
{
lock (_lock)
{
if (_expires <= TimeSpan.Zero && _maxMsgs <= 0)
return;
var now = DateTime.UtcNow;
var toRemove = new List<string>();
foreach (var (key, entry) in _replies)
{
if (_expires > TimeSpan.Zero && now - entry.RegisteredAt > _expires)
toRemove.Add(key);
else if (_maxMsgs > 0 && entry.Count >= _maxMsgs)
toRemove.Add(key);
}
foreach (var key in toRemove)
_replies.Remove(key);
}
}
}

View File

@@ -23,6 +23,7 @@ public enum ClientClosedReason
ServerShutdown,
MsgHeaderViolation,
NoRespondersRequiresHeaders,
AuthenticationExpired,
}
public static class ClientClosedReasonExtensions
@@ -46,6 +47,7 @@ public static class ClientClosedReasonExtensions
ClientClosedReason.ServerShutdown => "Server Shutdown",
ClientClosedReason.MsgHeaderViolation => "Message Header Violation",
ClientClosedReason.NoRespondersRequiresHeaders => "No Responders Requires Headers",
ClientClosedReason.AuthenticationExpired => "Authentication Expired",
_ => reason.ToString(),
};
}

View File

@@ -48,6 +48,7 @@ public sealed class NatsClient : IDisposable
public ClientOptions? ClientOpts { get; private set; }
public IMessageRouter? Router { get; set; }
public Account? Account { get; private set; }
public ClientPermissions? Permissions => _permissions;
private readonly ClientFlagHolder _flags = new();
public bool ConnectReceived => _flags.HasFlag(ClientFlags.ConnectReceived);
@@ -90,7 +91,7 @@ public sealed class NatsClient : IDisposable
_nonce = nonce;
_logger = logger;
_serverStats = serverStats;
_parser = new NatsParser(options.MaxPayload);
_parser = new NatsParser(options.MaxPayload, options.Trace ? logger : null);
StartTime = DateTime.UtcNow;
_lastActivityTicks = StartTime.Ticks;
if (socket.RemoteEndPoint is IPEndPoint ep)
@@ -348,6 +349,7 @@ public sealed class NatsClient : IDisposable
?? new ClientOptions();
// Authenticate if auth is required
AuthResult? authResult = null;
if (_authService.IsAuthRequired)
{
var context = new ClientAuthContext
@@ -356,8 +358,8 @@ public sealed class NatsClient : IDisposable
Nonce = _nonce ?? [],
};
var result = _authService.Authenticate(context);
if (result == null)
authResult = _authService.Authenticate(context);
if (authResult == null)
{
_logger.LogWarning("Client {ClientId} authentication failed", Id);
await SendErrAndCloseAsync(NatsProtocol.ErrAuthorizationViolation, ClientClosedReason.AuthenticationViolation);
@@ -365,17 +367,23 @@ public sealed class NatsClient : IDisposable
}
// Build permissions from auth result
_permissions = ClientPermissions.Build(result.Permissions);
_permissions = ClientPermissions.Build(authResult.Permissions);
// Resolve account
if (Router is NatsServer server)
{
var accountName = result.AccountName ?? Account.GlobalAccountName;
var accountName = authResult.AccountName ?? Account.GlobalAccountName;
Account = server.GetOrCreateAccount(accountName);
Account.AddClient(Id);
if (!Account.AddClient(Id))
{
Account = null;
await SendErrAndCloseAsync("maximum connections for account exceeded",
ClientClosedReason.AuthenticationViolation);
return;
}
}
_logger.LogDebug("Client {ClientId} authenticated as {Identity}", Id, result.Identity);
_logger.LogDebug("Client {ClientId} authenticated as {Identity}", Id, authResult.Identity);
// Clear nonce after use -- defense-in-depth against memory dumps
if (_nonce != null)
@@ -386,7 +394,13 @@ public sealed class NatsClient : IDisposable
if (Account == null && Router is NatsServer server2)
{
Account = server2.GetOrCreateAccount(Account.GlobalAccountName);
Account.AddClient(Id);
if (!Account.AddClient(Id))
{
Account = null;
await SendErrAndCloseAsync("maximum connections for account exceeded",
ClientClosedReason.AuthenticationViolation);
return;
}
}
// Validate no_responders requires headers
@@ -401,6 +415,32 @@ public sealed class NatsClient : IDisposable
_flags.SetFlag(ClientFlags.ConnectReceived);
_flags.SetFlag(ClientFlags.ConnectProcessFinished);
_logger.LogDebug("CONNECT received from client {ClientId}, name={ClientName}", Id, ClientOpts?.Name);
// Start auth expiry timer if needed
if (_authService.IsAuthRequired && authResult?.Expiry is { } expiry)
{
var remaining = expiry - DateTimeOffset.UtcNow;
if (remaining > TimeSpan.Zero)
{
_ = Task.Run(async () =>
{
try
{
await Task.Delay(remaining, _clientCts!.Token);
_logger.LogDebug("Client {ClientId} authentication expired", Id);
await SendErrAndCloseAsync("Authentication Expired",
ClientClosedReason.AuthenticationExpired);
}
catch (OperationCanceledException) { }
}, _clientCts!.Token);
}
else
{
await SendErrAndCloseAsync("Authentication Expired",
ClientClosedReason.AuthenticationExpired);
return;
}
}
}
private void ProcessSub(ParsedCommand cmd)
@@ -413,6 +453,24 @@ public sealed class NatsClient : IDisposable
return;
}
// Per-connection subscription limit
if (_options.MaxSubs > 0 && _subs.Count >= _options.MaxSubs)
{
_logger.LogDebug("Client {ClientId} max subscriptions exceeded", Id);
_ = SendErrAndCloseAsync(NatsProtocol.ErrMaxSubscriptionsExceeded,
ClientClosedReason.MaxSubscriptionsExceeded);
return;
}
// Per-account subscription limit
if (Account != null && !Account.IncrementSubscriptions())
{
_logger.LogDebug("Client {ClientId} account subscription limit exceeded", Id);
_ = SendErrAndCloseAsync(NatsProtocol.ErrMaxSubscriptionsExceeded,
ClientClosedReason.MaxSubscriptionsExceeded);
return;
}
var sub = new Subscription
{
Subject = cmd.Subject!,
@@ -443,6 +501,7 @@ public sealed class NatsClient : IDisposable
}
_subs.Remove(cmd.Sid!);
Account?.DecrementSubscriptions();
Account?.SubList.Remove(sub);
}
@@ -491,9 +550,20 @@ public sealed class NatsClient : IDisposable
private void SendInfo()
{
var infoJson = JsonSerializer.Serialize(_serverInfo);
var infoLine = Encoding.ASCII.GetBytes($"INFO {infoJson}\r\n");
QueueOutbound(infoLine);
// Use the cached INFO bytes from the server when there is no per-connection
// nonce (i.e. NKey auth is not active for this connection). When a nonce is
// present the _serverInfo was already cloned with the nonce embedded, so we
// must serialise it individually.
if (_nonce == null && Router is NatsServer server)
{
QueueOutbound(server.CachedInfoLine);
}
else
{
var infoJson = JsonSerializer.Serialize(_serverInfo);
var infoLine = Encoding.ASCII.GetBytes($"INFO {infoJson}\r\n");
QueueOutbound(infoLine);
}
}
public void SendMessage(string subject, string sid, string? replyTo,
@@ -504,26 +574,78 @@ public sealed class NatsClient : IDisposable
Interlocked.Increment(ref _serverStats.OutMsgs);
Interlocked.Add(ref _serverStats.OutBytes, payload.Length + headers.Length);
byte[] line;
// Estimate control line size
var estimatedLineSize = 5 + subject.Length + 1 + sid.Length + 1
+ (replyTo != null ? replyTo.Length + 1 : 0) + 20 + 2;
var totalPayloadLen = headers.Length + payload.Length;
var totalLen = estimatedLineSize + totalPayloadLen + 2;
var buffer = new byte[totalLen];
var span = buffer.AsSpan();
int pos = 0;
// Write prefix
if (headers.Length > 0)
{
int totalSize = headers.Length + payload.Length;
line = Encoding.ASCII.GetBytes($"HMSG {subject} {sid} {(replyTo != null ? replyTo + " " : "")}{headers.Length} {totalSize}\r\n");
"HMSG "u8.CopyTo(span);
pos = 5;
}
else
{
line = Encoding.ASCII.GetBytes($"MSG {subject} {sid} {(replyTo != null ? replyTo + " " : "")}{payload.Length}\r\n");
"MSG "u8.CopyTo(span);
pos = 4;
}
var totalLen = line.Length + headers.Length + payload.Length + NatsProtocol.CrLf.Length;
var msg = new byte[totalLen];
var offset = 0;
line.CopyTo(msg.AsSpan(offset)); offset += line.Length;
if (headers.Length > 0) { headers.Span.CopyTo(msg.AsSpan(offset)); offset += headers.Length; }
if (payload.Length > 0) { payload.Span.CopyTo(msg.AsSpan(offset)); offset += payload.Length; }
NatsProtocol.CrLf.CopyTo(msg.AsSpan(offset));
// Subject
pos += Encoding.ASCII.GetBytes(subject, span[pos..]);
span[pos++] = (byte)' ';
QueueOutbound(msg);
// SID
pos += Encoding.ASCII.GetBytes(sid, span[pos..]);
span[pos++] = (byte)' ';
// Reply-to
if (replyTo != null)
{
pos += Encoding.ASCII.GetBytes(replyTo, span[pos..]);
span[pos++] = (byte)' ';
}
// Sizes
if (headers.Length > 0)
{
int totalSize = headers.Length + payload.Length;
headers.Length.TryFormat(span[pos..], out int written);
pos += written;
span[pos++] = (byte)' ';
totalSize.TryFormat(span[pos..], out written);
pos += written;
}
else
{
payload.Length.TryFormat(span[pos..], out int written);
pos += written;
}
// CRLF
span[pos++] = (byte)'\r';
span[pos++] = (byte)'\n';
// Headers + payload + trailing CRLF
if (headers.Length > 0)
{
headers.Span.CopyTo(span[pos..]);
pos += headers.Length;
}
if (payload.Length > 0)
{
payload.Span.CopyTo(span[pos..]);
pos += payload.Length;
}
span[pos++] = (byte)'\r';
span[pos++] = (byte)'\n';
QueueOutbound(buffer.AsMemory(0, pos));
}
private void WriteProtocol(byte[] data)
@@ -689,6 +811,12 @@ public sealed class NatsClient : IDisposable
catch (ObjectDisposedException) { }
}
public void RemoveSubscription(string sid)
{
if (_subs.Remove(sid))
Account?.DecrementSubscriptions();
}
public void RemoveAllSubscriptions(SubList subList)
{
foreach (var sub in _subs.Values)

View File

@@ -16,6 +16,22 @@ public sealed class NatsOptions
public TimeSpan PingInterval { get; set; } = TimeSpan.FromMinutes(2);
public int MaxPingsOut { get; set; } = 2;
// Subscription limits
public int MaxSubs { get; set; } // 0 = unlimited (per-connection)
public int MaxSubTokens { get; set; } // 0 = unlimited
// Logging / diagnostics
public bool Debug { get; set; }
public bool Trace { get; set; }
public string? LogFile { get; set; }
public long LogSizeLimit { get; set; }
// Server tags (exposed via /varz)
public Dictionary<string, string>? Tags { get; set; }
// Account configuration
public Dictionary<string, AccountConfig>? Accounts { get; set; }
// Simple auth (single user)
public string? Username { get; set; }
public string? Password { get; set; }

View File

@@ -43,6 +43,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
private int _lameDuck;
private byte[] _cachedInfoLine = [];
private readonly List<PosixSignalRegistration> _signalRegistrations = [];
private string? _portsFilePath;
@@ -51,6 +53,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
private static readonly TimeSpan AcceptMaxSleep = TimeSpan.FromSeconds(1);
public SubList SubList => _globalAccount.SubList;
public byte[] CachedInfoLine => _cachedInfoLine;
public ServerStats Stats => _stats;
public DateTime StartTime => new(Interlocked.Read(ref _startTimeTicks), DateTimeKind.Utc);
public string ServerId => _serverInfo.ServerId;
@@ -272,6 +275,14 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
if (options.TlsRateLimit > 0)
_tlsRateLimiter = new TlsRateLimiter(options.TlsRateLimit);
}
BuildCachedInfo();
}
private void BuildCachedInfo()
{
var infoJson = System.Text.Json.JsonSerializer.Serialize(_serverInfo);
_cachedInfoLine = Encoding.ASCII.GetBytes($"INFO {infoJson}\r\n");
}
public async Task StartAsync(CancellationToken ct)
@@ -292,6 +303,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
var actualPort = ((IPEndPoint)_listener.LocalEndPoint!).Port;
_options.Port = actualPort;
_serverInfo.Port = actualPort;
BuildCachedInfo();
}
_listeningStarted.TrySetResult();
@@ -523,7 +535,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
}
}
private static void DeliverMessage(Subscription sub, string subject, string? replyTo,
private void DeliverMessage(Subscription sub, string subject, string? replyTo,
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
{
var client = sub.Client;
@@ -532,9 +544,26 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
// Check auto-unsub
var count = Interlocked.Increment(ref sub.MessageCount);
if (sub.MaxMessages > 0 && count > sub.MaxMessages)
{
// Clean up exhausted subscription from trie and client tracking
var subList = client.Account?.SubList ?? _globalAccount.SubList;
subList.Remove(sub);
client.RemoveSubscription(sub.Sid);
return;
}
// Deny-list delivery filter
if (client.Permissions?.IsDeliveryAllowed(subject) == false)
return;
client.SendMessage(subject, sub.Sid, replyTo, headers, payload);
// Track reply subject for response permissions
if (replyTo != null && client.Permissions?.ResponseTracker != null)
{
if (client.Permissions.IsPublishAllowed(replyTo) == false)
client.Permissions.ResponseTracker.RegisterReply(replyTo);
}
}
private static void SendNoResponders(NatsClient sender, string replyTo)
@@ -569,7 +598,17 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
public Account GetOrCreateAccount(string name)
{
return _accounts.GetOrAdd(name, n => new Account(n));
return _accounts.GetOrAdd(name, n =>
{
var acc = new Account(n);
if (_options.Accounts != null && _options.Accounts.TryGetValue(n, out var config))
{
acc.MaxConnections = config.MaxConnections;
acc.MaxSubscriptions = config.MaxSubscriptions;
acc.DefaultPermissions = config.DefaultPermissions;
}
return acc;
});
}
public void RemoveClient(NatsClient client)

View File

@@ -0,0 +1,108 @@
using System.Collections.ObjectModel;
using System.Text;
namespace NATS.Server.Protocol;
public readonly struct NatsHeaders()
{
public int Status { get; init; }
public string Description { get; init; } = string.Empty;
public IReadOnlyDictionary<string, string[]> Headers { get; init; } = ReadOnlyDictionary<string, string[]>.Empty;
public static readonly NatsHeaders Invalid = new()
{
Status = -1,
Description = string.Empty,
Headers = ReadOnlyDictionary<string, string[]>.Empty,
};
}
public static class NatsHeaderParser
{
private static ReadOnlySpan<byte> CrLf => "\r\n"u8;
private static ReadOnlySpan<byte> Prefix => "NATS/1.0"u8;
public static NatsHeaders Parse(ReadOnlySpan<byte> data)
{
if (data.Length < Prefix.Length)
return NatsHeaders.Invalid;
if (!data[..Prefix.Length].SequenceEqual(Prefix))
return NatsHeaders.Invalid;
int pos = Prefix.Length;
int status = 0;
string description = string.Empty;
// Parse status line: NATS/1.0[ status[ description]]\r\n
int lineEnd = data[pos..].IndexOf(CrLf);
if (lineEnd < 0)
return NatsHeaders.Invalid;
var statusLine = data[pos..(pos + lineEnd)];
pos += lineEnd + 2; // skip \r\n
if (statusLine.Length > 0)
{
int si = 0;
while (si < statusLine.Length && statusLine[si] == (byte)' ')
si++;
int numStart = si;
while (si < statusLine.Length && statusLine[si] >= (byte)'0' && statusLine[si] <= (byte)'9')
si++;
if (si > numStart && si - numStart <= 5) // max 5 digits to avoid overflow
{
for (int idx = numStart; idx < si; idx++)
status = status * 10 + (statusLine[idx] - '0');
while (si < statusLine.Length && statusLine[si] == (byte)' ')
si++;
if (si < statusLine.Length)
description = Encoding.ASCII.GetString(statusLine[si..]);
}
}
// Parse key-value headers until empty line
var headers = new Dictionary<string, List<string>>(StringComparer.OrdinalIgnoreCase);
while (pos < data.Length)
{
var remaining = data[pos..];
if (remaining.Length >= 2 && remaining[0] == (byte)'\r' && remaining[1] == (byte)'\n')
break;
lineEnd = remaining.IndexOf(CrLf);
if (lineEnd < 0)
break;
var headerLine = remaining[..lineEnd];
pos += lineEnd + 2;
int colon = headerLine.IndexOf((byte)':');
if (colon < 0)
continue;
var key = Encoding.ASCII.GetString(headerLine[..colon]).Trim();
var value = Encoding.ASCII.GetString(headerLine[(colon + 1)..]).Trim();
if (!headers.TryGetValue(key, out var values))
{
values = [];
headers[key] = values;
}
values.Add(value);
}
var result = new Dictionary<string, string[]>(headers.Count, StringComparer.OrdinalIgnoreCase);
foreach (var (k, v) in headers)
result[k] = v.ToArray();
return new NatsHeaders
{
Status = status,
Description = description,
Headers = result,
};
}
}

View File

@@ -1,5 +1,6 @@
using System.Buffers;
using System.Text;
using Microsoft.Extensions.Logging;
namespace NATS.Server.Protocol;
@@ -35,6 +36,7 @@ public sealed class NatsParser
{
private static readonly byte[] CrLfBytes = "\r\n"u8.ToArray();
private readonly int _maxPayload;
private readonly ILogger? _logger;
// State for split-packet payload reading
private bool _awaitingPayload;
@@ -44,9 +46,20 @@ public sealed class NatsParser
private int _pendingHeaderSize;
private CommandType _pendingType;
public NatsParser(int maxPayload = NatsProtocol.MaxPayloadSize)
public NatsParser(int maxPayload = NatsProtocol.MaxPayloadSize, ILogger? logger = null)
{
_maxPayload = maxPayload;
_logger = logger;
}
private void TraceInOp(string op, ReadOnlySpan<byte> arg = default)
{
if (_logger == null || !_logger.IsEnabled(LogLevel.Trace))
return;
if (arg.IsEmpty)
_logger.LogTrace("<<- {Op}", op);
else
_logger.LogTrace("<<- {Op} {Arg}", op, Encoding.ASCII.GetString(arg));
}
public bool TryParse(ref ReadOnlySequence<byte> buffer, out ParsedCommand command)
@@ -91,6 +104,7 @@ public sealed class NatsParser
{
command = ParsedCommand.Simple(CommandType.Ping);
buffer = buffer.Slice(reader.Position);
TraceInOp("PING");
return true;
}
@@ -98,6 +112,7 @@ public sealed class NatsParser
{
command = ParsedCommand.Simple(CommandType.Pong);
buffer = buffer.Slice(reader.Position);
TraceInOp("PONG");
return true;
}
@@ -121,6 +136,7 @@ public sealed class NatsParser
{
command = ParseSub(lineSpan);
buffer = buffer.Slice(reader.Position);
TraceInOp("SUB", lineSpan[4..]);
return true;
}
@@ -131,6 +147,7 @@ public sealed class NatsParser
{
command = ParseUnsub(lineSpan);
buffer = buffer.Slice(reader.Position);
TraceInOp("UNSUB", lineSpan[6..]);
return true;
}
@@ -141,6 +158,7 @@ public sealed class NatsParser
{
command = ParseConnect(lineSpan);
buffer = buffer.Slice(reader.Position);
TraceInOp("CONNECT");
return true;
}
@@ -151,6 +169,7 @@ public sealed class NatsParser
{
command = ParseInfo(lineSpan);
buffer = buffer.Slice(reader.Position);
TraceInOp("INFO");
return true;
}
@@ -159,11 +178,13 @@ public sealed class NatsParser
case (byte)'+': // +OK
command = ParsedCommand.Simple(CommandType.Ok);
buffer = buffer.Slice(reader.Position);
TraceInOp("+OK");
return true;
case (byte)'-': // -ERR
command = ParsedCommand.Simple(CommandType.Err);
buffer = buffer.Slice(reader.Position);
TraceInOp("-ERR");
return true;
}
@@ -215,6 +236,7 @@ public sealed class NatsParser
_pendingHeaderSize = -1;
_pendingType = CommandType.Pub;
TraceInOp("PUB", argsSpan);
return TryReadPayload(ref buffer, out command);
}
@@ -264,6 +286,7 @@ public sealed class NatsParser
_pendingHeaderSize = hdrSize;
_pendingType = CommandType.HPub;
TraceInOp("HPUB", argsSpan);
return TryReadPayload(ref buffer, out command);
}

View File

@@ -33,6 +33,7 @@ public static class NatsProtocol
public const string ErrPermissionsSubscribe = "Permissions Violation for Subscription";
public const string ErrSlowConsumer = "Slow Consumer";
public const string ErrNoRespondersRequiresHeaders = "No Responders Requires Headers Support";
public const string ErrMaxSubscriptionsExceeded = "Maximum Subscriptions Exceeded";
}
public sealed class ServerInfo

View File

@@ -13,9 +13,16 @@ public sealed class SubList : IDisposable
private readonly ReaderWriterLockSlim _lock = new();
private readonly TrieLevel _root = new();
private Dictionary<string, SubListResult>? _cache = new(StringComparer.Ordinal);
private Dictionary<string, CachedResult>? _cache = new(StringComparer.Ordinal);
private uint _count;
private volatile bool _disposed;
private long _generation;
private ulong _matches;
private ulong _cacheHits;
private ulong _inserts;
private ulong _removes;
private readonly record struct CachedResult(SubListResult Result, long Generation);
public void Dispose()
{
@@ -90,7 +97,8 @@ public sealed class SubList : IDisposable
}
_count++;
AddToCache(subject, sub);
_inserts++;
Interlocked.Increment(ref _generation);
}
finally
{
@@ -104,78 +112,10 @@ public sealed class SubList : IDisposable
_lock.EnterWriteLock();
try
{
var level = _root;
TrieNode? node = null;
bool sawFwc = false;
var pathList = new List<(TrieLevel level, TrieNode node, string token, bool isPwc, bool isFwc)>();
foreach (var token in new TokenEnumerator(sub.Subject))
if (RemoveInternal(sub))
{
if (token.Length == 0 || sawFwc)
return;
bool isPwc = token.Length == 1 && token[0] == SubjectMatch.Pwc;
bool isFwc = token.Length == 1 && token[0] == SubjectMatch.Fwc;
if (isPwc)
{
node = level.Pwc;
}
else if (isFwc)
{
node = level.Fwc;
sawFwc = true;
}
else
{
level.Nodes.TryGetValue(token.ToString(), out node);
}
if (node == null)
return; // not found
var tokenStr = token.ToString();
pathList.Add((level, node, tokenStr, isPwc, isFwc));
if (node.Next == null)
return; // corrupted trie state
level = node.Next;
}
if (node == null) return;
// Remove from node
bool removed;
if (sub.Queue == null)
{
removed = node.PlainSubs.Remove(sub);
}
else
{
removed = false;
if (node.QueueSubs.TryGetValue(sub.Queue, out var qset))
{
removed = qset.Remove(sub);
if (qset.Count == 0)
node.QueueSubs.Remove(sub.Queue);
}
}
if (!removed) return;
_count--;
RemoveFromCache(sub.Subject);
// Prune empty nodes (walk backwards)
for (int i = pathList.Count - 1; i >= 0; i--)
{
var (l, n, t, isPwc, isFwc) = pathList[i];
if (n.IsEmpty)
{
if (isPwc) l.Pwc = null;
else if (isFwc) l.Fwc = null;
else l.Nodes.Remove(t);
}
_removes++;
Interlocked.Increment(ref _generation);
}
}
finally
@@ -184,22 +124,107 @@ public sealed class SubList : IDisposable
}
}
/// <summary>
/// Core remove logic without lock acquisition or generation bumping.
/// Assumes write lock is held. Returns true if a subscription was actually removed.
/// </summary>
private bool RemoveInternal(Subscription sub)
{
var level = _root;
TrieNode? node = null;
bool sawFwc = false;
var pathList = new List<(TrieLevel level, TrieNode node, string token, bool isPwc, bool isFwc)>();
foreach (var token in new TokenEnumerator(sub.Subject))
{
if (token.Length == 0 || sawFwc)
return false;
bool isPwc = token.Length == 1 && token[0] == SubjectMatch.Pwc;
bool isFwc = token.Length == 1 && token[0] == SubjectMatch.Fwc;
if (isPwc)
{
node = level.Pwc;
}
else if (isFwc)
{
node = level.Fwc;
sawFwc = true;
}
else
{
level.Nodes.TryGetValue(token.ToString(), out node);
}
if (node == null)
return false; // not found
var tokenStr = token.ToString();
pathList.Add((level, node, tokenStr, isPwc, isFwc));
if (node.Next == null)
return false; // corrupted trie state
level = node.Next;
}
if (node == null) return false;
// Remove from node
bool removed;
if (sub.Queue == null)
{
removed = node.PlainSubs.Remove(sub);
}
else
{
removed = false;
if (node.QueueSubs.TryGetValue(sub.Queue, out var qset))
{
removed = qset.Remove(sub);
if (qset.Count == 0)
node.QueueSubs.Remove(sub.Queue);
}
}
if (!removed) return false;
_count--;
// Prune empty nodes (walk backwards)
for (int i = pathList.Count - 1; i >= 0; i--)
{
var (l, n, t, isPwc, isFwc) = pathList[i];
if (n.IsEmpty)
{
if (isPwc) l.Pwc = null;
else if (isFwc) l.Fwc = null;
else l.Nodes.Remove(t);
}
}
return true;
}
public SubListResult Match(string subject)
{
// Check cache under read lock first.
Interlocked.Increment(ref _matches);
var currentGen = Interlocked.Read(ref _generation);
_lock.EnterReadLock();
try
{
if (_cache != null && _cache.TryGetValue(subject, out var cached))
return cached;
if (_cache != null && _cache.TryGetValue(subject, out var cached) && cached.Generation == currentGen)
{
Interlocked.Increment(ref _cacheHits);
return cached.Result;
}
}
finally
{
_lock.ExitReadLock();
}
// Cache miss -- tokenize and match under write lock (needed for cache update).
// Tokenize the subject.
var tokens = Tokenize(subject);
if (tokens == null)
return SubListResult.Empty;
@@ -207,13 +232,15 @@ public sealed class SubList : IDisposable
_lock.EnterWriteLock();
try
{
// Re-check cache after acquiring write lock.
if (_cache != null && _cache.TryGetValue(subject, out var cached))
return cached;
currentGen = Interlocked.Read(ref _generation);
if (_cache != null && _cache.TryGetValue(subject, out var cached) && cached.Generation == currentGen)
{
Interlocked.Increment(ref _cacheHits);
return cached.Result;
}
var plainSubs = new List<Subscription>();
var queueSubs = new List<List<Subscription>>();
MatchLevel(_root, tokens, 0, plainSubs, queueSubs);
SubListResult result;
@@ -226,19 +253,14 @@ public sealed class SubList : IDisposable
var queueSubsArr = new Subscription[queueSubs.Count][];
for (int i = 0; i < queueSubs.Count; i++)
queueSubsArr[i] = queueSubs[i].ToArray();
result = new SubListResult(
plainSubs.ToArray(),
queueSubsArr);
result = new SubListResult(plainSubs.ToArray(), queueSubsArr);
}
if (_cache != null)
{
_cache[subject] = result;
_cache[subject] = new CachedResult(result, currentGen);
if (_cache.Count > CacheMax)
{
// Sweep: remove entries until at CacheSweep count.
var keys = _cache.Keys.Take(_cache.Count - CacheSweep).ToList();
foreach (var key in keys)
_cache.Remove(key);
@@ -356,119 +378,355 @@ public sealed class SubList : IDisposable
}
}
/// <summary>
/// Adds a subscription to matching cache entries.
/// Assumes write lock is held.
/// </summary>
private void AddToCache(string subject, Subscription sub)
public SubListStats Stats()
{
if (_cache == null)
return;
// If literal subject, we can do a direct lookup.
if (SubjectMatch.IsLiteral(subject))
_lock.EnterReadLock();
uint numSubs, numCache;
ulong inserts, removes;
try
{
if (_cache.TryGetValue(subject, out var r))
{
_cache[subject] = AddSubToResult(r, sub);
}
return;
numSubs = _count;
numCache = (uint)(_cache?.Count ?? 0);
inserts = _inserts;
removes = _removes;
}
finally
{
_lock.ExitReadLock();
}
// Wildcard subscription -- check all cached keys.
var keysToUpdate = new List<(string key, SubListResult result)>();
foreach (var (key, r) in _cache)
var matches = Interlocked.Read(ref _matches);
var cacheHits = Interlocked.Read(ref _cacheHits);
var hitRate = matches > 0 ? (double)cacheHits / matches : 0.0;
uint maxFanout = 0;
long totalFanout = 0;
int cacheEntries = 0;
_lock.EnterReadLock();
try
{
if (SubjectMatch.MatchLiteral(key, subject))
if (_cache != null)
{
keysToUpdate.Add((key, r));
foreach (var (_, entry) in _cache)
{
var r = entry.Result;
var f = r.PlainSubs.Length + r.QueueSubs.Length;
totalFanout += f;
if (f > maxFanout) maxFanout = (uint)f;
cacheEntries++;
}
}
}
foreach (var (key, r) in keysToUpdate)
finally
{
_cache[key] = AddSubToResult(r, sub);
_lock.ExitReadLock();
}
return new SubListStats
{
NumSubs = numSubs,
NumCache = numCache,
NumInserts = inserts,
NumRemoves = removes,
NumMatches = matches,
CacheHitRate = hitRate,
MaxFanout = maxFanout,
AvgFanout = cacheEntries > 0 ? (double)totalFanout / cacheEntries : 0.0,
};
}
public bool HasInterest(string subject)
{
var currentGen = Interlocked.Read(ref _generation);
_lock.EnterReadLock();
try
{
if (_cache != null && _cache.TryGetValue(subject, out var cached) && cached.Generation == currentGen)
{
var r = cached.Result;
return r.PlainSubs.Length > 0 || r.QueueSubs.Length > 0;
}
}
finally
{
_lock.ExitReadLock();
}
var tokens = Tokenize(subject);
if (tokens == null) return false;
_lock.EnterReadLock();
try
{
return HasInterestLevel(_root, tokens, 0);
}
finally
{
_lock.ExitReadLock();
}
}
/// <summary>
/// Removes cache entries that match the given subject.
/// Assumes write lock is held.
/// </summary>
private void RemoveFromCache(string subject)
public (int plainCount, int queueCount) NumInterest(string subject)
{
if (_cache == null)
return;
var tokens = Tokenize(subject);
if (tokens == null) return (0, 0);
// If literal subject, we can do a direct removal.
if (SubjectMatch.IsLiteral(subject))
_lock.EnterReadLock();
try
{
_cache.Remove(subject);
return;
int np = 0, nq = 0;
CountInterestLevel(_root, tokens, 0, ref np, ref nq);
return (np, nq);
}
// Wildcard subscription -- remove all matching cached keys.
var keysToRemove = new List<string>();
foreach (var key in _cache.Keys)
finally
{
if (SubjectMatch.MatchLiteral(key, subject))
{
keysToRemove.Add(key);
}
}
foreach (var key in keysToRemove)
{
_cache.Remove(key);
_lock.ExitReadLock();
}
}
/// <summary>
/// Creates a new result with the given subscription added.
/// </summary>
private static SubListResult AddSubToResult(SubListResult result, Subscription sub)
public void RemoveBatch(IEnumerable<Subscription> subs)
{
if (sub.Queue == null)
_lock.EnterWriteLock();
try
{
var newPlain = new Subscription[result.PlainSubs.Length + 1];
result.PlainSubs.CopyTo(newPlain, 0);
newPlain[^1] = sub;
return new SubListResult(newPlain, result.QueueSubs);
var wasEnabled = _cache != null;
_cache = null;
foreach (var sub in subs)
{
if (RemoveInternal(sub))
_removes++;
}
Interlocked.Increment(ref _generation);
if (wasEnabled)
_cache = new Dictionary<string, CachedResult>(StringComparer.Ordinal);
}
finally
{
_lock.ExitWriteLock();
}
}
public IReadOnlyList<Subscription> All()
{
var subs = new List<Subscription>();
_lock.EnterReadLock();
try
{
CollectAllSubs(_root, subs);
}
finally
{
_lock.ExitReadLock();
}
return subs;
}
public SubListResult ReverseMatch(string subject)
{
var tokens = Tokenize(subject);
if (tokens == null)
return SubListResult.Empty;
_lock.EnterReadLock();
try
{
var plainSubs = new List<Subscription>();
var queueSubs = new List<List<Subscription>>();
ReverseMatchLevel(_root, tokens, 0, plainSubs, queueSubs);
if (plainSubs.Count == 0 && queueSubs.Count == 0)
return SubListResult.Empty;
var queueSubsArr = new Subscription[queueSubs.Count][];
for (int i = 0; i < queueSubs.Count; i++)
queueSubsArr[i] = queueSubs[i].ToArray();
return new SubListResult(plainSubs.ToArray(), queueSubsArr);
}
finally
{
_lock.ExitReadLock();
}
}
private static bool HasInterestLevel(TrieLevel? level, string[] tokens, int tokenIndex)
{
TrieNode? pwc = null;
TrieNode? node = null;
for (int i = tokenIndex; i < tokens.Length; i++)
{
if (level == null) return false;
if (level.Fwc != null && NodeHasInterest(level.Fwc)) return true;
pwc = level.Pwc;
if (pwc != null && HasInterestLevel(pwc.Next, tokens, i + 1)) return true;
node = null;
if (level.Nodes.TryGetValue(tokens[i], out var found))
{
node = found;
level = node.Next;
}
else
{
level = null;
}
}
if (node != null && NodeHasInterest(node)) return true;
if (pwc != null && NodeHasInterest(pwc)) return true;
return false;
}
private static bool NodeHasInterest(TrieNode node)
{
return node.PlainSubs.Count > 0 || node.QueueSubs.Count > 0;
}
private static void CountInterestLevel(TrieLevel? level, string[] tokens, int tokenIndex,
ref int np, ref int nq)
{
TrieNode? pwc = null;
TrieNode? node = null;
for (int i = tokenIndex; i < tokens.Length; i++)
{
if (level == null) return;
if (level.Fwc != null) AddNodeCounts(level.Fwc, ref np, ref nq);
pwc = level.Pwc;
if (pwc != null) CountInterestLevel(pwc.Next, tokens, i + 1, ref np, ref nq);
node = null;
if (level.Nodes.TryGetValue(tokens[i], out var found))
{
node = found;
level = node.Next;
}
else
{
level = null;
}
}
if (node != null) AddNodeCounts(node, ref np, ref nq);
if (pwc != null) AddNodeCounts(pwc, ref np, ref nq);
}
private static void AddNodeCounts(TrieNode node, ref int np, ref int nq)
{
np += node.PlainSubs.Count;
foreach (var (_, qset) in node.QueueSubs)
nq += qset.Count;
}
private static void CollectAllSubs(TrieLevel level, List<Subscription> subs)
{
foreach (var (_, node) in level.Nodes)
{
foreach (var sub in node.PlainSubs)
subs.Add(sub);
foreach (var (_, qset) in node.QueueSubs)
foreach (var sub in qset)
subs.Add(sub);
if (node.Next != null)
CollectAllSubs(node.Next, subs);
}
if (level.Pwc != null)
{
foreach (var sub in level.Pwc.PlainSubs)
subs.Add(sub);
foreach (var (_, qset) in level.Pwc.QueueSubs)
foreach (var sub in qset)
subs.Add(sub);
if (level.Pwc.Next != null)
CollectAllSubs(level.Pwc.Next, subs);
}
if (level.Fwc != null)
{
foreach (var sub in level.Fwc.PlainSubs)
subs.Add(sub);
foreach (var (_, qset) in level.Fwc.QueueSubs)
foreach (var sub in qset)
subs.Add(sub);
if (level.Fwc.Next != null)
CollectAllSubs(level.Fwc.Next, subs);
}
}
private static void ReverseMatchLevel(TrieLevel? level, string[] tokens, int tokenIndex,
List<Subscription> plainSubs, List<List<Subscription>> queueSubs)
{
if (level == null || tokenIndex >= tokens.Length)
return;
var token = tokens[tokenIndex];
bool isLast = tokenIndex == tokens.Length - 1;
if (token == ">")
{
CollectAllNodes(level, plainSubs, queueSubs);
return;
}
if (token == "*")
{
foreach (var (_, node) in level.Nodes)
{
if (isLast)
AddNodeToResults(node, plainSubs, queueSubs);
else
ReverseMatchLevel(node.Next, tokens, tokenIndex + 1, plainSubs, queueSubs);
}
}
else
{
// Find existing queue group
var queueSubs = result.QueueSubs;
int slot = -1;
for (int i = 0; i < queueSubs.Length; i++)
if (level.Nodes.TryGetValue(token, out var node))
{
if (queueSubs[i].Length > 0 && queueSubs[i][0].Queue == sub.Queue)
{
slot = i;
break;
}
}
// Deep copy queue subs
var newQueueSubs = new Subscription[queueSubs.Length + (slot < 0 ? 1 : 0)][];
for (int i = 0; i < queueSubs.Length; i++)
{
if (i == slot)
{
var newGroup = new Subscription[queueSubs[i].Length + 1];
queueSubs[i].CopyTo(newGroup, 0);
newGroup[^1] = sub;
newQueueSubs[i] = newGroup;
}
if (isLast)
AddNodeToResults(node, plainSubs, queueSubs);
else
{
newQueueSubs[i] = (Subscription[])queueSubs[i].Clone();
}
}
if (slot < 0)
{
newQueueSubs[^1] = [sub];
ReverseMatchLevel(node.Next, tokens, tokenIndex + 1, plainSubs, queueSubs);
}
}
return new SubListResult(result.PlainSubs, newQueueSubs);
if (level.Pwc != null)
{
if (isLast)
AddNodeToResults(level.Pwc, plainSubs, queueSubs);
else
ReverseMatchLevel(level.Pwc.Next, tokens, tokenIndex + 1, plainSubs, queueSubs);
}
if (level.Fwc != null)
{
AddNodeToResults(level.Fwc, plainSubs, queueSubs);
}
}
private static void CollectAllNodes(TrieLevel level, List<Subscription> plainSubs,
List<List<Subscription>> queueSubs)
{
foreach (var (_, node) in level.Nodes)
{
AddNodeToResults(node, plainSubs, queueSubs);
if (node.Next != null)
CollectAllNodes(node.Next, plainSubs, queueSubs);
}
if (level.Pwc != null)
{
AddNodeToResults(level.Pwc, plainSubs, queueSubs);
if (level.Pwc.Next != null)
CollectAllNodes(level.Pwc.Next, plainSubs, queueSubs);
}
if (level.Fwc != null)
{
AddNodeToResults(level.Fwc, plainSubs, queueSubs);
if (level.Fwc.Next != null)
CollectAllNodes(level.Fwc.Next, plainSubs, queueSubs);
}
}

View File

@@ -0,0 +1,13 @@
namespace NATS.Server.Subscriptions;
public sealed class SubListStats
{
public uint NumSubs { get; init; }
public uint NumCache { get; init; }
public ulong NumInserts { get; init; }
public ulong NumRemoves { get; init; }
public ulong NumMatches { get; init; }
public double CacheHitRate { get; init; }
public uint MaxFanout { get; init; }
public double AvgFanout { get; init; }
}

View File

@@ -113,4 +113,112 @@ public static class SubjectMatch
return li >= literal.Length; // both exhausted
}
/// <summary>Count dot-delimited tokens. Empty string returns 0.</summary>
public static int NumTokens(string subject)
{
if (string.IsNullOrEmpty(subject))
return 0;
int count = 1;
for (int i = 0; i < subject.Length; i++)
{
if (subject[i] == Sep)
count++;
}
return count;
}
/// <summary>Return the 0-based nth token as a span. Returns empty if out of range.</summary>
public static ReadOnlySpan<char> TokenAt(string subject, int index)
{
if (string.IsNullOrEmpty(subject))
return default;
var span = subject.AsSpan();
int current = 0;
int start = 0;
for (int i = 0; i < span.Length; i++)
{
if (span[i] == Sep)
{
if (current == index)
return span[start..i];
start = i + 1;
current++;
}
}
if (current == index)
return span[start..];
return default;
}
/// <summary>
/// Determines if two subject patterns (possibly containing wildcards) can both
/// match the same literal subject. Reference: Go sublist.go SubjectsCollide.
/// </summary>
public static bool SubjectsCollide(string subj1, string subj2)
{
if (subj1 == subj2)
return true;
bool lit1 = IsLiteral(subj1);
bool lit2 = IsLiteral(subj2);
if (lit1 && lit2)
return false;
if (lit1 && !lit2)
return MatchLiteral(subj1, subj2);
if (lit2 && !lit1)
return MatchLiteral(subj2, subj1);
// Both have wildcards — split once to avoid O(n²) TokenAt calls
var tokens1 = subj1.Split(Sep);
var tokens2 = subj2.Split(Sep);
int n1 = tokens1.Length;
int n2 = tokens2.Length;
bool hasFwc1 = tokens1[^1] == ">";
bool hasFwc2 = tokens2[^1] == ">";
if (!hasFwc1 && !hasFwc2 && n1 != n2)
return false;
if (n1 < n2 && !hasFwc1)
return false;
if (n2 < n1 && !hasFwc2)
return false;
int stop = Math.Min(n1, n2);
for (int i = 0; i < stop; i++)
{
if (!TokensCanMatch(tokens1[i], tokens2[i]))
return false;
}
return true;
}
private static bool TokensCanMatch(ReadOnlySpan<char> t1, ReadOnlySpan<char> t2)
{
if (t1.Length == 1 && (t1[0] == Pwc || t1[0] == Fwc))
return true;
if (t2.Length == 1 && (t2[0] == Pwc || t2[0] == Fwc))
return true;
return t1.SequenceEqual(t2);
}
/// <summary>
/// Validates subject. When checkRunes is true, also rejects null bytes.
/// </summary>
public static bool IsValidSubject(string subject, bool checkRunes)
{
if (!IsValidSubject(subject))
return false;
if (!checkRunes)
return true;
for (int i = 0; i < subject.Length; i++)
{
if (subject[i] == '\0')
return false;
}
return true;
}
}

View File

@@ -32,4 +32,40 @@ public class AccountTests
{
Account.GlobalAccountName.ShouldBe("$G");
}
[Fact]
public void Account_enforces_max_connections()
{
var acc = new Account("test") { MaxConnections = 2 };
acc.AddClient(1).ShouldBeTrue();
acc.AddClient(2).ShouldBeTrue();
acc.AddClient(3).ShouldBeFalse(); // exceeds limit
acc.ClientCount.ShouldBe(2);
}
[Fact]
public void Account_unlimited_connections_when_zero()
{
var acc = new Account("test") { MaxConnections = 0 };
acc.AddClient(1).ShouldBeTrue();
acc.AddClient(2).ShouldBeTrue();
}
[Fact]
public void Account_enforces_max_subscriptions()
{
var acc = new Account("test") { MaxSubscriptions = 2 };
acc.IncrementSubscriptions().ShouldBeTrue();
acc.IncrementSubscriptions().ShouldBeTrue();
acc.IncrementSubscriptions().ShouldBeFalse();
}
[Fact]
public void Account_decrement_subscriptions()
{
var acc = new Account("test") { MaxSubscriptions = 1 };
acc.IncrementSubscriptions().ShouldBeTrue();
acc.DecrementSubscriptions();
acc.IncrementSubscriptions().ShouldBeTrue(); // slot freed
}
}

View File

@@ -5,10 +5,10 @@ public class ClientClosedReasonTests
[Fact]
public void All_expected_close_reasons_exist()
{
// Verify all 17 enum values exist and are distinct (None + 16 named reasons)
// Verify all 18 enum values exist and are distinct (None + 17 named reasons)
var values = Enum.GetValues<ClientClosedReason>();
values.Length.ShouldBe(17);
values.Distinct().Count().ShouldBe(17);
values.Length.ShouldBe(18);
values.Distinct().Count().ShouldBe(18);
}
[Theory]

View File

@@ -0,0 +1,51 @@
using NATS.Server.Protocol;
namespace NATS.Server.Tests;
public class NatsHeaderParserTests
{
[Fact]
public void Parse_status_line_only()
{
var input = "NATS/1.0 503\r\n\r\n"u8;
var result = NatsHeaderParser.Parse(input);
result.Status.ShouldBe(503);
result.Description.ShouldBeEmpty();
result.Headers.ShouldBeEmpty();
}
[Fact]
public void Parse_status_with_description()
{
var input = "NATS/1.0 503 No Responders\r\n\r\n"u8;
var result = NatsHeaderParser.Parse(input);
result.Status.ShouldBe(503);
result.Description.ShouldBe("No Responders");
}
[Fact]
public void Parse_headers_with_values()
{
var input = "NATS/1.0\r\nFoo: bar\r\nBaz: qux\r\n\r\n"u8;
var result = NatsHeaderParser.Parse(input);
result.Status.ShouldBe(0);
result.Headers["Foo"].ShouldBe(["bar"]);
result.Headers["Baz"].ShouldBe(["qux"]);
}
[Fact]
public void Parse_multi_value_header()
{
var input = "NATS/1.0\r\nX-Tag: a\r\nX-Tag: b\r\n\r\n"u8;
var result = NatsHeaderParser.Parse(input);
result.Headers["X-Tag"].ShouldBe(["a", "b"]);
}
[Fact]
public void Parse_invalid_returns_defaults()
{
var input = "GARBAGE\r\n\r\n"u8;
var result = NatsHeaderParser.Parse(input);
result.Status.ShouldBe(-1);
}
}

View File

@@ -0,0 +1,17 @@
namespace NATS.Server.Tests;
public class NatsOptionsTests
{
[Fact]
public void Defaults_are_correct()
{
var opts = new NatsOptions();
opts.MaxSubs.ShouldBe(0);
opts.MaxSubTokens.ShouldBe(0);
opts.Debug.ShouldBe(false);
opts.Trace.ShouldBe(false);
opts.LogFile.ShouldBeNull();
opts.LogSizeLimit.ShouldBe(0L);
opts.Tags.ShouldBeNull();
}
}

View File

@@ -0,0 +1,52 @@
using NATS.Server.Auth;
namespace NATS.Server.Tests;
public class PermissionLruCacheTests
{
[Fact]
public void Get_returns_none_for_unknown_key()
{
var cache = new PermissionLruCache(128);
cache.TryGet("foo", out _).ShouldBeFalse();
}
[Fact]
public void Set_and_get_returns_value()
{
var cache = new PermissionLruCache(128);
cache.Set("foo", true);
cache.TryGet("foo", out var v).ShouldBeTrue();
v.ShouldBeTrue();
}
[Fact]
public void Evicts_oldest_when_full()
{
var cache = new PermissionLruCache(3);
cache.Set("a", true);
cache.Set("b", true);
cache.Set("c", true);
cache.Set("d", true); // evicts "a"
cache.TryGet("a", out _).ShouldBeFalse();
cache.TryGet("b", out _).ShouldBeTrue();
cache.TryGet("d", out _).ShouldBeTrue();
}
[Fact]
public void Get_promotes_to_front()
{
var cache = new PermissionLruCache(3);
cache.Set("a", true);
cache.Set("b", true);
cache.Set("c", true);
// Access "a" to promote it
cache.TryGet("a", out _);
cache.Set("d", true); // should evict "b" (oldest untouched)
cache.TryGet("a", out _).ShouldBeTrue();
cache.TryGet("b", out _).ShouldBeFalse();
}
}

View File

@@ -0,0 +1,51 @@
using NATS.Server.Auth;
namespace NATS.Server.Tests;
public class ResponseTrackerTests
{
[Fact]
public void Allows_reply_subject_after_registration()
{
var tracker = new ResponseTracker(maxMsgs: 1, expires: TimeSpan.FromMinutes(5));
tracker.RegisterReply("_INBOX.abc123");
tracker.IsReplyAllowed("_INBOX.abc123").ShouldBeTrue();
}
[Fact]
public void Denies_unknown_reply_subject()
{
var tracker = new ResponseTracker(maxMsgs: 1, expires: TimeSpan.FromMinutes(5));
tracker.IsReplyAllowed("_INBOX.unknown").ShouldBeFalse();
}
[Fact]
public void Enforces_max_messages()
{
var tracker = new ResponseTracker(maxMsgs: 2, expires: TimeSpan.FromMinutes(5));
tracker.RegisterReply("_INBOX.abc");
tracker.IsReplyAllowed("_INBOX.abc").ShouldBeTrue();
tracker.IsReplyAllowed("_INBOX.abc").ShouldBeTrue();
tracker.IsReplyAllowed("_INBOX.abc").ShouldBeFalse(); // exceeded
}
[Fact]
public void Enforces_expiry()
{
var tracker = new ResponseTracker(maxMsgs: 0, expires: TimeSpan.FromMilliseconds(1));
tracker.RegisterReply("_INBOX.abc");
Thread.Sleep(50);
tracker.IsReplyAllowed("_INBOX.abc").ShouldBeFalse();
}
[Fact]
public void Prune_removes_expired()
{
var tracker = new ResponseTracker(maxMsgs: 0, expires: TimeSpan.FromMilliseconds(1));
tracker.RegisterReply("_INBOX.a");
tracker.RegisterReply("_INBOX.b");
Thread.Sleep(50);
tracker.Prune();
tracker.Count.ShouldBe(0);
}
}

View File

@@ -156,4 +156,125 @@ public class SubListTests
var r = sl.Match("foo.bar.baz");
r.PlainSubs.Length.ShouldBe(3);
}
[Fact]
public void Stats_returns_correct_values()
{
var sl = new SubList();
sl.Insert(MakeSub("foo.bar", sid: "1"));
sl.Insert(MakeSub("foo.baz", sid: "2"));
sl.Match("foo.bar");
sl.Match("foo.bar"); // cache hit
var stats = sl.Stats();
stats.NumSubs.ShouldBe(2u);
stats.NumInserts.ShouldBe(2ul);
stats.NumMatches.ShouldBe(2ul);
stats.CacheHitRate.ShouldBeGreaterThan(0.0);
}
[Fact]
public void HasInterest_returns_true_when_subscribers_exist()
{
var sl = new SubList();
sl.Insert(MakeSub("foo.bar"));
sl.HasInterest("foo.bar").ShouldBeTrue();
sl.HasInterest("foo.baz").ShouldBeFalse();
}
[Fact]
public void HasInterest_with_wildcards()
{
var sl = new SubList();
sl.Insert(MakeSub("foo.*"));
sl.HasInterest("foo.bar").ShouldBeTrue();
sl.HasInterest("bar.baz").ShouldBeFalse();
}
[Fact]
public void NumInterest_counts_subscribers()
{
var sl = new SubList();
sl.Insert(MakeSub("foo.bar", sid: "1"));
sl.Insert(MakeSub("foo.*", sid: "2"));
sl.Insert(MakeSub("foo.bar", queue: "q1", sid: "3"));
var (np, nq) = sl.NumInterest("foo.bar");
np.ShouldBe(2); // foo.bar + foo.*
nq.ShouldBe(1); // queue sub
}
[Fact]
public void RemoveBatch_removes_all()
{
var sl = new SubList();
var sub1 = MakeSub("foo.bar", sid: "1");
var sub2 = MakeSub("foo.baz", sid: "2");
var sub3 = MakeSub("bar.qux", sid: "3");
sl.Insert(sub1);
sl.Insert(sub2);
sl.Insert(sub3);
sl.Count.ShouldBe(3u);
sl.RemoveBatch([sub1, sub2]);
sl.Count.ShouldBe(1u);
sl.Match("foo.bar").PlainSubs.ShouldBeEmpty();
sl.Match("bar.qux").PlainSubs.ShouldHaveSingleItem();
}
[Fact]
public void All_returns_every_subscription()
{
var sl = new SubList();
var sub1 = MakeSub("foo.bar", sid: "1");
var sub2 = MakeSub("foo.*", sid: "2");
var sub3 = MakeSub("bar.>", queue: "q", sid: "3");
sl.Insert(sub1);
sl.Insert(sub2);
sl.Insert(sub3);
var all = sl.All();
all.Count.ShouldBe(3);
all.ShouldContain(sub1);
all.ShouldContain(sub2);
all.ShouldContain(sub3);
}
[Fact]
public void ReverseMatch_finds_patterns_matching_literal()
{
var sl = new SubList();
var sub1 = MakeSub("foo.bar", sid: "1");
var sub2 = MakeSub("foo.*", sid: "2");
var sub3 = MakeSub("foo.>", sid: "3");
var sub4 = MakeSub("bar.baz", sid: "4");
sl.Insert(sub1);
sl.Insert(sub2);
sl.Insert(sub3);
sl.Insert(sub4);
var result = sl.ReverseMatch("foo.bar");
result.PlainSubs.Length.ShouldBe(3); // foo.bar, foo.*, foo.>
result.PlainSubs.ShouldContain(sub1);
result.PlainSubs.ShouldContain(sub2);
result.PlainSubs.ShouldContain(sub3);
}
[Fact]
public void Generation_ID_invalidates_cache()
{
var sl = new SubList();
sl.Insert(MakeSub("foo.bar", sid: "1"));
// Prime cache
var r1 = sl.Match("foo.bar");
r1.PlainSubs.Length.ShouldBe(1);
// Insert another sub (bumps generation)
sl.Insert(MakeSub("foo.bar", sid: "2"));
// Cache should be invalidated by generation mismatch
var r2 = sl.Match("foo.bar");
r2.PlainSubs.Length.ShouldBe(2);
}
}

View File

@@ -54,4 +54,50 @@ public class SubjectMatchTests
{
SubjectMatch.MatchLiteral(literal, pattern).ShouldBe(expected);
}
[Theory]
[InlineData("foo.bar.baz", 3)]
[InlineData("foo", 1)]
[InlineData("a.b.c.d.e", 5)]
[InlineData("", 0)]
public void NumTokens(string subject, int expected)
{
SubjectMatch.NumTokens(subject).ShouldBe(expected);
}
[Theory]
[InlineData("foo.bar.baz", 0, "foo")]
[InlineData("foo.bar.baz", 1, "bar")]
[InlineData("foo.bar.baz", 2, "baz")]
[InlineData("foo", 0, "foo")]
[InlineData("foo.bar.baz", 5, "")]
public void TokenAt(string subject, int index, string expected)
{
SubjectMatch.TokenAt(subject, index).ToString().ShouldBe(expected);
}
[Theory]
[InlineData("foo.bar", "foo.bar", true)]
[InlineData("foo.bar", "foo.baz", false)]
[InlineData("foo.*", "foo.bar", true)]
[InlineData("foo.*", "foo.>", true)]
[InlineData("foo.>", "foo.bar.baz", true)]
[InlineData(">", "foo.bar", true)]
[InlineData("foo.*", "bar.*", false)]
[InlineData("foo.*.baz", "foo.bar.*", true)]
[InlineData("*.bar", "foo.*", true)]
[InlineData("foo.*", "bar.>", false)]
public void SubjectsCollide(string subj1, string subj2, bool expected)
{
SubjectMatch.SubjectsCollide(subj1, subj2).ShouldBe(expected);
}
[Theory]
[InlineData("foo\0bar", true, false)]
[InlineData("foo\0bar", false, true)]
[InlineData("foo.bar", true, true)]
public void IsValidSubject_checkRunes(string subject, bool checkRunes, bool expected)
{
SubjectMatch.IsValidSubject(subject, checkRunes).ShouldBe(expected);
}
}