# MQTT Server-Side Orchestration Design ## Goal Implement the MQTT server-side orchestration layer — listener, client creation, protocol parsing, packet dispatch, and all 15+ extension methods — achieving full MQTT 3.1.1 protocol parity with Go's `mqtt.go` (~5,800 LOC). The data structures, binary codecs, session manager, JetStream integration, and subject converter are already ported and verified. ## Architecture Bottom-up implementation with TDD: build from listener through parser to packet handlers to session/JetStream integration. Each layer is independently testable before the next depends on it. Mirror Go's `createMQTTClient()` pattern with a dedicated MQTT client creation path separate from the NATS client path. ## Decisions | Decision | Choice | Rationale | |----------|--------|-----------| | Scope | Full protocol parity | All packet types, QoS 0/1/2, will, retained, sessions | | Client creation | Dedicated `CreateMqttClient()` | Mirrors Go exactly; MQTT clients skip INFO, set headers=true | | Code organization | Split by concern | MqttParser.cs, MqttPacketHandlers.cs, MqttClientExtensions.cs | | Strategy | Bottom-up with TDD | Each layer tested before building the next | ## Tech Stack - .NET 10, C# latest - xUnit 3, Shouldly for testing - Raw TCP + binary MQTT packets for integration tests (no third-party MQTT library) - Existing: MqttReader, MqttWriter, MqttConstants, MqttTypes, MqttAccountSessionManager, MqttJsa --- ## 1. Listener & Client Creation ### StartMqtt() — Replace warning stub **File**: `Mqtt/MqttHandler.cs` (MqttServerExtensions) 1. Read `opts.Mqtt.Port` and `opts.Mqtt.Host` 2. Create `TcpListener` on MQTT port (support ephemeral port 0) 3. Store as `_mqttListener` field on `NatsServer` 4. Handle TLS if `opts.Mqtt.TlsConfig` is set 5. Launch `AcceptConnections()` goroutine with `CreateMqttClient` callback 6. Log: `"Listening for MQTT client connections on {endpoint}"` ### CreateMqttClient(TcpClient tc) — New method **File**: New `NatsServer.Mqtt.cs` partial class, or extend `NatsServer.Listeners.cs` 1. Create `ClientConnection(ClientKind.Client, this, tc.GetStream())` 2. Initialize `c.Mqtt = new MqttHandler()` with QoS flags from `opts.Mqtt`: - `RejectQoS2Pub` from `opts.Mqtt.RejectQoS2Pub` - `DowngradeQoS2Sub` from `opts.Mqtt.DowngradeQoS2Sub` 3. Set `c.Headers = true` (MQTT uses NATS headers for QoS metadata) 4. Register with `GlobalAccount()` 5. **Do NOT send INFO** — MQTT clients don't use the NATS INFO line 6. Start `ReadLoop()` and `WriteLoop()` via `StartGoRoutine()` 7. Add to server's `_clients` map 8. Check max connections limit ### NatsServer fields **File**: `NatsServer.cs` Add alongside existing listener fields: ```csharp private TcpListener? _mqttListener; ``` ### Shutdown integration **File**: `NatsServer.Lifecycle.cs` Add `_mqttListener` to `doneExpected` counting in `Shutdown()`: ```csharp if (_mqttListener != null) { doneExpected++; _mqttListener.Stop(); _mqttListener = null; } ``` Same for `LameDuckMode()`. --- ## 2. Parser & Dispatch ### MqttParse(byte[] buf) — New method on ClientConnection **File**: New `Mqtt/MqttParser.cs` State machine that processes raw bytes from the read loop: 1. **Fixed header**: Read packet type byte (upper 4 bits = type, lower 4 = flags) 2. **Remaining length**: Variable-length encoding (1-4 bytes, 7 bits per byte, MSB continuation) 3. **Payload**: Type-specific parsing delegated to per-packet parsers ### Dispatch switch After extracting packet type and remaining length: ``` CONNECT (0x10) → MqttParseConnect() → MqttProcessConnect() PUBLISH (0x30) → MqttParsePub() → MqttProcessPub() PUBACK (0x40) → MqttParsePubAck() PUBREC (0x50) → MqttParsePubRec() PUBREL (0x62) → MqttProcessPubRel() PUBCOMP (0x70) → MqttParsePubComp() SUBSCRIBE (0x82) → MqttParseSubs() → MqttProcessSubs() UNSUBSCRIBE(0xA2) → MqttParseUnsubs() → MqttProcessUnsubs() PINGREQ (0xC0) → MqttEnqueuePingResp() DISCONNECT (0xE0) → handle cleanup + close ``` ### Key constraints - CONNECT must be the first packet; any other packet before CONNECT → close connection - Partial packets: save state in `MqttHandler.ParseState/RemLen/Buf`, resume on next buffer - Invalid packet types → close connection ### ReadLoop integration **File**: `ClientConnection.cs` (ReadLoop method) Add MQTT branch: ```csharp if (IsMqtt()) err = MqttParse(buf); else err = Parse(buf, handler); ``` ### IsMqtt() change **File**: `ClientConnection.cs` Change from `return false` to: ```csharp internal bool IsMqtt() => Mqtt != null; ``` --- ## 3. Packet Handlers ### CONNECT (`MqttParseConnect` + `MqttProcessConnect`) **File**: `Mqtt/MqttPacketHandlers.cs` **Parse phase** — Extract fields in MQTT 3.1.1 spec order: - Protocol name ("MQTT"), protocol level (0x04) - Connect flags (clean session, will, will QoS, will retain, password, username) - Keep-alive interval (2 bytes, multiply by 1.5 for read deadline) - Client ID (auto-generate NUID if empty + clean session) - Will topic/message (convert topic to NATS subject via `MqttTopicToNatsPubSubject()`) - Username/password **Process phase** (`MqttProcessConnect` on NatsServer): 1. Get or create `MqttAccountSessionManager` for client's account 2. Lock session by client ID hash (prevents concurrent session takeover) 3. If existing session → take over (disconnect old client) 4. If clean session → delete old session state 5. If !clean session → restore subscriptions and pending messages 6. Set `c.Mqtt.Session`, `c.Mqtt.AccountSessionManager` 7. Run authentication if `opts.Mqtt.Username/Password` configured 8. Send CONNACK (return code 0 = accepted) 9. Deliver retained messages for existing subscriptions 10. Set keep-alive read deadline on connection ### PUBLISH (`MqttParsePub` + `MqttProcessPub`) **Parse**: Topic name → convert to NATS subject, packet ID (if QoS>0), payload bytes. **Process by QoS**: - **QoS 0**: `MqttInitiateMsgDelivery()` → done - **QoS 1**: Deliver → send PUBACK(pi) - **QoS 2**: `MqttStoreQoS2MsgOnce()` → send PUBREC(pi) → [client sends PUBREL] → deliver → send PUBCOMP(pi) **MqttInitiateMsgDelivery**: Construct NATS message with `Nmqtt-Pub:` header, set `c.Pa` (publish args), call `c.ProcessInboundClientMsg()`. **Max payload check**: Validate total NATS message size against client's max payload limit. ### SUBSCRIBE (`MqttParseSubs` + `MqttProcessSubs`) **Parse**: Packet ID, list of (topic filter + QoS) pairs. Convert each filter to NATS subject via `MqttFilterToNatsSubject()`. **Process per filter**: - Create NATS subscription on converted subject - If QoS > 0: Create JetStream durable consumer on `$MQTT.msgs.` - Track in `MqttSession.Subs` and `MqttSession.Cons` - Send SUBACK with granted QoS per filter (downgrade QoS 2 → 1 if configured) ### UNSUBSCRIBE (`MqttParseUnsubs` + `MqttProcessUnsubs`) Mirror of SUBSCRIBE: - Remove NATS subscriptions - Delete JetStream consumers - Update session state - Send UNSUBACK ### PING Queue PINGRESP (2 bytes: `0xD0 0x00`). ### DISCONNECT 1. Clear will message (graceful disconnect suppresses will) 2. Call `MqttHandleClosedClient()` for session cleanup 3. Close connection with `ClosedState.ClientClosed` ### Will Message (`MqttHandleWill`) On abnormal disconnect (connection drop, not DISCONNECT): - Publish will topic/message to NATS with configured QoS and retain flag - Only if will was set in CONNECT and not cleared by DISCONNECT ### PUBREL Processing (`MqttProcessPubRel`) For QoS 2 inbound flow: 1. Look up stored QoS 2 message by packet ID 2. Deliver via `MqttInitiateMsgDelivery()` 3. Send PUBCOMP(pi) 4. Remove from pending --- ## 4. Session Management & JetStream Integration ### Session Lifecycle **Creation** (during CONNECT): 1. `GetOrCreateMqttAccountSessionManager()` — Lazy per-account ASM initialization 2. `MqttCreateAccountSessionManager()` — Creates ASM with JetStream streams: - `$MQTT.msgs.` — QoS 1/2 message persistence - `$MQTT.sess.` — Session state persistence - `$MQTT.rmsgs.` — Retained messages (MaxMsgsPerSubject=1) 3. Lock session by client ID hash 4. Create or restore `MqttSession` **Persistence** (via `MqttJsa`): - Serialize session state to `$MQTT.sess` stream keyed by client ID hash - Clean session: delete all session data - Reconnect (!clean): reload from JetStream, rebind subscriptions **Takeover** (same client ID reconnects): - Lock old session → disconnect old client → transfer to new client → resume pending ### JetStream Consumer Management QoS 1/2 subscriptions create durable consumers: - Durable name: `_` - Filter: `$MQTT.msgs.` - Delivery subject: `$MQTT.sub.` - Ack policy: Explicit - Max ack pending: From account config (default 1024) ### Retained Messages - PUBLISH with retain flag → store in `$MQTT.rmsgs.` (MaxMsgsPerSubject=1) - New SUBSCRIBE → `LoadLastMsgForAsync()` to find matching retained messages - PUBLISH with retain + empty payload → delete retained message ### Extension Methods to Implement | Method | Purpose | |--------|---------| | `StartMqtt()` | Listener startup | | `MqttConfigAuth()` | Wire MQTT auth overrides | | `MqttHandleClosedClient()` | Session cleanup on disconnect | | `MqttUpdateMaxAckPending()` | Propagate config to consumers | | `MqttGetJsaForAccount()` | Get/create JSA for account | | `MqttStoreQosMsgForAccountOnNewSubject()` | Store QoS msg on new subject | | `GetOrCreateMqttAccountSessionManager()` | Lazy ASM creation | | `MqttCreateAccountSessionManager()` | ASM + stream creation | | `MqttDetermineReplicas()` | Cluster-aware replica count | | `MqttProcessConnect()` | CONNECT handling | | `MqttHandleWill()` | Will message delivery | | `MqttProcessPub()` | PUBLISH handling | | `MqttInitiateMsgDelivery()` | Inject into NATS substrate | | `MqttStoreQoS2MsgOnce()` | QoS 2 idempotent storage | | `MqttProcessPubRel()` | PUBREL handling | | `MqttCheckPubRetainedPerms()` | Retained message permission audit | --- ## 5. New Files | File | Purpose | Est. LOC | |------|---------|----------| | `Mqtt/MqttParser.cs` | Parse + dispatch state machine | 400-500 | | `Mqtt/MqttPacketHandlers.cs` | CONNECT/PUB/SUB/UNSUB/PING/DISCONNECT processing | 800-1000 | | `Mqtt/MqttClientExtensions.cs` | ClientConnection MQTT methods (IsMqtt, enqueue helpers) | 200-300 | | `NatsServer.Mqtt.cs` | Server-side MQTT partial (CreateMqttClient, listener) | 300-400 | **Modified files:** - `Mqtt/MqttHandler.cs` — Replace stubs with real implementations - `NatsServer.cs` — Add `_mqttListener` field - `NatsServer.Lifecycle.cs` — Add MQTT listener to shutdown - `ClientConnection.cs` — Change `IsMqtt()`, add `Mqtt` property, ReadLoop branch **Estimated total**: 2,000-3,000 LOC new code --- ## 6. Testing Strategy ### Unit Tests **Parser tests** (`Tests/Server/Mqtt/MqttParserTests.cs`): - Valid CONNECT packet → all fields extracted - CONNECT with will message → will topic/message parsed - Partial CONNECT (fragmented) → state saved, resume on next buffer - Invalid first packet (not CONNECT) → error - PUBLISH QoS 0/1/2 → topic conversion, payload, PI - SUBSCRIBE with multiple filters → all converted - UNSUBSCRIBE → filters parsed - Remaining length edge cases (1-byte, 2-byte, 4-byte encodings) **Dispatch tests** (`Tests/Server/Mqtt/MqttDispatchTests.cs`): - Each packet type dispatches to correct handler - CONNECT-first enforcement - Invalid packet type → error **Handler tests** (mocked server/session): - CONNECT: clean session, session resumption, client ID generation - PUBLISH QoS 0/1/2 flows - SUBSCRIBE: subscription + consumer creation - UNSUBSCRIBE: cleanup - DISCONNECT: will cleared, session cleanup - Will delivery on abnormal close ### Integration Tests 70 existing deferred tests in `Mqtt/MqttTests.cs` become runnable progressively. New boot tests: - `MqttBoot_AcceptsConnection_ShouldSucceed` - `MqttBoot_StartAndShutdown_ShouldSucceed` - End-to-end: client A publishes → client B receives ### Test approach - Unit tests: mocked dependencies, no server boot - Integration tests: `NatsServer.Start()` with `opts.Mqtt.Port = 0` (ephemeral) - Raw TCP + binary MQTT packets for protocol tests (no third-party MQTT client) --- ## 7. Implementation Order (Bottom-Up) 1. **Listener + client creation** — `StartMqtt()`, `CreateMqttClient()`, shutdown wiring 2. **Parser + dispatch** — `MqttParse()`, ReadLoop integration, `IsMqtt()` 3. **CONNECT** — Parse + process, CONNACK response, auth 4. **PING** — Simplest packet handler (proves dispatch works) 5. **DISCONNECT** — Graceful close, will suppression 6. **PUBLISH QoS 0** — Basic message flow through NATS substrate 7. **SUBSCRIBE + UNSUBSCRIBE** — NATS subscription management 8. **PUBLISH QoS 1** — PUBACK flow, JetStream consumer creation 9. **PUBLISH QoS 2** — PUBREC/PUBREL/PUBCOMP handshake 10. **Will messages** — Abnormal disconnect handling 11. **Retained messages** — Store/retrieve/delete retained 12. **Session persistence** — Clean/dirty session lifecycle 13. **Session takeover** — Same client ID reconnect 14. **Config integration** — `MqttConfigAuth`, `MqttUpdateMaxAckPending`, `MqttDetermineReplicas`, `MqttCheckPubRetainedPerms`