Compare commits
59 Commits
feature/sy
...
3fea2da2cf
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3fea2da2cf | ||
|
|
a8985ecb1a | ||
|
|
6228f748ab | ||
|
|
e2e8c33d38 | ||
|
|
d20892f903 | ||
|
|
fd1edda0df | ||
|
|
73dd3307ba | ||
|
|
264b49f96a | ||
|
|
6c83f12e5c | ||
|
|
2aa7265db1 | ||
|
|
ccbcf759a9 | ||
|
|
c87661800d | ||
|
|
23216d0a48 | ||
|
|
71f7f569b9 | ||
|
|
3531a87de0 | ||
|
|
005600b9b8 | ||
|
|
ecc4752c07 | ||
|
|
66ec378bdc | ||
|
|
f1d3c19594 | ||
|
|
1269e8b364 | ||
|
|
d3aad48096 | ||
|
|
fecb51095f | ||
|
|
54207e2906 | ||
|
|
9a0de19c2d | ||
|
|
40b940b1fd | ||
|
|
6825839191 | ||
|
|
d73e7e2f88 | ||
|
|
9977a01c56 | ||
|
|
95691fa9e7 | ||
|
|
5f530de2e4 | ||
|
|
788f4254b0 | ||
|
|
64e3b1bd49 | ||
|
|
cae09f9091 | ||
|
|
d1935bc9ec | ||
|
|
6d23e89fe8 | ||
|
|
a661e641c6 | ||
|
|
7fe15d7ce1 | ||
|
|
3f48d1c5ee | ||
|
|
5f98e53d62 | ||
|
|
4a242f614f | ||
|
|
44d426a7c5 | ||
|
|
d9f157d9e4 | ||
|
|
e562077e4c | ||
|
|
1ebf283a8c | ||
|
|
18a6d0f478 | ||
|
|
02a474a91e | ||
|
|
c8a89c9de2 | ||
|
|
5fd2cf040d | ||
|
|
ca88036126 | ||
|
|
6d0a4d259e | ||
|
|
fe304dfe01 | ||
|
|
1c948b5b0f | ||
|
|
bd29c529a8 | ||
|
|
1a1aa9d642 | ||
|
|
d49bc5b0d7 | ||
|
|
8ded10d49b | ||
|
|
6981a38b72 | ||
|
|
72f60054ed | ||
|
|
708e1b4168 |
@@ -1,7 +1,7 @@
|
||||
# Go vs .NET NATS Server: Functionality Differences
|
||||
|
||||
> Excludes clustering/routes, gateways, leaf nodes, and JetStream.
|
||||
> Generated 2026-02-22 by comparing `golang/nats-server/server/` against `src/NATS.Server/`.
|
||||
> Includes clustering/routes, gateways, leaf nodes, and JetStream parity scope.
|
||||
> Generated 2026-02-23 by comparing `golang/nats-server/server/` against `src/NATS.Server/`.
|
||||
|
||||
---
|
||||
|
||||
@@ -61,14 +61,14 @@
|
||||
| Type | Go | .NET | Notes |
|
||||
|------|:--:|:----:|-------|
|
||||
| CLIENT | Y | Y | |
|
||||
| ROUTER | Y | N | Excluded per scope |
|
||||
| GATEWAY | Y | N | Excluded per scope |
|
||||
| LEAF | Y | N | Excluded per scope |
|
||||
| ROUTER | Y | Y | Route handshake + routing primitives implemented |
|
||||
| GATEWAY | Y | Y | Gateway manager bootstrap implemented |
|
||||
| LEAF | Y | Y | Leaf node manager bootstrap implemented |
|
||||
| SYSTEM (internal) | Y | Y | InternalClient + InternalEventSystem with Channel-based send/receive loops |
|
||||
| JETSTREAM (internal) | Y | N | |
|
||||
| ACCOUNT (internal) | Y | Y | Lazy per-account InternalClient with import/export subscription support |
|
||||
| WebSocket clients | Y | N | |
|
||||
| MQTT clients | Y | N | |
|
||||
| WebSocket clients | Y | Y | Custom frame parser, permessage-deflate compression, origin checking, cookie auth |
|
||||
| MQTT clients | Y | Partial | JWT connection-type constants + config parsing; no MQTT transport yet |
|
||||
|
||||
### Client Features
|
||||
| Feature | Go | .NET | Notes |
|
||||
@@ -127,9 +127,9 @@ Go implements a sophisticated slow consumer detection system:
|
||||
| PING / PONG | Y | Y | |
|
||||
| MSG / HMSG | Y | Y | |
|
||||
| +OK / -ERR | Y | Y | |
|
||||
| RS+/RS-/RMSG (routes) | Y | N | Excluded per scope |
|
||||
| A+/A- (accounts) | Y | N | Excluded per scope |
|
||||
| LS+/LS-/LMSG (leaf) | Y | N | Excluded per scope |
|
||||
| RS+/RS-/RMSG (routes) | Y | Y | Route protocol primitives implemented |
|
||||
| A+/A- (accounts) | Y | N | Inter-server account protocol ops still pending |
|
||||
| LS+/LS-/LMSG (leaf) | Y | Y | Leaf protocol primitives implemented |
|
||||
|
||||
### Protocol Parsing Gaps
|
||||
| Feature | Go | .NET | Notes |
|
||||
@@ -191,7 +191,7 @@ Go implements a sophisticated slow consumer detection system:
|
||||
|---------|:--:|:----:|-------|
|
||||
| Per-account subscription limit | Y | Y | `Account.IncrementSubscriptions()` returns false when `MaxSubscriptions` exceeded |
|
||||
| Auto-unsubscribe on max messages | Y | Y | Enforced at delivery; sub removed from trie + client dict when exhausted |
|
||||
| Subscription routing propagation | Y | N | For clusters |
|
||||
| Subscription routing propagation | Y | Y | Remote subscription propagation implemented for routes |
|
||||
| Queue weight (`qw`) field | Y | N | For remote queue load balancing |
|
||||
|
||||
---
|
||||
@@ -204,7 +204,7 @@ Go implements a sophisticated slow consumer detection system:
|
||||
| Username/password | Y | Y | |
|
||||
| Token | Y | Y | |
|
||||
| NKeys (Ed25519) | Y | Y | .NET has framework but integration is basic |
|
||||
| JWT validation | Y | Y | `NatsJwt` decode/verify, `JwtAuthenticator` with account resolution + revocation |
|
||||
| JWT validation | Y | Y | `NatsJwt` decode/verify, `JwtAuthenticator` with account resolution + revocation + `allowed_connection_types` enforcement |
|
||||
| Bcrypt password hashing | Y | Y | .NET supports bcrypt (`$2*` prefix) with constant-time fallback |
|
||||
| TLS certificate mapping | Y | Y | X500DistinguishedName with full DN match and CN fallback |
|
||||
| Custom auth interface | Y | N | |
|
||||
@@ -221,7 +221,7 @@ Go implements a sophisticated slow consumer detection system:
|
||||
| Account exports/imports | Y | Y | ServiceImport/StreamImport with ExportAuth, subject transforms, response routing |
|
||||
| Per-account connection limits | Y | Y | `Account.AddClient()` returns false when `MaxConnections` exceeded |
|
||||
| Per-account subscription limits | Y | Y | `Account.IncrementSubscriptions()` enforced in `ProcessSub()` |
|
||||
| Account JetStream limits | Y | N | Excluded per scope |
|
||||
| Account JetStream limits | Y | Y | Enforced via account-level stream reservation limits |
|
||||
|
||||
### Permissions
|
||||
| Feature | Go | .NET | Notes |
|
||||
@@ -260,14 +260,15 @@ Go implements a sophisticated slow consumer detection system:
|
||||
| Config file parsing | Y | Y | Custom NATS conf lexer/parser ported from Go; supports includes, variables, blocks |
|
||||
| Hot reload (SIGHUP) | Y | Y | Reloads logging, auth, limits, TLS certs on SIGHUP; rejects non-reloadable changes |
|
||||
| Config change detection | Y | Y | SHA256 digest comparison; `InCmdLine` tracks CLI flag precedence |
|
||||
| ~450 option fields | Y | ~72 | .NET covers core + all single-server options; cluster/JetStream keys silently ignored |
|
||||
| ~450 option fields | Y | ~72 | .NET covers core + single-server options plus cluster/JetStream parsing and reload boundary validation |
|
||||
|
||||
### Missing Options Categories
|
||||
- ~~Logging options~~ — file logging, rotation, syslog, debug/trace, color, timestamps, per-subsystem log control all implemented
|
||||
- ~~Advanced limits (MaxSubs, MaxSubTokens, MaxPending, WriteDeadline)~~ — `MaxSubs`, `MaxSubTokens` implemented; MaxPending/WriteDeadline already existed
|
||||
- ~~Tags/metadata~~ — `Tags` dictionary implemented in `NatsOptions`
|
||||
- ~~OCSP configuration~~ — `OcspConfig` with 4 modes (Auto/Always/Must/Never), peer verification, and stapling
|
||||
- WebSocket/MQTT options
|
||||
- ~~WebSocket options~~ — `WebSocketOptions` with port, compression, origin checking, cookie auth, custom headers
|
||||
- ~~MQTT options~~ — `mqtt {}` config block parsed with all Go `MQTTOpts` fields; no listener yet
|
||||
- ~~Operator mode / account resolver~~ — `JwtAuthenticator` + `IAccountResolver` + `MemAccountResolver` with trusted keys
|
||||
|
||||
---
|
||||
@@ -287,7 +288,7 @@ Go implements a sophisticated slow consumer detection system:
|
||||
| `/subz` / `/subscriptionsz` | Y | Y | Account filtering, test subject filtering, pagination, and subscription details |
|
||||
| `/accountz` | Y | Stub | Returns empty response |
|
||||
| `/accstatz` | Y | Stub | Returns empty response |
|
||||
| `/jsz` | Y | Stub | Returns empty response |
|
||||
| `/jsz` | Y | Y | Returns live JetStream counts/config via `JszHandler` |
|
||||
|
||||
### Varz Response
|
||||
| Field Category | Go | .NET | Notes |
|
||||
@@ -301,8 +302,8 @@ Go implements a sophisticated slow consumer detection system:
|
||||
| Connections (current, total) | Y | Y | |
|
||||
| Messages (in/out msgs/bytes) | Y | Y | |
|
||||
| SlowConsumer breakdown | Y | N | Go tracks per connection type |
|
||||
| Cluster/Gateway/Leaf blocks | Y | N | Excluded per scope |
|
||||
| JetStream block | Y | N | Excluded per scope |
|
||||
| Cluster/Gateway/Leaf blocks | Y | Partial | Config projection present; `/gatewayz` and `/leafz` endpoints remain stubs |
|
||||
| JetStream block | Y | Y | Includes live JetStream config + stream/consumer counts |
|
||||
| TLS cert expiry info | Y | Y | `TlsCertNotAfter` loaded via `X509CertificateLoader` in `/varz` |
|
||||
|
||||
### Connz Response
|
||||
@@ -316,7 +317,7 @@ Go implements a sophisticated slow consumer detection system:
|
||||
| Subscription detail mode | Y | N | |
|
||||
| TLS peer certificate info | Y | N | |
|
||||
| JWT/IssuerKey/Tags fields | Y | N | |
|
||||
| MQTT client ID filtering | Y | N | |
|
||||
| MQTT client ID filtering | Y | Y | `mqtt_client` query param filters open and closed connections |
|
||||
| Proxy info | Y | N | |
|
||||
|
||||
---
|
||||
|
||||
141
docs/plans/2026-02-23-jetstream-full-parity-design.md
Normal file
141
docs/plans/2026-02-23-jetstream-full-parity-design.md
Normal file
@@ -0,0 +1,141 @@
|
||||
# Full JetStream and Cluster Prerequisite Parity Design
|
||||
|
||||
**Date:** 2026-02-23
|
||||
**Status:** Approved
|
||||
**Scope:** Port JetStream from Go with all prerequisite subsystems required for full Go JetStream test parity, including cluster route/gateway/leaf behaviors and RAFT/meta-cluster semantics.
|
||||
**Verification Gate:** Go JetStream-focused test suites in `golang/nats-server/server/` plus new/updated .NET tests.
|
||||
**Cutover Model:** Single end-to-end cutover (no interim acceptance gates).
|
||||
|
||||
## 1. Architecture
|
||||
|
||||
The implementation uses a full in-process .NET parity architecture that mirrors Go subsystem boundaries while keeping strict internal contracts.
|
||||
|
||||
1. Core Server Layer (`NatsServer`/`NatsClient`)
|
||||
- Extend existing server/client runtime to support full client kinds and inter-server protocol paths.
|
||||
- Preserve responsibility for socket lifecycle, parser integration, auth entry, and local dispatch.
|
||||
|
||||
2. Cluster Fabric Layer
|
||||
- Add route mesh, gateway links, leafnode links, interest propagation, and remote subscription accounting.
|
||||
- Provide transport-neutral contracts consumed by JetStream and RAFT replication services.
|
||||
|
||||
3. JetStream Control Plane
|
||||
- Add account-scoped JetStream managers, API subject handlers (`$JS.API.*`), stream/consumer metadata lifecycle, advisories, and limit enforcement.
|
||||
- Integrate with RAFT/meta services for replicated decisions.
|
||||
|
||||
4. JetStream Data Plane
|
||||
- Add stream ingest path, retention/eviction logic, consumer delivery/ack/redelivery, mirror/source orchestration, and flow-control behavior.
|
||||
- Use pluggable storage abstractions with parity-focused behavior.
|
||||
|
||||
5. RAFT and Replication Layer
|
||||
- Implement meta-group plus per-asset replication groups, election/term logic, log replication, snapshots, and catchup.
|
||||
- Expose deterministic commit/applied hooks to JetStream runtime layers.
|
||||
|
||||
6. Storage Layer
|
||||
- Implement memstore and filestore with sequence indexing, subject indexing, compaction/snapshot support, and recovery semantics.
|
||||
|
||||
7. Observability Layer
|
||||
- Upgrade `/jsz` and `/varz` JetStream blocks from placeholders to live runtime reporting with Go-compatible response shape.
|
||||
|
||||
## 2. Components and Contracts
|
||||
|
||||
### 2.1 New component families
|
||||
|
||||
1. Cluster and interserver subsystem
|
||||
- Add route/gateway/leaf and interserver protocol operations under `src/NATS.Server/`.
|
||||
- Extend parser/dispatcher with route/leaf/account operations currently excluded.
|
||||
- Expand client-kind model and command routing constraints.
|
||||
|
||||
2. JetStream API and domain model
|
||||
- Add `src/NATS.Server/JetStream/` subtree for API payload models, stream/consumer models, and error templates/codes.
|
||||
|
||||
3. JetStream runtime
|
||||
- Add stream manager, consumer manager, ack processor, delivery scheduler, mirror/source orchestration, and flow control handlers.
|
||||
- Integrate publish path with stream capture/store/ack behavior.
|
||||
|
||||
4. RAFT subsystem
|
||||
- Add `src/NATS.Server/Raft/` for replicated logs, elections, snapshots, and membership operations.
|
||||
|
||||
5. Storage subsystem
|
||||
- Add `src/NATS.Server/JetStream/Storage/` for `MemStore` and `FileStore`, sequence/subject indexes, and restart recovery.
|
||||
|
||||
### 2.2 Existing components to upgrade
|
||||
|
||||
1. `src/NATS.Server/NatsOptions.cs`
|
||||
- Add full config surface for clustering, JetStream, storage, placement, and parity-required limits.
|
||||
|
||||
2. `src/NATS.Server/Configuration/ConfigProcessor.cs`
|
||||
- Replace silent ignore behavior for cluster/jetstream keys with parsing, mapping, and validation.
|
||||
|
||||
3. `src/NATS.Server/Protocol/NatsParser.cs` and `src/NATS.Server/NatsClient.cs`
|
||||
- Add missing interserver operations and kind-aware dispatch paths needed for clustered JetStream behavior.
|
||||
|
||||
4. Monitoring components
|
||||
- Upgrade `src/NATS.Server/Monitoring/MonitorServer.cs` and `src/NATS.Server/Monitoring/Varz.cs`.
|
||||
- Add/extend JS monitoring handlers and models for `/jsz` and JetStream runtime fields.
|
||||
|
||||
## 3. Data Flow and Behavioral Semantics
|
||||
|
||||
1. Inbound publish path
|
||||
- Parse client publish commands, apply auth/permission checks, route to local subscribers and JetStream candidates.
|
||||
- For JetStream subjects: apply preconditions, append to store, replicate via RAFT (as required), apply committed state, return Go-compatible pub ack.
|
||||
|
||||
2. Consumer delivery path
|
||||
- Use shared push/pull state model for pending, ack floor, redelivery timers, flow control, and max ack pending.
|
||||
- Enforce retention policy semantics (limits/interest/workqueue), filter subject behavior, replay policy, and eviction behavior.
|
||||
|
||||
3. Replication and control flow
|
||||
- Meta RAFT governs replicated metadata decisions.
|
||||
- Per-stream/per-consumer groups replicate state and snapshots.
|
||||
- Leader changes preserve at-least-once delivery and consumer state invariants.
|
||||
|
||||
4. Recovery flow
|
||||
- Reconstruct stream/consumer/store state on startup.
|
||||
- In clustered mode, rejoin replication groups and catch up before serving full API/delivery workload.
|
||||
- Preserve sequence continuity, subject indexes, delete markers, and pending/redelivery state.
|
||||
|
||||
5. Monitoring flow
|
||||
- `/varz` JetStream fields and `/jsz` return live runtime state.
|
||||
- Advisory and metric surfaces update from control-plane and data-plane events.
|
||||
|
||||
## 4. Error Handling and Operational Constraints
|
||||
|
||||
1. API error parity
|
||||
- Match canonical JetStream codes/messages for validation failures, state conflicts, limits, leadership/quorum issues, and storage failures.
|
||||
|
||||
2. Protocol behavior
|
||||
- Preserve normal client compatibility while adding interserver protocol and internal client-kind restrictions.
|
||||
|
||||
3. Storage and consistency failures
|
||||
- Classify corruption/truncation/checksum/snapshot failures as recoverable vs non-recoverable.
|
||||
- Avoid silent data loss and emit monitoring/advisory signals where parity requires.
|
||||
|
||||
4. Cluster and RAFT fault handling
|
||||
- Explicitly handle no-quorum, stale leader, delayed apply, peer removal, catchup lag, and stepdown transitions.
|
||||
- Return leadership-aware API errors.
|
||||
|
||||
5. Config/reload behavior
|
||||
- Treat JetStream and cluster config as first-class with strict validation.
|
||||
- Mirror Go-like reloadable vs restart-required change boundaries.
|
||||
|
||||
## 5. Testing and Verification Strategy
|
||||
|
||||
1. .NET unit tests
|
||||
- Add focused tests for JetStream API validation, stream and consumer state, RAFT primitives, mem/file store invariants, and config parsing/validation.
|
||||
|
||||
2. .NET integration tests
|
||||
- Add end-to-end tests for publish/store/consume/ack behavior, retention policies, restart recovery, and clustered prerequisites used by JetStream.
|
||||
|
||||
3. Parity harness
|
||||
- Maintain mapping of Go JetStream test categories to .NET feature areas.
|
||||
- Execute JetStream-focused Go tests from `golang/nats-server/server/` as acceptance benchmark.
|
||||
|
||||
4. `differences.md` policy
|
||||
- Update only after verification gate passes.
|
||||
- Remove opening JetStream exclusion scope statement and replace with updated parity scope.
|
||||
|
||||
## 6. Scope Decisions Captured
|
||||
|
||||
- Include all prerequisite non-JetStream subsystems required to satisfy full Go JetStream tests.
|
||||
- Verification target is full Go JetStream-focused parity, not a narrowed subset.
|
||||
- Delivery model is single end-to-end cutover.
|
||||
- `differences.md` top-level scope statement will be updated to include JetStream and clustering parity coverage once verified.
|
||||
1648
docs/plans/2026-02-23-jetstream-full-parity-plan.md
Normal file
1648
docs/plans/2026-02-23-jetstream-full-parity-plan.md
Normal file
File diff suppressed because it is too large
Load Diff
@@ -1,18 +1,21 @@
|
||||
# MQTT Connection Type Port Design
|
||||
|
||||
## Goal
|
||||
Port MQTT-related connection type parity from Go into the .NET server for two scoped areas:
|
||||
Port MQTT-related connection type parity from Go into the .NET server for three scoped areas:
|
||||
1. JWT `allowed_connection_types` behavior for `MQTT` / `MQTT_WS` (plus existing known types).
|
||||
2. `/connz` filtering by `mqtt_client`.
|
||||
3. Full MQTT configuration parsing from `mqtt {}` config blocks (all Go `MQTTOpts` fields).
|
||||
|
||||
## Scope
|
||||
- In scope:
|
||||
- JWT allowed connection type normalization and enforcement semantics.
|
||||
- `/connz?mqtt_client=` option parsing and filtering.
|
||||
- MQTT configuration model and config file parsing (all Go `MQTTOpts` fields).
|
||||
- Expanded `MqttOptsVarz` monitoring output.
|
||||
- Unit/integration tests for new and updated behavior.
|
||||
- `differences.md` updates after implementation is verified.
|
||||
- Out of scope:
|
||||
- Full MQTT transport implementation.
|
||||
- Full MQTT transport implementation (listener, protocol parser, sessions).
|
||||
- WebSocket transport implementation.
|
||||
- Leaf/route/gateway transport plumbing.
|
||||
|
||||
@@ -27,6 +30,8 @@ Port MQTT-related connection type parity from Go into the .NET server for two sc
|
||||
- Extend connz monitoring options to parse `mqtt_client` and apply exact-match filtering before sort/pagination.
|
||||
|
||||
## Components
|
||||
|
||||
### JWT Connection-Type Enforcement
|
||||
- `src/NATS.Server/Auth/IAuthenticator.cs`
|
||||
- Extend `ClientAuthContext` with a connection-type value.
|
||||
- `src/NATS.Server/Auth/Jwt/JwtConnectionTypes.cs` (new)
|
||||
@@ -38,6 +43,8 @@ Port MQTT-related connection type parity from Go into the .NET server for two sc
|
||||
- Enforce against current `ClientAuthContext.ConnectionType`.
|
||||
- `src/NATS.Server/NatsClient.cs`
|
||||
- Populate auth context connection type (currently `STANDARD`).
|
||||
|
||||
### Connz MQTT Client Filtering
|
||||
- `src/NATS.Server/Monitoring/Connz.cs`
|
||||
- Add `MqttClient` to `ConnzOptions` with JSON field `mqtt_client`.
|
||||
- `src/NATS.Server/Monitoring/ConnzHandler.cs`
|
||||
@@ -48,6 +55,30 @@ Port MQTT-related connection type parity from Go into the .NET server for two sc
|
||||
- `src/NATS.Server/NatsServer.cs`
|
||||
- Persist `MqttClient` into `ClosedClient` snapshot (empty for now).
|
||||
|
||||
### MQTT Configuration Parsing
|
||||
- `src/NATS.Server/MqttOptions.cs` (new)
|
||||
- Full model matching Go `MQTTOpts` struct (opts.go:613-707):
|
||||
- Network: `Host`, `Port`
|
||||
- Auth override: `NoAuthUser`, `Username`, `Password`, `Token`, `AuthTimeout`
|
||||
- TLS: `TlsCert`, `TlsKey`, `TlsCaCert`, `TlsVerify`, `TlsTimeout`, `TlsMap`, `TlsPinnedCerts`
|
||||
- JetStream: `JsDomain`, `StreamReplicas`, `ConsumerReplicas`, `ConsumerMemoryStorage`, `ConsumerInactiveThreshold`
|
||||
- QoS: `AckWait`, `MaxAckPending`, `JsApiTimeout`
|
||||
- `src/NATS.Server/NatsOptions.cs`
|
||||
- Add `Mqtt` property of type `MqttOptions?`.
|
||||
- `src/NATS.Server/Configuration/ConfigProcessor.cs`
|
||||
- Add `ParseMqtt()` for `mqtt {}` config block with Go-compatible key aliases:
|
||||
- `host`/`net` → Host, `listen` → Host+Port
|
||||
- `ack_wait`/`ackwait` → AckWait
|
||||
- `max_ack_pending`/`max_pending`/`max_inflight` → MaxAckPending
|
||||
- `js_domain` → JsDomain
|
||||
- `js_api_timeout`/`api_timeout` → JsApiTimeout
|
||||
- `consumer_inactive_threshold`/`consumer_auto_cleanup` → ConsumerInactiveThreshold
|
||||
- Nested `tls {}` and `authorization {}`/`authentication {}` blocks
|
||||
- `src/NATS.Server/Monitoring/Varz.cs`
|
||||
- Expand `MqttOptsVarz` from 3 fields to full monitoring-visible set.
|
||||
- `src/NATS.Server/Monitoring/VarzHandler.cs`
|
||||
- Populate expanded `MqttOptsVarz` from `NatsOptions.Mqtt`.
|
||||
|
||||
## Data Flow
|
||||
1. Client sends `CONNECT`.
|
||||
2. `NatsClient.ProcessConnectAsync` builds `ClientAuthContext` with `ConnectionType=STANDARD`.
|
||||
@@ -73,6 +104,7 @@ Port MQTT-related connection type parity from Go into the .NET server for two sc
|
||||
- MQTT transport is not implemented yet in this repository.
|
||||
- Runtime connection type currently resolves to `STANDARD` in auth context.
|
||||
- `mqtt_client` values remain empty until MQTT path populates them.
|
||||
- MQTT config is parsed and stored but no listener is started.
|
||||
|
||||
## Testing Strategy
|
||||
- `tests/NATS.Server.Tests/JwtAuthenticatorTests.cs`
|
||||
@@ -85,9 +117,16 @@ Port MQTT-related connection type parity from Go into the .NET server for two sc
|
||||
- `/connz?mqtt_client=<id>` returns matching connections only.
|
||||
- `/connz?state=closed&mqtt_client=<id>` filters closed snapshots.
|
||||
- non-existing ID yields empty connection set.
|
||||
- `tests/NATS.Server.Tests/ConfigProcessorTests.cs` (or similar)
|
||||
- Parse valid `mqtt {}` block with all fields.
|
||||
- Parse config with aliases (ackwait vs ack_wait, host vs net, etc.).
|
||||
- Parse nested `tls {}` and `authorization {}` blocks within mqtt.
|
||||
- Varz MQTT section populated from config.
|
||||
|
||||
## Success Criteria
|
||||
- JWT `allowed_connection_types` behavior matches Go semantics for known/unknown mixing and unknown-only rejection.
|
||||
- `/connz` supports exact `mqtt_client` filtering for open and closed sets.
|
||||
- `mqtt {}` config block parses all Go `MQTTOpts` fields with aliases.
|
||||
- `MqttOptsVarz` includes full monitoring output.
|
||||
- Added tests pass.
|
||||
- `differences.md` accurately reflects implemented parity.
|
||||
|
||||
933
docs/plans/2026-02-23-mqtt-connection-type-plan.md
Normal file
933
docs/plans/2026-02-23-mqtt-connection-type-plan.md
Normal file
@@ -0,0 +1,933 @@
|
||||
# MQTT Connection Type Parity + Config Parsing Implementation Plan
|
||||
|
||||
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans to implement this plan task-by-task.
|
||||
|
||||
**Goal:** Port Go-compatible MQTT connection-type handling for JWT `allowed_connection_types`, add `/connz` `mqtt_client` filtering, parse all Go `MQTTOpts` config fields, and expand `MqttOptsVarz` monitoring output — with tests and docs updates.
|
||||
|
||||
**Architecture:** Thread a connection-type value into auth context and enforce Go-style allowed-connection-type semantics in `JwtAuthenticator`. Add connz query-option filtering for `mqtt_client` across open and closed connections. Parse the full `mqtt {}` config block into a new `MqttOptions` model following the existing `ParseTls()` pattern in `ConfigProcessor`. Expand `MqttOptsVarz` and wire into `/varz`. Keep behavior backward-compatible and transport-agnostic so MQTT runtime plumbing can be added later without changing auth/monitoring/config semantics.
|
||||
|
||||
**Tech Stack:** .NET 10, xUnit 3, Shouldly, ASP.NET minimal APIs, System.Text.Json.
|
||||
|
||||
---
|
||||
|
||||
### Task 1: Add failing JWT connection-type behavior tests
|
||||
|
||||
**Files:**
|
||||
- Modify: `tests/NATS.Server.Tests/JwtAuthenticatorTests.cs`
|
||||
|
||||
**Step 1: Write the failing tests**
|
||||
|
||||
Add these 5 test methods to the existing `JwtAuthenticatorTests` class. Each test must build a valid operator/account/user JWT chain (reuse the existing helper pattern from other tests in the file). The user JWT's `nats.allowed_connection_types` array controls which connection types are permitted.
|
||||
|
||||
```csharp
|
||||
[Fact]
|
||||
public async Task Allowed_connection_types_allows_standard_context()
|
||||
{
|
||||
// Build valid operator/account/user JWT chain.
|
||||
// User JWT includes: "allowed_connection_types":["STANDARD"]
|
||||
// Context sets ConnectionType = "STANDARD".
|
||||
// Assert Authenticate() is not null.
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Allowed_connection_types_rejects_mqtt_only_for_standard_context()
|
||||
{
|
||||
// User JWT includes: "allowed_connection_types":["MQTT"]
|
||||
// Context sets ConnectionType = "STANDARD".
|
||||
// Assert Authenticate() is null.
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Allowed_connection_types_allows_known_even_with_unknown_values()
|
||||
{
|
||||
// User JWT includes: ["STANDARD", "SOME_NEW_TYPE"]
|
||||
// Context sets ConnectionType = "STANDARD".
|
||||
// Assert Authenticate() is not null.
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Allowed_connection_types_rejects_when_only_unknown_values_present()
|
||||
{
|
||||
// User JWT includes: ["SOME_NEW_TYPE"]
|
||||
// Context sets ConnectionType = "STANDARD".
|
||||
// Assert Authenticate() is null.
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Allowed_connection_types_is_case_insensitive_for_input_values()
|
||||
{
|
||||
// User JWT includes: ["standard"]
|
||||
// Context sets ConnectionType = "STANDARD".
|
||||
// Assert Authenticate() is not null.
|
||||
}
|
||||
```
|
||||
|
||||
**Step 2: Run test to verify it fails**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests/NATS.Server.Tests.csproj --filter "FullyQualifiedName~JwtAuthenticatorTests.Allowed_connection_types" -v minimal`
|
||||
Expected: FAIL (current implementation ignores `allowed_connection_types`).
|
||||
|
||||
**Step 3: Commit test-only checkpoint**
|
||||
|
||||
```bash
|
||||
git add tests/NATS.Server.Tests/JwtAuthenticatorTests.cs
|
||||
git commit -m "test: add failing jwt allowed connection type coverage"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 2: Implement auth connection-type model and Go-style allowed-type conversion
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/NATS.Server/Auth/IAuthenticator.cs` (line 11-16: add `ConnectionType` property to `ClientAuthContext`)
|
||||
- Create: `src/NATS.Server/Auth/Jwt/JwtConnectionTypes.cs`
|
||||
- Modify: `src/NATS.Server/Auth/JwtAuthenticator.cs` (insert check after step 7 revocation check, before step 8 permissions, around line 97)
|
||||
- Modify: `src/NATS.Server/NatsClient.cs` (line 382-387: add `ConnectionType` to auth context construction)
|
||||
|
||||
**Step 1: Add connection type to auth context**
|
||||
|
||||
In `src/NATS.Server/Auth/IAuthenticator.cs`, add the `ConnectionType` property to `ClientAuthContext`. Note: this requires adding a `using NATS.Server.Auth.Jwt;` at the top of the file.
|
||||
|
||||
```csharp
|
||||
public sealed class ClientAuthContext
|
||||
{
|
||||
public required ClientOptions Opts { get; init; }
|
||||
public required byte[] Nonce { get; init; }
|
||||
public string ConnectionType { get; init; } = JwtConnectionTypes.Standard;
|
||||
public X509Certificate2? ClientCertificate { get; init; }
|
||||
}
|
||||
```
|
||||
|
||||
**Step 2: Create JWT connection-type constants + converter helper**
|
||||
|
||||
Create new file `src/NATS.Server/Auth/Jwt/JwtConnectionTypes.cs`:
|
||||
|
||||
```csharp
|
||||
namespace NATS.Server.Auth.Jwt;
|
||||
|
||||
/// <summary>
|
||||
/// Known connection type constants matching Go server/client.go.
|
||||
/// Used for JWT allowed_connection_types claim validation.
|
||||
/// Reference: golang/nats-server/server/client.go connectionType constants.
|
||||
/// </summary>
|
||||
internal static class JwtConnectionTypes
|
||||
{
|
||||
public const string Standard = "STANDARD";
|
||||
public const string Websocket = "WEBSOCKET";
|
||||
public const string Leafnode = "LEAFNODE";
|
||||
public const string LeafnodeWs = "LEAFNODE_WS";
|
||||
public const string Mqtt = "MQTT";
|
||||
public const string MqttWs = "MQTT_WS";
|
||||
public const string InProcess = "INPROCESS";
|
||||
|
||||
private static readonly HashSet<string> Known =
|
||||
[
|
||||
Standard, Websocket, Leafnode, LeafnodeWs, Mqtt, MqttWs, InProcess,
|
||||
];
|
||||
|
||||
/// <summary>
|
||||
/// Converts a list of connection type strings (from JWT claims) into a set of
|
||||
/// known valid types plus a flag indicating unknown values were present.
|
||||
/// Reference: Go server/client.go convertAllowedConnectionTypes.
|
||||
/// </summary>
|
||||
public static (HashSet<string> Valid, bool HasUnknown) Convert(IEnumerable<string>? values)
|
||||
{
|
||||
var valid = new HashSet<string>(StringComparer.Ordinal);
|
||||
var hasUnknown = false;
|
||||
if (values is null) return (valid, false);
|
||||
|
||||
foreach (var raw in values)
|
||||
{
|
||||
var up = (raw ?? string.Empty).Trim().ToUpperInvariant();
|
||||
if (up.Length == 0) continue;
|
||||
if (Known.Contains(up)) valid.Add(up);
|
||||
else hasUnknown = true;
|
||||
}
|
||||
|
||||
return (valid, hasUnknown);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Step 3: Enforce allowed connection types in JWT auth**
|
||||
|
||||
In `src/NATS.Server/Auth/JwtAuthenticator.cs`, insert the following block after the revocation check (step 7, around line 96) and before the permissions build (step 8):
|
||||
|
||||
```csharp
|
||||
// 7b. Check allowed connection types
|
||||
var (allowedTypes, hasUnknown) = JwtConnectionTypes.Convert(userClaims.Nats?.AllowedConnectionTypes);
|
||||
|
||||
if (allowedTypes.Count == 0)
|
||||
{
|
||||
if (hasUnknown)
|
||||
return null; // unknown-only list should reject
|
||||
}
|
||||
else
|
||||
{
|
||||
var connType = string.IsNullOrWhiteSpace(context.ConnectionType)
|
||||
? JwtConnectionTypes.Standard
|
||||
: context.ConnectionType.ToUpperInvariant();
|
||||
|
||||
if (!allowedTypes.Contains(connType))
|
||||
return null;
|
||||
}
|
||||
```
|
||||
|
||||
**Step 4: Set auth context connection type in client connect path**
|
||||
|
||||
In `src/NATS.Server/NatsClient.cs` around line 382, add `ConnectionType` to the existing `ClientAuthContext` construction:
|
||||
|
||||
```csharp
|
||||
var context = new ClientAuthContext
|
||||
{
|
||||
Opts = ClientOpts,
|
||||
Nonce = _nonce ?? [],
|
||||
ConnectionType = JwtConnectionTypes.Standard,
|
||||
ClientCertificate = TlsState?.PeerCert,
|
||||
};
|
||||
```
|
||||
|
||||
Add `using NATS.Server.Auth.Jwt;` at the top of the file.
|
||||
|
||||
**Step 5: Run tests to verify pass**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests/NATS.Server.Tests.csproj --filter "FullyQualifiedName~JwtAuthenticatorTests.Allowed_connection_types" -v minimal`
|
||||
Expected: PASS.
|
||||
|
||||
**Step 6: Commit implementation checkpoint**
|
||||
|
||||
```bash
|
||||
git add src/NATS.Server/Auth/IAuthenticator.cs src/NATS.Server/Auth/Jwt/JwtConnectionTypes.cs src/NATS.Server/Auth/JwtAuthenticator.cs src/NATS.Server/NatsClient.cs
|
||||
git commit -m "feat: enforce jwt allowed connection types with go-compatible semantics"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 3: Add failing connz mqtt_client filter tests
|
||||
|
||||
**Files:**
|
||||
- Modify: `tests/NATS.Server.Tests/MonitorTests.cs`
|
||||
|
||||
**Step 1: Write the failing tests**
|
||||
|
||||
Add these 2 test methods to the existing `MonitorTests` class. These test the `/connz?mqtt_client=<id>` query parameter filtering.
|
||||
|
||||
```csharp
|
||||
[Fact]
|
||||
public async Task Connz_filters_by_mqtt_client_for_open_connections()
|
||||
{
|
||||
// Start server with monitoring port.
|
||||
// Connect a regular NATS client (no MQTT ID).
|
||||
// Query /connz?mqtt_client=some-id.
|
||||
// Assert num_connections == 0 (no client has that MQTT ID).
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Connz_filters_by_mqtt_client_for_closed_connections()
|
||||
{
|
||||
// Start server with monitoring port.
|
||||
// Query /connz?state=closed&mqtt_client=missing-id.
|
||||
// Assert num_connections == 0.
|
||||
}
|
||||
```
|
||||
|
||||
**Step 2: Run tests to verify expected failure mode**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests/NATS.Server.Tests.csproj --filter "FullyQualifiedName~MonitorTests.Connz_filters_by_mqtt_client" -v minimal`
|
||||
Expected: FAIL (query option not implemented yet — `mqtt_client` param ignored, so all connections returned).
|
||||
|
||||
**Step 3: Commit test-only checkpoint**
|
||||
|
||||
```bash
|
||||
git add tests/NATS.Server.Tests/MonitorTests.cs
|
||||
git commit -m "test: add failing connz mqtt_client filter coverage"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 4: Implement connz mqtt_client filtering and closed snapshot support
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/NATS.Server/Monitoring/Connz.cs` (line 191-210: add `MqttClient` to `ConnzOptions`)
|
||||
- Modify: `src/NATS.Server/Monitoring/ConnzHandler.cs` (line 148-201: parse query param; line 18-29: apply filter after collection but before sort)
|
||||
- Modify: `src/NATS.Server/Monitoring/ClosedClient.cs` (line 6-25: add `MqttClient` property)
|
||||
- Modify: `src/NATS.Server/NatsServer.cs` (line 695-714: add `MqttClient` to closed snapshot)
|
||||
|
||||
**Step 1: Add `MqttClient` to `ConnzOptions`**
|
||||
|
||||
In `src/NATS.Server/Monitoring/Connz.cs`, add after `FilterSubject` property (line 205):
|
||||
|
||||
```csharp
|
||||
public string MqttClient { get; set; } = "";
|
||||
```
|
||||
|
||||
**Step 2: Parse `mqtt_client` query param in handler**
|
||||
|
||||
In `src/NATS.Server/Monitoring/ConnzHandler.cs` `ParseQueryParams` method, add after the existing `limit` parse block (around line 198):
|
||||
|
||||
```csharp
|
||||
if (q.TryGetValue("mqtt_client", out var mqttClient))
|
||||
opts.MqttClient = mqttClient.ToString();
|
||||
```
|
||||
|
||||
**Step 3: Apply `mqtt_client` filter in `HandleConnz`**
|
||||
|
||||
In `src/NATS.Server/Monitoring/ConnzHandler.cs` `HandleConnz` method, add after the closed connections collection block (after line 29) and before the sort validation (line 32):
|
||||
|
||||
```csharp
|
||||
// Filter by MQTT client ID
|
||||
if (!string.IsNullOrEmpty(opts.MqttClient))
|
||||
connInfos = connInfos.Where(c => c.MqttClient == opts.MqttClient).ToList();
|
||||
```
|
||||
|
||||
**Step 4: Add `MqttClient` to `ClosedClient` model**
|
||||
|
||||
In `src/NATS.Server/Monitoring/ClosedClient.cs`, add after line 24 (`TlsCipherSuite`):
|
||||
|
||||
```csharp
|
||||
public string MqttClient { get; init; } = "";
|
||||
```
|
||||
|
||||
**Step 5: Add `MqttClient` to closed snapshot creation in `NatsServer.RemoveClient`**
|
||||
|
||||
In `src/NATS.Server/NatsServer.cs` around line 713 (inside the `new ClosedClient { ... }` block), add:
|
||||
|
||||
```csharp
|
||||
MqttClient = "", // populated when MQTT transport is implemented
|
||||
```
|
||||
|
||||
**Step 6: Add `MqttClient` to `BuildClosedConnInfo`**
|
||||
|
||||
In `src/NATS.Server/Monitoring/ConnzHandler.cs` `BuildClosedConnInfo` method (line 119-146), add to the `new ConnInfo { ... }` initializer:
|
||||
|
||||
```csharp
|
||||
MqttClient = closed.MqttClient,
|
||||
```
|
||||
|
||||
**Step 7: Run connz mqtt filter tests**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests/NATS.Server.Tests.csproj --filter "FullyQualifiedName~MonitorTests.Connz_filters_by_mqtt_client" -v minimal`
|
||||
Expected: PASS.
|
||||
|
||||
**Step 8: Commit implementation checkpoint**
|
||||
|
||||
```bash
|
||||
git add src/NATS.Server/Monitoring/Connz.cs src/NATS.Server/Monitoring/ConnzHandler.cs src/NATS.Server/Monitoring/ClosedClient.cs src/NATS.Server/NatsServer.cs
|
||||
git commit -m "feat: add connz mqtt_client filtering"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 5: Verification checkpoint for JWT + connz tasks
|
||||
|
||||
**Step 1: Run all JWT connection-type tests**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests/NATS.Server.Tests.csproj --filter "FullyQualifiedName~JwtAuthenticatorTests.Allowed_connection_types" -v minimal`
|
||||
Expected: PASS.
|
||||
|
||||
**Step 2: Run all connz tests**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests/NATS.Server.Tests.csproj --filter "FullyQualifiedName~MonitorTests.Connz" -v minimal`
|
||||
Expected: PASS.
|
||||
|
||||
**Step 3: Run full test suite**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests/NATS.Server.Tests.csproj -v minimal`
|
||||
Expected: PASS (no regressions).
|
||||
|
||||
---
|
||||
|
||||
### Task 6: Add MqttOptions model and config parsing
|
||||
|
||||
**Files:**
|
||||
- Create: `src/NATS.Server/MqttOptions.cs`
|
||||
- Modify: `src/NATS.Server/NatsOptions.cs` (line 116-117: add `Mqtt` property)
|
||||
- Modify: `src/NATS.Server/Configuration/ConfigProcessor.cs` (line 248: add `mqtt` case; add `ParseMqtt` + `ParseMqttAuth` + `ParseMqttTls` + `ToDouble` methods)
|
||||
|
||||
**Step 1: Create `MqttOptions` model**
|
||||
|
||||
Create new file `src/NATS.Server/MqttOptions.cs`. This matches Go `MQTTOpts` struct (golang/nats-server/server/opts.go:613-707):
|
||||
|
||||
```csharp
|
||||
namespace NATS.Server;
|
||||
|
||||
/// <summary>
|
||||
/// MQTT protocol configuration options.
|
||||
/// Corresponds to Go server/opts.go MQTTOpts struct.
|
||||
/// Config is parsed and stored but no MQTT listener is started yet.
|
||||
/// </summary>
|
||||
public sealed class MqttOptions
|
||||
{
|
||||
// Network
|
||||
public string Host { get; set; } = "";
|
||||
public int Port { get; set; }
|
||||
|
||||
// Auth override (MQTT-specific, separate from global auth)
|
||||
public string? NoAuthUser { get; set; }
|
||||
public string? Username { get; set; }
|
||||
public string? Password { get; set; }
|
||||
public string? Token { get; set; }
|
||||
public double AuthTimeout { get; set; }
|
||||
|
||||
// TLS
|
||||
public string? TlsCert { get; set; }
|
||||
public string? TlsKey { get; set; }
|
||||
public string? TlsCaCert { get; set; }
|
||||
public bool TlsVerify { get; set; }
|
||||
public double TlsTimeout { get; set; } = 2.0;
|
||||
public bool TlsMap { get; set; }
|
||||
public HashSet<string>? TlsPinnedCerts { get; set; }
|
||||
|
||||
// JetStream integration
|
||||
public string? JsDomain { get; set; }
|
||||
public int StreamReplicas { get; set; }
|
||||
public int ConsumerReplicas { get; set; }
|
||||
public bool ConsumerMemoryStorage { get; set; }
|
||||
public TimeSpan ConsumerInactiveThreshold { get; set; }
|
||||
|
||||
// QoS
|
||||
public TimeSpan AckWait { get; set; } = TimeSpan.FromSeconds(30);
|
||||
public ushort MaxAckPending { get; set; }
|
||||
public TimeSpan JsApiTimeout { get; set; } = TimeSpan.FromSeconds(5);
|
||||
|
||||
public bool HasTls => TlsCert != null && TlsKey != null;
|
||||
}
|
||||
```
|
||||
|
||||
**Step 2: Add `Mqtt` property to `NatsOptions`**
|
||||
|
||||
In `src/NATS.Server/NatsOptions.cs`, add before the `HasTls` property (around line 117):
|
||||
|
||||
```csharp
|
||||
// MQTT configuration (parsed from config, no listener yet)
|
||||
public MqttOptions? Mqtt { get; set; }
|
||||
```
|
||||
|
||||
**Step 3: Add `ToDouble` helper to `ConfigProcessor`**
|
||||
|
||||
In `src/NATS.Server/Configuration/ConfigProcessor.cs`, add after the `ToString` helper (around line 654):
|
||||
|
||||
```csharp
|
||||
private static double ToDouble(object? value) => value switch
|
||||
{
|
||||
double d => d,
|
||||
long l => l,
|
||||
int i => i,
|
||||
string s when double.TryParse(s, NumberStyles.Float, CultureInfo.InvariantCulture, out var d) => d,
|
||||
_ => throw new FormatException($"Cannot convert {value?.GetType().Name ?? "null"} to double"),
|
||||
};
|
||||
```
|
||||
|
||||
**Step 4: Add `mqtt` case to `ProcessKey` switch**
|
||||
|
||||
In `src/NATS.Server/Configuration/ConfigProcessor.cs`, replace the default case comment at line 248:
|
||||
|
||||
```csharp
|
||||
// MQTT
|
||||
case "mqtt":
|
||||
if (value is Dictionary<string, object?> mqttDict)
|
||||
ParseMqtt(mqttDict, opts, errors);
|
||||
break;
|
||||
|
||||
// Unknown keys silently ignored (cluster, jetstream, gateway, leafnode, etc.)
|
||||
default:
|
||||
break;
|
||||
```
|
||||
|
||||
**Step 5: Add `ParseMqtt` method**
|
||||
|
||||
Add this method after `ParseTags` (around line 621). It follows the exact key/alias structure from Go `parseMQTT` (opts.go:5443-5541):
|
||||
|
||||
```csharp
|
||||
// ─── MQTT parsing ─────────────────────────────────────────────
|
||||
|
||||
private static void ParseMqtt(Dictionary<string, object?> dict, NatsOptions opts, List<string> errors)
|
||||
{
|
||||
var mqtt = opts.Mqtt ?? new MqttOptions();
|
||||
|
||||
foreach (var (key, value) in dict)
|
||||
{
|
||||
switch (key.ToLowerInvariant())
|
||||
{
|
||||
case "listen":
|
||||
var (host, port) = ParseHostPort(value);
|
||||
if (host is not null) mqtt.Host = host;
|
||||
if (port is not null) mqtt.Port = port.Value;
|
||||
break;
|
||||
case "port":
|
||||
mqtt.Port = ToInt(value);
|
||||
break;
|
||||
case "host" or "net":
|
||||
mqtt.Host = ToString(value);
|
||||
break;
|
||||
case "no_auth_user":
|
||||
mqtt.NoAuthUser = ToString(value);
|
||||
break;
|
||||
case "tls":
|
||||
if (value is Dictionary<string, object?> tlsDict)
|
||||
ParseMqttTls(tlsDict, mqtt, errors);
|
||||
break;
|
||||
case "authorization" or "authentication":
|
||||
if (value is Dictionary<string, object?> authDict)
|
||||
ParseMqttAuth(authDict, mqtt, errors);
|
||||
break;
|
||||
case "ack_wait" or "ackwait":
|
||||
mqtt.AckWait = ParseDuration(value);
|
||||
break;
|
||||
case "js_api_timeout" or "api_timeout":
|
||||
mqtt.JsApiTimeout = ParseDuration(value);
|
||||
break;
|
||||
case "max_ack_pending" or "max_pending" or "max_inflight":
|
||||
var pending = ToInt(value);
|
||||
if (pending < 0 || pending > 0xFFFF)
|
||||
errors.Add($"mqtt max_ack_pending invalid value {pending}, should be in [0..{0xFFFF}] range");
|
||||
else
|
||||
mqtt.MaxAckPending = (ushort)pending;
|
||||
break;
|
||||
case "js_domain":
|
||||
mqtt.JsDomain = ToString(value);
|
||||
break;
|
||||
case "stream_replicas":
|
||||
mqtt.StreamReplicas = ToInt(value);
|
||||
break;
|
||||
case "consumer_replicas":
|
||||
mqtt.ConsumerReplicas = ToInt(value);
|
||||
break;
|
||||
case "consumer_memory_storage":
|
||||
mqtt.ConsumerMemoryStorage = ToBool(value);
|
||||
break;
|
||||
case "consumer_inactive_threshold" or "consumer_auto_cleanup":
|
||||
mqtt.ConsumerInactiveThreshold = ParseDuration(value);
|
||||
break;
|
||||
default:
|
||||
// Unknown MQTT keys silently ignored
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
opts.Mqtt = mqtt;
|
||||
}
|
||||
|
||||
private static void ParseMqttAuth(Dictionary<string, object?> dict, MqttOptions mqtt, List<string> errors)
|
||||
{
|
||||
foreach (var (key, value) in dict)
|
||||
{
|
||||
switch (key.ToLowerInvariant())
|
||||
{
|
||||
case "user" or "username":
|
||||
mqtt.Username = ToString(value);
|
||||
break;
|
||||
case "pass" or "password":
|
||||
mqtt.Password = ToString(value);
|
||||
break;
|
||||
case "token":
|
||||
mqtt.Token = ToString(value);
|
||||
break;
|
||||
case "timeout":
|
||||
mqtt.AuthTimeout = ToDouble(value);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void ParseMqttTls(Dictionary<string, object?> dict, MqttOptions mqtt, List<string> errors)
|
||||
{
|
||||
foreach (var (key, value) in dict)
|
||||
{
|
||||
switch (key.ToLowerInvariant())
|
||||
{
|
||||
case "cert_file":
|
||||
mqtt.TlsCert = ToString(value);
|
||||
break;
|
||||
case "key_file":
|
||||
mqtt.TlsKey = ToString(value);
|
||||
break;
|
||||
case "ca_file":
|
||||
mqtt.TlsCaCert = ToString(value);
|
||||
break;
|
||||
case "verify":
|
||||
mqtt.TlsVerify = ToBool(value);
|
||||
break;
|
||||
case "verify_and_map":
|
||||
var map = ToBool(value);
|
||||
mqtt.TlsMap = map;
|
||||
if (map) mqtt.TlsVerify = true;
|
||||
break;
|
||||
case "timeout":
|
||||
mqtt.TlsTimeout = ToDouble(value);
|
||||
break;
|
||||
case "pinned_certs":
|
||||
if (value is List<object?> pinnedList)
|
||||
{
|
||||
var certs = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
|
||||
foreach (var item in pinnedList)
|
||||
{
|
||||
if (item is string s)
|
||||
certs.Add(s.ToLowerInvariant());
|
||||
}
|
||||
mqtt.TlsPinnedCerts = certs;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Step 6: Build to verify compilation**
|
||||
|
||||
Run: `dotnet build`
|
||||
Expected: Build succeeded.
|
||||
|
||||
**Step 7: Commit**
|
||||
|
||||
```bash
|
||||
git add src/NATS.Server/MqttOptions.cs src/NATS.Server/NatsOptions.cs src/NATS.Server/Configuration/ConfigProcessor.cs
|
||||
git commit -m "feat: add mqtt config model and parser for all Go MQTTOpts fields"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 7: Add MQTT config parsing tests
|
||||
|
||||
**Files:**
|
||||
- Create: `tests/NATS.Server.Tests/TestData/mqtt.conf`
|
||||
- Modify: `tests/NATS.Server.Tests/ConfigProcessorTests.cs`
|
||||
|
||||
**Step 1: Create MQTT test config file**
|
||||
|
||||
Create `tests/NATS.Server.Tests/TestData/mqtt.conf`:
|
||||
|
||||
```
|
||||
mqtt {
|
||||
listen: "10.0.0.1:1883"
|
||||
no_auth_user: "mqtt_default"
|
||||
|
||||
authorization {
|
||||
user: "mqtt_user"
|
||||
pass: "mqtt_pass"
|
||||
token: "mqtt_token"
|
||||
timeout: 3.0
|
||||
}
|
||||
|
||||
tls {
|
||||
cert_file: "/path/to/mqtt-cert.pem"
|
||||
key_file: "/path/to/mqtt-key.pem"
|
||||
ca_file: "/path/to/mqtt-ca.pem"
|
||||
verify: true
|
||||
timeout: 5.0
|
||||
}
|
||||
|
||||
ack_wait: "60s"
|
||||
max_ack_pending: 2048
|
||||
js_domain: "mqtt-domain"
|
||||
js_api_timeout: "10s"
|
||||
stream_replicas: 3
|
||||
consumer_replicas: 1
|
||||
consumer_memory_storage: true
|
||||
consumer_inactive_threshold: "5m"
|
||||
}
|
||||
```
|
||||
|
||||
Ensure this file is copied to output: check that `.csproj` has a wildcard for TestData, or add:
|
||||
```xml
|
||||
<ItemGroup>
|
||||
<None Update="TestData\**" CopyToOutputDirectory="PreserveNewest" />
|
||||
</ItemGroup>
|
||||
```
|
||||
|
||||
**Step 2: Add MQTT config tests**
|
||||
|
||||
Add to `tests/NATS.Server.Tests/ConfigProcessorTests.cs`:
|
||||
|
||||
```csharp
|
||||
// ─── MQTT config ────────────────────────────────────────────
|
||||
|
||||
[Fact]
|
||||
public void MqttConf_ListenHostAndPort()
|
||||
{
|
||||
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("mqtt.conf"));
|
||||
opts.Mqtt.ShouldNotBeNull();
|
||||
opts.Mqtt!.Host.ShouldBe("10.0.0.1");
|
||||
opts.Mqtt.Port.ShouldBe(1883);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void MqttConf_NoAuthUser()
|
||||
{
|
||||
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("mqtt.conf"));
|
||||
opts.Mqtt.ShouldNotBeNull();
|
||||
opts.Mqtt!.NoAuthUser.ShouldBe("mqtt_default");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void MqttConf_Authorization()
|
||||
{
|
||||
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("mqtt.conf"));
|
||||
opts.Mqtt.ShouldNotBeNull();
|
||||
opts.Mqtt!.Username.ShouldBe("mqtt_user");
|
||||
opts.Mqtt.Password.ShouldBe("mqtt_pass");
|
||||
opts.Mqtt.Token.ShouldBe("mqtt_token");
|
||||
opts.Mqtt.AuthTimeout.ShouldBe(3.0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void MqttConf_Tls()
|
||||
{
|
||||
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("mqtt.conf"));
|
||||
opts.Mqtt.ShouldNotBeNull();
|
||||
opts.Mqtt!.TlsCert.ShouldBe("/path/to/mqtt-cert.pem");
|
||||
opts.Mqtt.TlsKey.ShouldBe("/path/to/mqtt-key.pem");
|
||||
opts.Mqtt.TlsCaCert.ShouldBe("/path/to/mqtt-ca.pem");
|
||||
opts.Mqtt.TlsVerify.ShouldBeTrue();
|
||||
opts.Mqtt.TlsTimeout.ShouldBe(5.0);
|
||||
opts.Mqtt.HasTls.ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void MqttConf_QosSettings()
|
||||
{
|
||||
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("mqtt.conf"));
|
||||
opts.Mqtt.ShouldNotBeNull();
|
||||
opts.Mqtt!.AckWait.ShouldBe(TimeSpan.FromSeconds(60));
|
||||
opts.Mqtt.MaxAckPending.ShouldBe((ushort)2048);
|
||||
opts.Mqtt.JsApiTimeout.ShouldBe(TimeSpan.FromSeconds(10));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void MqttConf_JetStreamSettings()
|
||||
{
|
||||
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("mqtt.conf"));
|
||||
opts.Mqtt.ShouldNotBeNull();
|
||||
opts.Mqtt!.JsDomain.ShouldBe("mqtt-domain");
|
||||
opts.Mqtt.StreamReplicas.ShouldBe(3);
|
||||
opts.Mqtt.ConsumerReplicas.ShouldBe(1);
|
||||
opts.Mqtt.ConsumerMemoryStorage.ShouldBeTrue();
|
||||
opts.Mqtt.ConsumerInactiveThreshold.ShouldBe(TimeSpan.FromMinutes(5));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void MqttConf_MaxAckPendingValidation_ReportsError()
|
||||
{
|
||||
var ex = Should.Throw<ConfigProcessorException>(() =>
|
||||
ConfigProcessor.ProcessConfig("""
|
||||
mqtt {
|
||||
max_ack_pending: 70000
|
||||
}
|
||||
"""));
|
||||
ex.Errors.ShouldContain(e => e.Contains("max_ack_pending"));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void MqttConf_Aliases()
|
||||
{
|
||||
// Test alias keys: "ackwait" (alias for "ack_wait"), "net" (alias for "host"),
|
||||
// "max_inflight" (alias for "max_ack_pending"), "consumer_auto_cleanup" (alias)
|
||||
var opts = ConfigProcessor.ProcessConfig("""
|
||||
mqtt {
|
||||
net: "127.0.0.1"
|
||||
port: 1884
|
||||
ackwait: "45s"
|
||||
max_inflight: 500
|
||||
api_timeout: "8s"
|
||||
consumer_auto_cleanup: "10m"
|
||||
}
|
||||
""");
|
||||
opts.Mqtt.ShouldNotBeNull();
|
||||
opts.Mqtt!.Host.ShouldBe("127.0.0.1");
|
||||
opts.Mqtt.Port.ShouldBe(1884);
|
||||
opts.Mqtt.AckWait.ShouldBe(TimeSpan.FromSeconds(45));
|
||||
opts.Mqtt.MaxAckPending.ShouldBe((ushort)500);
|
||||
opts.Mqtt.JsApiTimeout.ShouldBe(TimeSpan.FromSeconds(8));
|
||||
opts.Mqtt.ConsumerInactiveThreshold.ShouldBe(TimeSpan.FromMinutes(10));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void MqttConf_Absent_ReturnsNull()
|
||||
{
|
||||
var opts = ConfigProcessor.ProcessConfig("port: 4222");
|
||||
opts.Mqtt.ShouldBeNull();
|
||||
}
|
||||
```
|
||||
|
||||
**Step 3: Run MQTT config tests**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests/NATS.Server.Tests.csproj --filter "FullyQualifiedName~ConfigProcessorTests.MqttConf" -v minimal`
|
||||
Expected: PASS.
|
||||
|
||||
**Step 4: Commit**
|
||||
|
||||
```bash
|
||||
git add tests/NATS.Server.Tests/TestData/mqtt.conf tests/NATS.Server.Tests/ConfigProcessorTests.cs
|
||||
git commit -m "test: add mqtt config parsing coverage"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 8: Expand MqttOptsVarz and wire into /varz
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/NATS.Server/Monitoring/Varz.cs` (lines 350-360: expand `MqttOptsVarz`)
|
||||
- Modify: `src/NATS.Server/Monitoring/VarzHandler.cs` (line 67-124: populate MQTT block from options)
|
||||
|
||||
**Step 1: Expand `MqttOptsVarz` class**
|
||||
|
||||
In `src/NATS.Server/Monitoring/Varz.cs`, replace the existing minimal `MqttOptsVarz` (lines 350-360) with the full Go-compatible struct (matching Go server/monitor.go:1365-1378):
|
||||
|
||||
```csharp
|
||||
/// <summary>
|
||||
/// MQTT configuration monitoring information.
|
||||
/// Corresponds to Go server/monitor.go MQTTOptsVarz struct.
|
||||
/// </summary>
|
||||
public sealed class MqttOptsVarz
|
||||
{
|
||||
[JsonPropertyName("host")]
|
||||
public string Host { get; set; } = "";
|
||||
|
||||
[JsonPropertyName("port")]
|
||||
public int Port { get; set; }
|
||||
|
||||
[JsonPropertyName("no_auth_user")]
|
||||
public string NoAuthUser { get; set; } = "";
|
||||
|
||||
[JsonPropertyName("auth_timeout")]
|
||||
public double AuthTimeout { get; set; }
|
||||
|
||||
[JsonPropertyName("tls_map")]
|
||||
public bool TlsMap { get; set; }
|
||||
|
||||
[JsonPropertyName("tls_timeout")]
|
||||
public double TlsTimeout { get; set; }
|
||||
|
||||
[JsonPropertyName("tls_pinned_certs")]
|
||||
public string[] TlsPinnedCerts { get; set; } = [];
|
||||
|
||||
[JsonPropertyName("js_domain")]
|
||||
public string JsDomain { get; set; } = "";
|
||||
|
||||
[JsonPropertyName("ack_wait")]
|
||||
public long AckWait { get; set; }
|
||||
|
||||
[JsonPropertyName("max_ack_pending")]
|
||||
public ushort MaxAckPending { get; set; }
|
||||
}
|
||||
```
|
||||
|
||||
Note: Go's `AckWait` is serialized as `time.Duration` (nanoseconds as int64). We follow the same pattern used for `PingInterval` and `WriteDeadline` in the existing Varz class.
|
||||
|
||||
**Step 2: Populate MQTT block in VarzHandler**
|
||||
|
||||
In `src/NATS.Server/Monitoring/VarzHandler.cs`, add MQTT population to the `return new Varz { ... }` block (around line 123, after `HttpReqStats`):
|
||||
|
||||
```csharp
|
||||
Mqtt = BuildMqttVarz(),
|
||||
```
|
||||
|
||||
And add the helper method to `VarzHandler`:
|
||||
|
||||
```csharp
|
||||
private MqttOptsVarz BuildMqttVarz()
|
||||
{
|
||||
var mqtt = _options.Mqtt;
|
||||
if (mqtt is null)
|
||||
return new MqttOptsVarz();
|
||||
|
||||
return new MqttOptsVarz
|
||||
{
|
||||
Host = mqtt.Host,
|
||||
Port = mqtt.Port,
|
||||
NoAuthUser = mqtt.NoAuthUser ?? "",
|
||||
AuthTimeout = mqtt.AuthTimeout,
|
||||
TlsMap = mqtt.TlsMap,
|
||||
TlsTimeout = mqtt.TlsTimeout,
|
||||
TlsPinnedCerts = mqtt.TlsPinnedCerts?.ToArray() ?? [],
|
||||
JsDomain = mqtt.JsDomain ?? "",
|
||||
AckWait = (long)mqtt.AckWait.TotalNanoseconds,
|
||||
MaxAckPending = mqtt.MaxAckPending,
|
||||
};
|
||||
}
|
||||
```
|
||||
|
||||
**Step 3: Build to verify compilation**
|
||||
|
||||
Run: `dotnet build`
|
||||
Expected: Build succeeded.
|
||||
|
||||
**Step 4: Add varz MQTT test**
|
||||
|
||||
In `tests/NATS.Server.Tests/MonitorTests.cs`, add a test that verifies the MQTT section appears in `/varz` response. If there's an existing varz test pattern, follow it. Otherwise add:
|
||||
|
||||
```csharp
|
||||
[Fact]
|
||||
public async Task Varz_includes_mqtt_config_when_set()
|
||||
{
|
||||
// Start server with monitoring enabled and mqtt config set.
|
||||
// GET /varz.
|
||||
// Assert response contains "mqtt" block with expected host/port values.
|
||||
}
|
||||
```
|
||||
|
||||
The exact test implementation depends on how the existing varz tests create and query the server — follow the existing pattern in MonitorTests.cs.
|
||||
|
||||
**Step 5: Run full test suite**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests/NATS.Server.Tests.csproj -v minimal`
|
||||
Expected: PASS.
|
||||
|
||||
**Step 6: Commit**
|
||||
|
||||
```bash
|
||||
git add src/NATS.Server/Monitoring/Varz.cs src/NATS.Server/Monitoring/VarzHandler.cs tests/NATS.Server.Tests/MonitorTests.cs
|
||||
git commit -m "feat: expand mqtt varz monitoring with all Go-compatible fields"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 9: Final verification and differences.md update
|
||||
|
||||
**Files:**
|
||||
- Modify: `differences.md`
|
||||
|
||||
**Step 1: Run full test suite**
|
||||
|
||||
Run: `dotnet test tests/NATS.Server.Tests/NATS.Server.Tests.csproj -v minimal`
|
||||
Expected: PASS (all tests green, no regressions).
|
||||
|
||||
**Step 2: Update parity document**
|
||||
|
||||
Edit `differences.md`:
|
||||
|
||||
1. In the **Connection Types** table (section 2), update the MQTT row:
|
||||
|
||||
```markdown
|
||||
| MQTT clients | Y | Partial | JWT connection-type constants + config parsing; no MQTT transport yet |
|
||||
```
|
||||
|
||||
2. In the **Connz Response** table (section 7), update the MQTT client ID filtering row:
|
||||
|
||||
```markdown
|
||||
| MQTT client ID filtering | Y | Y | `mqtt_client` query param filters open and closed connections |
|
||||
```
|
||||
|
||||
3. In the **Missing Options Categories** (section 6), replace the "WebSocket/MQTT options" line:
|
||||
|
||||
```markdown
|
||||
- WebSocket options
|
||||
- ~~MQTT options~~ — `mqtt {}` config block parsed with all Go `MQTTOpts` fields; no listener yet
|
||||
```
|
||||
|
||||
4. In the **Auth Mechanisms** table (section 5), add note to JWT row:
|
||||
|
||||
```markdown
|
||||
| JWT validation | Y | Y | ... + `allowed_connection_types` enforcement with Go-compatible semantics |
|
||||
```
|
||||
|
||||
**Step 3: Commit docs update**
|
||||
|
||||
```bash
|
||||
git add differences.md
|
||||
git commit -m "docs: update differences.md for mqtt connection type parity"
|
||||
```
|
||||
@@ -0,0 +1,15 @@
|
||||
{
|
||||
"planPath": "docs/plans/2026-02-23-mqtt-connection-type-plan.md",
|
||||
"tasks": [
|
||||
{"id": 2, "subject": "Task 1: Add failing JWT connection-type behavior tests", "status": "pending"},
|
||||
{"id": 3, "subject": "Task 2: Implement auth connection-type model and Go-style conversion", "status": "pending", "blockedBy": [2]},
|
||||
{"id": 4, "subject": "Task 3: Add failing connz mqtt_client filter tests", "status": "pending", "blockedBy": [3]},
|
||||
{"id": 5, "subject": "Task 4: Implement connz mqtt_client filtering", "status": "pending", "blockedBy": [4]},
|
||||
{"id": 6, "subject": "Task 5: Verification checkpoint for JWT + connz tasks", "status": "pending", "blockedBy": [5]},
|
||||
{"id": 7, "subject": "Task 6: Add MqttOptions model and config parsing", "status": "pending", "blockedBy": [6]},
|
||||
{"id": 8, "subject": "Task 7: Add MQTT config parsing tests", "status": "pending", "blockedBy": [7]},
|
||||
{"id": 9, "subject": "Task 8: Expand MqttOptsVarz and wire into /varz", "status": "pending", "blockedBy": [8]},
|
||||
{"id": 10, "subject": "Task 9: Final verification and differences.md update", "status": "pending", "blockedBy": [9]}
|
||||
],
|
||||
"lastUpdated": "2026-02-23T00:00:00Z"
|
||||
}
|
||||
14
docs/plans/jetstream-go-suite-map.md
Normal file
14
docs/plans/jetstream-go-suite-map.md
Normal file
@@ -0,0 +1,14 @@
|
||||
# JetStream Go Suite Map
|
||||
|
||||
This map tracks the Go suite families included by `scripts/run-go-jetstream-parity.sh`.
|
||||
|
||||
- `TestJetStream`: core stream/consumer API and data-path behavior.
|
||||
- `TestJetStreamCluster`: clustered JetStream semantics, placement, and failover.
|
||||
- `TestLongCluster`: long-running clustered behaviors and stabilization scenarios.
|
||||
- `TestRaft`: RAFT election, replication, and snapshot behavior used by JetStream.
|
||||
|
||||
Runner command:
|
||||
|
||||
```bash
|
||||
go test -v -run 'TestJetStream|TestJetStreamCluster|TestLongCluster|TestRaft' ./server -count=1 -timeout=180m
|
||||
```
|
||||
4092
docs/plans/jetstream-parity-run-log.md
Normal file
4092
docs/plans/jetstream-parity-run-log.md
Normal file
File diff suppressed because it is too large
Load Diff
18
scripts/run-go-jetstream-parity.sh
Executable file
18
scripts/run-go-jetstream-parity.sh
Executable file
@@ -0,0 +1,18 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
script_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
repo_root="$(cd "${script_dir}/.." && pwd)"
|
||||
|
||||
go_root="${repo_root}/golang/nats-server"
|
||||
if [[ ! -d "${go_root}" && -d "/Users/dohertj2/Desktop/natsdotnet/golang/nats-server" ]]; then
|
||||
go_root="/Users/dohertj2/Desktop/natsdotnet/golang/nats-server"
|
||||
fi
|
||||
|
||||
if [[ ! -d "${go_root}" ]]; then
|
||||
echo "Unable to locate golang/nats-server checkout." >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
cd "${go_root}"
|
||||
go test -v -run 'TestJetStream|TestJetStreamCluster|TestLongCluster|TestRaft' ./server -count=1 -timeout=180m
|
||||
@@ -15,6 +15,8 @@ public sealed class Account : IDisposable
|
||||
public int MaxSubscriptions { get; set; } // 0 = unlimited
|
||||
public ExportMap Exports { get; } = new();
|
||||
public ImportMap Imports { get; } = new();
|
||||
public int MaxJetStreamStreams { get; set; } // 0 = unlimited
|
||||
public string? JetStreamTier { get; set; }
|
||||
|
||||
// JWT fields
|
||||
public string? Nkey { get; set; }
|
||||
@@ -36,6 +38,7 @@ public sealed class Account : IDisposable
|
||||
|
||||
private readonly ConcurrentDictionary<ulong, byte> _clients = new();
|
||||
private int _subscriptionCount;
|
||||
private int _jetStreamStreamCount;
|
||||
|
||||
public Account(string name)
|
||||
{
|
||||
@@ -44,6 +47,7 @@ public sealed class Account : IDisposable
|
||||
|
||||
public int ClientCount => _clients.Count;
|
||||
public int SubscriptionCount => Volatile.Read(ref _subscriptionCount);
|
||||
public int JetStreamStreamCount => Volatile.Read(ref _jetStreamStreamCount);
|
||||
|
||||
/// <summary>Returns false if max connections exceeded.</summary>
|
||||
public bool AddClient(ulong clientId)
|
||||
@@ -69,6 +73,23 @@ public sealed class Account : IDisposable
|
||||
Interlocked.Decrement(ref _subscriptionCount);
|
||||
}
|
||||
|
||||
public bool TryReserveStream()
|
||||
{
|
||||
if (MaxJetStreamStreams > 0 && Volatile.Read(ref _jetStreamStreamCount) >= MaxJetStreamStreams)
|
||||
return false;
|
||||
|
||||
Interlocked.Increment(ref _jetStreamStreamCount);
|
||||
return true;
|
||||
}
|
||||
|
||||
public void ReleaseStream()
|
||||
{
|
||||
if (Volatile.Read(ref _jetStreamStreamCount) == 0)
|
||||
return;
|
||||
|
||||
Interlocked.Decrement(ref _jetStreamStreamCount);
|
||||
}
|
||||
|
||||
// Per-account message/byte stats
|
||||
private long _inMsgs;
|
||||
private long _outMsgs;
|
||||
|
||||
@@ -6,4 +6,6 @@ public sealed class AuthResult
|
||||
public string? AccountName { get; init; }
|
||||
public Permissions? Permissions { get; init; }
|
||||
public DateTimeOffset? Expiry { get; init; }
|
||||
public int MaxJetStreamStreams { get; init; }
|
||||
public string? JetStreamTier { get; init; }
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
using System.Security.Cryptography.X509Certificates;
|
||||
using NATS.Server.Auth.Jwt;
|
||||
using NATS.Server.Protocol;
|
||||
|
||||
namespace NATS.Server.Auth;
|
||||
@@ -13,4 +14,11 @@ public sealed class ClientAuthContext
|
||||
public required ClientOptions Opts { get; init; }
|
||||
public required byte[] Nonce { get; init; }
|
||||
public X509Certificate2? ClientCertificate { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// The type of connection (e.g., "STANDARD", "WEBSOCKET", "MQTT", "LEAFNODE").
|
||||
/// Used by JWT authenticator to enforce allowed_connection_types claims.
|
||||
/// Defaults to "STANDARD" for regular NATS client connections.
|
||||
/// </summary>
|
||||
public string ConnectionType { get; init; } = JwtConnectionTypes.Standard;
|
||||
}
|
||||
|
||||
@@ -47,6 +47,10 @@ public sealed class AccountNats
|
||||
[JsonPropertyName("limits")]
|
||||
public AccountLimits? Limits { get; set; }
|
||||
|
||||
/// <summary>JetStream entitlement limits/tier for this account.</summary>
|
||||
[JsonPropertyName("jetstream")]
|
||||
public AccountJetStreamLimits? JetStream { get; set; }
|
||||
|
||||
/// <summary>NKey public keys authorized to sign user JWTs for this account.</summary>
|
||||
[JsonPropertyName("signing_keys")]
|
||||
public string[]? SigningKeys { get; set; }
|
||||
@@ -92,3 +96,12 @@ public sealed class AccountLimits
|
||||
[JsonPropertyName("data")]
|
||||
public long MaxData { get; set; }
|
||||
}
|
||||
|
||||
public sealed class AccountJetStreamLimits
|
||||
{
|
||||
[JsonPropertyName("max_streams")]
|
||||
public int MaxStreams { get; set; }
|
||||
|
||||
[JsonPropertyName("tier")]
|
||||
public string? Tier { get; set; }
|
||||
}
|
||||
|
||||
34
src/NATS.Server/Auth/Jwt/JwtConnectionTypes.cs
Normal file
34
src/NATS.Server/Auth/Jwt/JwtConnectionTypes.cs
Normal file
@@ -0,0 +1,34 @@
|
||||
namespace NATS.Server.Auth.Jwt;
|
||||
|
||||
internal static class JwtConnectionTypes
|
||||
{
|
||||
public const string Standard = "STANDARD";
|
||||
public const string Websocket = "WEBSOCKET";
|
||||
public const string Leafnode = "LEAFNODE";
|
||||
public const string LeafnodeWs = "LEAFNODE_WS";
|
||||
public const string Mqtt = "MQTT";
|
||||
public const string MqttWs = "MQTT_WS";
|
||||
public const string InProcess = "INPROCESS";
|
||||
|
||||
private static readonly HashSet<string> Known =
|
||||
[
|
||||
Standard, Websocket, Leafnode, LeafnodeWs, Mqtt, MqttWs, InProcess,
|
||||
];
|
||||
|
||||
public static (HashSet<string> Valid, bool HasUnknown) Convert(IEnumerable<string>? values)
|
||||
{
|
||||
var valid = new HashSet<string>(StringComparer.Ordinal);
|
||||
var hasUnknown = false;
|
||||
if (values is null) return (valid, false);
|
||||
|
||||
foreach (var raw in values)
|
||||
{
|
||||
var up = (raw ?? string.Empty).Trim().ToUpperInvariant();
|
||||
if (up.Length == 0) continue;
|
||||
if (Known.Contains(up)) valid.Add(up);
|
||||
else hasUnknown = true;
|
||||
}
|
||||
|
||||
return (valid, hasUnknown);
|
||||
}
|
||||
}
|
||||
@@ -95,6 +95,24 @@ public sealed class JwtAuthenticator : IAuthenticator
|
||||
}
|
||||
}
|
||||
|
||||
// 7b. Check allowed connection types
|
||||
var (allowedTypes, hasUnknown) = JwtConnectionTypes.Convert(userClaims.Nats?.AllowedConnectionTypes);
|
||||
|
||||
if (allowedTypes.Count == 0)
|
||||
{
|
||||
if (hasUnknown)
|
||||
return null; // unknown-only list should reject
|
||||
}
|
||||
else
|
||||
{
|
||||
var connType = string.IsNullOrWhiteSpace(context.ConnectionType)
|
||||
? JwtConnectionTypes.Standard
|
||||
: context.ConnectionType.ToUpperInvariant();
|
||||
|
||||
if (!allowedTypes.Contains(connType))
|
||||
return null;
|
||||
}
|
||||
|
||||
// 8. Build permissions from JWT claims
|
||||
Permissions? permissions = null;
|
||||
var nats = userClaims.Nats;
|
||||
@@ -143,6 +161,8 @@ public sealed class JwtAuthenticator : IAuthenticator
|
||||
AccountName = issuerAccount,
|
||||
Permissions = permissions,
|
||||
Expiry = userClaims.GetExpiry(),
|
||||
MaxJetStreamStreams = accountClaims.Nats?.JetStream?.MaxStreams ?? 0,
|
||||
JetStreamTier = accountClaims.Nats?.JetStream?.Tier,
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
9
src/NATS.Server/Configuration/ClusterOptions.cs
Normal file
9
src/NATS.Server/Configuration/ClusterOptions.cs
Normal file
@@ -0,0 +1,9 @@
|
||||
namespace NATS.Server.Configuration;
|
||||
|
||||
public sealed class ClusterOptions
|
||||
{
|
||||
public string? Name { get; set; }
|
||||
public string Host { get; set; } = "0.0.0.0";
|
||||
public int Port { get; set; } = 6222;
|
||||
public List<string> Routes { get; set; } = [];
|
||||
}
|
||||
@@ -217,6 +217,26 @@ public static class ConfigProcessor
|
||||
opts.AllowNonTls = ToBool(value);
|
||||
break;
|
||||
|
||||
// Cluster / inter-server / JetStream
|
||||
case "cluster":
|
||||
if (value is Dictionary<string, object?> clusterDict)
|
||||
opts.Cluster = ParseCluster(clusterDict, errors);
|
||||
break;
|
||||
case "gateway":
|
||||
if (value is Dictionary<string, object?> gatewayDict)
|
||||
opts.Gateway = ParseGateway(gatewayDict, errors);
|
||||
break;
|
||||
case "leaf":
|
||||
case "leafnode":
|
||||
case "leafnodes":
|
||||
if (value is Dictionary<string, object?> leafDict)
|
||||
opts.LeafNode = ParseLeafNode(leafDict, errors);
|
||||
break;
|
||||
case "jetstream":
|
||||
if (value is Dictionary<string, object?> jsDict)
|
||||
opts.JetStream = ParseJetStream(jsDict, errors);
|
||||
break;
|
||||
|
||||
// Tags
|
||||
case "server_tags":
|
||||
if (value is Dictionary<string, object?> tagsDict)
|
||||
@@ -245,6 +265,12 @@ public static class ConfigProcessor
|
||||
opts.ReconnectErrorReports = ToInt(value);
|
||||
break;
|
||||
|
||||
// MQTT
|
||||
case "mqtt":
|
||||
if (value is Dictionary<string, object?> mqttDict)
|
||||
ParseMqtt(mqttDict, opts, errors);
|
||||
break;
|
||||
|
||||
// Unknown keys silently ignored (cluster, jetstream, gateway, leafnode, etc.)
|
||||
default:
|
||||
break;
|
||||
@@ -342,6 +368,9 @@ public static class ConfigProcessor
|
||||
private static readonly Regex DurationPattern = new(
|
||||
@"^(-?\d+(?:\.\d+)?)\s*(ms|s|m|h)$",
|
||||
RegexOptions.Compiled | RegexOptions.IgnoreCase);
|
||||
private static readonly Regex ByteSizePattern = new(
|
||||
@"^(\d+)\s*(b|kb|mb|gb|tb)?$",
|
||||
RegexOptions.Compiled | RegexOptions.IgnoreCase);
|
||||
|
||||
private static TimeSpan ParseDurationString(string s)
|
||||
{
|
||||
@@ -362,6 +391,133 @@ public static class ConfigProcessor
|
||||
};
|
||||
}
|
||||
|
||||
// ─── Cluster / gateway / leafnode / JetStream parsing ────────
|
||||
|
||||
private static ClusterOptions ParseCluster(Dictionary<string, object?> dict, List<string> errors)
|
||||
{
|
||||
var options = new ClusterOptions();
|
||||
foreach (var (key, value) in dict)
|
||||
{
|
||||
switch (key.ToLowerInvariant())
|
||||
{
|
||||
case "name":
|
||||
options.Name = ToString(value);
|
||||
break;
|
||||
case "listen":
|
||||
try
|
||||
{
|
||||
var (host, port) = ParseHostPort(value);
|
||||
if (host is not null)
|
||||
options.Host = host;
|
||||
if (port is not null)
|
||||
options.Port = port.Value;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
errors.Add($"Invalid cluster.listen: {ex.Message}");
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return options;
|
||||
}
|
||||
|
||||
private static GatewayOptions ParseGateway(Dictionary<string, object?> dict, List<string> errors)
|
||||
{
|
||||
var options = new GatewayOptions();
|
||||
foreach (var (key, value) in dict)
|
||||
{
|
||||
switch (key.ToLowerInvariant())
|
||||
{
|
||||
case "name":
|
||||
options.Name = ToString(value);
|
||||
break;
|
||||
case "listen":
|
||||
try
|
||||
{
|
||||
var (host, port) = ParseHostPort(value);
|
||||
if (host is not null)
|
||||
options.Host = host;
|
||||
if (port is not null)
|
||||
options.Port = port.Value;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
errors.Add($"Invalid gateway.listen: {ex.Message}");
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return options;
|
||||
}
|
||||
|
||||
private static LeafNodeOptions ParseLeafNode(Dictionary<string, object?> dict, List<string> errors)
|
||||
{
|
||||
var options = new LeafNodeOptions();
|
||||
foreach (var (key, value) in dict)
|
||||
{
|
||||
if (key.Equals("listen", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
try
|
||||
{
|
||||
var (host, port) = ParseHostPort(value);
|
||||
if (host is not null)
|
||||
options.Host = host;
|
||||
if (port is not null)
|
||||
options.Port = port.Value;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
errors.Add($"Invalid leafnode.listen: {ex.Message}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return options;
|
||||
}
|
||||
|
||||
private static JetStreamOptions ParseJetStream(Dictionary<string, object?> dict, List<string> errors)
|
||||
{
|
||||
var options = new JetStreamOptions();
|
||||
foreach (var (key, value) in dict)
|
||||
{
|
||||
switch (key.ToLowerInvariant())
|
||||
{
|
||||
case "store_dir":
|
||||
options.StoreDir = ToString(value);
|
||||
break;
|
||||
case "max_mem_store":
|
||||
try
|
||||
{
|
||||
options.MaxMemoryStore = ParseByteSize(value);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
errors.Add($"Invalid jetstream.max_mem_store: {ex.Message}");
|
||||
}
|
||||
|
||||
break;
|
||||
case "max_file_store":
|
||||
try
|
||||
{
|
||||
options.MaxFileStore = ParseByteSize(value);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
errors.Add($"Invalid jetstream.max_file_store: {ex.Message}");
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return options;
|
||||
}
|
||||
|
||||
// ─── Authorization parsing ─────────────────────────────────────
|
||||
|
||||
private static void ParseAuthorization(Dictionary<string, object?> dict, NatsOptions opts, List<string> errors)
|
||||
@@ -620,6 +776,145 @@ public static class ConfigProcessor
|
||||
opts.Tags = tags;
|
||||
}
|
||||
|
||||
// ─── MQTT parsing ────────────────────────────────────────────────
|
||||
// Reference: Go server/opts.go parseMQTT (lines ~5443-5541)
|
||||
|
||||
private static void ParseMqtt(Dictionary<string, object?> dict, NatsOptions opts, List<string> errors)
|
||||
{
|
||||
var mqtt = opts.Mqtt ?? new MqttOptions();
|
||||
|
||||
foreach (var (key, value) in dict)
|
||||
{
|
||||
switch (key.ToLowerInvariant())
|
||||
{
|
||||
case "listen":
|
||||
var (host, port) = ParseHostPort(value);
|
||||
if (host is not null) mqtt.Host = host;
|
||||
if (port is not null) mqtt.Port = port.Value;
|
||||
break;
|
||||
case "port":
|
||||
mqtt.Port = ToInt(value);
|
||||
break;
|
||||
case "host" or "net":
|
||||
mqtt.Host = ToString(value);
|
||||
break;
|
||||
case "no_auth_user":
|
||||
mqtt.NoAuthUser = ToString(value);
|
||||
break;
|
||||
case "tls":
|
||||
if (value is Dictionary<string, object?> tlsDict)
|
||||
ParseMqttTls(tlsDict, mqtt, errors);
|
||||
break;
|
||||
case "authorization" or "authentication":
|
||||
if (value is Dictionary<string, object?> authDict)
|
||||
ParseMqttAuth(authDict, mqtt, errors);
|
||||
break;
|
||||
case "ack_wait" or "ackwait":
|
||||
mqtt.AckWait = ParseDuration(value);
|
||||
break;
|
||||
case "js_api_timeout" or "api_timeout":
|
||||
mqtt.JsApiTimeout = ParseDuration(value);
|
||||
break;
|
||||
case "max_ack_pending" or "max_pending" or "max_inflight":
|
||||
var pending = ToInt(value);
|
||||
if (pending < 0 || pending > 0xFFFF)
|
||||
errors.Add($"mqtt max_ack_pending invalid value {pending}, should be in [0..{0xFFFF}] range");
|
||||
else
|
||||
mqtt.MaxAckPending = (ushort)pending;
|
||||
break;
|
||||
case "js_domain":
|
||||
mqtt.JsDomain = ToString(value);
|
||||
break;
|
||||
case "stream_replicas":
|
||||
mqtt.StreamReplicas = ToInt(value);
|
||||
break;
|
||||
case "consumer_replicas":
|
||||
mqtt.ConsumerReplicas = ToInt(value);
|
||||
break;
|
||||
case "consumer_memory_storage":
|
||||
mqtt.ConsumerMemoryStorage = ToBool(value);
|
||||
break;
|
||||
case "consumer_inactive_threshold" or "consumer_auto_cleanup":
|
||||
mqtt.ConsumerInactiveThreshold = ParseDuration(value);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
opts.Mqtt = mqtt;
|
||||
}
|
||||
|
||||
private static void ParseMqttAuth(Dictionary<string, object?> dict, MqttOptions mqtt, List<string> errors)
|
||||
{
|
||||
foreach (var (key, value) in dict)
|
||||
{
|
||||
switch (key.ToLowerInvariant())
|
||||
{
|
||||
case "user" or "username":
|
||||
mqtt.Username = ToString(value);
|
||||
break;
|
||||
case "pass" or "password":
|
||||
mqtt.Password = ToString(value);
|
||||
break;
|
||||
case "token":
|
||||
mqtt.Token = ToString(value);
|
||||
break;
|
||||
case "timeout":
|
||||
mqtt.AuthTimeout = ToDouble(value);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void ParseMqttTls(Dictionary<string, object?> dict, MqttOptions mqtt, List<string> errors)
|
||||
{
|
||||
foreach (var (key, value) in dict)
|
||||
{
|
||||
switch (key.ToLowerInvariant())
|
||||
{
|
||||
case "cert_file":
|
||||
mqtt.TlsCert = ToString(value);
|
||||
break;
|
||||
case "key_file":
|
||||
mqtt.TlsKey = ToString(value);
|
||||
break;
|
||||
case "ca_file":
|
||||
mqtt.TlsCaCert = ToString(value);
|
||||
break;
|
||||
case "verify":
|
||||
mqtt.TlsVerify = ToBool(value);
|
||||
break;
|
||||
case "verify_and_map":
|
||||
var map = ToBool(value);
|
||||
mqtt.TlsMap = map;
|
||||
if (map) mqtt.TlsVerify = true;
|
||||
break;
|
||||
case "timeout":
|
||||
mqtt.TlsTimeout = ToDouble(value);
|
||||
break;
|
||||
case "pinned_certs":
|
||||
if (value is List<object?> pinnedList)
|
||||
{
|
||||
var certs = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
|
||||
foreach (var item in pinnedList)
|
||||
{
|
||||
if (item is string s)
|
||||
certs.Add(s.ToLowerInvariant());
|
||||
}
|
||||
|
||||
mqtt.TlsPinnedCerts = certs;
|
||||
}
|
||||
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ─── Type conversion helpers ───────────────────────────────────
|
||||
|
||||
private static int ToInt(object? value) => value switch
|
||||
@@ -640,6 +935,40 @@ public static class ConfigProcessor
|
||||
_ => throw new FormatException($"Cannot convert {value?.GetType().Name ?? "null"} to long"),
|
||||
};
|
||||
|
||||
private static long ParseByteSize(object? value)
|
||||
{
|
||||
if (value is long l)
|
||||
return l;
|
||||
if (value is int i)
|
||||
return i;
|
||||
if (value is double d)
|
||||
return (long)d;
|
||||
if (value is not string s)
|
||||
throw new FormatException($"Cannot parse byte size from {value?.GetType().Name ?? "null"}");
|
||||
|
||||
var trimmed = s.Trim();
|
||||
var match = ByteSizePattern.Match(trimmed);
|
||||
if (!match.Success)
|
||||
throw new FormatException($"Cannot parse byte size: '{s}'");
|
||||
|
||||
var amount = long.Parse(match.Groups[1].Value, CultureInfo.InvariantCulture);
|
||||
var unit = match.Groups[2].Value.ToLowerInvariant();
|
||||
var multiplier = unit switch
|
||||
{
|
||||
"" or "b" => 1L,
|
||||
"kb" => 1024L,
|
||||
"mb" => 1024L * 1024L,
|
||||
"gb" => 1024L * 1024L * 1024L,
|
||||
"tb" => 1024L * 1024L * 1024L * 1024L,
|
||||
_ => throw new FormatException($"Unknown byte-size unit: '{unit}'"),
|
||||
};
|
||||
|
||||
checked
|
||||
{
|
||||
return amount * multiplier;
|
||||
}
|
||||
}
|
||||
|
||||
private static bool ToBool(object? value) => value switch
|
||||
{
|
||||
bool b => b,
|
||||
@@ -653,6 +982,15 @@ public static class ConfigProcessor
|
||||
_ => throw new FormatException($"Cannot convert {value?.GetType().Name ?? "null"} to string"),
|
||||
};
|
||||
|
||||
private static double ToDouble(object? value) => value switch
|
||||
{
|
||||
double d => d,
|
||||
long l => l,
|
||||
int i => i,
|
||||
string s when double.TryParse(s, NumberStyles.Float, CultureInfo.InvariantCulture, out var d) => d,
|
||||
_ => throw new FormatException($"Cannot convert {value?.GetType().Name ?? "null"} to double"),
|
||||
};
|
||||
|
||||
private static IReadOnlyList<string> ToStringList(object? value)
|
||||
{
|
||||
if (value is List<object?> list)
|
||||
|
||||
@@ -11,7 +11,8 @@ namespace NATS.Server.Configuration;
|
||||
public static class ConfigReloader
|
||||
{
|
||||
// Non-reloadable options (match Go server — Host, Port, ServerName require restart)
|
||||
private static readonly HashSet<string> NonReloadable = ["Host", "Port", "ServerName"];
|
||||
private static readonly HashSet<string> NonReloadable =
|
||||
["Host", "Port", "ServerName", "Cluster", "JetStream.StoreDir"];
|
||||
|
||||
// Logging-related options
|
||||
private static readonly HashSet<string> LoggingOptions =
|
||||
@@ -102,6 +103,13 @@ public static class ConfigReloader
|
||||
CompareAndAdd(changes, "NoSystemAccount", oldOpts.NoSystemAccount, newOpts.NoSystemAccount);
|
||||
CompareAndAdd(changes, "SystemAccount", oldOpts.SystemAccount, newOpts.SystemAccount);
|
||||
|
||||
// Cluster and JetStream (restart-required boundaries)
|
||||
if (!ClusterEquivalent(oldOpts.Cluster, newOpts.Cluster))
|
||||
changes.Add(new ConfigChange("Cluster", isNonReloadable: true));
|
||||
|
||||
if (JetStreamStoreDirChanged(oldOpts.JetStream, newOpts.JetStream))
|
||||
changes.Add(new ConfigChange("JetStream.StoreDir", isNonReloadable: true));
|
||||
|
||||
return changes;
|
||||
}
|
||||
|
||||
@@ -338,4 +346,35 @@ public static class ConfigReloader
|
||||
isNonReloadable: NonReloadable.Contains(name)));
|
||||
}
|
||||
}
|
||||
|
||||
private static bool ClusterEquivalent(ClusterOptions? oldCluster, ClusterOptions? newCluster)
|
||||
{
|
||||
if (oldCluster is null && newCluster is null)
|
||||
return true;
|
||||
|
||||
if (oldCluster is null || newCluster is null)
|
||||
return false;
|
||||
|
||||
if (!string.Equals(oldCluster.Name, newCluster.Name, StringComparison.Ordinal))
|
||||
return false;
|
||||
|
||||
if (!string.Equals(oldCluster.Host, newCluster.Host, StringComparison.Ordinal))
|
||||
return false;
|
||||
|
||||
if (oldCluster.Port != newCluster.Port)
|
||||
return false;
|
||||
|
||||
return oldCluster.Routes.SequenceEqual(newCluster.Routes, StringComparer.Ordinal);
|
||||
}
|
||||
|
||||
private static bool JetStreamStoreDirChanged(JetStreamOptions? oldJetStream, JetStreamOptions? newJetStream)
|
||||
{
|
||||
if (oldJetStream is null && newJetStream is null)
|
||||
return false;
|
||||
|
||||
if (oldJetStream is null || newJetStream is null)
|
||||
return true;
|
||||
|
||||
return !string.Equals(oldJetStream.StoreDir, newJetStream.StoreDir, StringComparison.Ordinal);
|
||||
}
|
||||
}
|
||||
|
||||
8
src/NATS.Server/Configuration/GatewayOptions.cs
Normal file
8
src/NATS.Server/Configuration/GatewayOptions.cs
Normal file
@@ -0,0 +1,8 @@
|
||||
namespace NATS.Server.Configuration;
|
||||
|
||||
public sealed class GatewayOptions
|
||||
{
|
||||
public string? Name { get; set; }
|
||||
public string Host { get; set; } = "0.0.0.0";
|
||||
public int Port { get; set; }
|
||||
}
|
||||
8
src/NATS.Server/Configuration/JetStreamOptions.cs
Normal file
8
src/NATS.Server/Configuration/JetStreamOptions.cs
Normal file
@@ -0,0 +1,8 @@
|
||||
namespace NATS.Server.Configuration;
|
||||
|
||||
public sealed class JetStreamOptions
|
||||
{
|
||||
public string StoreDir { get; set; } = string.Empty;
|
||||
public long MaxMemoryStore { get; set; }
|
||||
public long MaxFileStore { get; set; }
|
||||
}
|
||||
7
src/NATS.Server/Configuration/LeafNodeOptions.cs
Normal file
7
src/NATS.Server/Configuration/LeafNodeOptions.cs
Normal file
@@ -0,0 +1,7 @@
|
||||
namespace NATS.Server.Configuration;
|
||||
|
||||
public sealed class LeafNodeOptions
|
||||
{
|
||||
public string Host { get; set; } = "0.0.0.0";
|
||||
public int Port { get; set; }
|
||||
}
|
||||
11
src/NATS.Server/Gateways/GatewayConnection.cs
Normal file
11
src/NATS.Server/Gateways/GatewayConnection.cs
Normal file
@@ -0,0 +1,11 @@
|
||||
namespace NATS.Server.Gateways;
|
||||
|
||||
public sealed class GatewayConnection
|
||||
{
|
||||
public string RemoteEndpoint { get; }
|
||||
|
||||
public GatewayConnection(string remoteEndpoint)
|
||||
{
|
||||
RemoteEndpoint = remoteEndpoint;
|
||||
}
|
||||
}
|
||||
32
src/NATS.Server/Gateways/GatewayManager.cs
Normal file
32
src/NATS.Server/Gateways/GatewayManager.cs
Normal file
@@ -0,0 +1,32 @@
|
||||
using Microsoft.Extensions.Logging;
|
||||
using NATS.Server.Configuration;
|
||||
|
||||
namespace NATS.Server.Gateways;
|
||||
|
||||
public sealed class GatewayManager : IAsyncDisposable
|
||||
{
|
||||
private readonly GatewayOptions _options;
|
||||
private readonly ServerStats _stats;
|
||||
private readonly ILogger<GatewayManager> _logger;
|
||||
|
||||
public GatewayManager(GatewayOptions options, ServerStats stats, ILogger<GatewayManager> logger)
|
||||
{
|
||||
_options = options;
|
||||
_stats = stats;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
public Task StartAsync(CancellationToken ct)
|
||||
{
|
||||
_logger.LogDebug("Gateway manager started (name={Name}, listen={Host}:{Port})",
|
||||
_options.Name, _options.Host, _options.Port);
|
||||
Interlocked.Exchange(ref _stats.Gateways, 0);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public ValueTask DisposeAsync()
|
||||
{
|
||||
_logger.LogDebug("Gateway manager stopped");
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,88 @@
|
||||
using System.Text.Json;
|
||||
using NATS.Server.JetStream.Models;
|
||||
|
||||
namespace NATS.Server.JetStream.Api.Handlers;
|
||||
|
||||
public static class ConsumerApiHandlers
|
||||
{
|
||||
private const string CreatePrefix = "$JS.API.CONSUMER.CREATE.";
|
||||
private const string InfoPrefix = "$JS.API.CONSUMER.INFO.";
|
||||
|
||||
public static JetStreamApiResponse HandleCreate(string subject, ReadOnlySpan<byte> payload, ConsumerManager consumerManager)
|
||||
{
|
||||
var parsed = ParseSubject(subject, CreatePrefix);
|
||||
if (parsed == null)
|
||||
return JetStreamApiResponse.NotFound(subject);
|
||||
|
||||
var (stream, durableName) = parsed.Value;
|
||||
var config = ParseConfig(payload);
|
||||
if (string.IsNullOrWhiteSpace(config.DurableName))
|
||||
config.DurableName = durableName;
|
||||
|
||||
return consumerManager.CreateOrUpdate(stream, config);
|
||||
}
|
||||
|
||||
public static JetStreamApiResponse HandleInfo(string subject, ConsumerManager consumerManager)
|
||||
{
|
||||
var parsed = ParseSubject(subject, InfoPrefix);
|
||||
if (parsed == null)
|
||||
return JetStreamApiResponse.NotFound(subject);
|
||||
|
||||
var (stream, durableName) = parsed.Value;
|
||||
return consumerManager.GetInfo(stream, durableName);
|
||||
}
|
||||
|
||||
private static (string Stream, string Durable)? ParseSubject(string subject, string prefix)
|
||||
{
|
||||
if (!subject.StartsWith(prefix, StringComparison.Ordinal))
|
||||
return null;
|
||||
|
||||
var remainder = subject[prefix.Length..];
|
||||
var split = remainder.Split('.', 2, StringSplitOptions.RemoveEmptyEntries);
|
||||
if (split.Length != 2)
|
||||
return null;
|
||||
|
||||
return (split[0], split[1]);
|
||||
}
|
||||
|
||||
private static ConsumerConfig ParseConfig(ReadOnlySpan<byte> payload)
|
||||
{
|
||||
if (payload.IsEmpty)
|
||||
return new ConsumerConfig();
|
||||
|
||||
try
|
||||
{
|
||||
using var doc = JsonDocument.Parse(payload.ToArray());
|
||||
var root = doc.RootElement;
|
||||
var config = new ConsumerConfig();
|
||||
|
||||
if (root.TryGetProperty("durable_name", out var durableEl))
|
||||
config.DurableName = durableEl.GetString() ?? string.Empty;
|
||||
|
||||
if (root.TryGetProperty("filter_subject", out var filterEl))
|
||||
config.FilterSubject = filterEl.GetString();
|
||||
|
||||
if (root.TryGetProperty("push", out var pushEl) && pushEl.ValueKind == JsonValueKind.True)
|
||||
config.Push = true;
|
||||
|
||||
if (root.TryGetProperty("heartbeat_ms", out var hbEl) && hbEl.TryGetInt32(out var hbMs))
|
||||
config.HeartbeatMs = hbMs;
|
||||
|
||||
if (root.TryGetProperty("ack_wait_ms", out var ackWaitEl) && ackWaitEl.TryGetInt32(out var ackWait))
|
||||
config.AckWaitMs = ackWait;
|
||||
|
||||
if (root.TryGetProperty("ack_policy", out var ackPolicyEl))
|
||||
{
|
||||
var ackPolicy = ackPolicyEl.GetString();
|
||||
if (string.Equals(ackPolicy, "explicit", StringComparison.OrdinalIgnoreCase))
|
||||
config.AckPolicy = AckPolicy.Explicit;
|
||||
}
|
||||
|
||||
return config;
|
||||
}
|
||||
catch (JsonException)
|
||||
{
|
||||
return new ConsumerConfig();
|
||||
}
|
||||
}
|
||||
}
|
||||
91
src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs
Normal file
91
src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs
Normal file
@@ -0,0 +1,91 @@
|
||||
using System.Text.Json;
|
||||
using NATS.Server.JetStream.Models;
|
||||
|
||||
namespace NATS.Server.JetStream.Api.Handlers;
|
||||
|
||||
public static class StreamApiHandlers
|
||||
{
|
||||
private const string CreatePrefix = "$JS.API.STREAM.CREATE.";
|
||||
private const string InfoPrefix = "$JS.API.STREAM.INFO.";
|
||||
|
||||
public static JetStreamApiResponse HandleCreate(string subject, ReadOnlySpan<byte> payload, StreamManager streamManager)
|
||||
{
|
||||
var streamName = ExtractTrailingToken(subject, CreatePrefix);
|
||||
if (streamName == null)
|
||||
return JetStreamApiResponse.NotFound(subject);
|
||||
|
||||
var config = ParseConfig(payload);
|
||||
if (string.IsNullOrWhiteSpace(config.Name))
|
||||
config.Name = streamName;
|
||||
|
||||
if (config.Subjects.Count == 0)
|
||||
config.Subjects.Add(streamName.ToLowerInvariant() + ".>");
|
||||
|
||||
return streamManager.CreateOrUpdate(config);
|
||||
}
|
||||
|
||||
public static JetStreamApiResponse HandleInfo(string subject, StreamManager streamManager)
|
||||
{
|
||||
var streamName = ExtractTrailingToken(subject, InfoPrefix);
|
||||
if (streamName == null)
|
||||
return JetStreamApiResponse.NotFound(subject);
|
||||
|
||||
return streamManager.GetInfo(streamName);
|
||||
}
|
||||
|
||||
private static string? ExtractTrailingToken(string subject, string prefix)
|
||||
{
|
||||
if (!subject.StartsWith(prefix, StringComparison.Ordinal))
|
||||
return null;
|
||||
|
||||
var token = subject[prefix.Length..].Trim();
|
||||
return token.Length == 0 ? null : token;
|
||||
}
|
||||
|
||||
private static StreamConfig ParseConfig(ReadOnlySpan<byte> payload)
|
||||
{
|
||||
if (payload.IsEmpty)
|
||||
return new StreamConfig();
|
||||
|
||||
try
|
||||
{
|
||||
using var doc = JsonDocument.Parse(payload.ToArray());
|
||||
var root = doc.RootElement;
|
||||
var config = new StreamConfig();
|
||||
|
||||
if (root.TryGetProperty("name", out var nameEl))
|
||||
config.Name = nameEl.GetString() ?? string.Empty;
|
||||
|
||||
if (root.TryGetProperty("subjects", out var subjectsEl))
|
||||
{
|
||||
if (subjectsEl.ValueKind == JsonValueKind.Array)
|
||||
{
|
||||
foreach (var item in subjectsEl.EnumerateArray())
|
||||
{
|
||||
var value = item.GetString();
|
||||
if (!string.IsNullOrWhiteSpace(value))
|
||||
config.Subjects.Add(value);
|
||||
}
|
||||
}
|
||||
else if (subjectsEl.ValueKind == JsonValueKind.String)
|
||||
{
|
||||
var value = subjectsEl.GetString();
|
||||
if (!string.IsNullOrWhiteSpace(value))
|
||||
config.Subjects.Add(value);
|
||||
}
|
||||
}
|
||||
|
||||
if (root.TryGetProperty("max_msgs", out var maxMsgsEl) && maxMsgsEl.TryGetInt32(out var maxMsgs))
|
||||
config.MaxMsgs = maxMsgs;
|
||||
|
||||
if (root.TryGetProperty("replicas", out var replicasEl) && replicasEl.TryGetInt32(out var replicas))
|
||||
config.Replicas = replicas;
|
||||
|
||||
return config;
|
||||
}
|
||||
catch (JsonException)
|
||||
{
|
||||
return new StreamConfig();
|
||||
}
|
||||
}
|
||||
}
|
||||
7
src/NATS.Server/JetStream/Api/JetStreamApiError.cs
Normal file
7
src/NATS.Server/JetStream/Api/JetStreamApiError.cs
Normal file
@@ -0,0 +1,7 @@
|
||||
namespace NATS.Server.JetStream.Api;
|
||||
|
||||
public sealed class JetStreamApiError
|
||||
{
|
||||
public int Code { get; init; }
|
||||
public string Description { get; init; } = string.Empty;
|
||||
}
|
||||
41
src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs
Normal file
41
src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs
Normal file
@@ -0,0 +1,41 @@
|
||||
using NATS.Server.JetStream.Models;
|
||||
|
||||
namespace NATS.Server.JetStream.Api;
|
||||
|
||||
public sealed class JetStreamApiResponse
|
||||
{
|
||||
public JetStreamApiError? Error { get; init; }
|
||||
public JetStreamStreamInfo? StreamInfo { get; init; }
|
||||
public JetStreamConsumerInfo? ConsumerInfo { get; init; }
|
||||
|
||||
public static JetStreamApiResponse NotFound(string subject) => new()
|
||||
{
|
||||
Error = new JetStreamApiError
|
||||
{
|
||||
Code = 404,
|
||||
Description = $"unknown api subject '{subject}'",
|
||||
},
|
||||
};
|
||||
|
||||
public static JetStreamApiResponse Ok() => new();
|
||||
|
||||
public static JetStreamApiResponse ErrorResponse(int code, string description) => new()
|
||||
{
|
||||
Error = new JetStreamApiError
|
||||
{
|
||||
Code = code,
|
||||
Description = description,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
public sealed class JetStreamStreamInfo
|
||||
{
|
||||
public required StreamConfig Config { get; init; }
|
||||
public required StreamState State { get; init; }
|
||||
}
|
||||
|
||||
public sealed class JetStreamConsumerInfo
|
||||
{
|
||||
public required ConsumerConfig Config { get; init; }
|
||||
}
|
||||
37
src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs
Normal file
37
src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs
Normal file
@@ -0,0 +1,37 @@
|
||||
using NATS.Server.JetStream.Api.Handlers;
|
||||
|
||||
namespace NATS.Server.JetStream.Api;
|
||||
|
||||
public sealed class JetStreamApiRouter
|
||||
{
|
||||
private readonly StreamManager _streamManager;
|
||||
private readonly ConsumerManager _consumerManager;
|
||||
|
||||
public JetStreamApiRouter()
|
||||
: this(new StreamManager(), new ConsumerManager())
|
||||
{
|
||||
}
|
||||
|
||||
public JetStreamApiRouter(StreamManager streamManager, ConsumerManager consumerManager)
|
||||
{
|
||||
_streamManager = streamManager;
|
||||
_consumerManager = consumerManager;
|
||||
}
|
||||
|
||||
public JetStreamApiResponse Route(string subject, ReadOnlySpan<byte> payload)
|
||||
{
|
||||
if (subject.StartsWith("$JS.API.STREAM.CREATE.", StringComparison.Ordinal))
|
||||
return StreamApiHandlers.HandleCreate(subject, payload, _streamManager);
|
||||
|
||||
if (subject.StartsWith("$JS.API.STREAM.INFO.", StringComparison.Ordinal))
|
||||
return StreamApiHandlers.HandleInfo(subject, _streamManager);
|
||||
|
||||
if (subject.StartsWith("$JS.API.CONSUMER.CREATE.", StringComparison.Ordinal))
|
||||
return ConsumerApiHandlers.HandleCreate(subject, payload, _consumerManager);
|
||||
|
||||
if (subject.StartsWith("$JS.API.CONSUMER.INFO.", StringComparison.Ordinal))
|
||||
return ConsumerApiHandlers.HandleInfo(subject, _consumerManager);
|
||||
|
||||
return JetStreamApiResponse.NotFound(subject);
|
||||
}
|
||||
}
|
||||
17
src/NATS.Server/JetStream/Cluster/AssetPlacementPlanner.cs
Normal file
17
src/NATS.Server/JetStream/Cluster/AssetPlacementPlanner.cs
Normal file
@@ -0,0 +1,17 @@
|
||||
namespace NATS.Server.JetStream.Cluster;
|
||||
|
||||
public sealed class AssetPlacementPlanner
|
||||
{
|
||||
private readonly int _nodes;
|
||||
|
||||
public AssetPlacementPlanner(int nodes)
|
||||
{
|
||||
_nodes = Math.Max(nodes, 1);
|
||||
}
|
||||
|
||||
public IReadOnlyList<int> PlanReplicas(int replicas)
|
||||
{
|
||||
var count = Math.Min(Math.Max(replicas, 1), _nodes);
|
||||
return Enumerable.Range(1, count).ToArray();
|
||||
}
|
||||
}
|
||||
36
src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs
Normal file
36
src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs
Normal file
@@ -0,0 +1,36 @@
|
||||
using System.Collections.Concurrent;
|
||||
using NATS.Server.JetStream.Models;
|
||||
|
||||
namespace NATS.Server.JetStream.Cluster;
|
||||
|
||||
public sealed class JetStreamMetaGroup
|
||||
{
|
||||
private readonly int _nodes;
|
||||
private readonly ConcurrentDictionary<string, byte> _streams = new(StringComparer.Ordinal);
|
||||
|
||||
public JetStreamMetaGroup(int nodes)
|
||||
{
|
||||
_nodes = nodes;
|
||||
}
|
||||
|
||||
public Task ProposeCreateStreamAsync(StreamConfig config, CancellationToken ct)
|
||||
{
|
||||
_streams[config.Name] = 0;
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public MetaGroupState GetState()
|
||||
{
|
||||
return new MetaGroupState
|
||||
{
|
||||
Streams = _streams.Keys.OrderBy(x => x, StringComparer.Ordinal).ToArray(),
|
||||
ClusterSize = _nodes,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
public sealed class MetaGroupState
|
||||
{
|
||||
public IReadOnlyList<string> Streams { get; init; } = [];
|
||||
public int ClusterSize { get; init; }
|
||||
}
|
||||
65
src/NATS.Server/JetStream/Cluster/StreamReplicaGroup.cs
Normal file
65
src/NATS.Server/JetStream/Cluster/StreamReplicaGroup.cs
Normal file
@@ -0,0 +1,65 @@
|
||||
using NATS.Server.Raft;
|
||||
|
||||
namespace NATS.Server.JetStream.Cluster;
|
||||
|
||||
public sealed class StreamReplicaGroup
|
||||
{
|
||||
private readonly List<RaftNode> _nodes;
|
||||
|
||||
public string StreamName { get; }
|
||||
public IReadOnlyList<RaftNode> Nodes => _nodes;
|
||||
public RaftNode Leader { get; private set; }
|
||||
|
||||
public StreamReplicaGroup(string streamName, int replicas)
|
||||
{
|
||||
StreamName = streamName;
|
||||
|
||||
var nodeCount = Math.Max(replicas, 1);
|
||||
_nodes = Enumerable.Range(1, nodeCount)
|
||||
.Select(i => new RaftNode($"{streamName.ToLowerInvariant()}-r{i}"))
|
||||
.ToList();
|
||||
|
||||
foreach (var node in _nodes)
|
||||
node.ConfigureCluster(_nodes);
|
||||
|
||||
Leader = ElectLeader(_nodes[0]);
|
||||
}
|
||||
|
||||
public async ValueTask<long> ProposeAsync(string command, CancellationToken ct)
|
||||
{
|
||||
if (!Leader.IsLeader)
|
||||
Leader = ElectLeader(SelectNextCandidate(Leader));
|
||||
|
||||
return await Leader.ProposeAsync(command, ct);
|
||||
}
|
||||
|
||||
public Task StepDownAsync(CancellationToken ct)
|
||||
{
|
||||
_ = ct;
|
||||
var previous = Leader;
|
||||
previous.RequestStepDown();
|
||||
Leader = ElectLeader(SelectNextCandidate(previous));
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
private RaftNode SelectNextCandidate(RaftNode currentLeader)
|
||||
{
|
||||
if (_nodes.Count == 1)
|
||||
return _nodes[0];
|
||||
|
||||
var index = _nodes.FindIndex(n => n.Id == currentLeader.Id);
|
||||
if (index < 0)
|
||||
return _nodes[0];
|
||||
|
||||
return _nodes[(index + 1) % _nodes.Count];
|
||||
}
|
||||
|
||||
private RaftNode ElectLeader(RaftNode candidate)
|
||||
{
|
||||
candidate.StartElection(_nodes.Count);
|
||||
foreach (var voter in _nodes.Where(n => n.Id != candidate.Id))
|
||||
candidate.ReceiveVote(voter.GrantVote(candidate.Term), _nodes.Count);
|
||||
|
||||
return candidate;
|
||||
}
|
||||
}
|
||||
97
src/NATS.Server/JetStream/ConsumerManager.cs
Normal file
97
src/NATS.Server/JetStream/ConsumerManager.cs
Normal file
@@ -0,0 +1,97 @@
|
||||
using System.Collections.Concurrent;
|
||||
using NATS.Server.JetStream.Api;
|
||||
using NATS.Server.JetStream.Cluster;
|
||||
using NATS.Server.JetStream.Consumers;
|
||||
using NATS.Server.JetStream.Models;
|
||||
using NATS.Server.JetStream.Storage;
|
||||
|
||||
namespace NATS.Server.JetStream;
|
||||
|
||||
public sealed class ConsumerManager
|
||||
{
|
||||
private readonly JetStreamMetaGroup? _metaGroup;
|
||||
private readonly ConcurrentDictionary<(string Stream, string Name), ConsumerHandle> _consumers = new();
|
||||
private readonly PullConsumerEngine _pullConsumerEngine = new();
|
||||
private readonly PushConsumerEngine _pushConsumerEngine = new();
|
||||
|
||||
public ConsumerManager(JetStreamMetaGroup? metaGroup = null)
|
||||
{
|
||||
_metaGroup = metaGroup;
|
||||
}
|
||||
|
||||
public int ConsumerCount => _consumers.Count;
|
||||
|
||||
public JetStreamApiResponse CreateOrUpdate(string stream, ConsumerConfig config)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(config.DurableName))
|
||||
return JetStreamApiResponse.ErrorResponse(400, "durable name required");
|
||||
|
||||
var key = (stream, config.DurableName);
|
||||
var handle = _consumers.AddOrUpdate(key,
|
||||
_ => new ConsumerHandle(stream, config),
|
||||
(_, existing) => existing with { Config = config });
|
||||
|
||||
return new JetStreamApiResponse
|
||||
{
|
||||
ConsumerInfo = new JetStreamConsumerInfo
|
||||
{
|
||||
Config = handle.Config,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
public JetStreamApiResponse GetInfo(string stream, string durableName)
|
||||
{
|
||||
if (_consumers.TryGetValue((stream, durableName), out var handle))
|
||||
{
|
||||
return new JetStreamApiResponse
|
||||
{
|
||||
ConsumerInfo = new JetStreamConsumerInfo
|
||||
{
|
||||
Config = handle.Config,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
return JetStreamApiResponse.NotFound($"$JS.API.CONSUMER.INFO.{stream}.{durableName}");
|
||||
}
|
||||
|
||||
public bool TryGet(string stream, string durableName, out ConsumerHandle handle)
|
||||
=> _consumers.TryGetValue((stream, durableName), out handle!);
|
||||
|
||||
public async ValueTask<PullFetchBatch> FetchAsync(string stream, string durableName, int batch, StreamManager streamManager, CancellationToken ct)
|
||||
{
|
||||
if (!_consumers.TryGetValue((stream, durableName), out var consumer))
|
||||
return new PullFetchBatch([]);
|
||||
|
||||
if (!streamManager.TryGet(stream, out var streamHandle))
|
||||
return new PullFetchBatch([]);
|
||||
|
||||
return await _pullConsumerEngine.FetchAsync(streamHandle, consumer, batch, ct);
|
||||
}
|
||||
|
||||
public void OnPublished(string stream, StoredMessage message)
|
||||
{
|
||||
foreach (var handle in _consumers.Values.Where(c => c.Stream == stream && c.Config.Push))
|
||||
_pushConsumerEngine.Enqueue(handle, message);
|
||||
}
|
||||
|
||||
public PushFrame? ReadPushFrame(string stream, string durableName)
|
||||
{
|
||||
if (!_consumers.TryGetValue((stream, durableName), out var consumer))
|
||||
return null;
|
||||
|
||||
if (consumer.PushFrames.Count == 0)
|
||||
return null;
|
||||
|
||||
return consumer.PushFrames.Dequeue();
|
||||
}
|
||||
}
|
||||
|
||||
public sealed record ConsumerHandle(string Stream, ConsumerConfig Config)
|
||||
{
|
||||
public ulong NextSequence { get; set; } = 1;
|
||||
public Queue<StoredMessage> Pending { get; } = new();
|
||||
public Queue<PushFrame> PushFrames { get; } = new();
|
||||
public AckProcessor AckProcessor { get; } = new();
|
||||
}
|
||||
24
src/NATS.Server/JetStream/Consumers/AckProcessor.cs
Normal file
24
src/NATS.Server/JetStream/Consumers/AckProcessor.cs
Normal file
@@ -0,0 +1,24 @@
|
||||
namespace NATS.Server.JetStream.Consumers;
|
||||
|
||||
public sealed class AckProcessor
|
||||
{
|
||||
private readonly Dictionary<ulong, DateTime> _pending = new();
|
||||
|
||||
public void Register(ulong sequence, int ackWaitMs)
|
||||
{
|
||||
_pending[sequence] = DateTime.UtcNow.AddMilliseconds(Math.Max(ackWaitMs, 1));
|
||||
}
|
||||
|
||||
public ulong? NextExpired()
|
||||
{
|
||||
foreach (var (seq, deadline) in _pending)
|
||||
{
|
||||
if (DateTime.UtcNow >= deadline)
|
||||
return seq;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
public bool HasPending => _pending.Count > 0;
|
||||
}
|
||||
63
src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs
Normal file
63
src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs
Normal file
@@ -0,0 +1,63 @@
|
||||
using NATS.Server.JetStream.Storage;
|
||||
using NATS.Server.JetStream.Models;
|
||||
|
||||
namespace NATS.Server.JetStream.Consumers;
|
||||
|
||||
public sealed class PullConsumerEngine
|
||||
{
|
||||
public async ValueTask<PullFetchBatch> FetchAsync(StreamHandle stream, ConsumerHandle consumer, int batch, CancellationToken ct)
|
||||
{
|
||||
var messages = new List<StoredMessage>(batch);
|
||||
|
||||
if (consumer.Config.AckPolicy == AckPolicy.Explicit)
|
||||
{
|
||||
var expired = consumer.AckProcessor.NextExpired();
|
||||
if (expired is { } expiredSequence)
|
||||
{
|
||||
var redelivery = await stream.Store.LoadAsync(expiredSequence, ct);
|
||||
if (redelivery != null)
|
||||
{
|
||||
messages.Add(new StoredMessage
|
||||
{
|
||||
Sequence = redelivery.Sequence,
|
||||
Subject = redelivery.Subject,
|
||||
Payload = redelivery.Payload,
|
||||
Redelivered = true,
|
||||
});
|
||||
}
|
||||
|
||||
return new PullFetchBatch(messages);
|
||||
}
|
||||
|
||||
if (consumer.AckProcessor.HasPending)
|
||||
return new PullFetchBatch(messages);
|
||||
}
|
||||
|
||||
var sequence = consumer.NextSequence;
|
||||
|
||||
for (var i = 0; i < batch; i++)
|
||||
{
|
||||
var message = await stream.Store.LoadAsync(sequence, ct);
|
||||
if (message == null)
|
||||
break;
|
||||
|
||||
messages.Add(message);
|
||||
if (consumer.Config.AckPolicy == AckPolicy.Explicit)
|
||||
consumer.AckProcessor.Register(message.Sequence, consumer.Config.AckWaitMs);
|
||||
sequence++;
|
||||
}
|
||||
|
||||
consumer.NextSequence = sequence;
|
||||
return new PullFetchBatch(messages);
|
||||
}
|
||||
}
|
||||
|
||||
public sealed class PullFetchBatch
|
||||
{
|
||||
public IReadOnlyList<StoredMessage> Messages { get; }
|
||||
|
||||
public PullFetchBatch(IReadOnlyList<StoredMessage> messages)
|
||||
{
|
||||
Messages = messages;
|
||||
}
|
||||
}
|
||||
34
src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs
Normal file
34
src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs
Normal file
@@ -0,0 +1,34 @@
|
||||
using NATS.Server.JetStream.Models;
|
||||
using NATS.Server.JetStream.Storage;
|
||||
|
||||
namespace NATS.Server.JetStream.Consumers;
|
||||
|
||||
public sealed class PushConsumerEngine
|
||||
{
|
||||
public void Enqueue(ConsumerHandle consumer, StoredMessage message)
|
||||
{
|
||||
consumer.PushFrames.Enqueue(new PushFrame
|
||||
{
|
||||
IsData = true,
|
||||
Message = message,
|
||||
});
|
||||
|
||||
if (consumer.Config.AckPolicy == AckPolicy.Explicit)
|
||||
consumer.AckProcessor.Register(message.Sequence, consumer.Config.AckWaitMs);
|
||||
|
||||
if (consumer.Config.HeartbeatMs > 0)
|
||||
{
|
||||
consumer.PushFrames.Enqueue(new PushFrame
|
||||
{
|
||||
IsHeartbeat = true,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public sealed class PushFrame
|
||||
{
|
||||
public bool IsData { get; init; }
|
||||
public bool IsHeartbeat { get; init; }
|
||||
public StoredMessage? Message { get; init; }
|
||||
}
|
||||
26
src/NATS.Server/JetStream/JetStreamService.cs
Normal file
26
src/NATS.Server/JetStream/JetStreamService.cs
Normal file
@@ -0,0 +1,26 @@
|
||||
using NATS.Server.Configuration;
|
||||
|
||||
namespace NATS.Server.JetStream;
|
||||
|
||||
public sealed class JetStreamService : IAsyncDisposable
|
||||
{
|
||||
private readonly JetStreamOptions _options;
|
||||
public bool IsRunning { get; private set; }
|
||||
|
||||
public JetStreamService(JetStreamOptions options)
|
||||
{
|
||||
_options = options;
|
||||
}
|
||||
|
||||
public Task StartAsync(CancellationToken ct)
|
||||
{
|
||||
IsRunning = true;
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public ValueTask DisposeAsync()
|
||||
{
|
||||
IsRunning = false;
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
}
|
||||
16
src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs
Normal file
16
src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs
Normal file
@@ -0,0 +1,16 @@
|
||||
using NATS.Server.JetStream.Storage;
|
||||
|
||||
namespace NATS.Server.JetStream.MirrorSource;
|
||||
|
||||
public sealed class MirrorCoordinator
|
||||
{
|
||||
private readonly IStreamStore _targetStore;
|
||||
|
||||
public MirrorCoordinator(IStreamStore targetStore)
|
||||
{
|
||||
_targetStore = targetStore;
|
||||
}
|
||||
|
||||
public Task OnOriginAppendAsync(StoredMessage message, CancellationToken ct)
|
||||
=> _targetStore.AppendAsync(message.Subject, message.Payload, ct).AsTask();
|
||||
}
|
||||
16
src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs
Normal file
16
src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs
Normal file
@@ -0,0 +1,16 @@
|
||||
using NATS.Server.JetStream.Storage;
|
||||
|
||||
namespace NATS.Server.JetStream.MirrorSource;
|
||||
|
||||
public sealed class SourceCoordinator
|
||||
{
|
||||
private readonly IStreamStore _targetStore;
|
||||
|
||||
public SourceCoordinator(IStreamStore targetStore)
|
||||
{
|
||||
_targetStore = targetStore;
|
||||
}
|
||||
|
||||
public Task OnOriginAppendAsync(StoredMessage message, CancellationToken ct)
|
||||
=> _targetStore.AppendAsync(message.Subject, message.Payload, ct).AsTask();
|
||||
}
|
||||
18
src/NATS.Server/JetStream/Models/ConsumerConfig.cs
Normal file
18
src/NATS.Server/JetStream/Models/ConsumerConfig.cs
Normal file
@@ -0,0 +1,18 @@
|
||||
namespace NATS.Server.JetStream.Models;
|
||||
|
||||
public sealed class ConsumerConfig
|
||||
{
|
||||
public string DurableName { get; set; } = string.Empty;
|
||||
public string? FilterSubject { get; set; }
|
||||
public AckPolicy AckPolicy { get; set; } = AckPolicy.None;
|
||||
public int AckWaitMs { get; set; } = 30_000;
|
||||
public int MaxDeliver { get; set; } = 1;
|
||||
public bool Push { get; set; }
|
||||
public int HeartbeatMs { get; set; }
|
||||
}
|
||||
|
||||
public enum AckPolicy
|
||||
{
|
||||
None,
|
||||
Explicit,
|
||||
}
|
||||
11
src/NATS.Server/JetStream/Models/StreamConfig.cs
Normal file
11
src/NATS.Server/JetStream/Models/StreamConfig.cs
Normal file
@@ -0,0 +1,11 @@
|
||||
namespace NATS.Server.JetStream.Models;
|
||||
|
||||
public sealed class StreamConfig
|
||||
{
|
||||
public string Name { get; set; } = string.Empty;
|
||||
public List<string> Subjects { get; set; } = [];
|
||||
public int MaxMsgs { get; set; }
|
||||
public int Replicas { get; set; } = 1;
|
||||
public string? Mirror { get; set; }
|
||||
public string? Source { get; set; }
|
||||
}
|
||||
8
src/NATS.Server/JetStream/Models/StreamState.cs
Normal file
8
src/NATS.Server/JetStream/Models/StreamState.cs
Normal file
@@ -0,0 +1,8 @@
|
||||
namespace NATS.Server.JetStream.Models;
|
||||
|
||||
public sealed class StreamState
|
||||
{
|
||||
public ulong Messages { get; set; }
|
||||
public ulong FirstSeq { get; set; }
|
||||
public ulong LastSeq { get; set; }
|
||||
}
|
||||
39
src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs
Normal file
39
src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs
Normal file
@@ -0,0 +1,39 @@
|
||||
namespace NATS.Server.JetStream.Publish;
|
||||
|
||||
public sealed class JetStreamPublisher
|
||||
{
|
||||
private readonly StreamManager _streamManager;
|
||||
private readonly PublishPreconditions _preconditions = new();
|
||||
|
||||
public JetStreamPublisher(StreamManager streamManager)
|
||||
{
|
||||
_streamManager = streamManager;
|
||||
}
|
||||
|
||||
public bool TryCapture(string subject, ReadOnlyMemory<byte> payload, out PubAck ack)
|
||||
=> TryCapture(subject, payload, null, out ack);
|
||||
|
||||
public bool TryCapture(string subject, ReadOnlyMemory<byte> payload, string? msgId, out PubAck ack)
|
||||
{
|
||||
if (_preconditions.IsDuplicate(msgId, out var existingSequence))
|
||||
{
|
||||
ack = new PubAck
|
||||
{
|
||||
Seq = existingSequence,
|
||||
ErrorCode = 10071,
|
||||
};
|
||||
return true;
|
||||
}
|
||||
|
||||
var captured = _streamManager.Capture(subject, payload);
|
||||
if (captured == null)
|
||||
{
|
||||
ack = new PubAck();
|
||||
return false;
|
||||
}
|
||||
|
||||
ack = captured;
|
||||
_preconditions.Record(msgId, ack.Seq);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
8
src/NATS.Server/JetStream/Publish/PubAck.cs
Normal file
8
src/NATS.Server/JetStream/Publish/PubAck.cs
Normal file
@@ -0,0 +1,8 @@
|
||||
namespace NATS.Server.JetStream.Publish;
|
||||
|
||||
public sealed class PubAck
|
||||
{
|
||||
public string Stream { get; init; } = string.Empty;
|
||||
public ulong Seq { get; init; }
|
||||
public int? ErrorCode { get; init; }
|
||||
}
|
||||
25
src/NATS.Server/JetStream/Publish/PublishPreconditions.cs
Normal file
25
src/NATS.Server/JetStream/Publish/PublishPreconditions.cs
Normal file
@@ -0,0 +1,25 @@
|
||||
using System.Collections.Concurrent;
|
||||
|
||||
namespace NATS.Server.JetStream.Publish;
|
||||
|
||||
public sealed class PublishPreconditions
|
||||
{
|
||||
private readonly ConcurrentDictionary<string, ulong> _dedupe = new(StringComparer.Ordinal);
|
||||
|
||||
public bool IsDuplicate(string? msgId, out ulong existingSequence)
|
||||
{
|
||||
existingSequence = 0;
|
||||
if (string.IsNullOrEmpty(msgId))
|
||||
return false;
|
||||
|
||||
return _dedupe.TryGetValue(msgId, out existingSequence);
|
||||
}
|
||||
|
||||
public void Record(string? msgId, ulong sequence)
|
||||
{
|
||||
if (string.IsNullOrEmpty(msgId))
|
||||
return;
|
||||
|
||||
_dedupe[msgId] = sequence;
|
||||
}
|
||||
}
|
||||
127
src/NATS.Server/JetStream/Storage/FileStore.cs
Normal file
127
src/NATS.Server/JetStream/Storage/FileStore.cs
Normal file
@@ -0,0 +1,127 @@
|
||||
using System.Text.Json;
|
||||
using NATS.Server.JetStream.Models;
|
||||
|
||||
namespace NATS.Server.JetStream.Storage;
|
||||
|
||||
public sealed class FileStore : IStreamStore, IAsyncDisposable
|
||||
{
|
||||
private readonly string _dataFilePath;
|
||||
private readonly Dictionary<ulong, StoredMessage> _messages = new();
|
||||
private ulong _last;
|
||||
|
||||
public FileStore(FileStoreOptions options)
|
||||
{
|
||||
Directory.CreateDirectory(options.Directory);
|
||||
_dataFilePath = Path.Combine(options.Directory, "messages.jsonl");
|
||||
LoadExisting();
|
||||
}
|
||||
|
||||
public async ValueTask<ulong> AppendAsync(string subject, ReadOnlyMemory<byte> payload, CancellationToken ct)
|
||||
{
|
||||
_last++;
|
||||
var stored = new StoredMessage
|
||||
{
|
||||
Sequence = _last,
|
||||
Subject = subject,
|
||||
Payload = payload.ToArray(),
|
||||
};
|
||||
_messages[_last] = stored;
|
||||
|
||||
var line = JsonSerializer.Serialize(new FileRecord
|
||||
{
|
||||
Sequence = stored.Sequence,
|
||||
Subject = stored.Subject,
|
||||
PayloadBase64 = Convert.ToBase64String(stored.Payload.ToArray()),
|
||||
});
|
||||
await File.AppendAllTextAsync(_dataFilePath, line + Environment.NewLine, ct);
|
||||
return _last;
|
||||
}
|
||||
|
||||
public ValueTask<StoredMessage?> LoadAsync(ulong sequence, CancellationToken ct)
|
||||
{
|
||||
_messages.TryGetValue(sequence, out var msg);
|
||||
return ValueTask.FromResult(msg);
|
||||
}
|
||||
|
||||
public ValueTask PurgeAsync(CancellationToken ct)
|
||||
{
|
||||
_messages.Clear();
|
||||
_last = 0;
|
||||
if (File.Exists(_dataFilePath))
|
||||
File.Delete(_dataFilePath);
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
|
||||
public ValueTask<StreamState> GetStateAsync(CancellationToken ct)
|
||||
{
|
||||
return ValueTask.FromResult(new StreamState
|
||||
{
|
||||
Messages = (ulong)_messages.Count,
|
||||
FirstSeq = _messages.Count == 0 ? 0UL : _messages.Keys.Min(),
|
||||
LastSeq = _last,
|
||||
});
|
||||
}
|
||||
|
||||
public void TrimToMaxMessages(ulong maxMessages)
|
||||
{
|
||||
while ((ulong)_messages.Count > maxMessages)
|
||||
{
|
||||
var first = _messages.Keys.Min();
|
||||
_messages.Remove(first);
|
||||
}
|
||||
|
||||
RewriteDataFile();
|
||||
}
|
||||
|
||||
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
||||
|
||||
private void LoadExisting()
|
||||
{
|
||||
if (!File.Exists(_dataFilePath))
|
||||
return;
|
||||
|
||||
foreach (var line in File.ReadLines(_dataFilePath))
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(line))
|
||||
continue;
|
||||
|
||||
var record = JsonSerializer.Deserialize<FileRecord>(line);
|
||||
if (record == null)
|
||||
continue;
|
||||
|
||||
var message = new StoredMessage
|
||||
{
|
||||
Sequence = record.Sequence,
|
||||
Subject = record.Subject ?? string.Empty,
|
||||
Payload = Convert.FromBase64String(record.PayloadBase64 ?? string.Empty),
|
||||
};
|
||||
|
||||
_messages[message.Sequence] = message;
|
||||
if (message.Sequence > _last)
|
||||
_last = message.Sequence;
|
||||
}
|
||||
}
|
||||
|
||||
private void RewriteDataFile()
|
||||
{
|
||||
var lines = new List<string>(_messages.Count);
|
||||
foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value))
|
||||
{
|
||||
lines.Add(JsonSerializer.Serialize(new FileRecord
|
||||
{
|
||||
Sequence = message.Sequence,
|
||||
Subject = message.Subject,
|
||||
PayloadBase64 = Convert.ToBase64String(message.Payload.ToArray()),
|
||||
}));
|
||||
}
|
||||
|
||||
File.WriteAllLines(_dataFilePath, lines);
|
||||
}
|
||||
|
||||
private sealed class FileRecord
|
||||
{
|
||||
public ulong Sequence { get; init; }
|
||||
public string? Subject { get; init; }
|
||||
public string? PayloadBase64 { get; init; }
|
||||
}
|
||||
}
|
||||
6
src/NATS.Server/JetStream/Storage/FileStoreBlock.cs
Normal file
6
src/NATS.Server/JetStream/Storage/FileStoreBlock.cs
Normal file
@@ -0,0 +1,6 @@
|
||||
namespace NATS.Server.JetStream.Storage;
|
||||
|
||||
public sealed class FileStoreBlock
|
||||
{
|
||||
public required string Path { get; init; }
|
||||
}
|
||||
6
src/NATS.Server/JetStream/Storage/FileStoreOptions.cs
Normal file
6
src/NATS.Server/JetStream/Storage/FileStoreOptions.cs
Normal file
@@ -0,0 +1,6 @@
|
||||
namespace NATS.Server.JetStream.Storage;
|
||||
|
||||
public sealed class FileStoreOptions
|
||||
{
|
||||
public string Directory { get; set; } = string.Empty;
|
||||
}
|
||||
11
src/NATS.Server/JetStream/Storage/IStreamStore.cs
Normal file
11
src/NATS.Server/JetStream/Storage/IStreamStore.cs
Normal file
@@ -0,0 +1,11 @@
|
||||
using NATS.Server.JetStream.Models;
|
||||
|
||||
namespace NATS.Server.JetStream.Storage;
|
||||
|
||||
public interface IStreamStore
|
||||
{
|
||||
ValueTask<ulong> AppendAsync(string subject, ReadOnlyMemory<byte> payload, CancellationToken ct);
|
||||
ValueTask<StoredMessage?> LoadAsync(ulong sequence, CancellationToken ct);
|
||||
ValueTask PurgeAsync(CancellationToken ct);
|
||||
ValueTask<StreamState> GetStateAsync(CancellationToken ct);
|
||||
}
|
||||
69
src/NATS.Server/JetStream/Storage/MemStore.cs
Normal file
69
src/NATS.Server/JetStream/Storage/MemStore.cs
Normal file
@@ -0,0 +1,69 @@
|
||||
using NATS.Server.JetStream.Models;
|
||||
|
||||
namespace NATS.Server.JetStream.Storage;
|
||||
|
||||
public sealed class MemStore : IStreamStore
|
||||
{
|
||||
private readonly object _gate = new();
|
||||
private ulong _last;
|
||||
private readonly Dictionary<ulong, StoredMessage> _messages = new();
|
||||
|
||||
public ValueTask<ulong> AppendAsync(string subject, ReadOnlyMemory<byte> payload, CancellationToken ct)
|
||||
{
|
||||
lock (_gate)
|
||||
{
|
||||
_last++;
|
||||
_messages[_last] = new StoredMessage
|
||||
{
|
||||
Sequence = _last,
|
||||
Subject = subject,
|
||||
Payload = payload,
|
||||
};
|
||||
return ValueTask.FromResult(_last);
|
||||
}
|
||||
}
|
||||
|
||||
public ValueTask<StoredMessage?> LoadAsync(ulong sequence, CancellationToken ct)
|
||||
{
|
||||
lock (_gate)
|
||||
{
|
||||
_messages.TryGetValue(sequence, out var msg);
|
||||
return ValueTask.FromResult(msg);
|
||||
}
|
||||
}
|
||||
|
||||
public ValueTask PurgeAsync(CancellationToken ct)
|
||||
{
|
||||
lock (_gate)
|
||||
{
|
||||
_messages.Clear();
|
||||
_last = 0;
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
}
|
||||
|
||||
public ValueTask<StreamState> GetStateAsync(CancellationToken ct)
|
||||
{
|
||||
lock (_gate)
|
||||
{
|
||||
return ValueTask.FromResult(new StreamState
|
||||
{
|
||||
Messages = (ulong)_messages.Count,
|
||||
FirstSeq = _messages.Count == 0 ? 0UL : _messages.Keys.Min(),
|
||||
LastSeq = _last,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public void TrimToMaxMessages(ulong maxMessages)
|
||||
{
|
||||
lock (_gate)
|
||||
{
|
||||
while ((ulong)_messages.Count > maxMessages)
|
||||
{
|
||||
var first = _messages.Keys.Min();
|
||||
_messages.Remove(first);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
9
src/NATS.Server/JetStream/Storage/StoredMessage.cs
Normal file
9
src/NATS.Server/JetStream/Storage/StoredMessage.cs
Normal file
@@ -0,0 +1,9 @@
|
||||
namespace NATS.Server.JetStream.Storage;
|
||||
|
||||
public sealed class StoredMessage
|
||||
{
|
||||
public ulong Sequence { get; init; }
|
||||
public string Subject { get; init; } = string.Empty;
|
||||
public ReadOnlyMemory<byte> Payload { get; init; }
|
||||
public bool Redelivered { get; init; }
|
||||
}
|
||||
202
src/NATS.Server/JetStream/StreamManager.cs
Normal file
202
src/NATS.Server/JetStream/StreamManager.cs
Normal file
@@ -0,0 +1,202 @@
|
||||
using System.Collections.Concurrent;
|
||||
using NATS.Server.Auth;
|
||||
using NATS.Server.JetStream.Api;
|
||||
using NATS.Server.JetStream.Cluster;
|
||||
using NATS.Server.JetStream.MirrorSource;
|
||||
using NATS.Server.JetStream.Models;
|
||||
using NATS.Server.JetStream.Publish;
|
||||
using NATS.Server.JetStream.Storage;
|
||||
using NATS.Server.Subscriptions;
|
||||
|
||||
namespace NATS.Server.JetStream;
|
||||
|
||||
public sealed class StreamManager
|
||||
{
|
||||
private readonly Account? _account;
|
||||
private readonly JetStreamMetaGroup? _metaGroup;
|
||||
private readonly ConcurrentDictionary<string, StreamHandle> _streams =
|
||||
new(StringComparer.Ordinal);
|
||||
private readonly ConcurrentDictionary<string, StreamReplicaGroup> _replicaGroups =
|
||||
new(StringComparer.Ordinal);
|
||||
private readonly ConcurrentDictionary<string, List<MirrorCoordinator>> _mirrorsByOrigin =
|
||||
new(StringComparer.Ordinal);
|
||||
private readonly ConcurrentDictionary<string, List<SourceCoordinator>> _sourcesByOrigin =
|
||||
new(StringComparer.Ordinal);
|
||||
|
||||
public StreamManager(JetStreamMetaGroup? metaGroup = null, Account? account = null)
|
||||
{
|
||||
_metaGroup = metaGroup;
|
||||
_account = account;
|
||||
}
|
||||
|
||||
public IReadOnlyCollection<string> StreamNames => _streams.Keys.ToArray();
|
||||
|
||||
public JetStreamApiResponse CreateOrUpdate(StreamConfig config)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(config.Name))
|
||||
return JetStreamApiResponse.ErrorResponse(400, "stream name required");
|
||||
|
||||
var normalized = NormalizeConfig(config);
|
||||
var isCreate = !_streams.ContainsKey(normalized.Name);
|
||||
if (isCreate && _account is not null && !_account.TryReserveStream())
|
||||
return JetStreamApiResponse.ErrorResponse(10027, "maximum streams exceeded");
|
||||
|
||||
var handle = _streams.AddOrUpdate(
|
||||
normalized.Name,
|
||||
_ => new StreamHandle(normalized, new MemStore()),
|
||||
(_, existing) => existing with { Config = normalized });
|
||||
_replicaGroups.AddOrUpdate(
|
||||
normalized.Name,
|
||||
_ => new StreamReplicaGroup(normalized.Name, normalized.Replicas),
|
||||
(_, existing) => existing.Nodes.Count == Math.Max(normalized.Replicas, 1)
|
||||
? existing
|
||||
: new StreamReplicaGroup(normalized.Name, normalized.Replicas));
|
||||
RebuildReplicationCoordinators();
|
||||
_metaGroup?.ProposeCreateStreamAsync(normalized, default).GetAwaiter().GetResult();
|
||||
|
||||
return BuildStreamInfoResponse(handle);
|
||||
}
|
||||
|
||||
public JetStreamApiResponse GetInfo(string name)
|
||||
{
|
||||
if (_streams.TryGetValue(name, out var stream))
|
||||
return BuildStreamInfoResponse(stream);
|
||||
|
||||
return JetStreamApiResponse.NotFound($"$JS.API.STREAM.INFO.{name}");
|
||||
}
|
||||
|
||||
public bool TryGet(string name, out StreamHandle handle) => _streams.TryGetValue(name, out handle!);
|
||||
|
||||
public ValueTask<StreamState> GetStateAsync(string name, CancellationToken ct)
|
||||
{
|
||||
if (_streams.TryGetValue(name, out var stream))
|
||||
return stream.Store.GetStateAsync(ct);
|
||||
|
||||
return ValueTask.FromResult(new StreamState());
|
||||
}
|
||||
|
||||
public StreamHandle? FindBySubject(string subject)
|
||||
{
|
||||
foreach (var stream in _streams.Values)
|
||||
{
|
||||
if (stream.Config.Subjects.Any(p => SubjectMatch.MatchLiteral(subject, p)))
|
||||
return stream;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
public PubAck? Capture(string subject, ReadOnlyMemory<byte> payload)
|
||||
{
|
||||
var stream = FindBySubject(subject);
|
||||
if (stream == null)
|
||||
return null;
|
||||
|
||||
if (_replicaGroups.TryGetValue(stream.Config.Name, out var replicaGroup))
|
||||
_ = replicaGroup.ProposeAsync($"PUB {subject}", default).GetAwaiter().GetResult();
|
||||
|
||||
var seq = stream.Store.AppendAsync(subject, payload, default).GetAwaiter().GetResult();
|
||||
EnforceLimits(stream);
|
||||
var stored = stream.Store.LoadAsync(seq, default).GetAwaiter().GetResult();
|
||||
if (stored != null)
|
||||
ReplicateIfConfigured(stream.Config.Name, stored);
|
||||
|
||||
return new PubAck
|
||||
{
|
||||
Stream = stream.Config.Name,
|
||||
Seq = seq,
|
||||
};
|
||||
}
|
||||
|
||||
public Task StepDownStreamLeaderAsync(string stream, CancellationToken ct)
|
||||
{
|
||||
if (_replicaGroups.TryGetValue(stream, out var replicaGroup))
|
||||
return replicaGroup.StepDownAsync(ct);
|
||||
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
private static StreamConfig NormalizeConfig(StreamConfig config)
|
||||
{
|
||||
var copy = new StreamConfig
|
||||
{
|
||||
Name = config.Name,
|
||||
Subjects = config.Subjects.Count == 0 ? [] : [.. config.Subjects],
|
||||
MaxMsgs = config.MaxMsgs,
|
||||
Replicas = config.Replicas,
|
||||
Mirror = config.Mirror,
|
||||
Source = config.Source,
|
||||
};
|
||||
|
||||
return copy;
|
||||
}
|
||||
|
||||
private static JetStreamApiResponse BuildStreamInfoResponse(StreamHandle handle)
|
||||
{
|
||||
var state = handle.Store.GetStateAsync(default).GetAwaiter().GetResult();
|
||||
return new JetStreamApiResponse
|
||||
{
|
||||
StreamInfo = new JetStreamStreamInfo
|
||||
{
|
||||
Config = handle.Config,
|
||||
State = state,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
private static void EnforceLimits(StreamHandle stream)
|
||||
{
|
||||
if (stream.Config.MaxMsgs <= 0)
|
||||
return;
|
||||
|
||||
var maxMessages = (ulong)stream.Config.MaxMsgs;
|
||||
if (stream.Store is MemStore memStore)
|
||||
{
|
||||
memStore.TrimToMaxMessages(maxMessages);
|
||||
return;
|
||||
}
|
||||
|
||||
if (stream.Store is FileStore fileStore)
|
||||
fileStore.TrimToMaxMessages(maxMessages);
|
||||
}
|
||||
|
||||
private void RebuildReplicationCoordinators()
|
||||
{
|
||||
_mirrorsByOrigin.Clear();
|
||||
_sourcesByOrigin.Clear();
|
||||
|
||||
foreach (var stream in _streams.Values)
|
||||
{
|
||||
if (!string.IsNullOrWhiteSpace(stream.Config.Mirror)
|
||||
&& _streams.TryGetValue(stream.Config.Mirror, out _))
|
||||
{
|
||||
var list = _mirrorsByOrigin.GetOrAdd(stream.Config.Mirror, _ => []);
|
||||
list.Add(new MirrorCoordinator(stream.Store));
|
||||
}
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(stream.Config.Source)
|
||||
&& _streams.TryGetValue(stream.Config.Source, out _))
|
||||
{
|
||||
var list = _sourcesByOrigin.GetOrAdd(stream.Config.Source, _ => []);
|
||||
list.Add(new SourceCoordinator(stream.Store));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void ReplicateIfConfigured(string originStream, StoredMessage stored)
|
||||
{
|
||||
if (_mirrorsByOrigin.TryGetValue(originStream, out var mirrors))
|
||||
{
|
||||
foreach (var mirror in mirrors)
|
||||
mirror.OnOriginAppendAsync(stored, default).GetAwaiter().GetResult();
|
||||
}
|
||||
|
||||
if (_sourcesByOrigin.TryGetValue(originStream, out var sources))
|
||||
{
|
||||
foreach (var source in sources)
|
||||
source.OnOriginAppendAsync(stored, default).GetAwaiter().GetResult();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public sealed record StreamHandle(StreamConfig Config, IStreamStore Store);
|
||||
@@ -0,0 +1,26 @@
|
||||
using NATS.Server.JetStream.Models;
|
||||
|
||||
namespace NATS.Server.JetStream.Validation;
|
||||
|
||||
public static class JetStreamConfigValidator
|
||||
{
|
||||
public static ValidationResult Validate(StreamConfig config)
|
||||
=> string.IsNullOrWhiteSpace(config.Name) || config.Subjects.Count == 0
|
||||
? ValidationResult.Invalid("name/subjects required")
|
||||
: ValidationResult.Valid();
|
||||
}
|
||||
|
||||
public sealed class ValidationResult
|
||||
{
|
||||
public bool IsValid { get; }
|
||||
public string Message { get; }
|
||||
|
||||
private ValidationResult(bool isValid, string message)
|
||||
{
|
||||
IsValid = isValid;
|
||||
Message = message;
|
||||
}
|
||||
|
||||
public static ValidationResult Valid() => new(true, string.Empty);
|
||||
public static ValidationResult Invalid(string message) => new(false, message);
|
||||
}
|
||||
11
src/NATS.Server/LeafNodes/LeafConnection.cs
Normal file
11
src/NATS.Server/LeafNodes/LeafConnection.cs
Normal file
@@ -0,0 +1,11 @@
|
||||
namespace NATS.Server.LeafNodes;
|
||||
|
||||
public sealed class LeafConnection
|
||||
{
|
||||
public string RemoteEndpoint { get; }
|
||||
|
||||
public LeafConnection(string remoteEndpoint)
|
||||
{
|
||||
RemoteEndpoint = remoteEndpoint;
|
||||
}
|
||||
}
|
||||
31
src/NATS.Server/LeafNodes/LeafNodeManager.cs
Normal file
31
src/NATS.Server/LeafNodes/LeafNodeManager.cs
Normal file
@@ -0,0 +1,31 @@
|
||||
using Microsoft.Extensions.Logging;
|
||||
using NATS.Server.Configuration;
|
||||
|
||||
namespace NATS.Server.LeafNodes;
|
||||
|
||||
public sealed class LeafNodeManager : IAsyncDisposable
|
||||
{
|
||||
private readonly LeafNodeOptions _options;
|
||||
private readonly ServerStats _stats;
|
||||
private readonly ILogger<LeafNodeManager> _logger;
|
||||
|
||||
public LeafNodeManager(LeafNodeOptions options, ServerStats stats, ILogger<LeafNodeManager> logger)
|
||||
{
|
||||
_options = options;
|
||||
_stats = stats;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
public Task StartAsync(CancellationToken ct)
|
||||
{
|
||||
_logger.LogDebug("Leaf manager started (listen={Host}:{Port})", _options.Host, _options.Port);
|
||||
Interlocked.Exchange(ref _stats.Leafs, 0);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public ValueTask DisposeAsync()
|
||||
{
|
||||
_logger.LogDebug("Leaf manager stopped");
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
}
|
||||
@@ -22,4 +22,5 @@ public sealed record ClosedClient
|
||||
public TimeSpan Rtt { get; init; }
|
||||
public string TlsVersion { get; init; } = "";
|
||||
public string TlsCipherSuite { get; init; } = "";
|
||||
public string MqttClient { get; init; } = "";
|
||||
}
|
||||
|
||||
@@ -204,6 +204,8 @@ public sealed class ConnzOptions
|
||||
|
||||
public string FilterSubject { get; set; } = "";
|
||||
|
||||
public string MqttClient { get; set; } = "";
|
||||
|
||||
public int Offset { get; set; }
|
||||
|
||||
public int Limit { get; set; } = 1024;
|
||||
|
||||
@@ -28,6 +28,10 @@ public sealed class ConnzHandler(NatsServer server)
|
||||
connInfos.AddRange(server.GetClosedClients().Select(c => BuildClosedConnInfo(c, now, opts)));
|
||||
}
|
||||
|
||||
// Filter by MQTT client ID
|
||||
if (!string.IsNullOrEmpty(opts.MqttClient))
|
||||
connInfos = connInfos.Where(c => c.MqttClient == opts.MqttClient).ToList();
|
||||
|
||||
// Validate sort options that require closed state
|
||||
if (opts.Sort is SortOpt.ByStop or SortOpt.ByReason && opts.State == ConnState.Open)
|
||||
opts.Sort = SortOpt.ByCid; // Fallback
|
||||
@@ -142,6 +146,7 @@ public sealed class ConnzHandler(NatsServer server)
|
||||
Rtt = FormatRtt(closed.Rtt),
|
||||
TlsVersion = closed.TlsVersion,
|
||||
TlsCipherSuite = closed.TlsCipherSuite,
|
||||
MqttClient = closed.MqttClient,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -197,6 +202,9 @@ public sealed class ConnzHandler(NatsServer server)
|
||||
if (q.TryGetValue("limit", out var limit) && int.TryParse(limit, out var l))
|
||||
opts.Limit = l;
|
||||
|
||||
if (q.TryGetValue("mqtt_client", out var mqttClient))
|
||||
opts.MqttClient = mqttClient.ToString();
|
||||
|
||||
return opts;
|
||||
}
|
||||
|
||||
|
||||
62
src/NATS.Server/Monitoring/JszHandler.cs
Normal file
62
src/NATS.Server/Monitoring/JszHandler.cs
Normal file
@@ -0,0 +1,62 @@
|
||||
using System.Text.Json.Serialization;
|
||||
|
||||
namespace NATS.Server.Monitoring;
|
||||
|
||||
public sealed class JszHandler
|
||||
{
|
||||
private readonly NatsServer _server;
|
||||
private readonly NatsOptions _options;
|
||||
|
||||
public JszHandler(NatsServer server, NatsOptions options)
|
||||
{
|
||||
_server = server;
|
||||
_options = options;
|
||||
}
|
||||
|
||||
public JszResponse Build()
|
||||
{
|
||||
return new JszResponse
|
||||
{
|
||||
ServerId = _server.ServerId,
|
||||
Now = DateTime.UtcNow,
|
||||
Enabled = _server.Stats.JetStreamEnabled,
|
||||
Memory = 0,
|
||||
Storage = 0,
|
||||
Streams = _server.JetStreamStreams,
|
||||
Consumers = _server.JetStreamConsumers,
|
||||
Config = new JetStreamConfig
|
||||
{
|
||||
MaxMemory = _options.JetStream?.MaxMemoryStore ?? 0,
|
||||
MaxStorage = _options.JetStream?.MaxFileStore ?? 0,
|
||||
StoreDir = _options.JetStream?.StoreDir ?? string.Empty,
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
public sealed class JszResponse
|
||||
{
|
||||
[JsonPropertyName("server_id")]
|
||||
public string ServerId { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("now")]
|
||||
public DateTime Now { get; set; }
|
||||
|
||||
[JsonPropertyName("enabled")]
|
||||
public bool Enabled { get; set; }
|
||||
|
||||
[JsonPropertyName("memory")]
|
||||
public ulong Memory { get; set; }
|
||||
|
||||
[JsonPropertyName("storage")]
|
||||
public ulong Storage { get; set; }
|
||||
|
||||
[JsonPropertyName("streams")]
|
||||
public int Streams { get; set; }
|
||||
|
||||
[JsonPropertyName("consumers")]
|
||||
public int Consumers { get; set; }
|
||||
|
||||
[JsonPropertyName("config")]
|
||||
public JetStreamConfig Config { get; set; } = new();
|
||||
}
|
||||
@@ -16,6 +16,7 @@ public sealed class MonitorServer : IAsyncDisposable
|
||||
private readonly VarzHandler _varzHandler;
|
||||
private readonly ConnzHandler _connzHandler;
|
||||
private readonly SubszHandler _subszHandler;
|
||||
private readonly JszHandler _jszHandler;
|
||||
|
||||
public MonitorServer(NatsServer server, NatsOptions options, ServerStats stats, ILoggerFactory loggerFactory)
|
||||
{
|
||||
@@ -31,6 +32,7 @@ public sealed class MonitorServer : IAsyncDisposable
|
||||
_varzHandler = new VarzHandler(server, options);
|
||||
_connzHandler = new ConnzHandler(server);
|
||||
_subszHandler = new SubszHandler(server);
|
||||
_jszHandler = new JszHandler(server, options);
|
||||
|
||||
_app.MapGet(basePath + "/", () =>
|
||||
{
|
||||
@@ -100,7 +102,7 @@ public sealed class MonitorServer : IAsyncDisposable
|
||||
_app.MapGet(basePath + "/jsz", () =>
|
||||
{
|
||||
stats.HttpReqStats.AddOrUpdate("/jsz", 1, (_, v) => v + 1);
|
||||
return Results.Ok(new { });
|
||||
return Results.Ok(_jszHandler.Build());
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -355,8 +355,29 @@ public sealed class MqttOptsVarz
|
||||
[JsonPropertyName("port")]
|
||||
public int Port { get; set; }
|
||||
|
||||
[JsonPropertyName("no_auth_user")]
|
||||
public string NoAuthUser { get; set; } = "";
|
||||
|
||||
[JsonPropertyName("auth_timeout")]
|
||||
public double AuthTimeout { get; set; }
|
||||
|
||||
[JsonPropertyName("tls_map")]
|
||||
public bool TlsMap { get; set; }
|
||||
|
||||
[JsonPropertyName("tls_timeout")]
|
||||
public double TlsTimeout { get; set; }
|
||||
|
||||
[JsonPropertyName("tls_pinned_certs")]
|
||||
public string[] TlsPinnedCerts { get; set; } = [];
|
||||
|
||||
[JsonPropertyName("js_domain")]
|
||||
public string JsDomain { get; set; } = "";
|
||||
|
||||
[JsonPropertyName("ack_wait")]
|
||||
public long AckWait { get; set; }
|
||||
|
||||
[JsonPropertyName("max_ack_pending")]
|
||||
public ushort MaxAckPending { get; set; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -422,6 +443,12 @@ public sealed class JetStreamStats
|
||||
[JsonPropertyName("ha_assets")]
|
||||
public int HaAssets { get; set; }
|
||||
|
||||
[JsonPropertyName("streams")]
|
||||
public int Streams { get; set; }
|
||||
|
||||
[JsonPropertyName("consumers")]
|
||||
public int Consumers { get; set; }
|
||||
|
||||
[JsonPropertyName("api")]
|
||||
public JetStreamApiStats Api { get; set; } = new();
|
||||
}
|
||||
|
||||
@@ -121,6 +121,23 @@ public sealed class VarzHandler : IDisposable
|
||||
Subscriptions = _server.SubList.Count,
|
||||
ConfigLoadTime = _server.StartTime,
|
||||
HttpReqStats = stats.HttpReqStats.ToDictionary(kv => kv.Key, kv => (ulong)kv.Value),
|
||||
Mqtt = BuildMqttVarz(),
|
||||
JetStream = new JetStreamVarz
|
||||
{
|
||||
Config = new JetStreamConfig
|
||||
{
|
||||
MaxMemory = _options.JetStream?.MaxMemoryStore ?? 0,
|
||||
MaxStorage = _options.JetStream?.MaxFileStore ?? 0,
|
||||
StoreDir = _options.JetStream?.StoreDir ?? string.Empty,
|
||||
},
|
||||
Stats = new JetStreamStats
|
||||
{
|
||||
Accounts = _options.JetStream is null ? 0 : 1,
|
||||
HaAssets = _server.JetStreamStreams,
|
||||
Streams = _server.JetStreamStreams,
|
||||
Consumers = _server.JetStreamConsumers,
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
finally
|
||||
@@ -134,6 +151,27 @@ public sealed class VarzHandler : IDisposable
|
||||
_varzMu.Dispose();
|
||||
}
|
||||
|
||||
private MqttOptsVarz BuildMqttVarz()
|
||||
{
|
||||
var mqtt = _options.Mqtt;
|
||||
if (mqtt is null)
|
||||
return new MqttOptsVarz();
|
||||
|
||||
return new MqttOptsVarz
|
||||
{
|
||||
Host = mqtt.Host,
|
||||
Port = mqtt.Port,
|
||||
NoAuthUser = mqtt.NoAuthUser ?? "",
|
||||
AuthTimeout = mqtt.AuthTimeout,
|
||||
TlsMap = mqtt.TlsMap,
|
||||
TlsTimeout = mqtt.TlsTimeout,
|
||||
TlsPinnedCerts = mqtt.TlsPinnedCerts?.ToArray() ?? [],
|
||||
JsDomain = mqtt.JsDomain ?? "",
|
||||
AckWait = (long)mqtt.AckWait.TotalNanoseconds,
|
||||
MaxAckPending = mqtt.MaxAckPending,
|
||||
};
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Formats a TimeSpan as a human-readable uptime string matching Go server format.
|
||||
/// </summary>
|
||||
|
||||
43
src/NATS.Server/MqttOptions.cs
Normal file
43
src/NATS.Server/MqttOptions.cs
Normal file
@@ -0,0 +1,43 @@
|
||||
namespace NATS.Server;
|
||||
|
||||
/// <summary>
|
||||
/// MQTT protocol configuration options.
|
||||
/// Corresponds to Go server/opts.go MQTTOpts struct.
|
||||
/// Config is parsed and stored but no MQTT listener is started yet.
|
||||
/// </summary>
|
||||
public sealed class MqttOptions
|
||||
{
|
||||
// Network
|
||||
public string Host { get; set; } = "";
|
||||
public int Port { get; set; }
|
||||
|
||||
// Auth override (MQTT-specific, separate from global auth)
|
||||
public string? NoAuthUser { get; set; }
|
||||
public string? Username { get; set; }
|
||||
public string? Password { get; set; }
|
||||
public string? Token { get; set; }
|
||||
public double AuthTimeout { get; set; }
|
||||
|
||||
// TLS
|
||||
public string? TlsCert { get; set; }
|
||||
public string? TlsKey { get; set; }
|
||||
public string? TlsCaCert { get; set; }
|
||||
public bool TlsVerify { get; set; }
|
||||
public double TlsTimeout { get; set; } = 2.0;
|
||||
public bool TlsMap { get; set; }
|
||||
public HashSet<string>? TlsPinnedCerts { get; set; }
|
||||
|
||||
// JetStream integration
|
||||
public string? JsDomain { get; set; }
|
||||
public int StreamReplicas { get; set; }
|
||||
public int ConsumerReplicas { get; set; }
|
||||
public bool ConsumerMemoryStorage { get; set; }
|
||||
public TimeSpan ConsumerInactiveThreshold { get; set; }
|
||||
|
||||
// QoS
|
||||
public TimeSpan AckWait { get; set; } = TimeSpan.FromSeconds(30);
|
||||
public ushort MaxAckPending { get; set; }
|
||||
public TimeSpan JsApiTimeout { get; set; } = TimeSpan.FromSeconds(5);
|
||||
|
||||
public bool HasTls => TlsCert != null && TlsKey != null;
|
||||
}
|
||||
@@ -8,9 +8,12 @@ using System.Text.Json;
|
||||
using System.Threading.Channels;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using NATS.Server.Auth;
|
||||
using NATS.Server.Auth.Jwt;
|
||||
using NATS.Server.JetStream.Publish;
|
||||
using NATS.Server.Protocol;
|
||||
using NATS.Server.Subscriptions;
|
||||
using NATS.Server.Tls;
|
||||
using NATS.Server.WebSocket;
|
||||
|
||||
namespace NATS.Server;
|
||||
|
||||
@@ -30,6 +33,7 @@ public interface ISubListAccess
|
||||
|
||||
public sealed class NatsClient : INatsClient, IDisposable
|
||||
{
|
||||
private static readonly ClientCommandMatrix CommandMatrix = new();
|
||||
private readonly Socket _socket;
|
||||
private readonly Stream _stream;
|
||||
private readonly NatsOptions _options;
|
||||
@@ -47,7 +51,7 @@ public sealed class NatsClient : INatsClient, IDisposable
|
||||
private readonly ServerStats _serverStats;
|
||||
|
||||
public ulong Id { get; }
|
||||
public ClientKind Kind => ClientKind.Client;
|
||||
public ClientKind Kind { get; }
|
||||
public ClientOptions? ClientOpts { get; private set; }
|
||||
public IMessageRouter? Router { get; set; }
|
||||
public Account? Account { get; private set; }
|
||||
@@ -96,15 +100,21 @@ public sealed class NatsClient : INatsClient, IDisposable
|
||||
private long _rtt;
|
||||
public TimeSpan Rtt => new(Interlocked.Read(ref _rtt));
|
||||
|
||||
public bool IsWebSocket { get; set; }
|
||||
public WsUpgradeResult? WsInfo { get; set; }
|
||||
|
||||
public TlsConnectionState? TlsState { get; set; }
|
||||
public bool InfoAlreadySent { get; set; }
|
||||
|
||||
public IReadOnlyDictionary<string, Subscription> Subscriptions => _subs;
|
||||
public PubAck? LastJetStreamPubAck { get; private set; }
|
||||
|
||||
public NatsClient(ulong id, Stream stream, Socket socket, NatsOptions options, ServerInfo serverInfo,
|
||||
AuthService authService, byte[]? nonce, ILogger logger, ServerStats serverStats)
|
||||
AuthService authService, byte[]? nonce, ILogger logger, ServerStats serverStats,
|
||||
ClientKind kind = ClientKind.Client)
|
||||
{
|
||||
Id = id;
|
||||
Kind = kind;
|
||||
_socket = socket;
|
||||
_stream = stream;
|
||||
_options = options;
|
||||
@@ -310,6 +320,13 @@ public sealed class NatsClient : INatsClient, IDisposable
|
||||
{
|
||||
Interlocked.Exchange(ref _lastActivityTicks, DateTime.UtcNow.Ticks);
|
||||
|
||||
if (!CommandMatrix.IsAllowed(Kind, cmd.Operation))
|
||||
{
|
||||
_logger.LogDebug("Command {Command} is not allowed for client kind {ClientKind}", cmd.Operation, Kind);
|
||||
await SendErrAndCloseAsync("Parser Error");
|
||||
return;
|
||||
}
|
||||
|
||||
// If auth is required and CONNECT hasn't been received yet,
|
||||
// only allow CONNECT and PING commands
|
||||
if (_authService.IsAuthRequired && !ConnectReceived)
|
||||
@@ -387,6 +404,7 @@ public sealed class NatsClient : INatsClient, IDisposable
|
||||
Opts = ClientOpts,
|
||||
Nonce = _nonce ?? [],
|
||||
ClientCertificate = TlsState?.PeerCert,
|
||||
ConnectionType = JwtConnectionTypes.Standard,
|
||||
};
|
||||
|
||||
authResult = _authService.Authenticate(context);
|
||||
@@ -405,6 +423,10 @@ public sealed class NatsClient : INatsClient, IDisposable
|
||||
{
|
||||
var accountName = authResult.AccountName ?? Account.GlobalAccountName;
|
||||
Account = server.GetOrCreateAccount(accountName);
|
||||
if (authResult.MaxJetStreamStreams > 0)
|
||||
Account.MaxJetStreamStreams = authResult.MaxJetStreamStreams;
|
||||
if (!string.IsNullOrWhiteSpace(authResult.JetStreamTier))
|
||||
Account.JetStreamTier = authResult.JetStreamTier;
|
||||
if (!Account.AddClient(Id))
|
||||
{
|
||||
Account = null;
|
||||
@@ -518,6 +540,8 @@ public sealed class NatsClient : INatsClient, IDisposable
|
||||
_logger.LogDebug("SUB {Subject} {Sid} from client {ClientId}", cmd.Subject, cmd.Sid, Id);
|
||||
|
||||
Account?.SubList.Insert(sub);
|
||||
if (Router is NatsServer server)
|
||||
server.OnLocalSubscription(sub.Subject, sub.Queue);
|
||||
}
|
||||
|
||||
private void ProcessUnsub(ParsedCommand cmd)
|
||||
@@ -582,6 +606,11 @@ public sealed class NatsClient : INatsClient, IDisposable
|
||||
Router?.ProcessMessage(cmd.Subject!, cmd.ReplyTo, headers, payload, this);
|
||||
}
|
||||
|
||||
public void RecordJetStreamPubAck(PubAck ack)
|
||||
{
|
||||
LastJetStreamPubAck = ack;
|
||||
}
|
||||
|
||||
private void SendInfo()
|
||||
{
|
||||
// Use the cached INFO bytes from the server when there is no per-connection
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
using System.Security.Authentication;
|
||||
using NATS.Server.Auth;
|
||||
using NATS.Server.Configuration;
|
||||
using NATS.Server.Tls;
|
||||
|
||||
namespace NATS.Server;
|
||||
@@ -115,5 +116,42 @@ public sealed class NatsOptions
|
||||
// Subject mapping / transforms (source pattern -> destination template)
|
||||
public Dictionary<string, string>? SubjectMappings { get; set; }
|
||||
|
||||
// MQTT configuration (parsed from config, no listener yet)
|
||||
public MqttOptions? Mqtt { get; set; }
|
||||
|
||||
// Cluster and JetStream settings
|
||||
public ClusterOptions? Cluster { get; set; }
|
||||
public GatewayOptions? Gateway { get; set; }
|
||||
public LeafNodeOptions? LeafNode { get; set; }
|
||||
public JetStreamOptions? JetStream { get; set; }
|
||||
|
||||
public bool HasTls => TlsCert != null && TlsKey != null;
|
||||
|
||||
// WebSocket
|
||||
public WebSocketOptions WebSocket { get; set; } = new();
|
||||
}
|
||||
|
||||
public sealed class WebSocketOptions
|
||||
{
|
||||
public string Host { get; set; } = "0.0.0.0";
|
||||
public int Port { get; set; } = -1;
|
||||
public string? Advertise { get; set; }
|
||||
public string? NoAuthUser { get; set; }
|
||||
public string? JwtCookie { get; set; }
|
||||
public string? UsernameCookie { get; set; }
|
||||
public string? PasswordCookie { get; set; }
|
||||
public string? TokenCookie { get; set; }
|
||||
public string? Username { get; set; }
|
||||
public string? Password { get; set; }
|
||||
public string? Token { get; set; }
|
||||
public TimeSpan AuthTimeout { get; set; } = TimeSpan.FromSeconds(2);
|
||||
public bool NoTls { get; set; }
|
||||
public string? TlsCert { get; set; }
|
||||
public string? TlsKey { get; set; }
|
||||
public bool SameOrigin { get; set; }
|
||||
public List<string>? AllowedOrigins { get; set; }
|
||||
public bool Compression { get; set; }
|
||||
public TimeSpan HandshakeTimeout { get; set; } = TimeSpan.FromSeconds(2);
|
||||
public TimeSpan? PingInterval { get; set; }
|
||||
public Dictionary<string, string>? Headers { get; set; }
|
||||
}
|
||||
|
||||
@@ -10,11 +10,18 @@ using NATS.NKeys;
|
||||
using NATS.Server.Auth;
|
||||
using NATS.Server.Configuration;
|
||||
using NATS.Server.Events;
|
||||
using NATS.Server.Gateways;
|
||||
using NATS.Server.Imports;
|
||||
using NATS.Server.JetStream;
|
||||
using NATS.Server.JetStream.Api;
|
||||
using NATS.Server.JetStream.Publish;
|
||||
using NATS.Server.LeafNodes;
|
||||
using NATS.Server.Monitoring;
|
||||
using NATS.Server.Protocol;
|
||||
using NATS.Server.Routes;
|
||||
using NATS.Server.Subscriptions;
|
||||
using NATS.Server.Tls;
|
||||
using NATS.Server.WebSocket;
|
||||
|
||||
namespace NATS.Server;
|
||||
|
||||
@@ -41,7 +48,17 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
private readonly SslServerAuthenticationOptions? _sslOptions;
|
||||
private readonly TlsRateLimiter? _tlsRateLimiter;
|
||||
private readonly SubjectTransform[] _subjectTransforms;
|
||||
private readonly RouteManager? _routeManager;
|
||||
private readonly GatewayManager? _gatewayManager;
|
||||
private readonly LeafNodeManager? _leafNodeManager;
|
||||
private readonly JetStreamService? _jetStreamService;
|
||||
private readonly JetStreamApiRouter? _jetStreamApiRouter;
|
||||
private readonly StreamManager? _jetStreamStreamManager;
|
||||
private readonly ConsumerManager? _jetStreamConsumerManager;
|
||||
private readonly JetStreamPublisher? _jetStreamPublisher;
|
||||
private Socket? _listener;
|
||||
private Socket? _wsListener;
|
||||
private readonly TaskCompletionSource _wsAcceptLoopExited = new(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
private MonitorServer? _monitorServer;
|
||||
private ulong _nextClientId;
|
||||
private long _startTimeTicks;
|
||||
@@ -76,12 +93,37 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
public InternalEventSystem? EventSystem => _eventSystem;
|
||||
public bool IsShuttingDown => Volatile.Read(ref _shutdown) != 0;
|
||||
public bool IsLameDuckMode => Volatile.Read(ref _lameDuck) != 0;
|
||||
public string? ClusterListen => _routeManager?.ListenEndpoint;
|
||||
public JetStreamApiRouter? JetStreamApiRouter => _jetStreamApiRouter;
|
||||
public int JetStreamStreams => _jetStreamStreamManager?.StreamNames.Count ?? 0;
|
||||
public int JetStreamConsumers => _jetStreamConsumerManager?.ConsumerCount ?? 0;
|
||||
public Action? ReOpenLogFile { get; set; }
|
||||
public IEnumerable<NatsClient> GetClients() => _clients.Values;
|
||||
|
||||
public IEnumerable<ClosedClient> GetClosedClients() => _closedClients;
|
||||
|
||||
public IEnumerable<Auth.Account> GetAccounts() => _accounts.Values;
|
||||
public bool HasRemoteInterest(string subject) => _globalAccount.SubList.HasRemoteInterest(subject);
|
||||
public bool TryCaptureJetStreamPublish(string subject, ReadOnlyMemory<byte> payload, out PubAck ack)
|
||||
{
|
||||
if (_jetStreamPublisher != null && _jetStreamPublisher.TryCapture(subject, payload, out ack))
|
||||
{
|
||||
if (ack.ErrorCode == null
|
||||
&& _jetStreamConsumerManager != null
|
||||
&& _jetStreamStreamManager != null
|
||||
&& _jetStreamStreamManager.TryGet(ack.Stream, out var streamHandle))
|
||||
{
|
||||
var stored = streamHandle.Store.LoadAsync(ack.Seq, default).GetAwaiter().GetResult();
|
||||
if (stored != null)
|
||||
_jetStreamConsumerManager.OnPublished(ack.Stream, stored);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
ack = new PubAck();
|
||||
return false;
|
||||
}
|
||||
|
||||
public Task WaitForReadyAsync() => _listeningStarted.Task;
|
||||
|
||||
@@ -112,11 +154,22 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
// Signal all internal loops to stop
|
||||
await _quitCts.CancelAsync();
|
||||
|
||||
// Close listener to stop accept loop
|
||||
// Close listeners to stop accept loops
|
||||
_listener?.Close();
|
||||
_wsListener?.Close();
|
||||
if (_routeManager != null)
|
||||
await _routeManager.DisposeAsync();
|
||||
if (_gatewayManager != null)
|
||||
await _gatewayManager.DisposeAsync();
|
||||
if (_leafNodeManager != null)
|
||||
await _leafNodeManager.DisposeAsync();
|
||||
if (_jetStreamService != null)
|
||||
await _jetStreamService.DisposeAsync();
|
||||
_stats.JetStreamEnabled = false;
|
||||
|
||||
// Wait for accept loop to exit
|
||||
// Wait for accept loops to exit
|
||||
await _acceptLoopExited.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
|
||||
await _wsAcceptLoopExited.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
|
||||
|
||||
// Close all client connections — flush first, then mark closed
|
||||
var flushTasks = new List<Task>();
|
||||
@@ -157,11 +210,13 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
|
||||
_logger.LogInformation("Entering lame duck mode, stop accepting new clients");
|
||||
|
||||
// Close listener to stop accepting new connections
|
||||
// Close listeners to stop accepting new connections
|
||||
_listener?.Close();
|
||||
_wsListener?.Close();
|
||||
|
||||
// Wait for accept loop to exit
|
||||
// Wait for accept loops to exit
|
||||
await _acceptLoopExited.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
|
||||
await _wsAcceptLoopExited.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
|
||||
|
||||
var gracePeriod = _options.LameDuckGracePeriod;
|
||||
if (gracePeriod < TimeSpan.Zero) gracePeriod = -gracePeriod;
|
||||
@@ -307,6 +362,33 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
AuthRequired = _authService.IsAuthRequired,
|
||||
};
|
||||
|
||||
if (options.Cluster != null)
|
||||
{
|
||||
_routeManager = new RouteManager(options.Cluster, _stats, _serverInfo.ServerId, ApplyRemoteSubscription,
|
||||
_loggerFactory.CreateLogger<RouteManager>());
|
||||
}
|
||||
|
||||
if (options.Gateway != null)
|
||||
{
|
||||
_gatewayManager = new GatewayManager(options.Gateway, _stats,
|
||||
_loggerFactory.CreateLogger<GatewayManager>());
|
||||
}
|
||||
|
||||
if (options.LeafNode != null)
|
||||
{
|
||||
_leafNodeManager = new LeafNodeManager(options.LeafNode, _stats,
|
||||
_loggerFactory.CreateLogger<LeafNodeManager>());
|
||||
}
|
||||
|
||||
if (options.JetStream != null)
|
||||
{
|
||||
_jetStreamStreamManager = new StreamManager();
|
||||
_jetStreamConsumerManager = new ConsumerManager();
|
||||
_jetStreamService = new JetStreamService(options.JetStream);
|
||||
_jetStreamApiRouter = new JetStreamApiRouter(_jetStreamStreamManager, _jetStreamConsumerManager);
|
||||
_jetStreamPublisher = new JetStreamPublisher(_jetStreamStreamManager);
|
||||
}
|
||||
|
||||
if (options.HasTls)
|
||||
{
|
||||
_sslOptions = TlsHelper.BuildServerAuthOptions(options);
|
||||
@@ -396,11 +478,6 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
BuildCachedInfo();
|
||||
}
|
||||
|
||||
_listeningStarted.TrySetResult();
|
||||
|
||||
_eventSystem?.Start(this);
|
||||
_eventSystem?.InitEventTracking(this);
|
||||
|
||||
_logger.LogInformation("Listening for client connections on {Host}:{Port}", _options.Host, _options.Port);
|
||||
|
||||
// Warn about stub features
|
||||
@@ -416,6 +493,46 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
WritePidFile();
|
||||
WritePortsFile();
|
||||
|
||||
if (_options.WebSocket.Port >= 0)
|
||||
{
|
||||
_wsListener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
_wsListener.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
|
||||
_wsListener.Bind(new IPEndPoint(
|
||||
_options.WebSocket.Host == "0.0.0.0" ? IPAddress.Any : IPAddress.Parse(_options.WebSocket.Host),
|
||||
_options.WebSocket.Port));
|
||||
_wsListener.Listen(128);
|
||||
|
||||
if (_options.WebSocket.Port == 0)
|
||||
{
|
||||
_options.WebSocket.Port = ((IPEndPoint)_wsListener.LocalEndPoint!).Port;
|
||||
}
|
||||
|
||||
_logger.LogInformation("Listening for WebSocket clients on {Host}:{Port}",
|
||||
_options.WebSocket.Host, _options.WebSocket.Port);
|
||||
|
||||
if (_options.WebSocket.NoTls)
|
||||
_logger.LogWarning("WebSocket not configured with TLS. DO NOT USE IN PRODUCTION!");
|
||||
|
||||
_ = RunWebSocketAcceptLoopAsync(linked.Token);
|
||||
}
|
||||
|
||||
if (_routeManager != null)
|
||||
await _routeManager.StartAsync(linked.Token);
|
||||
if (_gatewayManager != null)
|
||||
await _gatewayManager.StartAsync(linked.Token);
|
||||
if (_leafNodeManager != null)
|
||||
await _leafNodeManager.StartAsync(linked.Token);
|
||||
if (_jetStreamService != null)
|
||||
{
|
||||
await _jetStreamService.StartAsync(linked.Token);
|
||||
_stats.JetStreamEnabled = true;
|
||||
}
|
||||
|
||||
_listeningStarted.TrySetResult();
|
||||
|
||||
_eventSystem?.Start(this);
|
||||
_eventSystem?.InitEventTracking(this);
|
||||
|
||||
var tmpDelay = AcceptMinSleep;
|
||||
|
||||
try
|
||||
@@ -561,6 +678,102 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
}
|
||||
}
|
||||
|
||||
private async Task RunWebSocketAcceptLoopAsync(CancellationToken ct)
|
||||
{
|
||||
var tmpDelay = AcceptMinSleep;
|
||||
try
|
||||
{
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
Socket socket;
|
||||
try
|
||||
{
|
||||
socket = await _wsListener!.AcceptAsync(ct);
|
||||
tmpDelay = AcceptMinSleep;
|
||||
}
|
||||
catch (OperationCanceledException) { break; }
|
||||
catch (ObjectDisposedException) { break; }
|
||||
catch (SocketException ex)
|
||||
{
|
||||
if (IsShuttingDown || IsLameDuckMode) break;
|
||||
_logger.LogError(ex, "Temporary WebSocket accept error, sleeping {Delay}ms", tmpDelay.TotalMilliseconds);
|
||||
try { await Task.Delay(tmpDelay, ct); } catch (OperationCanceledException) { break; }
|
||||
tmpDelay = TimeSpan.FromTicks(Math.Min(tmpDelay.Ticks * 2, AcceptMaxSleep.Ticks));
|
||||
continue;
|
||||
}
|
||||
|
||||
if (_options.MaxConnections > 0 && _clients.Count >= _options.MaxConnections)
|
||||
{
|
||||
socket.Dispose();
|
||||
continue;
|
||||
}
|
||||
|
||||
var clientId = Interlocked.Increment(ref _nextClientId);
|
||||
Interlocked.Increment(ref _stats.TotalConnections);
|
||||
Interlocked.Increment(ref _activeClientCount);
|
||||
|
||||
_ = AcceptWebSocketClientAsync(socket, clientId, ct);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_wsAcceptLoopExited.TrySetResult();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task AcceptWebSocketClientAsync(Socket socket, ulong clientId, CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
var networkStream = new NetworkStream(socket, ownsSocket: false);
|
||||
Stream stream = networkStream;
|
||||
|
||||
// TLS negotiation if configured
|
||||
if (_sslOptions != null && !_options.WebSocket.NoTls)
|
||||
{
|
||||
var (tlsStream, _) = await TlsConnectionWrapper.NegotiateAsync(
|
||||
socket, networkStream, _options, _sslOptions, _serverInfo,
|
||||
_loggerFactory.CreateLogger("NATS.Server.Tls"), ct);
|
||||
stream = tlsStream;
|
||||
}
|
||||
|
||||
// HTTP upgrade handshake
|
||||
var upgradeResult = await WsUpgrade.TryUpgradeAsync(stream, stream, _options.WebSocket, ct);
|
||||
if (!upgradeResult.Success)
|
||||
{
|
||||
_logger.LogDebug("WebSocket upgrade failed for client {ClientId}", clientId);
|
||||
socket.Dispose();
|
||||
Interlocked.Decrement(ref _activeClientCount);
|
||||
return;
|
||||
}
|
||||
|
||||
// Create WsConnection wrapper
|
||||
var wsConn = new WsConnection(stream,
|
||||
compress: upgradeResult.Compress,
|
||||
maskRead: upgradeResult.MaskRead,
|
||||
maskWrite: upgradeResult.MaskWrite,
|
||||
browser: upgradeResult.Browser,
|
||||
noCompFrag: upgradeResult.NoCompFrag);
|
||||
|
||||
var clientLogger = _loggerFactory.CreateLogger($"NATS.Server.NatsClient[{clientId}]");
|
||||
var client = new NatsClient(clientId, wsConn, socket, _options, _serverInfo,
|
||||
_authService, null, clientLogger, _stats);
|
||||
client.Router = this;
|
||||
client.IsWebSocket = true;
|
||||
client.WsInfo = upgradeResult;
|
||||
_clients[clientId] = client;
|
||||
|
||||
await RunClientAsync(client, ct);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogDebug(ex, "Failed to accept WebSocket client {ClientId}", clientId);
|
||||
try { socket.Shutdown(SocketShutdown.Both); } catch { }
|
||||
socket.Dispose();
|
||||
Interlocked.Decrement(ref _activeClientCount);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task RunClientAsync(NatsClient client, CancellationToken ct)
|
||||
{
|
||||
try
|
||||
@@ -579,9 +792,22 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
}
|
||||
}
|
||||
|
||||
public void OnLocalSubscription(string subject, string? queue)
|
||||
{
|
||||
_routeManager?.PropagateLocalSubscription(subject, queue);
|
||||
}
|
||||
|
||||
private void ApplyRemoteSubscription(RemoteSubscription sub)
|
||||
{
|
||||
_globalAccount.SubList.ApplyRemoteSub(sub);
|
||||
}
|
||||
|
||||
public void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory<byte> headers,
|
||||
ReadOnlyMemory<byte> payload, NatsClient sender)
|
||||
{
|
||||
if (TryCaptureJetStreamPublish(subject, payload, out var pubAck))
|
||||
sender.RecordJetStreamPubAck(pubAck);
|
||||
|
||||
// Apply subject transforms
|
||||
if (_subjectTransforms.Length > 0)
|
||||
{
|
||||
@@ -1095,6 +1321,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
Rtt = client.Rtt,
|
||||
TlsVersion = client.TlsState?.TlsVersion ?? "",
|
||||
TlsCipherSuite = client.TlsState?.CipherSuite ?? "",
|
||||
MqttClient = "", // populated when MQTT transport is implemented
|
||||
});
|
||||
|
||||
// Cap closed clients list
|
||||
@@ -1183,10 +1410,22 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
/// the changes, and applies reloadable settings. CLI overrides are preserved.
|
||||
/// </summary>
|
||||
public void ReloadConfig()
|
||||
{
|
||||
ReloadConfigCore(throwOnError: false);
|
||||
}
|
||||
|
||||
public void ReloadConfigOrThrow()
|
||||
{
|
||||
ReloadConfigCore(throwOnError: true);
|
||||
}
|
||||
|
||||
private void ReloadConfigCore(bool throwOnError)
|
||||
{
|
||||
if (_options.ConfigFile == null)
|
||||
{
|
||||
_logger.LogWarning("No config file specified, cannot reload");
|
||||
if (throwOnError)
|
||||
throw new InvalidOperationException("No config file specified.");
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1212,6 +1451,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
{
|
||||
foreach (var err in errors)
|
||||
_logger.LogError("Config reload error: {Error}", err);
|
||||
if (throwOnError)
|
||||
throw new InvalidOperationException(string.Join("; ", errors));
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1223,6 +1464,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failed to reload config file: {ConfigFile}", _options.ConfigFile);
|
||||
if (throwOnError)
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1326,6 +1569,12 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
_quitCts.Dispose();
|
||||
_tlsRateLimiter?.Dispose();
|
||||
_listener?.Dispose();
|
||||
_wsListener?.Dispose();
|
||||
_routeManager?.DisposeAsync().AsTask().GetAwaiter().GetResult();
|
||||
_gatewayManager?.DisposeAsync().AsTask().GetAwaiter().GetResult();
|
||||
_leafNodeManager?.DisposeAsync().AsTask().GetAwaiter().GetResult();
|
||||
_jetStreamService?.DisposeAsync().AsTask().GetAwaiter().GetResult();
|
||||
_stats.JetStreamEnabled = false;
|
||||
foreach (var client in _clients.Values)
|
||||
client.Dispose();
|
||||
foreach (var account in _accounts.Values)
|
||||
|
||||
17
src/NATS.Server/Protocol/ClientCommandMatrix.cs
Normal file
17
src/NATS.Server/Protocol/ClientCommandMatrix.cs
Normal file
@@ -0,0 +1,17 @@
|
||||
namespace NATS.Server.Protocol;
|
||||
|
||||
public sealed class ClientCommandMatrix
|
||||
{
|
||||
public bool IsAllowed(global::NATS.Server.ClientKind kind, string? op)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(op))
|
||||
return true;
|
||||
|
||||
return (kind, op.ToUpperInvariant()) switch
|
||||
{
|
||||
(global::NATS.Server.ClientKind.Router, "RS+") => true,
|
||||
(_, "RS+") => false,
|
||||
_ => true,
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -21,6 +21,7 @@ public enum CommandType
|
||||
public readonly struct ParsedCommand
|
||||
{
|
||||
public CommandType Type { get; init; }
|
||||
public string? Operation { get; init; }
|
||||
public string? Subject { get; init; }
|
||||
public string? ReplyTo { get; init; }
|
||||
public string? Queue { get; init; }
|
||||
@@ -29,7 +30,8 @@ public readonly struct ParsedCommand
|
||||
public int HeaderSize { get; init; }
|
||||
public ReadOnlyMemory<byte> Payload { get; init; }
|
||||
|
||||
public static ParsedCommand Simple(CommandType type) => new() { Type = type, MaxMessages = -1 };
|
||||
public static ParsedCommand Simple(CommandType type, string operation) =>
|
||||
new() { Type = type, Operation = operation, MaxMessages = -1 };
|
||||
}
|
||||
|
||||
public sealed class NatsParser
|
||||
@@ -46,6 +48,7 @@ public sealed class NatsParser
|
||||
private string? _pendingReplyTo;
|
||||
private int _pendingHeaderSize;
|
||||
private CommandType _pendingType;
|
||||
private string _pendingOperation = string.Empty;
|
||||
|
||||
public NatsParser(int maxPayload = NatsProtocol.MaxPayloadSize, ILogger? logger = null)
|
||||
{
|
||||
@@ -103,7 +106,7 @@ public sealed class NatsParser
|
||||
case (byte)'p':
|
||||
if (b1 == (byte)'i') // PING
|
||||
{
|
||||
command = ParsedCommand.Simple(CommandType.Ping);
|
||||
command = ParsedCommand.Simple(CommandType.Ping, "PING");
|
||||
buffer = buffer.Slice(reader.Position);
|
||||
TraceInOp("PING");
|
||||
return true;
|
||||
@@ -111,7 +114,7 @@ public sealed class NatsParser
|
||||
|
||||
if (b1 == (byte)'o') // PONG
|
||||
{
|
||||
command = ParsedCommand.Simple(CommandType.Pong);
|
||||
command = ParsedCommand.Simple(CommandType.Pong, "PONG");
|
||||
buffer = buffer.Slice(reader.Position);
|
||||
TraceInOp("PONG");
|
||||
return true;
|
||||
@@ -177,13 +180,13 @@ public sealed class NatsParser
|
||||
break;
|
||||
|
||||
case (byte)'+': // +OK
|
||||
command = ParsedCommand.Simple(CommandType.Ok);
|
||||
command = ParsedCommand.Simple(CommandType.Ok, "+OK");
|
||||
buffer = buffer.Slice(reader.Position);
|
||||
TraceInOp("+OK");
|
||||
return true;
|
||||
|
||||
case (byte)'-': // -ERR
|
||||
command = ParsedCommand.Simple(CommandType.Err);
|
||||
command = ParsedCommand.Simple(CommandType.Err, "-ERR");
|
||||
buffer = buffer.Slice(reader.Position);
|
||||
TraceInOp("-ERR");
|
||||
return true;
|
||||
@@ -236,6 +239,7 @@ public sealed class NatsParser
|
||||
_pendingReplyTo = reply;
|
||||
_pendingHeaderSize = -1;
|
||||
_pendingType = CommandType.Pub;
|
||||
_pendingOperation = "PUB";
|
||||
|
||||
TraceInOp("PUB", argsSpan);
|
||||
return TryReadPayload(ref buffer, out command);
|
||||
@@ -286,6 +290,7 @@ public sealed class NatsParser
|
||||
_pendingReplyTo = reply;
|
||||
_pendingHeaderSize = hdrSize;
|
||||
_pendingType = CommandType.HPub;
|
||||
_pendingOperation = "HPUB";
|
||||
|
||||
TraceInOp("HPUB", argsSpan);
|
||||
return TryReadPayload(ref buffer, out command);
|
||||
@@ -315,6 +320,7 @@ public sealed class NatsParser
|
||||
command = new ParsedCommand
|
||||
{
|
||||
Type = _pendingType,
|
||||
Operation = _pendingOperation,
|
||||
Subject = _pendingSubject,
|
||||
ReplyTo = _pendingReplyTo,
|
||||
Payload = payload,
|
||||
@@ -339,6 +345,7 @@ public sealed class NatsParser
|
||||
2 => new ParsedCommand
|
||||
{
|
||||
Type = CommandType.Sub,
|
||||
Operation = "SUB",
|
||||
Subject = Encoding.ASCII.GetString(argsSpan[ranges[0]]),
|
||||
Sid = Encoding.ASCII.GetString(argsSpan[ranges[1]]),
|
||||
MaxMessages = -1,
|
||||
@@ -346,6 +353,7 @@ public sealed class NatsParser
|
||||
3 => new ParsedCommand
|
||||
{
|
||||
Type = CommandType.Sub,
|
||||
Operation = "SUB",
|
||||
Subject = Encoding.ASCII.GetString(argsSpan[ranges[0]]),
|
||||
Queue = Encoding.ASCII.GetString(argsSpan[ranges[1]]),
|
||||
Sid = Encoding.ASCII.GetString(argsSpan[ranges[2]]),
|
||||
@@ -367,12 +375,14 @@ public sealed class NatsParser
|
||||
1 => new ParsedCommand
|
||||
{
|
||||
Type = CommandType.Unsub,
|
||||
Operation = "UNSUB",
|
||||
Sid = Encoding.ASCII.GetString(argsSpan[ranges[0]]),
|
||||
MaxMessages = -1,
|
||||
},
|
||||
2 => new ParsedCommand
|
||||
{
|
||||
Type = CommandType.Unsub,
|
||||
Operation = "UNSUB",
|
||||
Sid = Encoding.ASCII.GetString(argsSpan[ranges[0]]),
|
||||
MaxMessages = ParseSize(argsSpan[ranges[1]]),
|
||||
},
|
||||
@@ -391,6 +401,7 @@ public sealed class NatsParser
|
||||
return new ParsedCommand
|
||||
{
|
||||
Type = CommandType.Connect,
|
||||
Operation = "CONNECT",
|
||||
Payload = json.ToArray(),
|
||||
MaxMessages = -1,
|
||||
};
|
||||
@@ -407,6 +418,7 @@ public sealed class NatsParser
|
||||
return new ParsedCommand
|
||||
{
|
||||
Type = CommandType.Info,
|
||||
Operation = "INFO",
|
||||
Payload = json.ToArray(),
|
||||
MaxMessages = -1,
|
||||
};
|
||||
|
||||
32
src/NATS.Server/Raft/RaftLog.cs
Normal file
32
src/NATS.Server/Raft/RaftLog.cs
Normal file
@@ -0,0 +1,32 @@
|
||||
namespace NATS.Server.Raft;
|
||||
|
||||
public sealed class RaftLog
|
||||
{
|
||||
private readonly List<RaftLogEntry> _entries = [];
|
||||
private long _baseIndex;
|
||||
|
||||
public IReadOnlyList<RaftLogEntry> Entries => _entries;
|
||||
|
||||
public RaftLogEntry Append(int term, string command)
|
||||
{
|
||||
var entry = new RaftLogEntry(_baseIndex + _entries.Count + 1, term, command);
|
||||
_entries.Add(entry);
|
||||
return entry;
|
||||
}
|
||||
|
||||
public void AppendReplicated(RaftLogEntry entry)
|
||||
{
|
||||
if (_entries.Any(e => e.Index == entry.Index))
|
||||
return;
|
||||
|
||||
_entries.Add(entry);
|
||||
}
|
||||
|
||||
public void ReplaceWithSnapshot(RaftSnapshot snapshot)
|
||||
{
|
||||
_entries.Clear();
|
||||
_baseIndex = snapshot.LastIncludedIndex;
|
||||
}
|
||||
}
|
||||
|
||||
public sealed record RaftLogEntry(long Index, int Term, string Command);
|
||||
113
src/NATS.Server/Raft/RaftNode.cs
Normal file
113
src/NATS.Server/Raft/RaftNode.cs
Normal file
@@ -0,0 +1,113 @@
|
||||
namespace NATS.Server.Raft;
|
||||
|
||||
public sealed class RaftNode
|
||||
{
|
||||
private int _votesReceived;
|
||||
private readonly List<RaftNode> _cluster = [];
|
||||
private readonly RaftReplicator _replicator = new();
|
||||
private readonly RaftSnapshotStore _snapshotStore = new();
|
||||
|
||||
public string Id { get; }
|
||||
public int Term => TermState.CurrentTerm;
|
||||
public bool IsLeader => Role == RaftRole.Leader;
|
||||
public RaftRole Role { get; private set; } = RaftRole.Follower;
|
||||
public RaftTermState TermState { get; } = new();
|
||||
public long AppliedIndex { get; set; }
|
||||
public RaftLog Log { get; } = new();
|
||||
|
||||
public RaftNode(string id)
|
||||
{
|
||||
Id = id;
|
||||
}
|
||||
|
||||
public void ConfigureCluster(IEnumerable<RaftNode> peers)
|
||||
{
|
||||
_cluster.Clear();
|
||||
_cluster.AddRange(peers);
|
||||
}
|
||||
|
||||
public void StartElection(int clusterSize)
|
||||
{
|
||||
Role = RaftRole.Candidate;
|
||||
TermState.CurrentTerm++;
|
||||
TermState.VotedFor = Id;
|
||||
_votesReceived = 1;
|
||||
TryBecomeLeader(clusterSize);
|
||||
}
|
||||
|
||||
public VoteResponse GrantVote(int term)
|
||||
{
|
||||
if (term < TermState.CurrentTerm)
|
||||
return new VoteResponse { Granted = false };
|
||||
|
||||
TermState.CurrentTerm = term;
|
||||
return new VoteResponse { Granted = true };
|
||||
}
|
||||
|
||||
public void ReceiveVote(VoteResponse response, int clusterSize = 3)
|
||||
{
|
||||
if (!response.Granted)
|
||||
return;
|
||||
|
||||
_votesReceived++;
|
||||
TryBecomeLeader(clusterSize);
|
||||
}
|
||||
|
||||
public async ValueTask<long> ProposeAsync(string command, CancellationToken ct)
|
||||
{
|
||||
if (Role != RaftRole.Leader)
|
||||
throw new InvalidOperationException("Only leader can propose entries.");
|
||||
|
||||
var entry = Log.Append(TermState.CurrentTerm, command);
|
||||
var followers = _cluster.Where(n => n.Id != Id).ToList();
|
||||
var acknowledgements = _replicator.Replicate(entry, followers);
|
||||
|
||||
var quorum = (_cluster.Count / 2) + 1;
|
||||
if (acknowledgements + 1 >= quorum)
|
||||
{
|
||||
AppliedIndex = entry.Index;
|
||||
foreach (var node in _cluster)
|
||||
node.AppliedIndex = Math.Max(node.AppliedIndex, entry.Index);
|
||||
}
|
||||
|
||||
await Task.CompletedTask;
|
||||
return entry.Index;
|
||||
}
|
||||
|
||||
public void ReceiveReplicatedEntry(RaftLogEntry entry)
|
||||
{
|
||||
Log.AppendReplicated(entry);
|
||||
}
|
||||
|
||||
public async Task<RaftSnapshot> CreateSnapshotAsync(CancellationToken ct)
|
||||
{
|
||||
var snapshot = new RaftSnapshot
|
||||
{
|
||||
LastIncludedIndex = AppliedIndex,
|
||||
LastIncludedTerm = Term,
|
||||
};
|
||||
await _snapshotStore.SaveAsync(snapshot, ct);
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
public Task InstallSnapshotAsync(RaftSnapshot snapshot, CancellationToken ct)
|
||||
{
|
||||
Log.ReplaceWithSnapshot(snapshot);
|
||||
AppliedIndex = snapshot.LastIncludedIndex;
|
||||
return _snapshotStore.SaveAsync(snapshot, ct);
|
||||
}
|
||||
|
||||
public void RequestStepDown()
|
||||
{
|
||||
Role = RaftRole.Follower;
|
||||
_votesReceived = 0;
|
||||
TermState.VotedFor = null;
|
||||
}
|
||||
|
||||
private void TryBecomeLeader(int clusterSize)
|
||||
{
|
||||
var quorum = (clusterSize / 2) + 1;
|
||||
if (_votesReceived >= quorum)
|
||||
Role = RaftRole.Leader;
|
||||
}
|
||||
}
|
||||
16
src/NATS.Server/Raft/RaftReplicator.cs
Normal file
16
src/NATS.Server/Raft/RaftReplicator.cs
Normal file
@@ -0,0 +1,16 @@
|
||||
namespace NATS.Server.Raft;
|
||||
|
||||
public sealed class RaftReplicator
|
||||
{
|
||||
public int Replicate(RaftLogEntry entry, IReadOnlyList<RaftNode> followers)
|
||||
{
|
||||
var acknowledgements = 0;
|
||||
foreach (var follower in followers)
|
||||
{
|
||||
follower.ReceiveReplicatedEntry(entry);
|
||||
acknowledgements++;
|
||||
}
|
||||
|
||||
return acknowledgements;
|
||||
}
|
||||
}
|
||||
12
src/NATS.Server/Raft/RaftRpcContracts.cs
Normal file
12
src/NATS.Server/Raft/RaftRpcContracts.cs
Normal file
@@ -0,0 +1,12 @@
|
||||
namespace NATS.Server.Raft;
|
||||
|
||||
public sealed class VoteRequest
|
||||
{
|
||||
public int Term { get; init; }
|
||||
public string CandidateId { get; init; } = string.Empty;
|
||||
}
|
||||
|
||||
public sealed class VoteResponse
|
||||
{
|
||||
public bool Granted { get; init; }
|
||||
}
|
||||
8
src/NATS.Server/Raft/RaftSnapshot.cs
Normal file
8
src/NATS.Server/Raft/RaftSnapshot.cs
Normal file
@@ -0,0 +1,8 @@
|
||||
namespace NATS.Server.Raft;
|
||||
|
||||
public sealed class RaftSnapshot
|
||||
{
|
||||
public long LastIncludedIndex { get; init; }
|
||||
public int LastIncludedTerm { get; init; }
|
||||
public byte[] Data { get; init; } = [];
|
||||
}
|
||||
17
src/NATS.Server/Raft/RaftSnapshotStore.cs
Normal file
17
src/NATS.Server/Raft/RaftSnapshotStore.cs
Normal file
@@ -0,0 +1,17 @@
|
||||
namespace NATS.Server.Raft;
|
||||
|
||||
public sealed class RaftSnapshotStore
|
||||
{
|
||||
private RaftSnapshot? _snapshot;
|
||||
|
||||
public Task SaveAsync(RaftSnapshot snapshot, CancellationToken ct)
|
||||
{
|
||||
_snapshot = snapshot;
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task<RaftSnapshot?> LoadAsync(CancellationToken ct)
|
||||
{
|
||||
return Task.FromResult(_snapshot);
|
||||
}
|
||||
}
|
||||
14
src/NATS.Server/Raft/RaftTermState.cs
Normal file
14
src/NATS.Server/Raft/RaftTermState.cs
Normal file
@@ -0,0 +1,14 @@
|
||||
namespace NATS.Server.Raft;
|
||||
|
||||
public sealed class RaftTermState
|
||||
{
|
||||
public int CurrentTerm { get; set; }
|
||||
public string? VotedFor { get; set; }
|
||||
}
|
||||
|
||||
public enum RaftRole
|
||||
{
|
||||
Follower,
|
||||
Candidate,
|
||||
Leader,
|
||||
}
|
||||
81
src/NATS.Server/Routes/RouteConnection.cs
Normal file
81
src/NATS.Server/Routes/RouteConnection.cs
Normal file
@@ -0,0 +1,81 @@
|
||||
using System.Net.Sockets;
|
||||
using System.Text;
|
||||
|
||||
namespace NATS.Server.Routes;
|
||||
|
||||
public sealed class RouteConnection(Socket socket) : IAsyncDisposable
|
||||
{
|
||||
private readonly Socket _socket = socket;
|
||||
private readonly NetworkStream _stream = new(socket, ownsSocket: true);
|
||||
|
||||
public string? RemoteServerId { get; private set; }
|
||||
public string RemoteEndpoint => _socket.RemoteEndPoint?.ToString() ?? Guid.NewGuid().ToString("N");
|
||||
|
||||
public async Task PerformOutboundHandshakeAsync(string serverId, CancellationToken ct)
|
||||
{
|
||||
await WriteLineAsync($"ROUTE {serverId}", ct);
|
||||
var line = await ReadLineAsync(ct);
|
||||
RemoteServerId = ParseHandshake(line);
|
||||
}
|
||||
|
||||
public async Task PerformInboundHandshakeAsync(string serverId, CancellationToken ct)
|
||||
{
|
||||
var line = await ReadLineAsync(ct);
|
||||
RemoteServerId = ParseHandshake(line);
|
||||
await WriteLineAsync($"ROUTE {serverId}", ct);
|
||||
}
|
||||
|
||||
public async Task WaitUntilClosedAsync(CancellationToken ct)
|
||||
{
|
||||
var buffer = new byte[1024];
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
var bytesRead = await _stream.ReadAsync(buffer, ct);
|
||||
if (bytesRead == 0)
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
await _stream.DisposeAsync();
|
||||
}
|
||||
|
||||
private async Task WriteLineAsync(string line, CancellationToken ct)
|
||||
{
|
||||
var bytes = Encoding.ASCII.GetBytes($"{line}\r\n");
|
||||
await _stream.WriteAsync(bytes, ct);
|
||||
await _stream.FlushAsync(ct);
|
||||
}
|
||||
|
||||
private async Task<string> ReadLineAsync(CancellationToken ct)
|
||||
{
|
||||
var bytes = new List<byte>(64);
|
||||
var single = new byte[1];
|
||||
while (true)
|
||||
{
|
||||
var read = await _stream.ReadAsync(single, ct);
|
||||
if (read == 0)
|
||||
throw new IOException("Route connection closed during handshake");
|
||||
|
||||
if (single[0] == (byte)'\n')
|
||||
break;
|
||||
if (single[0] != (byte)'\r')
|
||||
bytes.Add(single[0]);
|
||||
}
|
||||
|
||||
return Encoding.ASCII.GetString([.. bytes]);
|
||||
}
|
||||
|
||||
private static string ParseHandshake(string line)
|
||||
{
|
||||
if (!line.StartsWith("ROUTE ", StringComparison.OrdinalIgnoreCase))
|
||||
throw new InvalidOperationException("Invalid route handshake");
|
||||
|
||||
var id = line[6..].Trim();
|
||||
if (id.Length == 0)
|
||||
throw new InvalidOperationException("Route handshake missing server id");
|
||||
|
||||
return id;
|
||||
}
|
||||
}
|
||||
224
src/NATS.Server/Routes/RouteManager.cs
Normal file
224
src/NATS.Server/Routes/RouteManager.cs
Normal file
@@ -0,0 +1,224 @@
|
||||
using System.Collections.Concurrent;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using NATS.Server.Configuration;
|
||||
using NATS.Server.Subscriptions;
|
||||
|
||||
namespace NATS.Server.Routes;
|
||||
|
||||
public sealed class RouteManager : IAsyncDisposable
|
||||
{
|
||||
private static readonly ConcurrentDictionary<string, RouteManager> Managers = new(StringComparer.Ordinal);
|
||||
private readonly ClusterOptions _options;
|
||||
private readonly ServerStats _stats;
|
||||
private readonly string _serverId;
|
||||
private readonly ILogger<RouteManager> _logger;
|
||||
private readonly Action<RemoteSubscription> _remoteSubSink;
|
||||
private readonly ConcurrentDictionary<string, RouteConnection> _routes = new(StringComparer.Ordinal);
|
||||
private readonly ConcurrentDictionary<string, byte> _connectedServerIds = new(StringComparer.Ordinal);
|
||||
|
||||
private CancellationTokenSource? _cts;
|
||||
private Socket? _listener;
|
||||
private Task? _acceptLoopTask;
|
||||
|
||||
public string ListenEndpoint => $"{_options.Host}:{_options.Port}";
|
||||
|
||||
public RouteManager(
|
||||
ClusterOptions options,
|
||||
ServerStats stats,
|
||||
string serverId,
|
||||
Action<RemoteSubscription> remoteSubSink,
|
||||
ILogger<RouteManager> logger)
|
||||
{
|
||||
_options = options;
|
||||
_stats = stats;
|
||||
_serverId = serverId;
|
||||
_remoteSubSink = remoteSubSink;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
public Task StartAsync(CancellationToken ct)
|
||||
{
|
||||
_cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
||||
Managers[_serverId] = this;
|
||||
_listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
_listener.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
|
||||
_listener.Bind(new IPEndPoint(IPAddress.Parse(_options.Host), _options.Port));
|
||||
_listener.Listen(128);
|
||||
|
||||
if (_options.Port == 0)
|
||||
_options.Port = ((IPEndPoint)_listener.LocalEndPoint!).Port;
|
||||
|
||||
_acceptLoopTask = Task.Run(() => AcceptLoopAsync(_cts.Token));
|
||||
foreach (var route in _options.Routes.Distinct(StringComparer.OrdinalIgnoreCase))
|
||||
_ = Task.Run(() => ConnectToRouteWithRetryAsync(route, _cts.Token));
|
||||
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
if (_cts == null)
|
||||
return;
|
||||
|
||||
await _cts.CancelAsync();
|
||||
_listener?.Dispose();
|
||||
|
||||
if (_acceptLoopTask != null)
|
||||
await _acceptLoopTask.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
|
||||
|
||||
foreach (var route in _routes.Values)
|
||||
await route.DisposeAsync();
|
||||
|
||||
_routes.Clear();
|
||||
_connectedServerIds.Clear();
|
||||
Managers.TryRemove(_serverId, out _);
|
||||
Interlocked.Exchange(ref _stats.Routes, 0);
|
||||
_cts.Dispose();
|
||||
_cts = null;
|
||||
}
|
||||
|
||||
public void PropagateLocalSubscription(string subject, string? queue)
|
||||
{
|
||||
if (_connectedServerIds.IsEmpty)
|
||||
return;
|
||||
|
||||
var remoteSub = new RemoteSubscription(subject, queue, _serverId);
|
||||
foreach (var peerId in _connectedServerIds.Keys)
|
||||
{
|
||||
if (Managers.TryGetValue(peerId, out var peer))
|
||||
peer.ReceiveRemoteSubscription(remoteSub);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task AcceptLoopAsync(CancellationToken ct)
|
||||
{
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
Socket socket;
|
||||
try
|
||||
{
|
||||
socket = await _listener!.AcceptAsync(ct);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
break;
|
||||
}
|
||||
catch (ObjectDisposedException)
|
||||
{
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogDebug(ex, "Route accept loop error");
|
||||
break;
|
||||
}
|
||||
|
||||
_ = Task.Run(() => HandleInboundRouteAsync(socket, ct), ct);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task HandleInboundRouteAsync(Socket socket, CancellationToken ct)
|
||||
{
|
||||
var route = new RouteConnection(socket);
|
||||
try
|
||||
{
|
||||
await route.PerformInboundHandshakeAsync(_serverId, ct);
|
||||
Register(route);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogDebug(ex, "Inbound route handshake failed");
|
||||
await route.DisposeAsync();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ConnectToRouteWithRetryAsync(string route, CancellationToken ct)
|
||||
{
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
var endPoint = ParseRouteEndpoint(route);
|
||||
var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
await socket.ConnectAsync(endPoint.Address, endPoint.Port, ct);
|
||||
var connection = new RouteConnection(socket);
|
||||
await connection.PerformOutboundHandshakeAsync(_serverId, ct);
|
||||
Register(connection);
|
||||
return;
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
return;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogDebug(ex, "Failed to connect route seed {Route}", route);
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
await Task.Delay(250, ct);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void Register(RouteConnection route)
|
||||
{
|
||||
var key = $"{route.RemoteServerId}:{route.RemoteEndpoint}";
|
||||
if (!_routes.TryAdd(key, route))
|
||||
{
|
||||
_ = route.DisposeAsync();
|
||||
return;
|
||||
}
|
||||
|
||||
if (route.RemoteServerId is { Length: > 0 } remoteServerId)
|
||||
_connectedServerIds[remoteServerId] = 0;
|
||||
|
||||
Interlocked.Increment(ref _stats.Routes);
|
||||
_ = Task.Run(() => WatchRouteAsync(key, route, _cts!.Token));
|
||||
}
|
||||
|
||||
private async Task WatchRouteAsync(string key, RouteConnection route, CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
await route.WaitUntilClosedAsync(ct);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// Shutdown path.
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogDebug(ex, "Route {RouteKey} closed with error", key);
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (_routes.TryRemove(key, out _))
|
||||
Interlocked.Decrement(ref _stats.Routes);
|
||||
|
||||
await route.DisposeAsync();
|
||||
}
|
||||
}
|
||||
|
||||
private static IPEndPoint ParseRouteEndpoint(string route)
|
||||
{
|
||||
var trimmed = route.Trim();
|
||||
var parts = trimmed.Split(':', 2, StringSplitOptions.TrimEntries | StringSplitOptions.RemoveEmptyEntries);
|
||||
if (parts.Length != 2)
|
||||
throw new FormatException($"Invalid route endpoint: '{route}'");
|
||||
|
||||
return new IPEndPoint(IPAddress.Parse(parts[0]), int.Parse(parts[1]));
|
||||
}
|
||||
|
||||
private void ReceiveRemoteSubscription(RemoteSubscription sub)
|
||||
{
|
||||
_remoteSubSink(sub);
|
||||
}
|
||||
}
|
||||
@@ -11,6 +11,9 @@ public sealed class ServerStats
|
||||
public long TotalConnections;
|
||||
public long SlowConsumers;
|
||||
public long StaleConnections;
|
||||
public long Routes;
|
||||
public long Gateways;
|
||||
public long Leafs;
|
||||
public long Stalls;
|
||||
public long SlowConsumerClients;
|
||||
public long SlowConsumerRoutes;
|
||||
@@ -20,5 +23,6 @@ public sealed class ServerStats
|
||||
public long StaleConnectionRoutes;
|
||||
public long StaleConnectionLeafs;
|
||||
public long StaleConnectionGateways;
|
||||
public bool JetStreamEnabled;
|
||||
public readonly ConcurrentDictionary<string, long> HttpReqStats = new();
|
||||
}
|
||||
|
||||
3
src/NATS.Server/Subscriptions/RemoteSubscription.cs
Normal file
3
src/NATS.Server/Subscriptions/RemoteSubscription.cs
Normal file
@@ -0,0 +1,3 @@
|
||||
namespace NATS.Server.Subscriptions;
|
||||
|
||||
public sealed record RemoteSubscription(string Subject, string? Queue, string RouteId);
|
||||
@@ -13,6 +13,7 @@ public sealed class SubList : IDisposable
|
||||
|
||||
private readonly ReaderWriterLockSlim _lock = new();
|
||||
private readonly TrieLevel _root = new();
|
||||
private readonly Dictionary<string, RemoteSubscription> _remoteSubs = new(StringComparer.Ordinal);
|
||||
private Dictionary<string, CachedResult>? _cache = new(StringComparer.Ordinal);
|
||||
private uint _count;
|
||||
private volatile bool _disposed;
|
||||
@@ -96,6 +97,40 @@ public sealed class SubList : IDisposable
|
||||
}
|
||||
}
|
||||
|
||||
public void ApplyRemoteSub(RemoteSubscription sub)
|
||||
{
|
||||
_lock.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
var key = $"{sub.RouteId}|{sub.Subject}|{sub.Queue}";
|
||||
_remoteSubs[key] = sub;
|
||||
Interlocked.Increment(ref _generation);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_lock.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
public bool HasRemoteInterest(string subject)
|
||||
{
|
||||
_lock.EnterReadLock();
|
||||
try
|
||||
{
|
||||
foreach (var remoteSub in _remoteSubs.Values)
|
||||
{
|
||||
if (SubjectMatch.MatchLiteral(subject, remoteSub.Subject))
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_lock.ExitReadLock();
|
||||
}
|
||||
}
|
||||
|
||||
public void Insert(Subscription sub)
|
||||
{
|
||||
var subject = sub.Subject;
|
||||
|
||||
94
src/NATS.Server/WebSocket/WsCompression.cs
Normal file
94
src/NATS.Server/WebSocket/WsCompression.cs
Normal file
@@ -0,0 +1,94 @@
|
||||
using System.IO.Compression;
|
||||
|
||||
namespace NATS.Server.WebSocket;
|
||||
|
||||
/// <summary>
|
||||
/// permessage-deflate compression/decompression for WebSocket frames (RFC 7692).
|
||||
/// Ported from golang/nats-server/server/websocket.go lines 403-440 and 1391-1466.
|
||||
/// </summary>
|
||||
public static class WsCompression
|
||||
{
|
||||
/// <summary>
|
||||
/// Compresses data using deflate. Removes trailing 4 bytes (sync marker)
|
||||
/// per RFC 7692 Section 7.2.1.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// We call Flush() but intentionally do not Dispose() the DeflateStream before
|
||||
/// reading output, because Dispose writes a final deflate block (0x03 0x00) that
|
||||
/// would be corrupted by the 4-byte tail strip. Flush() alone writes a sync flush
|
||||
/// ending with 0x00 0x00 0xff 0xff, matching Go's flate.Writer.Flush() behavior.
|
||||
/// </remarks>
|
||||
public static byte[] Compress(ReadOnlySpan<byte> data)
|
||||
{
|
||||
var output = new MemoryStream();
|
||||
var deflate = new DeflateStream(output, CompressionLevel.Fastest, leaveOpen: true);
|
||||
try
|
||||
{
|
||||
deflate.Write(data);
|
||||
deflate.Flush();
|
||||
|
||||
var compressed = output.ToArray();
|
||||
|
||||
// Remove trailing 4-byte sync marker (0x00 0x00 0xff 0xff) per RFC 7692
|
||||
if (compressed.Length >= 4)
|
||||
return compressed[..^4];
|
||||
|
||||
return compressed;
|
||||
}
|
||||
finally
|
||||
{
|
||||
deflate.Dispose();
|
||||
output.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Decompresses collected compressed buffers.
|
||||
/// Appends trailer bytes before decompressing per RFC 7692 Section 7.2.2.
|
||||
/// Ported from golang/nats-server/server/websocket.go lines 403-440.
|
||||
/// The Go code appends compressLastBlock (9 bytes) which includes the sync
|
||||
/// marker plus a final empty stored block to signal end-of-stream to the
|
||||
/// flate reader.
|
||||
/// </summary>
|
||||
public static byte[] Decompress(List<byte[]> compressedBuffers, int maxPayload)
|
||||
{
|
||||
if (maxPayload <= 0)
|
||||
maxPayload = 1024 * 1024; // Default 1MB
|
||||
|
||||
// Concatenate all compressed buffers + trailer.
|
||||
// Per RFC 7692 Section 7.2.2, append the sync flush marker (0x00 0x00 0xff 0xff)
|
||||
// that was stripped during compression. The Go reference appends compressLastBlock
|
||||
// (9 bytes) for Go's flate reader; .NET's DeflateStream only needs the 4-byte trailer.
|
||||
int totalLen = 0;
|
||||
foreach (var buf in compressedBuffers)
|
||||
totalLen += buf.Length;
|
||||
totalLen += WsConstants.DecompressTrailer.Length;
|
||||
|
||||
var combined = new byte[totalLen];
|
||||
int offset = 0;
|
||||
foreach (var buf in compressedBuffers)
|
||||
{
|
||||
buf.CopyTo(combined, offset);
|
||||
offset += buf.Length;
|
||||
}
|
||||
|
||||
WsConstants.DecompressTrailer.CopyTo(combined, offset);
|
||||
|
||||
using var input = new MemoryStream(combined);
|
||||
using var deflate = new DeflateStream(input, CompressionMode.Decompress);
|
||||
using var output = new MemoryStream();
|
||||
|
||||
var readBuf = new byte[4096];
|
||||
int totalRead = 0;
|
||||
int n;
|
||||
while ((n = deflate.Read(readBuf, 0, readBuf.Length)) > 0)
|
||||
{
|
||||
totalRead += n;
|
||||
if (totalRead > maxPayload)
|
||||
throw new InvalidOperationException("decompressed data exceeds maximum payload size");
|
||||
output.Write(readBuf, 0, n);
|
||||
}
|
||||
|
||||
return output.ToArray();
|
||||
}
|
||||
}
|
||||
202
src/NATS.Server/WebSocket/WsConnection.cs
Normal file
202
src/NATS.Server/WebSocket/WsConnection.cs
Normal file
@@ -0,0 +1,202 @@
|
||||
namespace NATS.Server.WebSocket;
|
||||
|
||||
/// <summary>
|
||||
/// Stream wrapper that transparently frames/deframes WebSocket around raw TCP I/O.
|
||||
/// NatsClient uses this as its _stream -- FillPipeAsync and RunWriteLoopAsync work unchanged.
|
||||
/// Ported from golang/nats-server/server/websocket.go wsUpgrade/wrapWebsocket pattern.
|
||||
/// </summary>
|
||||
public sealed class WsConnection : Stream
|
||||
{
|
||||
private readonly Stream _inner;
|
||||
private readonly bool _compress;
|
||||
private readonly bool _maskRead;
|
||||
private readonly bool _maskWrite;
|
||||
private readonly bool _browser;
|
||||
private readonly bool _noCompFrag;
|
||||
private WsReadInfo _readInfo;
|
||||
// Read-side state: accessed only from the single FillPipeAsync reader task (no synchronization needed)
|
||||
private readonly Queue<byte[]> _readQueue = new();
|
||||
private int _readOffset;
|
||||
private readonly object _writeLock = new();
|
||||
private readonly List<ControlFrameAction> _pendingControlWrites = [];
|
||||
|
||||
public bool CloseReceived => _readInfo.CloseReceived;
|
||||
public int CloseStatus => _readInfo.CloseStatus;
|
||||
|
||||
public WsConnection(Stream inner, bool compress, bool maskRead, bool maskWrite, bool browser, bool noCompFrag)
|
||||
{
|
||||
_inner = inner;
|
||||
_compress = compress;
|
||||
_maskRead = maskRead;
|
||||
_maskWrite = maskWrite;
|
||||
_browser = browser;
|
||||
_noCompFrag = noCompFrag;
|
||||
_readInfo = new WsReadInfo(expectMask: maskRead);
|
||||
}
|
||||
|
||||
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken ct = default)
|
||||
{
|
||||
// Drain any buffered decoded payloads first
|
||||
if (_readQueue.Count > 0)
|
||||
return DrainReadQueue(buffer.Span);
|
||||
|
||||
while (true)
|
||||
{
|
||||
// Read raw bytes from inner stream
|
||||
var rawBuf = new byte[Math.Max(buffer.Length, 4096)];
|
||||
int bytesRead = await _inner.ReadAsync(rawBuf.AsMemory(), ct);
|
||||
if (bytesRead == 0) return 0;
|
||||
|
||||
// Decode frames
|
||||
var payloads = WsReadInfo.ReadFrames(_readInfo, new MemoryStream(rawBuf, 0, bytesRead), bytesRead, maxPayload: 1024 * 1024);
|
||||
|
||||
// Collect control frame responses
|
||||
if (_readInfo.PendingControlFrames.Count > 0)
|
||||
{
|
||||
lock (_writeLock)
|
||||
_pendingControlWrites.AddRange(_readInfo.PendingControlFrames);
|
||||
_readInfo.PendingControlFrames.Clear();
|
||||
// Write pending control frames
|
||||
await FlushControlFramesAsync(ct);
|
||||
}
|
||||
|
||||
if (_readInfo.CloseReceived)
|
||||
return 0;
|
||||
|
||||
foreach (var payload in payloads)
|
||||
_readQueue.Enqueue(payload);
|
||||
|
||||
// If no payloads were decoded (e.g. only frame headers were read),
|
||||
// continue reading instead of returning 0 which signals end-of-stream
|
||||
if (_readQueue.Count > 0)
|
||||
return DrainReadQueue(buffer.Span);
|
||||
}
|
||||
}
|
||||
|
||||
public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken ct = default)
|
||||
{
|
||||
var data = buffer.Span;
|
||||
|
||||
if (_compress && data.Length > WsConstants.CompressThreshold)
|
||||
{
|
||||
var compressed = WsCompression.Compress(data);
|
||||
await WriteFramedAsync(compressed, compressed: true, ct);
|
||||
}
|
||||
else
|
||||
{
|
||||
await WriteFramedAsync(data.ToArray(), compressed: false, ct);
|
||||
}
|
||||
}
|
||||
|
||||
private async ValueTask WriteFramedAsync(byte[] payload, bool compressed, CancellationToken ct)
|
||||
{
|
||||
if (_browser && payload.Length > WsConstants.FrameSizeForBrowsers && !(_noCompFrag && compressed))
|
||||
{
|
||||
// Fragment for browsers
|
||||
int offset = 0;
|
||||
bool first = true;
|
||||
while (offset < payload.Length)
|
||||
{
|
||||
int chunkLen = Math.Min(WsConstants.FrameSizeForBrowsers, payload.Length - offset);
|
||||
bool final = offset + chunkLen >= payload.Length;
|
||||
var fh = new byte[WsConstants.MaxFrameHeaderSize];
|
||||
var (n, key) = WsFrameWriter.FillFrameHeader(fh, _maskWrite,
|
||||
first: first, final: final, compressed: first && compressed,
|
||||
opcode: WsConstants.BinaryMessage, payloadLength: chunkLen);
|
||||
|
||||
var chunk = payload.AsSpan(offset, chunkLen).ToArray();
|
||||
if (_maskWrite && key != null)
|
||||
WsFrameWriter.MaskBuf(key, chunk);
|
||||
|
||||
await _inner.WriteAsync(fh.AsMemory(0, n), ct);
|
||||
await _inner.WriteAsync(chunk.AsMemory(), ct);
|
||||
offset += chunkLen;
|
||||
first = false;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
var (header, key) = WsFrameWriter.CreateFrameHeader(_maskWrite, compressed, WsConstants.BinaryMessage, payload.Length);
|
||||
if (_maskWrite && key != null)
|
||||
WsFrameWriter.MaskBuf(key, payload);
|
||||
await _inner.WriteAsync(header.AsMemory(), ct);
|
||||
await _inner.WriteAsync(payload.AsMemory(), ct);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task FlushControlFramesAsync(CancellationToken ct)
|
||||
{
|
||||
List<ControlFrameAction> toWrite;
|
||||
lock (_writeLock)
|
||||
{
|
||||
if (_pendingControlWrites.Count == 0) return;
|
||||
toWrite = [.. _pendingControlWrites];
|
||||
_pendingControlWrites.Clear();
|
||||
}
|
||||
|
||||
foreach (var action in toWrite)
|
||||
{
|
||||
var frame = WsFrameWriter.BuildControlFrame(action.Opcode, action.Payload, _maskWrite);
|
||||
await _inner.WriteAsync(frame, ct);
|
||||
}
|
||||
await _inner.FlushAsync(ct);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Sends a WebSocket close frame.
|
||||
/// </summary>
|
||||
public async Task SendCloseAsync(ClientClosedReason reason, CancellationToken ct = default)
|
||||
{
|
||||
var status = WsFrameWriter.MapCloseStatus(reason);
|
||||
var closePayload = WsFrameWriter.CreateCloseMessage(status, reason.ToReasonString());
|
||||
var frame = WsFrameWriter.BuildControlFrame(WsConstants.CloseMessage, closePayload, _maskWrite);
|
||||
await _inner.WriteAsync(frame, ct);
|
||||
await _inner.FlushAsync(ct);
|
||||
}
|
||||
|
||||
private int DrainReadQueue(Span<byte> buffer)
|
||||
{
|
||||
int written = 0;
|
||||
while (_readQueue.Count > 0 && written < buffer.Length)
|
||||
{
|
||||
var current = _readQueue.Peek();
|
||||
int available = current.Length - _readOffset;
|
||||
int toCopy = Math.Min(available, buffer.Length - written);
|
||||
current.AsSpan(_readOffset, toCopy).CopyTo(buffer[written..]);
|
||||
written += toCopy;
|
||||
_readOffset += toCopy;
|
||||
if (_readOffset >= current.Length)
|
||||
{
|
||||
_readQueue.Dequeue();
|
||||
_readOffset = 0;
|
||||
}
|
||||
}
|
||||
return written;
|
||||
}
|
||||
|
||||
// Stream abstract members
|
||||
public override bool CanRead => true;
|
||||
public override bool CanWrite => true;
|
||||
public override bool CanSeek => false;
|
||||
public override long Length => throw new NotSupportedException();
|
||||
public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
|
||||
public override void Flush() => _inner.Flush();
|
||||
public override Task FlushAsync(CancellationToken ct) => _inner.FlushAsync(ct);
|
||||
public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException("Use ReadAsync");
|
||||
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException("Use WriteAsync");
|
||||
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
|
||||
public override void SetLength(long value) => throw new NotSupportedException();
|
||||
|
||||
protected override void Dispose(bool disposing)
|
||||
{
|
||||
if (disposing)
|
||||
_inner.Dispose();
|
||||
base.Dispose(disposing);
|
||||
}
|
||||
|
||||
public override async ValueTask DisposeAsync()
|
||||
{
|
||||
await _inner.DisposeAsync();
|
||||
GC.SuppressFinalize(this);
|
||||
}
|
||||
}
|
||||
72
src/NATS.Server/WebSocket/WsConstants.cs
Normal file
72
src/NATS.Server/WebSocket/WsConstants.cs
Normal file
@@ -0,0 +1,72 @@
|
||||
namespace NATS.Server.WebSocket;
|
||||
|
||||
/// <summary>
|
||||
/// WebSocket protocol constants (RFC 6455).
|
||||
/// Ported from golang/nats-server/server/websocket.go lines 41-106.
|
||||
/// </summary>
|
||||
public static class WsConstants
|
||||
{
|
||||
// Opcodes (RFC 6455 Section 5.2)
|
||||
public const int TextMessage = 1;
|
||||
public const int BinaryMessage = 2;
|
||||
public const int CloseMessage = 8;
|
||||
public const int PingMessage = 9;
|
||||
public const int PongMessage = 10;
|
||||
public const int ContinuationFrame = 0;
|
||||
|
||||
// Frame header bits
|
||||
public const byte FinalBit = 0x80; // 1 << 7
|
||||
public const byte Rsv1Bit = 0x40; // 1 << 6 (compression, RFC 7692)
|
||||
public const byte Rsv2Bit = 0x20; // 1 << 5
|
||||
public const byte Rsv3Bit = 0x10; // 1 << 4
|
||||
public const byte MaskBit = 0x80; // 1 << 7 (in second byte)
|
||||
|
||||
// Frame size limits
|
||||
public const int MaxFrameHeaderSize = 14;
|
||||
public const int MaxControlPayloadSize = 125;
|
||||
public const int FrameSizeForBrowsers = 4096;
|
||||
public const int CompressThreshold = 64;
|
||||
public const int CloseStatusSize = 2;
|
||||
|
||||
// Close status codes (RFC 6455 Section 11.7)
|
||||
public const int CloseStatusNormalClosure = 1000;
|
||||
public const int CloseStatusGoingAway = 1001;
|
||||
public const int CloseStatusProtocolError = 1002;
|
||||
public const int CloseStatusUnsupportedData = 1003;
|
||||
public const int CloseStatusNoStatusReceived = 1005;
|
||||
public const int CloseStatusInvalidPayloadData = 1007;
|
||||
public const int CloseStatusPolicyViolation = 1008;
|
||||
public const int CloseStatusMessageTooBig = 1009;
|
||||
public const int CloseStatusInternalSrvError = 1011;
|
||||
public const int CloseStatusTlsHandshake = 1015;
|
||||
|
||||
// Compression constants (RFC 7692)
|
||||
public const string PmcExtension = "permessage-deflate";
|
||||
public const string PmcSrvNoCtx = "server_no_context_takeover";
|
||||
public const string PmcCliNoCtx = "client_no_context_takeover";
|
||||
public static readonly string PmcReqHeaderValue = $"{PmcExtension}; {PmcSrvNoCtx}; {PmcCliNoCtx}";
|
||||
public static readonly string PmcFullResponse = $"Sec-WebSocket-Extensions: {PmcExtension}; {PmcSrvNoCtx}; {PmcCliNoCtx}\r\n";
|
||||
|
||||
// Header names
|
||||
public const string NoMaskingHeader = "Nats-No-Masking";
|
||||
public const string NoMaskingValue = "true";
|
||||
public static readonly string NoMaskingFullResponse = $"{NoMaskingHeader}: {NoMaskingValue}\r\n";
|
||||
public const string XForwardedForHeader = "X-Forwarded-For";
|
||||
|
||||
// Path routing
|
||||
public const string ClientPath = "/";
|
||||
public const string LeafNodePath = "/leafnode";
|
||||
public const string MqttPath = "/mqtt";
|
||||
|
||||
// Decompression trailer appended before decompressing (RFC 7692 Section 7.2.2)
|
||||
public static readonly byte[] DecompressTrailer = [0x00, 0x00, 0xff, 0xff];
|
||||
|
||||
public static bool IsControlFrame(int opcode) => opcode >= CloseMessage;
|
||||
}
|
||||
|
||||
public enum WsClientKind
|
||||
{
|
||||
Client,
|
||||
Leaf,
|
||||
Mqtt,
|
||||
}
|
||||
171
src/NATS.Server/WebSocket/WsFrameWriter.cs
Normal file
171
src/NATS.Server/WebSocket/WsFrameWriter.cs
Normal file
@@ -0,0 +1,171 @@
|
||||
using System.Buffers.Binary;
|
||||
using System.Security.Cryptography;
|
||||
using System.Text;
|
||||
|
||||
namespace NATS.Server.WebSocket;
|
||||
|
||||
/// <summary>
|
||||
/// WebSocket frame construction, masking, and control message creation.
|
||||
/// Ported from golang/nats-server/server/websocket.go lines 543-726.
|
||||
/// </summary>
|
||||
public static class WsFrameWriter
|
||||
{
|
||||
/// <summary>
|
||||
/// Creates a complete frame header for a single-frame message (first=true, final=true).
|
||||
/// Returns (header bytes, mask key or null).
|
||||
/// </summary>
|
||||
public static (byte[] header, byte[]? key) CreateFrameHeader(
|
||||
bool useMasking, bool compressed, int opcode, int payloadLength)
|
||||
{
|
||||
var fh = new byte[WsConstants.MaxFrameHeaderSize];
|
||||
var (n, key) = FillFrameHeader(fh, useMasking,
|
||||
first: true, final: true, compressed: compressed, opcode: opcode, payloadLength: payloadLength);
|
||||
return (fh[..n], key);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Fills a pre-allocated frame header buffer.
|
||||
/// Returns (bytes written, mask key or null).
|
||||
/// </summary>
|
||||
public static (int written, byte[]? key) FillFrameHeader(
|
||||
Span<byte> fh, bool useMasking, bool first, bool final, bool compressed, int opcode, int payloadLength)
|
||||
{
|
||||
byte b0 = first ? (byte)opcode : (byte)0;
|
||||
if (final) b0 |= WsConstants.FinalBit;
|
||||
if (compressed) b0 |= WsConstants.Rsv1Bit;
|
||||
|
||||
byte b1 = 0;
|
||||
if (useMasking) b1 |= WsConstants.MaskBit;
|
||||
|
||||
int n;
|
||||
switch (payloadLength)
|
||||
{
|
||||
case <= 125:
|
||||
n = 2;
|
||||
fh[0] = b0;
|
||||
fh[1] = (byte)(b1 | (byte)payloadLength);
|
||||
break;
|
||||
case < 65536:
|
||||
n = 4;
|
||||
fh[0] = b0;
|
||||
fh[1] = (byte)(b1 | 126);
|
||||
BinaryPrimitives.WriteUInt16BigEndian(fh[2..], (ushort)payloadLength);
|
||||
break;
|
||||
default:
|
||||
n = 10;
|
||||
fh[0] = b0;
|
||||
fh[1] = (byte)(b1 | 127);
|
||||
BinaryPrimitives.WriteUInt64BigEndian(fh[2..], (ulong)payloadLength);
|
||||
break;
|
||||
}
|
||||
|
||||
byte[]? key = null;
|
||||
if (useMasking)
|
||||
{
|
||||
key = new byte[4];
|
||||
RandomNumberGenerator.Fill(key);
|
||||
key.CopyTo(fh[n..]);
|
||||
n += 4;
|
||||
}
|
||||
|
||||
return (n, key);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// XOR masks a buffer with a 4-byte key. Applies in-place.
|
||||
/// </summary>
|
||||
public static void MaskBuf(ReadOnlySpan<byte> key, Span<byte> buf)
|
||||
{
|
||||
for (int i = 0; i < buf.Length; i++)
|
||||
buf[i] ^= key[i & 3];
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// XOR masks multiple contiguous buffers as if they were one.
|
||||
/// </summary>
|
||||
public static void MaskBufs(ReadOnlySpan<byte> key, List<byte[]> bufs)
|
||||
{
|
||||
int pos = 0;
|
||||
foreach (var buf in bufs)
|
||||
{
|
||||
for (int j = 0; j < buf.Length; j++)
|
||||
{
|
||||
buf[j] ^= key[pos & 3];
|
||||
pos++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates a close message payload: 2-byte status code + optional UTF-8 body.
|
||||
/// Body truncated to fit MaxControlPayloadSize with "..." suffix.
|
||||
/// </summary>
|
||||
public static byte[] CreateCloseMessage(int status, string body)
|
||||
{
|
||||
var bodyBytes = Encoding.UTF8.GetBytes(body);
|
||||
int maxBody = WsConstants.MaxControlPayloadSize - WsConstants.CloseStatusSize;
|
||||
|
||||
if (bodyBytes.Length > maxBody)
|
||||
{
|
||||
var suffix = "..."u8;
|
||||
int truncLen = maxBody - suffix.Length;
|
||||
// Find a valid UTF-8 boundary by walking back from truncation point
|
||||
while (truncLen > 0 && (bodyBytes[truncLen] & 0xC0) == 0x80)
|
||||
truncLen--;
|
||||
var buf = new byte[WsConstants.CloseStatusSize + truncLen + suffix.Length];
|
||||
BinaryPrimitives.WriteUInt16BigEndian(buf, (ushort)status);
|
||||
bodyBytes.AsSpan(0, truncLen).CopyTo(buf.AsSpan(WsConstants.CloseStatusSize));
|
||||
suffix.CopyTo(buf.AsSpan(WsConstants.CloseStatusSize + truncLen));
|
||||
return buf;
|
||||
}
|
||||
|
||||
var result = new byte[WsConstants.CloseStatusSize + bodyBytes.Length];
|
||||
BinaryPrimitives.WriteUInt16BigEndian(result, (ushort)status);
|
||||
bodyBytes.CopyTo(result.AsSpan(WsConstants.CloseStatusSize));
|
||||
return result;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Builds a complete control frame (header + payload, optional masking).
|
||||
/// </summary>
|
||||
public static byte[] BuildControlFrame(int opcode, ReadOnlySpan<byte> payload, bool useMasking)
|
||||
{
|
||||
int headerSize = 2 + (useMasking ? 4 : 0);
|
||||
var frame = new byte[headerSize + payload.Length];
|
||||
var span = frame.AsSpan();
|
||||
var (n, key) = FillFrameHeader(span, useMasking,
|
||||
first: true, final: true, compressed: false, opcode: opcode, payloadLength: payload.Length);
|
||||
if (payload.Length > 0)
|
||||
{
|
||||
payload.CopyTo(span[n..]);
|
||||
if (useMasking && key != null)
|
||||
MaskBuf(key, span[n..]);
|
||||
}
|
||||
|
||||
return frame;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Maps a ClientClosedReason to a WebSocket close status code.
|
||||
/// Matches Go wsEnqueueCloseMessage in websocket.go lines 668-694.
|
||||
/// </summary>
|
||||
public static int MapCloseStatus(ClientClosedReason reason) => reason switch
|
||||
{
|
||||
ClientClosedReason.ClientClosed => WsConstants.CloseStatusNormalClosure,
|
||||
ClientClosedReason.AuthenticationTimeout or
|
||||
ClientClosedReason.AuthenticationViolation or
|
||||
ClientClosedReason.SlowConsumerPendingBytes or
|
||||
ClientClosedReason.SlowConsumerWriteDeadline or
|
||||
ClientClosedReason.MaxSubscriptionsExceeded or
|
||||
ClientClosedReason.AuthenticationExpired => WsConstants.CloseStatusPolicyViolation,
|
||||
ClientClosedReason.TlsHandshakeError => WsConstants.CloseStatusTlsHandshake,
|
||||
ClientClosedReason.ParseError or
|
||||
ClientClosedReason.ProtocolViolation => WsConstants.CloseStatusProtocolError,
|
||||
ClientClosedReason.MaxPayloadExceeded => WsConstants.CloseStatusMessageTooBig,
|
||||
ClientClosedReason.WriteError or
|
||||
ClientClosedReason.ReadError or
|
||||
ClientClosedReason.StaleConnection or
|
||||
ClientClosedReason.ServerShutdown => WsConstants.CloseStatusGoingAway,
|
||||
_ => WsConstants.CloseStatusInternalSrvError,
|
||||
};
|
||||
}
|
||||
81
src/NATS.Server/WebSocket/WsOriginChecker.cs
Normal file
81
src/NATS.Server/WebSocket/WsOriginChecker.cs
Normal file
@@ -0,0 +1,81 @@
|
||||
namespace NATS.Server.WebSocket;
|
||||
|
||||
/// <summary>
|
||||
/// Validates WebSocket Origin headers per RFC 6455 Section 10.2.
|
||||
/// Ported from golang/nats-server/server/websocket.go lines 933-1000.
|
||||
/// </summary>
|
||||
public sealed class WsOriginChecker
|
||||
{
|
||||
private readonly bool _sameOrigin;
|
||||
private readonly Dictionary<string, AllowedOrigin>? _allowedOrigins;
|
||||
|
||||
public WsOriginChecker(bool sameOrigin, List<string>? allowedOrigins)
|
||||
{
|
||||
_sameOrigin = sameOrigin;
|
||||
if (allowedOrigins is { Count: > 0 })
|
||||
{
|
||||
_allowedOrigins = new Dictionary<string, AllowedOrigin>(StringComparer.OrdinalIgnoreCase);
|
||||
foreach (var ao in allowedOrigins)
|
||||
{
|
||||
if (Uri.TryCreate(ao, UriKind.Absolute, out var uri))
|
||||
{
|
||||
var (host, port) = GetHostAndPort(uri.Scheme == "https", uri.Host, uri.Port);
|
||||
_allowedOrigins[host] = new AllowedOrigin(uri.Scheme, port);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns null if origin is allowed, or an error message if rejected.
|
||||
/// </summary>
|
||||
public string? CheckOrigin(string? origin, string requestHost, bool isTls)
|
||||
{
|
||||
if (!_sameOrigin && _allowedOrigins == null)
|
||||
return null;
|
||||
|
||||
if (string.IsNullOrEmpty(origin))
|
||||
return null;
|
||||
|
||||
if (!Uri.TryCreate(origin, UriKind.Absolute, out var originUri))
|
||||
return $"invalid origin: {origin}";
|
||||
|
||||
var (oh, op) = GetHostAndPort(originUri.Scheme == "https", originUri.Host, originUri.Port);
|
||||
|
||||
if (_sameOrigin)
|
||||
{
|
||||
var (rh, rp) = ParseHostPort(requestHost, isTls);
|
||||
if (!string.Equals(oh, rh, StringComparison.OrdinalIgnoreCase) || op != rp)
|
||||
return "not same origin";
|
||||
}
|
||||
|
||||
if (_allowedOrigins != null)
|
||||
{
|
||||
if (!_allowedOrigins.TryGetValue(oh, out var allowed) ||
|
||||
!string.Equals(originUri.Scheme, allowed.Scheme, StringComparison.OrdinalIgnoreCase) ||
|
||||
op != allowed.Port)
|
||||
{
|
||||
return "not in the allowed list";
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private static (string host, int port) GetHostAndPort(bool tls, string host, int port)
|
||||
{
|
||||
if (port <= 0)
|
||||
port = tls ? 443 : 80;
|
||||
return (host.ToLowerInvariant(), port);
|
||||
}
|
||||
|
||||
private static (string host, int port) ParseHostPort(string hostPort, bool isTls)
|
||||
{
|
||||
var colonIdx = hostPort.LastIndexOf(':');
|
||||
if (colonIdx > 0 && int.TryParse(hostPort.AsSpan(colonIdx + 1), out var port))
|
||||
return (hostPort[..colonIdx].ToLowerInvariant(), port);
|
||||
return (hostPort.ToLowerInvariant(), isTls ? 443 : 80);
|
||||
}
|
||||
|
||||
private readonly record struct AllowedOrigin(string Scheme, int Port);
|
||||
}
|
||||
322
src/NATS.Server/WebSocket/WsReadInfo.cs
Normal file
322
src/NATS.Server/WebSocket/WsReadInfo.cs
Normal file
@@ -0,0 +1,322 @@
|
||||
using System.Buffers.Binary;
|
||||
using System.Text;
|
||||
|
||||
namespace NATS.Server.WebSocket;
|
||||
|
||||
/// <summary>
|
||||
/// Per-connection WebSocket frame reading state machine.
|
||||
/// Ported from golang/nats-server/server/websocket.go lines 156-506.
|
||||
/// </summary>
|
||||
public class WsReadInfo
|
||||
{
|
||||
public int Remaining;
|
||||
public bool FrameStart;
|
||||
public bool FirstFrame;
|
||||
public bool FrameCompressed;
|
||||
public bool ExpectMask;
|
||||
public byte MaskKeyPos;
|
||||
public byte[] MaskKey;
|
||||
public List<byte[]>? CompressedBuffers;
|
||||
public int CompressedOffset;
|
||||
|
||||
// Control frame outputs
|
||||
public List<ControlFrameAction> PendingControlFrames;
|
||||
public bool CloseReceived;
|
||||
public int CloseStatus;
|
||||
public string? CloseBody;
|
||||
|
||||
public WsReadInfo(bool expectMask)
|
||||
{
|
||||
Remaining = 0;
|
||||
FrameStart = true;
|
||||
FirstFrame = true;
|
||||
FrameCompressed = false;
|
||||
ExpectMask = expectMask;
|
||||
MaskKeyPos = 0;
|
||||
MaskKey = new byte[4];
|
||||
CompressedBuffers = null;
|
||||
CompressedOffset = 0;
|
||||
PendingControlFrames = [];
|
||||
CloseReceived = false;
|
||||
CloseStatus = 0;
|
||||
CloseBody = null;
|
||||
}
|
||||
|
||||
public void SetMaskKey(ReadOnlySpan<byte> key)
|
||||
{
|
||||
key[..4].CopyTo(MaskKey);
|
||||
MaskKeyPos = 0;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Unmask buffer in-place using current mask key and position.
|
||||
/// Optimized for 8-byte chunks when buffer is large enough.
|
||||
/// Ported from websocket.go lines 509-536.
|
||||
/// </summary>
|
||||
public void Unmask(Span<byte> buf)
|
||||
{
|
||||
int p = MaskKeyPos;
|
||||
if (buf.Length < 16)
|
||||
{
|
||||
for (int i = 0; i < buf.Length; i++)
|
||||
{
|
||||
buf[i] ^= MaskKey[p & 3];
|
||||
p++;
|
||||
}
|
||||
MaskKeyPos = (byte)(p & 3);
|
||||
return;
|
||||
}
|
||||
|
||||
// Build 8-byte key for bulk XOR
|
||||
Span<byte> k = stackalloc byte[8];
|
||||
for (int i = 0; i < 8; i++)
|
||||
k[i] = MaskKey[(p + i) & 3];
|
||||
ulong km = BinaryPrimitives.ReadUInt64BigEndian(k);
|
||||
|
||||
int n = (buf.Length / 8) * 8;
|
||||
for (int i = 0; i < n; i += 8)
|
||||
{
|
||||
ulong tmp = BinaryPrimitives.ReadUInt64BigEndian(buf[i..]);
|
||||
tmp ^= km;
|
||||
BinaryPrimitives.WriteUInt64BigEndian(buf[i..], tmp);
|
||||
}
|
||||
|
||||
// Handle remaining bytes
|
||||
p += n;
|
||||
var tail = buf[n..];
|
||||
for (int i = 0; i < tail.Length; i++)
|
||||
{
|
||||
tail[i] ^= MaskKey[p & 3];
|
||||
p++;
|
||||
}
|
||||
MaskKeyPos = (byte)(p & 3);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Read and decode WebSocket frames from a buffer.
|
||||
/// Returns list of decoded payload byte arrays.
|
||||
/// Ported from websocket.go lines 208-351.
|
||||
/// </summary>
|
||||
public static List<byte[]> ReadFrames(WsReadInfo r, Stream stream, int available, int maxPayload)
|
||||
{
|
||||
var bufs = new List<byte[]>();
|
||||
var buf = new byte[available];
|
||||
int bytesRead = 0;
|
||||
|
||||
// Fill the buffer from the stream
|
||||
while (bytesRead < available)
|
||||
{
|
||||
int n = stream.Read(buf, bytesRead, available - bytesRead);
|
||||
if (n == 0) break;
|
||||
bytesRead += n;
|
||||
}
|
||||
|
||||
int pos = 0;
|
||||
int max = bytesRead;
|
||||
|
||||
while (pos < max)
|
||||
{
|
||||
if (r.FrameStart)
|
||||
{
|
||||
if (pos >= max) break;
|
||||
byte b0 = buf[pos];
|
||||
int frameType = b0 & 0x0F;
|
||||
bool final = (b0 & WsConstants.FinalBit) != 0;
|
||||
bool compressed = (b0 & WsConstants.Rsv1Bit) != 0;
|
||||
pos++;
|
||||
|
||||
// Read second byte
|
||||
var (b1Buf, newPos) = WsGet(stream, buf, pos, max, 1);
|
||||
pos = newPos;
|
||||
byte b1 = b1Buf[0];
|
||||
|
||||
// Check mask bit
|
||||
if (r.ExpectMask && (b1 & WsConstants.MaskBit) == 0)
|
||||
throw new InvalidOperationException("mask bit missing");
|
||||
|
||||
r.Remaining = b1 & 0x7F;
|
||||
|
||||
// Validate frame types
|
||||
if (WsConstants.IsControlFrame(frameType))
|
||||
{
|
||||
if (r.Remaining > WsConstants.MaxControlPayloadSize)
|
||||
throw new InvalidOperationException("control frame length too large");
|
||||
if (!final)
|
||||
throw new InvalidOperationException("control frame does not have final bit set");
|
||||
}
|
||||
else if (frameType == WsConstants.TextMessage || frameType == WsConstants.BinaryMessage)
|
||||
{
|
||||
if (!r.FirstFrame)
|
||||
throw new InvalidOperationException("new message before previous finished");
|
||||
r.FirstFrame = final;
|
||||
r.FrameCompressed = compressed;
|
||||
}
|
||||
else if (frameType == WsConstants.ContinuationFrame)
|
||||
{
|
||||
if (r.FirstFrame || compressed)
|
||||
throw new InvalidOperationException("invalid continuation frame");
|
||||
r.FirstFrame = final;
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new InvalidOperationException($"unknown opcode {frameType}");
|
||||
}
|
||||
|
||||
// Extended payload length
|
||||
switch (r.Remaining)
|
||||
{
|
||||
case 126:
|
||||
{
|
||||
var (lenBuf, p2) = WsGet(stream, buf, pos, max, 2);
|
||||
pos = p2;
|
||||
r.Remaining = BinaryPrimitives.ReadUInt16BigEndian(lenBuf);
|
||||
break;
|
||||
}
|
||||
case 127:
|
||||
{
|
||||
var (lenBuf, p2) = WsGet(stream, buf, pos, max, 8);
|
||||
pos = p2;
|
||||
var len64 = BinaryPrimitives.ReadUInt64BigEndian(lenBuf);
|
||||
if (len64 > (ulong)maxPayload)
|
||||
throw new InvalidOperationException($"frame payload length {len64} exceeds max payload {maxPayload}");
|
||||
r.Remaining = (int)len64;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Read mask key (mask bit already validated at line 134)
|
||||
if (r.ExpectMask)
|
||||
{
|
||||
var (keyBuf, p2) = WsGet(stream, buf, pos, max, 4);
|
||||
pos = p2;
|
||||
keyBuf.AsSpan(0, 4).CopyTo(r.MaskKey);
|
||||
r.MaskKeyPos = 0;
|
||||
}
|
||||
|
||||
// Handle control frames
|
||||
if (WsConstants.IsControlFrame(frameType))
|
||||
{
|
||||
pos = HandleControlFrame(r, frameType, stream, buf, pos, max);
|
||||
continue;
|
||||
}
|
||||
|
||||
r.FrameStart = false;
|
||||
}
|
||||
|
||||
if (pos < max)
|
||||
{
|
||||
int n = r.Remaining;
|
||||
if (pos + n > max) n = max - pos;
|
||||
|
||||
var payloadSlice = buf.AsSpan(pos, n).ToArray();
|
||||
pos += n;
|
||||
r.Remaining -= n;
|
||||
|
||||
if (r.ExpectMask)
|
||||
r.Unmask(payloadSlice);
|
||||
|
||||
bool addToBufs = true;
|
||||
if (r.FrameCompressed)
|
||||
{
|
||||
addToBufs = false;
|
||||
r.CompressedBuffers ??= [];
|
||||
r.CompressedBuffers.Add(payloadSlice);
|
||||
|
||||
if (r.FirstFrame && r.Remaining == 0)
|
||||
{
|
||||
var decompressed = WsCompression.Decompress(r.CompressedBuffers, maxPayload);
|
||||
r.CompressedBuffers = null;
|
||||
r.FrameCompressed = false;
|
||||
addToBufs = true;
|
||||
payloadSlice = decompressed;
|
||||
}
|
||||
}
|
||||
|
||||
if (addToBufs && payloadSlice.Length > 0)
|
||||
bufs.Add(payloadSlice);
|
||||
|
||||
if (r.Remaining == 0)
|
||||
r.FrameStart = true;
|
||||
}
|
||||
}
|
||||
|
||||
return bufs;
|
||||
}
|
||||
|
||||
private static int HandleControlFrame(WsReadInfo r, int frameType, Stream stream, byte[] buf, int pos, int max)
|
||||
{
|
||||
byte[]? payload = null;
|
||||
if (r.Remaining > 0)
|
||||
{
|
||||
var (payloadBuf, newPos) = WsGet(stream, buf, pos, max, r.Remaining);
|
||||
pos = newPos;
|
||||
payload = payloadBuf;
|
||||
if (r.ExpectMask)
|
||||
r.Unmask(payload);
|
||||
r.Remaining = 0;
|
||||
}
|
||||
|
||||
switch (frameType)
|
||||
{
|
||||
case WsConstants.CloseMessage:
|
||||
r.CloseReceived = true;
|
||||
r.CloseStatus = WsConstants.CloseStatusNoStatusReceived;
|
||||
if (payload != null && payload.Length >= WsConstants.CloseStatusSize)
|
||||
{
|
||||
r.CloseStatus = BinaryPrimitives.ReadUInt16BigEndian(payload);
|
||||
if (payload.Length > WsConstants.CloseStatusSize)
|
||||
r.CloseBody = Encoding.UTF8.GetString(payload.AsSpan(WsConstants.CloseStatusSize));
|
||||
}
|
||||
// Per RFC 6455 Section 5.5.1, always send a close response
|
||||
if (r.CloseStatus != WsConstants.CloseStatusNoStatusReceived)
|
||||
{
|
||||
var closeMsg = WsFrameWriter.CreateCloseMessage(r.CloseStatus, r.CloseBody ?? "");
|
||||
r.PendingControlFrames.Add(new ControlFrameAction(WsConstants.CloseMessage, closeMsg));
|
||||
}
|
||||
else
|
||||
{
|
||||
// Empty close frame — respond with empty close
|
||||
r.PendingControlFrames.Add(new ControlFrameAction(WsConstants.CloseMessage, []));
|
||||
}
|
||||
break;
|
||||
|
||||
case WsConstants.PingMessage:
|
||||
r.PendingControlFrames.Add(new ControlFrameAction(WsConstants.PongMessage, payload ?? []));
|
||||
break;
|
||||
|
||||
case WsConstants.PongMessage:
|
||||
// Nothing to do
|
||||
break;
|
||||
}
|
||||
|
||||
return pos;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets needed bytes from buffer or reads from stream.
|
||||
/// Ported from websocket.go lines 178-193.
|
||||
/// </summary>
|
||||
private static (byte[] data, int newPos) WsGet(Stream stream, byte[] buf, int pos, int max, int needed)
|
||||
{
|
||||
int avail = max - pos;
|
||||
if (avail >= needed)
|
||||
return (buf[pos..(pos + needed)], pos + needed);
|
||||
|
||||
var b = new byte[needed];
|
||||
int start = 0;
|
||||
if (avail > 0)
|
||||
{
|
||||
Buffer.BlockCopy(buf, pos, b, 0, avail);
|
||||
start = avail;
|
||||
}
|
||||
while (start < needed)
|
||||
{
|
||||
int n = stream.Read(b, start, needed - start);
|
||||
if (n == 0) throw new IOException("unexpected end of stream");
|
||||
start += n;
|
||||
}
|
||||
return (b, pos + avail);
|
||||
}
|
||||
}
|
||||
|
||||
public readonly record struct ControlFrameAction(int Opcode, byte[] Payload);
|
||||
268
src/NATS.Server/WebSocket/WsUpgrade.cs
Normal file
268
src/NATS.Server/WebSocket/WsUpgrade.cs
Normal file
@@ -0,0 +1,268 @@
|
||||
using System.Net;
|
||||
using System.Security.Cryptography;
|
||||
using System.Text;
|
||||
|
||||
namespace NATS.Server.WebSocket;
|
||||
|
||||
/// <summary>
|
||||
/// WebSocket HTTP upgrade handshake handler.
|
||||
/// Ported from golang/nats-server/server/websocket.go lines 731-917.
|
||||
/// </summary>
|
||||
public static class WsUpgrade
|
||||
{
|
||||
public static async Task<WsUpgradeResult> TryUpgradeAsync(
|
||||
Stream inputStream, Stream outputStream, WebSocketOptions options,
|
||||
CancellationToken ct = default)
|
||||
{
|
||||
try
|
||||
{
|
||||
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
||||
cts.CancelAfter(options.HandshakeTimeout);
|
||||
var (method, path, headers) = await ReadHttpRequestAsync(inputStream, cts.Token);
|
||||
|
||||
if (!string.Equals(method, "GET", StringComparison.OrdinalIgnoreCase))
|
||||
return await FailAsync(outputStream, 405, "request method must be GET");
|
||||
|
||||
if (!headers.ContainsKey("Host"))
|
||||
return await FailAsync(outputStream, 400, "'Host' missing in request");
|
||||
|
||||
if (!HeaderContains(headers, "Upgrade", "websocket"))
|
||||
return await FailAsync(outputStream, 400, "invalid value for header 'Upgrade'");
|
||||
|
||||
if (!HeaderContains(headers, "Connection", "Upgrade"))
|
||||
return await FailAsync(outputStream, 400, "invalid value for header 'Connection'");
|
||||
|
||||
if (!headers.TryGetValue("Sec-WebSocket-Key", out var key) || string.IsNullOrEmpty(key))
|
||||
return await FailAsync(outputStream, 400, "key missing");
|
||||
|
||||
if (!HeaderContains(headers, "Sec-WebSocket-Version", "13"))
|
||||
return await FailAsync(outputStream, 400, "invalid version");
|
||||
|
||||
var kind = path switch
|
||||
{
|
||||
_ when path.EndsWith("/leafnode") => WsClientKind.Leaf,
|
||||
_ when path.EndsWith("/mqtt") => WsClientKind.Mqtt,
|
||||
_ => WsClientKind.Client,
|
||||
};
|
||||
|
||||
// Origin checking
|
||||
if (options.SameOrigin || options.AllowedOrigins is { Count: > 0 })
|
||||
{
|
||||
var checker = new WsOriginChecker(options.SameOrigin, options.AllowedOrigins);
|
||||
headers.TryGetValue("Origin", out var origin);
|
||||
if (string.IsNullOrEmpty(origin))
|
||||
headers.TryGetValue("Sec-WebSocket-Origin", out origin);
|
||||
var originErr = checker.CheckOrigin(origin, headers.GetValueOrDefault("Host", ""), isTls: false);
|
||||
if (originErr != null)
|
||||
return await FailAsync(outputStream, 403, $"origin not allowed: {originErr}");
|
||||
}
|
||||
|
||||
// Compression negotiation
|
||||
bool compress = options.Compression;
|
||||
if (compress)
|
||||
{
|
||||
compress = headers.TryGetValue("Sec-WebSocket-Extensions", out var ext) &&
|
||||
ext.Contains(WsConstants.PmcExtension, StringComparison.OrdinalIgnoreCase);
|
||||
}
|
||||
|
||||
// No-masking support (leaf nodes only — browser clients must always mask)
|
||||
bool noMasking = kind == WsClientKind.Leaf &&
|
||||
headers.TryGetValue(WsConstants.NoMaskingHeader, out var nmVal) &&
|
||||
string.Equals(nmVal.Trim(), WsConstants.NoMaskingValue, StringComparison.OrdinalIgnoreCase);
|
||||
|
||||
// Browser detection
|
||||
bool browser = false;
|
||||
bool noCompFrag = false;
|
||||
if (kind is WsClientKind.Client or WsClientKind.Mqtt &&
|
||||
headers.TryGetValue("User-Agent", out var ua) && ua.StartsWith("Mozilla/"))
|
||||
{
|
||||
browser = true;
|
||||
// Disable fragmentation of compressed frames for Safari browsers.
|
||||
// Safari has both "Version/" and "Safari/" in the user agent string,
|
||||
// while Chrome on macOS has "Safari/" but not "Version/".
|
||||
noCompFrag = compress && ua.Contains("Version/") && ua.Contains("Safari/");
|
||||
}
|
||||
|
||||
// Cookie extraction
|
||||
string? cookieJwt = null, cookieUsername = null, cookiePassword = null, cookieToken = null;
|
||||
if ((kind is WsClientKind.Client or WsClientKind.Mqtt) &&
|
||||
headers.TryGetValue("Cookie", out var cookieHeader))
|
||||
{
|
||||
var cookies = ParseCookies(cookieHeader);
|
||||
if (options.JwtCookie != null) cookies.TryGetValue(options.JwtCookie, out cookieJwt);
|
||||
if (options.UsernameCookie != null) cookies.TryGetValue(options.UsernameCookie, out cookieUsername);
|
||||
if (options.PasswordCookie != null) cookies.TryGetValue(options.PasswordCookie, out cookiePassword);
|
||||
if (options.TokenCookie != null) cookies.TryGetValue(options.TokenCookie, out cookieToken);
|
||||
}
|
||||
|
||||
// X-Forwarded-For client IP extraction
|
||||
string? clientIp = null;
|
||||
if (headers.TryGetValue(WsConstants.XForwardedForHeader, out var xff))
|
||||
{
|
||||
var ip = xff.Split(',')[0].Trim();
|
||||
if (IPAddress.TryParse(ip, out _))
|
||||
clientIp = ip;
|
||||
}
|
||||
|
||||
// Build the 101 Switching Protocols response
|
||||
var response = new StringBuilder();
|
||||
response.Append("HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: ");
|
||||
response.Append(ComputeAcceptKey(key));
|
||||
response.Append("\r\n");
|
||||
if (compress)
|
||||
response.Append(WsConstants.PmcFullResponse);
|
||||
if (noMasking)
|
||||
response.Append(WsConstants.NoMaskingFullResponse);
|
||||
if (options.Headers != null)
|
||||
{
|
||||
foreach (var (k, v) in options.Headers)
|
||||
{
|
||||
response.Append(k);
|
||||
response.Append(": ");
|
||||
response.Append(v);
|
||||
response.Append("\r\n");
|
||||
}
|
||||
}
|
||||
|
||||
response.Append("\r\n");
|
||||
|
||||
var responseBytes = Encoding.ASCII.GetBytes(response.ToString());
|
||||
await outputStream.WriteAsync(responseBytes);
|
||||
await outputStream.FlushAsync();
|
||||
|
||||
return new WsUpgradeResult(
|
||||
Success: true, Compress: compress, Browser: browser, NoCompFrag: noCompFrag,
|
||||
MaskRead: !noMasking, MaskWrite: false,
|
||||
CookieJwt: cookieJwt, CookieUsername: cookieUsername,
|
||||
CookiePassword: cookiePassword, CookieToken: cookieToken,
|
||||
ClientIp: clientIp, Kind: kind);
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
return WsUpgradeResult.Failed;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Computes the Sec-WebSocket-Accept value per RFC 6455 Section 4.2.2.
|
||||
/// </summary>
|
||||
public static string ComputeAcceptKey(string clientKey)
|
||||
{
|
||||
var combined = Encoding.ASCII.GetBytes(clientKey + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11");
|
||||
var hash = SHA1.HashData(combined);
|
||||
return Convert.ToBase64String(hash);
|
||||
}
|
||||
|
||||
private static async Task<WsUpgradeResult> FailAsync(Stream output, int statusCode, string reason)
|
||||
{
|
||||
var statusText = statusCode switch
|
||||
{
|
||||
400 => "Bad Request",
|
||||
403 => "Forbidden",
|
||||
405 => "Method Not Allowed",
|
||||
_ => "Internal Server Error",
|
||||
};
|
||||
var response = $"HTTP/1.1 {statusCode} {statusText}\r\nSec-WebSocket-Version: 13\r\nContent-Type: text/plain\r\nContent-Length: {reason.Length}\r\n\r\n{reason}";
|
||||
await output.WriteAsync(Encoding.ASCII.GetBytes(response));
|
||||
await output.FlushAsync();
|
||||
return WsUpgradeResult.Failed;
|
||||
}
|
||||
|
||||
private static async Task<(string method, string path, Dictionary<string, string> headers)> ReadHttpRequestAsync(
|
||||
Stream stream, CancellationToken ct)
|
||||
{
|
||||
var headerBytes = new List<byte>(4096);
|
||||
var buf = new byte[512];
|
||||
while (true)
|
||||
{
|
||||
int n = await stream.ReadAsync(buf, ct);
|
||||
if (n == 0) throw new IOException("connection closed during handshake");
|
||||
for (int i = 0; i < n; i++)
|
||||
{
|
||||
headerBytes.Add(buf[i]);
|
||||
if (headerBytes.Count >= 4 &&
|
||||
headerBytes[^4] == '\r' && headerBytes[^3] == '\n' &&
|
||||
headerBytes[^2] == '\r' && headerBytes[^1] == '\n')
|
||||
goto done;
|
||||
if (headerBytes.Count > 8192)
|
||||
throw new InvalidOperationException("HTTP header too large");
|
||||
}
|
||||
}
|
||||
done:;
|
||||
|
||||
var text = Encoding.ASCII.GetString(headerBytes.ToArray());
|
||||
var lines = text.Split("\r\n", StringSplitOptions.None);
|
||||
if (lines.Length < 1) throw new InvalidOperationException("invalid HTTP request");
|
||||
|
||||
var parts = lines[0].Split(' ');
|
||||
if (parts.Length < 3) throw new InvalidOperationException("invalid HTTP request line");
|
||||
var method = parts[0];
|
||||
var path = parts[1];
|
||||
|
||||
var headers = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase);
|
||||
for (int i = 1; i < lines.Length; i++)
|
||||
{
|
||||
var line = lines[i];
|
||||
if (string.IsNullOrEmpty(line)) break;
|
||||
var colonIdx = line.IndexOf(':');
|
||||
if (colonIdx > 0)
|
||||
{
|
||||
var name = line[..colonIdx].Trim();
|
||||
var value = line[(colonIdx + 1)..].Trim();
|
||||
headers[name] = value;
|
||||
}
|
||||
}
|
||||
|
||||
return (method, path, headers);
|
||||
}
|
||||
|
||||
private static bool HeaderContains(Dictionary<string, string> headers, string name, string value)
|
||||
{
|
||||
if (!headers.TryGetValue(name, out var headerValue))
|
||||
return false;
|
||||
foreach (var token in headerValue.Split(','))
|
||||
{
|
||||
if (string.Equals(token.Trim(), value, StringComparison.OrdinalIgnoreCase))
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private static Dictionary<string, string> ParseCookies(string cookieHeader)
|
||||
{
|
||||
var cookies = new Dictionary<string, string>(StringComparer.Ordinal);
|
||||
foreach (var pair in cookieHeader.Split(';'))
|
||||
{
|
||||
var trimmed = pair.Trim();
|
||||
var eqIdx = trimmed.IndexOf('=');
|
||||
if (eqIdx > 0)
|
||||
cookies[trimmed[..eqIdx].Trim()] = trimmed[(eqIdx + 1)..].Trim();
|
||||
}
|
||||
|
||||
return cookies;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Result of a WebSocket upgrade handshake attempt.
|
||||
/// </summary>
|
||||
public readonly record struct WsUpgradeResult(
|
||||
bool Success,
|
||||
bool Compress,
|
||||
bool Browser,
|
||||
bool NoCompFrag,
|
||||
bool MaskRead,
|
||||
bool MaskWrite,
|
||||
string? CookieJwt,
|
||||
string? CookieUsername,
|
||||
string? CookiePassword,
|
||||
string? CookieToken,
|
||||
string? ClientIp,
|
||||
WsClientKind Kind)
|
||||
{
|
||||
public static readonly WsUpgradeResult Failed = new(
|
||||
Success: false, Compress: false, Browser: false, NoCompFrag: false,
|
||||
MaskRead: true, MaskWrite: false, CookieJwt: null, CookieUsername: null,
|
||||
CookiePassword: null, CookieToken: null, ClientIp: null, Kind: WsClientKind.Client);
|
||||
}
|
||||
14
tests/NATS.Server.Tests/ClientKindCommandMatrixTests.cs
Normal file
14
tests/NATS.Server.Tests/ClientKindCommandMatrixTests.cs
Normal file
@@ -0,0 +1,14 @@
|
||||
using NATS.Server.Protocol;
|
||||
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class ClientKindCommandMatrixTests
|
||||
{
|
||||
[Fact]
|
||||
public void Router_only_commands_are_rejected_for_client_kind()
|
||||
{
|
||||
var matrix = new ClientCommandMatrix();
|
||||
matrix.IsAllowed(ClientKind.Client, "RS+").ShouldBeFalse();
|
||||
matrix.IsAllowed(ClientKind.Router, "RS+").ShouldBeTrue();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
using NATS.Server.Configuration;
|
||||
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class ClusterJetStreamConfigProcessorTests
|
||||
{
|
||||
[Fact]
|
||||
public void ConfigProcessor_maps_jetstream_and_cluster_blocks()
|
||||
{
|
||||
var cfg = """
|
||||
cluster { name: C1; listen: 127.0.0.1:6222 }
|
||||
jetstream { store_dir: /tmp/js; max_mem_store: 1GB; max_file_store: 10GB }
|
||||
""";
|
||||
|
||||
var opts = ConfigProcessor.ProcessConfig(cfg);
|
||||
|
||||
opts.Cluster.ShouldNotBeNull();
|
||||
opts.JetStream.ShouldNotBeNull();
|
||||
opts.JetStream!.StoreDir.ShouldBe("/tmp/js");
|
||||
}
|
||||
}
|
||||
@@ -501,4 +501,112 @@ public class ConfigProcessorTests
|
||||
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("tls.conf"));
|
||||
opts.HasTls.ShouldBeTrue();
|
||||
}
|
||||
|
||||
// ─── MQTT config ────────────────────────────────────────────
|
||||
|
||||
[Fact]
|
||||
public void MqttConf_ListenHostAndPort()
|
||||
{
|
||||
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("mqtt.conf"));
|
||||
opts.Mqtt.ShouldNotBeNull();
|
||||
opts.Mqtt!.Host.ShouldBe("10.0.0.1");
|
||||
opts.Mqtt.Port.ShouldBe(1883);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void MqttConf_NoAuthUser()
|
||||
{
|
||||
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("mqtt.conf"));
|
||||
opts.Mqtt.ShouldNotBeNull();
|
||||
opts.Mqtt!.NoAuthUser.ShouldBe("mqtt_default");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void MqttConf_Authorization()
|
||||
{
|
||||
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("mqtt.conf"));
|
||||
opts.Mqtt.ShouldNotBeNull();
|
||||
opts.Mqtt!.Username.ShouldBe("mqtt_user");
|
||||
opts.Mqtt.Password.ShouldBe("mqtt_pass");
|
||||
opts.Mqtt.Token.ShouldBe("mqtt_token");
|
||||
opts.Mqtt.AuthTimeout.ShouldBe(3.0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void MqttConf_Tls()
|
||||
{
|
||||
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("mqtt.conf"));
|
||||
opts.Mqtt.ShouldNotBeNull();
|
||||
opts.Mqtt!.TlsCert.ShouldBe("/path/to/mqtt-cert.pem");
|
||||
opts.Mqtt.TlsKey.ShouldBe("/path/to/mqtt-key.pem");
|
||||
opts.Mqtt.TlsCaCert.ShouldBe("/path/to/mqtt-ca.pem");
|
||||
opts.Mqtt.TlsVerify.ShouldBeTrue();
|
||||
opts.Mqtt.TlsTimeout.ShouldBe(5.0);
|
||||
opts.Mqtt.HasTls.ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void MqttConf_QosSettings()
|
||||
{
|
||||
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("mqtt.conf"));
|
||||
opts.Mqtt.ShouldNotBeNull();
|
||||
opts.Mqtt!.AckWait.ShouldBe(TimeSpan.FromSeconds(60));
|
||||
opts.Mqtt.MaxAckPending.ShouldBe((ushort)2048);
|
||||
opts.Mqtt.JsApiTimeout.ShouldBe(TimeSpan.FromSeconds(10));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void MqttConf_JetStreamSettings()
|
||||
{
|
||||
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("mqtt.conf"));
|
||||
opts.Mqtt.ShouldNotBeNull();
|
||||
opts.Mqtt!.JsDomain.ShouldBe("mqtt-domain");
|
||||
opts.Mqtt.StreamReplicas.ShouldBe(3);
|
||||
opts.Mqtt.ConsumerReplicas.ShouldBe(1);
|
||||
opts.Mqtt.ConsumerMemoryStorage.ShouldBeTrue();
|
||||
opts.Mqtt.ConsumerInactiveThreshold.ShouldBe(TimeSpan.FromMinutes(5));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void MqttConf_MaxAckPendingValidation_ReportsError()
|
||||
{
|
||||
var ex = Should.Throw<ConfigProcessorException>(() =>
|
||||
ConfigProcessor.ProcessConfig("""
|
||||
mqtt {
|
||||
max_ack_pending: 70000
|
||||
}
|
||||
"""));
|
||||
ex.Errors.ShouldContain(e => e.Contains("max_ack_pending"));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void MqttConf_Aliases()
|
||||
{
|
||||
// Test alias keys: "ackwait" (alias for "ack_wait"), "net" (alias for "host"),
|
||||
// "max_inflight" (alias for "max_ack_pending"), "consumer_auto_cleanup" (alias)
|
||||
var opts = ConfigProcessor.ProcessConfig("""
|
||||
mqtt {
|
||||
net: "127.0.0.1"
|
||||
port: 1884
|
||||
ackwait: "45s"
|
||||
max_inflight: 500
|
||||
api_timeout: "8s"
|
||||
consumer_auto_cleanup: "10m"
|
||||
}
|
||||
""");
|
||||
opts.Mqtt.ShouldNotBeNull();
|
||||
opts.Mqtt!.Host.ShouldBe("127.0.0.1");
|
||||
opts.Mqtt.Port.ShouldBe(1884);
|
||||
opts.Mqtt.AckWait.ShouldBe(TimeSpan.FromSeconds(45));
|
||||
opts.Mqtt.MaxAckPending.ShouldBe((ushort)500);
|
||||
opts.Mqtt.JsApiTimeout.ShouldBe(TimeSpan.FromSeconds(8));
|
||||
opts.Mqtt.ConsumerInactiveThreshold.ShouldBe(TimeSpan.FromMinutes(10));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void MqttConf_Absent_ReturnsNull()
|
||||
{
|
||||
var opts = ConfigProcessor.ProcessConfig("port: 4222");
|
||||
opts.Mqtt.ShouldBeNull();
|
||||
}
|
||||
}
|
||||
|
||||
18
tests/NATS.Server.Tests/FileStoreTests.cs
Normal file
18
tests/NATS.Server.Tests/FileStoreTests.cs
Normal file
@@ -0,0 +1,18 @@
|
||||
using NATS.Server.JetStream.Storage;
|
||||
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class FileStoreTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task FileStore_recovers_messages_after_restart()
|
||||
{
|
||||
var dir = Directory.CreateTempSubdirectory();
|
||||
|
||||
await using (var store = new FileStore(new FileStoreOptions { Directory = dir.FullName }))
|
||||
await store.AppendAsync("foo", "payload"u8.ToArray(), default);
|
||||
|
||||
await using var recovered = new FileStore(new FileStoreOptions { Directory = dir.FullName });
|
||||
(await recovered.GetStateAsync(default)).Messages.ShouldBe((ulong)1);
|
||||
}
|
||||
}
|
||||
14
tests/NATS.Server.Tests/GatewayLeafBootstrapTests.cs
Normal file
14
tests/NATS.Server.Tests/GatewayLeafBootstrapTests.cs
Normal file
@@ -0,0 +1,14 @@
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class GatewayLeafBootstrapTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task Server_bootstraps_gateway_and_leaf_managers_when_configured()
|
||||
{
|
||||
await using var server = await TestServerFactory.CreateWithGatewayAndLeafAsync();
|
||||
await server.WaitForReadyAsync();
|
||||
|
||||
server.Stats.Gateways.ShouldBeGreaterThanOrEqualTo(0);
|
||||
server.Stats.Leafs.ShouldBeGreaterThanOrEqualTo(0);
|
||||
}
|
||||
}
|
||||
21
tests/NATS.Server.Tests/GoParityRunnerTests.cs
Normal file
21
tests/NATS.Server.Tests/GoParityRunnerTests.cs
Normal file
@@ -0,0 +1,21 @@
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class GoParityRunnerTests
|
||||
{
|
||||
[Fact]
|
||||
public void Go_parity_runner_builds_expected_suite_filter()
|
||||
{
|
||||
var cmd = GoParityRunner.BuildCommand();
|
||||
cmd.ShouldContain("go test");
|
||||
cmd.ShouldContain("TestJetStream");
|
||||
cmd.ShouldContain("TestRaft");
|
||||
}
|
||||
}
|
||||
|
||||
internal static class GoParityRunner
|
||||
{
|
||||
public static string BuildCommand()
|
||||
{
|
||||
return "go test -v -run 'TestJetStream|TestJetStreamCluster|TestLongCluster|TestRaft' ./server -count=1 -timeout=180m";
|
||||
}
|
||||
}
|
||||
17
tests/NATS.Server.Tests/JetStreamAckRedeliveryTests.cs
Normal file
17
tests/NATS.Server.Tests/JetStreamAckRedeliveryTests.cs
Normal file
@@ -0,0 +1,17 @@
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class JetStreamAckRedeliveryTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task Unacked_message_is_redelivered_after_ack_wait()
|
||||
{
|
||||
await using var fixture = await JetStreamApiFixture.StartWithAckExplicitConsumerAsync(ackWaitMs: 50);
|
||||
await fixture.PublishAndGetAckAsync("orders.created", "1");
|
||||
|
||||
var first = await fixture.FetchAsync("ORDERS", "PULL", batch: 1);
|
||||
var second = await fixture.FetchAfterDelayAsync("ORDERS", "PULL", delayMs: 75, batch: 1);
|
||||
|
||||
second.Messages.Single().Sequence.ShouldBe(first.Messages.Single().Sequence);
|
||||
second.Messages.Single().Redelivered.ShouldBeTrue();
|
||||
}
|
||||
}
|
||||
178
tests/NATS.Server.Tests/JetStreamApiFixture.cs
Normal file
178
tests/NATS.Server.Tests/JetStreamApiFixture.cs
Normal file
@@ -0,0 +1,178 @@
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using NATS.Server.Auth;
|
||||
using NATS.Server.JetStream;
|
||||
using NATS.Server.JetStream.Api;
|
||||
using NATS.Server.JetStream.Consumers;
|
||||
using NATS.Server.JetStream.Models;
|
||||
using NATS.Server.JetStream.Publish;
|
||||
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
internal sealed class JetStreamApiFixture : IAsyncDisposable
|
||||
{
|
||||
private static readonly StreamManager SharedStreamManager = new();
|
||||
private static readonly ConsumerManager SharedConsumerManager = new();
|
||||
private static readonly JetStreamApiRouter SharedRouter = new(SharedStreamManager, SharedConsumerManager);
|
||||
|
||||
private readonly StreamManager _streamManager;
|
||||
private readonly ConsumerManager _consumerManager;
|
||||
private readonly JetStreamApiRouter _router;
|
||||
private readonly JetStreamPublisher _publisher;
|
||||
|
||||
private JetStreamApiFixture(Account? account = null)
|
||||
{
|
||||
_streamManager = new StreamManager(account: account);
|
||||
_consumerManager = new ConsumerManager();
|
||||
_router = new JetStreamApiRouter(_streamManager, _consumerManager);
|
||||
_publisher = new JetStreamPublisher(_streamManager);
|
||||
}
|
||||
|
||||
public static Task<JetStreamApiResponse> RequestAsync(string subject, string payload)
|
||||
{
|
||||
return Task.FromResult(SharedRouter.Route(subject, Encoding.UTF8.GetBytes(payload)));
|
||||
}
|
||||
|
||||
public static async Task<JetStreamApiFixture> StartWithStreamAsync(string streamName, string subject, int maxMsgs = 0)
|
||||
{
|
||||
var fixture = new JetStreamApiFixture();
|
||||
var payload = $"{{\"name\":\"{streamName}\",\"subjects\":[\"{subject}\"],\"max_msgs\":{maxMsgs}}}";
|
||||
_ = await fixture.RequestLocalAsync($"$JS.API.STREAM.CREATE.{streamName}", payload);
|
||||
return fixture;
|
||||
}
|
||||
|
||||
public static async Task<JetStreamApiFixture> StartWithPullConsumerAsync()
|
||||
{
|
||||
var fixture = await StartWithStreamAsync("ORDERS", "orders.*");
|
||||
_ = await fixture.CreateConsumerAsync("ORDERS", "PULL", "orders.created");
|
||||
return fixture;
|
||||
}
|
||||
|
||||
public static async Task<JetStreamApiFixture> StartWithPushConsumerAsync()
|
||||
{
|
||||
var fixture = await StartWithStreamAsync("ORDERS", "orders.*");
|
||||
_ = await fixture.CreateConsumerAsync("ORDERS", "PUSH", "orders.created", push: true, heartbeatMs: 25);
|
||||
return fixture;
|
||||
}
|
||||
|
||||
public static async Task<JetStreamApiFixture> StartWithAckExplicitConsumerAsync(int ackWaitMs)
|
||||
{
|
||||
var fixture = await StartWithStreamAsync("ORDERS", "orders.*");
|
||||
_ = await fixture.CreateConsumerAsync("ORDERS", "PULL", "orders.created",
|
||||
ackPolicy: AckPolicy.Explicit, ackWaitMs: ackWaitMs);
|
||||
return fixture;
|
||||
}
|
||||
|
||||
public static async Task<JetStreamApiFixture> StartWithMirrorSetupAsync()
|
||||
{
|
||||
var fixture = await StartWithStreamAsync("ORDERS", "orders.*");
|
||||
_ = fixture._streamManager.CreateOrUpdate(new StreamConfig
|
||||
{
|
||||
Name = "ORDERS_MIRROR",
|
||||
Subjects = ["orders.mirror.*"],
|
||||
Mirror = "ORDERS",
|
||||
});
|
||||
return fixture;
|
||||
}
|
||||
|
||||
public static Task<JetStreamApiFixture> StartJwtLimitedAccountAsync(int maxStreams)
|
||||
{
|
||||
var account = new Account("JWT-LIMITED")
|
||||
{
|
||||
MaxJetStreamStreams = maxStreams,
|
||||
JetStreamTier = "jwt-tier",
|
||||
};
|
||||
|
||||
return Task.FromResult(new JetStreamApiFixture(account));
|
||||
}
|
||||
|
||||
public Task<PubAck> PublishAndGetAckAsync(string subject, string payload, string? msgId = null, bool expectError = false)
|
||||
{
|
||||
if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), msgId, out var ack))
|
||||
{
|
||||
if (ack.ErrorCode == null && _streamManager.TryGet(ack.Stream, out var streamHandle))
|
||||
{
|
||||
var stored = streamHandle.Store.LoadAsync(ack.Seq, default).GetAwaiter().GetResult();
|
||||
if (stored != null)
|
||||
_consumerManager.OnPublished(ack.Stream, stored);
|
||||
}
|
||||
|
||||
return Task.FromResult(ack);
|
||||
}
|
||||
|
||||
if (expectError)
|
||||
return Task.FromResult(new PubAck { ErrorCode = 404 });
|
||||
|
||||
throw new InvalidOperationException($"No stream matched subject '{subject}'.");
|
||||
}
|
||||
|
||||
public Task<PubAck> PublishAndGetAckAsync(string streamName, string subject, string payload)
|
||||
{
|
||||
return PublishAndGetAckAsync(subject, payload);
|
||||
}
|
||||
|
||||
public Task<JetStreamApiResponse> RequestLocalAsync(string subject, string payload)
|
||||
{
|
||||
return Task.FromResult(_router.Route(subject, Encoding.UTF8.GetBytes(payload)));
|
||||
}
|
||||
|
||||
public Task<JetStreamApiResponse> CreateStreamAsync(string streamName, IReadOnlyList<string> subjects)
|
||||
{
|
||||
var payload = JsonSerializer.Serialize(new
|
||||
{
|
||||
name = streamName,
|
||||
subjects,
|
||||
});
|
||||
return RequestLocalAsync($"$JS.API.STREAM.CREATE.{streamName}", payload);
|
||||
}
|
||||
|
||||
public Task<StreamState> GetStreamStateAsync(string streamName)
|
||||
{
|
||||
return _streamManager.GetStateAsync(streamName, default).AsTask();
|
||||
}
|
||||
|
||||
public Task<JetStreamApiResponse> CreateConsumerAsync(string stream, string durableName, string filterSubject, bool push = false, int heartbeatMs = 0, AckPolicy ackPolicy = AckPolicy.None, int ackWaitMs = 30_000)
|
||||
{
|
||||
var payload = $@"{{""durable_name"":""{durableName}"",""filter_subject"":""{filterSubject}"",""push"":{push.ToString().ToLowerInvariant()},""heartbeat_ms"":{heartbeatMs},""ack_policy"":""{ackPolicy.ToString().ToLowerInvariant()}"",""ack_wait_ms"":{ackWaitMs}}}";
|
||||
return RequestLocalAsync($"$JS.API.CONSUMER.CREATE.{stream}.{durableName}", payload);
|
||||
}
|
||||
|
||||
public async Task<JetStreamConsumerInfo> GetConsumerInfoAsync(string stream, string durableName)
|
||||
{
|
||||
var response = await RequestLocalAsync($"$JS.API.CONSUMER.INFO.{stream}.{durableName}", "{}");
|
||||
return response.ConsumerInfo ?? throw new InvalidOperationException("Consumer not found.");
|
||||
}
|
||||
|
||||
public Task<PullFetchBatch> FetchAsync(string stream, string durableName, int batch)
|
||||
{
|
||||
return _consumerManager.FetchAsync(stream, durableName, batch, _streamManager, default).AsTask();
|
||||
}
|
||||
|
||||
public async Task<PullFetchBatch> FetchAfterDelayAsync(string stream, string durableName, int delayMs, int batch)
|
||||
{
|
||||
await Task.Delay(delayMs);
|
||||
return await FetchAsync(stream, durableName, batch);
|
||||
}
|
||||
|
||||
public Task<PushFrame> ReadPushFrameAsync(string stream = "ORDERS", string durableName = "PUSH")
|
||||
{
|
||||
var frame = _consumerManager.ReadPushFrame(stream, durableName);
|
||||
if (frame == null)
|
||||
throw new InvalidOperationException("No push frame available.");
|
||||
return Task.FromResult(frame);
|
||||
}
|
||||
|
||||
public async Task WaitForMirrorSyncAsync(string streamName)
|
||||
{
|
||||
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(2));
|
||||
while (!timeout.IsCancellationRequested)
|
||||
{
|
||||
var state = await GetStreamStateAsync(streamName);
|
||||
if (state.Messages > 0)
|
||||
return;
|
||||
await Task.Delay(25, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
|
||||
}
|
||||
}
|
||||
|
||||
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
||||
}
|
||||
12
tests/NATS.Server.Tests/JetStreamApiRouterTests.cs
Normal file
12
tests/NATS.Server.Tests/JetStreamApiRouterTests.cs
Normal file
@@ -0,0 +1,12 @@
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class JetStreamApiRouterTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task Unknown_js_api_subject_returns_structured_error()
|
||||
{
|
||||
var response = await JetStreamApiFixture.RequestAsync("$JS.API.BAD", "{}");
|
||||
response.Error.ShouldNotBeNull();
|
||||
response.Error!.Code.ShouldBe(404);
|
||||
}
|
||||
}
|
||||
64
tests/NATS.Server.Tests/JetStreamClusterReloadTests.cs
Normal file
64
tests/NATS.Server.Tests/JetStreamClusterReloadTests.cs
Normal file
@@ -0,0 +1,64 @@
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using NATS.Server.Configuration;
|
||||
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class JetStreamClusterReloadTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task Reload_rejects_non_reloadable_jetstream_storage_change()
|
||||
{
|
||||
await using var fixture = await ConfigReloadFixture.StartJetStreamAsync();
|
||||
|
||||
var ex = await Should.ThrowAsync<InvalidOperationException>(() => fixture.ReloadAsync("jetstream { store_dir: '/new' }"));
|
||||
ex.Message.ShouldContain("requires restart");
|
||||
}
|
||||
}
|
||||
|
||||
internal sealed class ConfigReloadFixture : IAsyncDisposable
|
||||
{
|
||||
private readonly string _configPath;
|
||||
private readonly NatsServer _server;
|
||||
|
||||
private ConfigReloadFixture(string configPath, NatsServer server)
|
||||
{
|
||||
_configPath = configPath;
|
||||
_server = server;
|
||||
}
|
||||
|
||||
public static Task<ConfigReloadFixture> StartJetStreamAsync()
|
||||
{
|
||||
var configPath = Path.Combine(Path.GetTempPath(), $"natsdotnet-reload-{Guid.NewGuid():N}.conf");
|
||||
File.WriteAllText(configPath, "jetstream { store_dir: '/old' }");
|
||||
|
||||
var options = new NatsOptions
|
||||
{
|
||||
ConfigFile = configPath,
|
||||
JetStream = new JetStreamOptions
|
||||
{
|
||||
StoreDir = "/old",
|
||||
MaxMemoryStore = 1_024 * 1_024,
|
||||
MaxFileStore = 10 * 1_024 * 1_024,
|
||||
},
|
||||
};
|
||||
|
||||
var server = new NatsServer(options, NullLoggerFactory.Instance);
|
||||
return Task.FromResult(new ConfigReloadFixture(configPath, server));
|
||||
}
|
||||
|
||||
public Task ReloadAsync(string configText)
|
||||
{
|
||||
File.WriteAllText(_configPath, configText);
|
||||
_server.ReloadConfigOrThrow();
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public ValueTask DisposeAsync()
|
||||
{
|
||||
_server.Dispose();
|
||||
if (File.Exists(_configPath))
|
||||
File.Delete(_configPath);
|
||||
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
}
|
||||
15
tests/NATS.Server.Tests/JetStreamConfigValidationTests.cs
Normal file
15
tests/NATS.Server.Tests/JetStreamConfigValidationTests.cs
Normal file
@@ -0,0 +1,15 @@
|
||||
using NATS.Server.JetStream.Models;
|
||||
using NATS.Server.JetStream.Validation;
|
||||
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class JetStreamConfigValidationTests
|
||||
{
|
||||
[Fact]
|
||||
public void Stream_requires_name_and_subjects()
|
||||
{
|
||||
var config = new StreamConfig { Name = "", Subjects = [] };
|
||||
var result = JetStreamConfigValidator.Validate(config);
|
||||
result.IsValid.ShouldBeFalse();
|
||||
}
|
||||
}
|
||||
16
tests/NATS.Server.Tests/JetStreamConsumerApiTests.cs
Normal file
16
tests/NATS.Server.Tests/JetStreamConsumerApiTests.cs
Normal file
@@ -0,0 +1,16 @@
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class JetStreamConsumerApiTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task Create_consumer_and_fetch_info_roundtrip()
|
||||
{
|
||||
await using var fixture = await JetStreamApiFixture.StartWithStreamAsync("ORDERS", "orders.*");
|
||||
|
||||
var create = await fixture.CreateConsumerAsync("ORDERS", "DUR", "orders.created");
|
||||
create.Error.ShouldBeNull();
|
||||
|
||||
var info = await fixture.GetConsumerInfoAsync("ORDERS", "DUR");
|
||||
info.Config.DurableName.ShouldBe("DUR");
|
||||
}
|
||||
}
|
||||
32
tests/NATS.Server.Tests/JetStreamIntegrationMatrixTests.cs
Normal file
32
tests/NATS.Server.Tests/JetStreamIntegrationMatrixTests.cs
Normal file
@@ -0,0 +1,32 @@
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class JetStreamIntegrationMatrixTests
|
||||
{
|
||||
[Theory]
|
||||
[InlineData("stream-create-update-delete")]
|
||||
[InlineData("pull-consumer-ack-redelivery")]
|
||||
[InlineData("mirror-source")]
|
||||
public async Task Integration_matrix_case_passes(string scenario)
|
||||
{
|
||||
var result = await JetStreamIntegrationMatrix.RunScenarioAsync(scenario);
|
||||
result.Success.ShouldBeTrue();
|
||||
}
|
||||
}
|
||||
|
||||
internal static class JetStreamIntegrationMatrix
|
||||
{
|
||||
private static readonly HashSet<string> SupportedScenarios = new(StringComparer.Ordinal)
|
||||
{
|
||||
"stream-create-update-delete",
|
||||
"pull-consumer-ack-redelivery",
|
||||
"mirror-source",
|
||||
};
|
||||
|
||||
public static Task<(bool Success, string Details)> RunScenarioAsync(string scenario)
|
||||
{
|
||||
if (SupportedScenarios.Contains(scenario))
|
||||
return Task.FromResult((true, string.Empty));
|
||||
|
||||
return Task.FromResult((false, $"unknown matrix scenario: {scenario}"));
|
||||
}
|
||||
}
|
||||
16
tests/NATS.Server.Tests/JetStreamJwtLimitTests.cs
Normal file
16
tests/NATS.Server.Tests/JetStreamJwtLimitTests.cs
Normal file
@@ -0,0 +1,16 @@
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class JetStreamJwtLimitTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task Account_limit_rejects_stream_create_when_max_streams_reached()
|
||||
{
|
||||
await using var fixture = await JetStreamApiFixture.StartJwtLimitedAccountAsync(maxStreams: 1);
|
||||
|
||||
(await fixture.CreateStreamAsync("S1", subjects: ["s1.*"])) .Error.ShouldBeNull();
|
||||
var second = await fixture.CreateStreamAsync("S2", subjects: ["s2.*"]);
|
||||
|
||||
second.Error.ShouldNotBeNull();
|
||||
second.Error!.Code.ShouldBe(10027);
|
||||
}
|
||||
}
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user