From 60bb56a90c22f8af1b316a47c57386cb97096ae5 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 15:25:01 -0500 Subject: [PATCH] docs: add MQTT server-side orchestration design Full protocol parity design for MQTT 3.1.1: listener, client creation, parser/dispatch, all packet handlers (CONNECT through DISCONNECT), QoS 0/1/2, will messages, retained messages, session persistence, and JetStream consumer management. Bottom-up implementation strategy. --- .../2026-03-01-mqtt-orchestration-design.md | 362 ++++++++++++++++++ reports/current.md | 2 +- 2 files changed, 363 insertions(+), 1 deletion(-) create mode 100644 docs/plans/2026-03-01-mqtt-orchestration-design.md diff --git a/docs/plans/2026-03-01-mqtt-orchestration-design.md b/docs/plans/2026-03-01-mqtt-orchestration-design.md new file mode 100644 index 0000000..5b67731 --- /dev/null +++ b/docs/plans/2026-03-01-mqtt-orchestration-design.md @@ -0,0 +1,362 @@ +# MQTT Server-Side Orchestration Design + +## Goal + +Implement the MQTT server-side orchestration layer — listener, client creation, protocol parsing, packet dispatch, and all 15+ extension methods — achieving full MQTT 3.1.1 protocol parity with Go's `mqtt.go` (~5,800 LOC). The data structures, binary codecs, session manager, JetStream integration, and subject converter are already ported and verified. + +## Architecture + +Bottom-up implementation with TDD: build from listener through parser to packet handlers to session/JetStream integration. Each layer is independently testable before the next depends on it. Mirror Go's `createMQTTClient()` pattern with a dedicated MQTT client creation path separate from the NATS client path. + +## Decisions + +| Decision | Choice | Rationale | +|----------|--------|-----------| +| Scope | Full protocol parity | All packet types, QoS 0/1/2, will, retained, sessions | +| Client creation | Dedicated `CreateMqttClient()` | Mirrors Go exactly; MQTT clients skip INFO, set headers=true | +| Code organization | Split by concern | MqttParser.cs, MqttPacketHandlers.cs, MqttClientExtensions.cs | +| Strategy | Bottom-up with TDD | Each layer tested before building the next | + +## Tech Stack + +- .NET 10, C# latest +- xUnit 3, Shouldly for testing +- Raw TCP + binary MQTT packets for integration tests (no third-party MQTT library) +- Existing: MqttReader, MqttWriter, MqttConstants, MqttTypes, MqttAccountSessionManager, MqttJsa + +--- + +## 1. Listener & Client Creation + +### StartMqtt() — Replace warning stub + +**File**: `Mqtt/MqttHandler.cs` (MqttServerExtensions) + +1. Read `opts.Mqtt.Port` and `opts.Mqtt.Host` +2. Create `TcpListener` on MQTT port (support ephemeral port 0) +3. Store as `_mqttListener` field on `NatsServer` +4. Handle TLS if `opts.Mqtt.TlsConfig` is set +5. Launch `AcceptConnections()` goroutine with `CreateMqttClient` callback +6. Log: `"Listening for MQTT client connections on {endpoint}"` + +### CreateMqttClient(TcpClient tc) — New method + +**File**: New `NatsServer.Mqtt.cs` partial class, or extend `NatsServer.Listeners.cs` + +1. Create `ClientConnection(ClientKind.Client, this, tc.GetStream())` +2. Initialize `c.Mqtt = new MqttHandler()` with QoS flags from `opts.Mqtt`: + - `RejectQoS2Pub` from `opts.Mqtt.RejectQoS2Pub` + - `DowngradeQoS2Sub` from `opts.Mqtt.DowngradeQoS2Sub` +3. Set `c.Headers = true` (MQTT uses NATS headers for QoS metadata) +4. Register with `GlobalAccount()` +5. **Do NOT send INFO** — MQTT clients don't use the NATS INFO line +6. Start `ReadLoop()` and `WriteLoop()` via `StartGoRoutine()` +7. Add to server's `_clients` map +8. Check max connections limit + +### NatsServer fields + +**File**: `NatsServer.cs` + +Add alongside existing listener fields: +```csharp +private TcpListener? _mqttListener; +``` + +### Shutdown integration + +**File**: `NatsServer.Lifecycle.cs` + +Add `_mqttListener` to `doneExpected` counting in `Shutdown()`: +```csharp +if (_mqttListener != null) +{ + doneExpected++; + _mqttListener.Stop(); + _mqttListener = null; +} +``` + +Same for `LameDuckMode()`. + +--- + +## 2. Parser & Dispatch + +### MqttParse(byte[] buf) — New method on ClientConnection + +**File**: New `Mqtt/MqttParser.cs` + +State machine that processes raw bytes from the read loop: + +1. **Fixed header**: Read packet type byte (upper 4 bits = type, lower 4 = flags) +2. **Remaining length**: Variable-length encoding (1-4 bytes, 7 bits per byte, MSB continuation) +3. **Payload**: Type-specific parsing delegated to per-packet parsers + +### Dispatch switch + +After extracting packet type and remaining length: + +``` +CONNECT (0x10) → MqttParseConnect() → MqttProcessConnect() +PUBLISH (0x30) → MqttParsePub() → MqttProcessPub() +PUBACK (0x40) → MqttParsePubAck() +PUBREC (0x50) → MqttParsePubRec() +PUBREL (0x62) → MqttProcessPubRel() +PUBCOMP (0x70) → MqttParsePubComp() +SUBSCRIBE (0x82) → MqttParseSubs() → MqttProcessSubs() +UNSUBSCRIBE(0xA2) → MqttParseUnsubs() → MqttProcessUnsubs() +PINGREQ (0xC0) → MqttEnqueuePingResp() +DISCONNECT (0xE0) → handle cleanup + close +``` + +### Key constraints + +- CONNECT must be the first packet; any other packet before CONNECT → close connection +- Partial packets: save state in `MqttHandler.ParseState/RemLen/Buf`, resume on next buffer +- Invalid packet types → close connection + +### ReadLoop integration + +**File**: `ClientConnection.cs` (ReadLoop method) + +Add MQTT branch: +```csharp +if (IsMqtt()) + err = MqttParse(buf); +else + err = Parse(buf, handler); +``` + +### IsMqtt() change + +**File**: `ClientConnection.cs` + +Change from `return false` to: +```csharp +internal bool IsMqtt() => Mqtt != null; +``` + +--- + +## 3. Packet Handlers + +### CONNECT (`MqttParseConnect` + `MqttProcessConnect`) + +**File**: `Mqtt/MqttPacketHandlers.cs` + +**Parse phase** — Extract fields in MQTT 3.1.1 spec order: +- Protocol name ("MQTT"), protocol level (0x04) +- Connect flags (clean session, will, will QoS, will retain, password, username) +- Keep-alive interval (2 bytes, multiply by 1.5 for read deadline) +- Client ID (auto-generate NUID if empty + clean session) +- Will topic/message (convert topic to NATS subject via `MqttTopicToNatsPubSubject()`) +- Username/password + +**Process phase** (`MqttProcessConnect` on NatsServer): +1. Get or create `MqttAccountSessionManager` for client's account +2. Lock session by client ID hash (prevents concurrent session takeover) +3. If existing session → take over (disconnect old client) +4. If clean session → delete old session state +5. If !clean session → restore subscriptions and pending messages +6. Set `c.Mqtt.Session`, `c.Mqtt.AccountSessionManager` +7. Run authentication if `opts.Mqtt.Username/Password` configured +8. Send CONNACK (return code 0 = accepted) +9. Deliver retained messages for existing subscriptions +10. Set keep-alive read deadline on connection + +### PUBLISH (`MqttParsePub` + `MqttProcessPub`) + +**Parse**: Topic name → convert to NATS subject, packet ID (if QoS>0), payload bytes. + +**Process by QoS**: +- **QoS 0**: `MqttInitiateMsgDelivery()` → done +- **QoS 1**: Deliver → send PUBACK(pi) +- **QoS 2**: `MqttStoreQoS2MsgOnce()` → send PUBREC(pi) → [client sends PUBREL] → deliver → send PUBCOMP(pi) + +**MqttInitiateMsgDelivery**: Construct NATS message with `Nmqtt-Pub:` header, set `c.Pa` (publish args), call `c.ProcessInboundClientMsg()`. + +**Max payload check**: Validate total NATS message size against client's max payload limit. + +### SUBSCRIBE (`MqttParseSubs` + `MqttProcessSubs`) + +**Parse**: Packet ID, list of (topic filter + QoS) pairs. Convert each filter to NATS subject via `MqttFilterToNatsSubject()`. + +**Process per filter**: +- Create NATS subscription on converted subject +- If QoS > 0: Create JetStream durable consumer on `$MQTT.msgs.` +- Track in `MqttSession.Subs` and `MqttSession.Cons` +- Send SUBACK with granted QoS per filter (downgrade QoS 2 → 1 if configured) + +### UNSUBSCRIBE (`MqttParseUnsubs` + `MqttProcessUnsubs`) + +Mirror of SUBSCRIBE: +- Remove NATS subscriptions +- Delete JetStream consumers +- Update session state +- Send UNSUBACK + +### PING + +Queue PINGRESP (2 bytes: `0xD0 0x00`). + +### DISCONNECT + +1. Clear will message (graceful disconnect suppresses will) +2. Call `MqttHandleClosedClient()` for session cleanup +3. Close connection with `ClosedState.ClientClosed` + +### Will Message (`MqttHandleWill`) + +On abnormal disconnect (connection drop, not DISCONNECT): +- Publish will topic/message to NATS with configured QoS and retain flag +- Only if will was set in CONNECT and not cleared by DISCONNECT + +### PUBREL Processing (`MqttProcessPubRel`) + +For QoS 2 inbound flow: +1. Look up stored QoS 2 message by packet ID +2. Deliver via `MqttInitiateMsgDelivery()` +3. Send PUBCOMP(pi) +4. Remove from pending + +--- + +## 4. Session Management & JetStream Integration + +### Session Lifecycle + +**Creation** (during CONNECT): +1. `GetOrCreateMqttAccountSessionManager()` — Lazy per-account ASM initialization +2. `MqttCreateAccountSessionManager()` — Creates ASM with JetStream streams: + - `$MQTT.msgs.` — QoS 1/2 message persistence + - `$MQTT.sess.` — Session state persistence + - `$MQTT.rmsgs.` — Retained messages (MaxMsgsPerSubject=1) +3. Lock session by client ID hash +4. Create or restore `MqttSession` + +**Persistence** (via `MqttJsa`): +- Serialize session state to `$MQTT.sess` stream keyed by client ID hash +- Clean session: delete all session data +- Reconnect (!clean): reload from JetStream, rebind subscriptions + +**Takeover** (same client ID reconnects): +- Lock old session → disconnect old client → transfer to new client → resume pending + +### JetStream Consumer Management + +QoS 1/2 subscriptions create durable consumers: +- Durable name: `_` +- Filter: `$MQTT.msgs.` +- Delivery subject: `$MQTT.sub.` +- Ack policy: Explicit +- Max ack pending: From account config (default 1024) + +### Retained Messages + +- PUBLISH with retain flag → store in `$MQTT.rmsgs.` (MaxMsgsPerSubject=1) +- New SUBSCRIBE → `LoadLastMsgForAsync()` to find matching retained messages +- PUBLISH with retain + empty payload → delete retained message + +### Extension Methods to Implement + +| Method | Purpose | +|--------|---------| +| `StartMqtt()` | Listener startup | +| `MqttConfigAuth()` | Wire MQTT auth overrides | +| `MqttHandleClosedClient()` | Session cleanup on disconnect | +| `MqttUpdateMaxAckPending()` | Propagate config to consumers | +| `MqttGetJsaForAccount()` | Get/create JSA for account | +| `MqttStoreQosMsgForAccountOnNewSubject()` | Store QoS msg on new subject | +| `GetOrCreateMqttAccountSessionManager()` | Lazy ASM creation | +| `MqttCreateAccountSessionManager()` | ASM + stream creation | +| `MqttDetermineReplicas()` | Cluster-aware replica count | +| `MqttProcessConnect()` | CONNECT handling | +| `MqttHandleWill()` | Will message delivery | +| `MqttProcessPub()` | PUBLISH handling | +| `MqttInitiateMsgDelivery()` | Inject into NATS substrate | +| `MqttStoreQoS2MsgOnce()` | QoS 2 idempotent storage | +| `MqttProcessPubRel()` | PUBREL handling | +| `MqttCheckPubRetainedPerms()` | Retained message permission audit | + +--- + +## 5. New Files + +| File | Purpose | Est. LOC | +|------|---------|----------| +| `Mqtt/MqttParser.cs` | Parse + dispatch state machine | 400-500 | +| `Mqtt/MqttPacketHandlers.cs` | CONNECT/PUB/SUB/UNSUB/PING/DISCONNECT processing | 800-1000 | +| `Mqtt/MqttClientExtensions.cs` | ClientConnection MQTT methods (IsMqtt, enqueue helpers) | 200-300 | +| `NatsServer.Mqtt.cs` | Server-side MQTT partial (CreateMqttClient, listener) | 300-400 | + +**Modified files:** +- `Mqtt/MqttHandler.cs` — Replace stubs with real implementations +- `NatsServer.cs` — Add `_mqttListener` field +- `NatsServer.Lifecycle.cs` — Add MQTT listener to shutdown +- `ClientConnection.cs` — Change `IsMqtt()`, add `Mqtt` property, ReadLoop branch + +**Estimated total**: 2,000-3,000 LOC new code + +--- + +## 6. Testing Strategy + +### Unit Tests + +**Parser tests** (`Tests/Server/Mqtt/MqttParserTests.cs`): +- Valid CONNECT packet → all fields extracted +- CONNECT with will message → will topic/message parsed +- Partial CONNECT (fragmented) → state saved, resume on next buffer +- Invalid first packet (not CONNECT) → error +- PUBLISH QoS 0/1/2 → topic conversion, payload, PI +- SUBSCRIBE with multiple filters → all converted +- UNSUBSCRIBE → filters parsed +- Remaining length edge cases (1-byte, 2-byte, 4-byte encodings) + +**Dispatch tests** (`Tests/Server/Mqtt/MqttDispatchTests.cs`): +- Each packet type dispatches to correct handler +- CONNECT-first enforcement +- Invalid packet type → error + +**Handler tests** (mocked server/session): +- CONNECT: clean session, session resumption, client ID generation +- PUBLISH QoS 0/1/2 flows +- SUBSCRIBE: subscription + consumer creation +- UNSUBSCRIBE: cleanup +- DISCONNECT: will cleared, session cleanup +- Will delivery on abnormal close + +### Integration Tests + +70 existing deferred tests in `Mqtt/MqttTests.cs` become runnable progressively. + +New boot tests: +- `MqttBoot_AcceptsConnection_ShouldSucceed` +- `MqttBoot_StartAndShutdown_ShouldSucceed` +- End-to-end: client A publishes → client B receives + +### Test approach + +- Unit tests: mocked dependencies, no server boot +- Integration tests: `NatsServer.Start()` with `opts.Mqtt.Port = 0` (ephemeral) +- Raw TCP + binary MQTT packets for protocol tests (no third-party MQTT client) + +--- + +## 7. Implementation Order (Bottom-Up) + +1. **Listener + client creation** — `StartMqtt()`, `CreateMqttClient()`, shutdown wiring +2. **Parser + dispatch** — `MqttParse()`, ReadLoop integration, `IsMqtt()` +3. **CONNECT** — Parse + process, CONNACK response, auth +4. **PING** — Simplest packet handler (proves dispatch works) +5. **DISCONNECT** — Graceful close, will suppression +6. **PUBLISH QoS 0** — Basic message flow through NATS substrate +7. **SUBSCRIBE + UNSUBSCRIBE** — NATS subscription management +8. **PUBLISH QoS 1** — PUBACK flow, JetStream consumer creation +9. **PUBLISH QoS 2** — PUBREC/PUBREL/PUBCOMP handshake +10. **Will messages** — Abnormal disconnect handling +11. **Retained messages** — Store/retrieve/delete retained +12. **Session persistence** — Clean/dirty session lifecycle +13. **Session takeover** — Same client ID reconnect +14. **Config integration** — `MqttConfigAuth`, `MqttUpdateMaxAckPending`, `MqttDetermineReplicas`, `MqttCheckPubRetainedPerms` diff --git a/reports/current.md b/reports/current.md index bc49679..9e15cc2 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-03-01 20:13:49 UTC +Generated: 2026-03-01 20:25:04 UTC ## Modules (12 total)