Files
natsnet/docs/plans/2026-03-01-mqtt-orchestration-design.md
Joseph Doherty 60bb56a90c docs: add MQTT server-side orchestration design
Full protocol parity design for MQTT 3.1.1: listener, client creation,
parser/dispatch, all packet handlers (CONNECT through DISCONNECT),
QoS 0/1/2, will messages, retained messages, session persistence,
and JetStream consumer management. Bottom-up implementation strategy.
2026-03-01 15:25:01 -05:00

13 KiB

MQTT Server-Side Orchestration Design

Goal

Implement the MQTT server-side orchestration layer — listener, client creation, protocol parsing, packet dispatch, and all 15+ extension methods — achieving full MQTT 3.1.1 protocol parity with Go's mqtt.go (~5,800 LOC). The data structures, binary codecs, session manager, JetStream integration, and subject converter are already ported and verified.

Architecture

Bottom-up implementation with TDD: build from listener through parser to packet handlers to session/JetStream integration. Each layer is independently testable before the next depends on it. Mirror Go's createMQTTClient() pattern with a dedicated MQTT client creation path separate from the NATS client path.

Decisions

Decision Choice Rationale
Scope Full protocol parity All packet types, QoS 0/1/2, will, retained, sessions
Client creation Dedicated CreateMqttClient() Mirrors Go exactly; MQTT clients skip INFO, set headers=true
Code organization Split by concern MqttParser.cs, MqttPacketHandlers.cs, MqttClientExtensions.cs
Strategy Bottom-up with TDD Each layer tested before building the next

Tech Stack

  • .NET 10, C# latest
  • xUnit 3, Shouldly for testing
  • Raw TCP + binary MQTT packets for integration tests (no third-party MQTT library)
  • Existing: MqttReader, MqttWriter, MqttConstants, MqttTypes, MqttAccountSessionManager, MqttJsa

1. Listener & Client Creation

StartMqtt() — Replace warning stub

File: Mqtt/MqttHandler.cs (MqttServerExtensions)

  1. Read opts.Mqtt.Port and opts.Mqtt.Host
  2. Create TcpListener on MQTT port (support ephemeral port 0)
  3. Store as _mqttListener field on NatsServer
  4. Handle TLS if opts.Mqtt.TlsConfig is set
  5. Launch AcceptConnections() goroutine with CreateMqttClient callback
  6. Log: "Listening for MQTT client connections on {endpoint}"

CreateMqttClient(TcpClient tc) — New method

File: New NatsServer.Mqtt.cs partial class, or extend NatsServer.Listeners.cs

  1. Create ClientConnection(ClientKind.Client, this, tc.GetStream())
  2. Initialize c.Mqtt = new MqttHandler() with QoS flags from opts.Mqtt:
    • RejectQoS2Pub from opts.Mqtt.RejectQoS2Pub
    • DowngradeQoS2Sub from opts.Mqtt.DowngradeQoS2Sub
  3. Set c.Headers = true (MQTT uses NATS headers for QoS metadata)
  4. Register with GlobalAccount()
  5. Do NOT send INFO — MQTT clients don't use the NATS INFO line
  6. Start ReadLoop() and WriteLoop() via StartGoRoutine()
  7. Add to server's _clients map
  8. Check max connections limit

NatsServer fields

File: NatsServer.cs

Add alongside existing listener fields:

private TcpListener? _mqttListener;

Shutdown integration

File: NatsServer.Lifecycle.cs

Add _mqttListener to doneExpected counting in Shutdown():

if (_mqttListener != null)
{
    doneExpected++;
    _mqttListener.Stop();
    _mqttListener = null;
}

Same for LameDuckMode().


2. Parser & Dispatch

MqttParse(byte[] buf) — New method on ClientConnection

File: New Mqtt/MqttParser.cs

State machine that processes raw bytes from the read loop:

  1. Fixed header: Read packet type byte (upper 4 bits = type, lower 4 = flags)
  2. Remaining length: Variable-length encoding (1-4 bytes, 7 bits per byte, MSB continuation)
  3. Payload: Type-specific parsing delegated to per-packet parsers

Dispatch switch

After extracting packet type and remaining length:

CONNECT    (0x10) → MqttParseConnect() → MqttProcessConnect()
PUBLISH    (0x30) → MqttParsePub() → MqttProcessPub()
PUBACK     (0x40) → MqttParsePubAck()
PUBREC     (0x50) → MqttParsePubRec()
PUBREL     (0x62) → MqttProcessPubRel()
PUBCOMP    (0x70) → MqttParsePubComp()
SUBSCRIBE  (0x82) → MqttParseSubs() → MqttProcessSubs()
UNSUBSCRIBE(0xA2) → MqttParseUnsubs() → MqttProcessUnsubs()
PINGREQ    (0xC0) → MqttEnqueuePingResp()
DISCONNECT (0xE0) → handle cleanup + close

Key constraints

  • CONNECT must be the first packet; any other packet before CONNECT → close connection
  • Partial packets: save state in MqttHandler.ParseState/RemLen/Buf, resume on next buffer
  • Invalid packet types → close connection

ReadLoop integration

File: ClientConnection.cs (ReadLoop method)

Add MQTT branch:

if (IsMqtt())
    err = MqttParse(buf);
else
    err = Parse(buf, handler);

IsMqtt() change

File: ClientConnection.cs

Change from return false to:

internal bool IsMqtt() => Mqtt != null;

3. Packet Handlers

CONNECT (MqttParseConnect + MqttProcessConnect)

File: Mqtt/MqttPacketHandlers.cs

Parse phase — Extract fields in MQTT 3.1.1 spec order:

  • Protocol name ("MQTT"), protocol level (0x04)
  • Connect flags (clean session, will, will QoS, will retain, password, username)
  • Keep-alive interval (2 bytes, multiply by 1.5 for read deadline)
  • Client ID (auto-generate NUID if empty + clean session)
  • Will topic/message (convert topic to NATS subject via MqttTopicToNatsPubSubject())
  • Username/password

Process phase (MqttProcessConnect on NatsServer):

  1. Get or create MqttAccountSessionManager for client's account
  2. Lock session by client ID hash (prevents concurrent session takeover)
  3. If existing session → take over (disconnect old client)
  4. If clean session → delete old session state
  5. If !clean session → restore subscriptions and pending messages
  6. Set c.Mqtt.Session, c.Mqtt.AccountSessionManager
  7. Run authentication if opts.Mqtt.Username/Password configured
  8. Send CONNACK (return code 0 = accepted)
  9. Deliver retained messages for existing subscriptions
  10. Set keep-alive read deadline on connection

PUBLISH (MqttParsePub + MqttProcessPub)

Parse: Topic name → convert to NATS subject, packet ID (if QoS>0), payload bytes.

Process by QoS:

  • QoS 0: MqttInitiateMsgDelivery() → done
  • QoS 1: Deliver → send PUBACK(pi)
  • QoS 2: MqttStoreQoS2MsgOnce() → send PUBREC(pi) → [client sends PUBREL] → deliver → send PUBCOMP(pi)

MqttInitiateMsgDelivery: Construct NATS message with Nmqtt-Pub:<qos> header, set c.Pa (publish args), call c.ProcessInboundClientMsg().

Max payload check: Validate total NATS message size against client's max payload limit.

SUBSCRIBE (MqttParseSubs + MqttProcessSubs)

Parse: Packet ID, list of (topic filter + QoS) pairs. Convert each filter to NATS subject via MqttFilterToNatsSubject().

Process per filter:

  • Create NATS subscription on converted subject
  • If QoS > 0: Create JetStream durable consumer on $MQTT.msgs.<subject>
  • Track in MqttSession.Subs and MqttSession.Cons
  • Send SUBACK with granted QoS per filter (downgrade QoS 2 → 1 if configured)

UNSUBSCRIBE (MqttParseUnsubs + MqttProcessUnsubs)

Mirror of SUBSCRIBE:

  • Remove NATS subscriptions
  • Delete JetStream consumers
  • Update session state
  • Send UNSUBACK

PING

Queue PINGRESP (2 bytes: 0xD0 0x00).

DISCONNECT

  1. Clear will message (graceful disconnect suppresses will)
  2. Call MqttHandleClosedClient() for session cleanup
  3. Close connection with ClosedState.ClientClosed

Will Message (MqttHandleWill)

On abnormal disconnect (connection drop, not DISCONNECT):

  • Publish will topic/message to NATS with configured QoS and retain flag
  • Only if will was set in CONNECT and not cleared by DISCONNECT

PUBREL Processing (MqttProcessPubRel)

For QoS 2 inbound flow:

  1. Look up stored QoS 2 message by packet ID
  2. Deliver via MqttInitiateMsgDelivery()
  3. Send PUBCOMP(pi)
  4. Remove from pending

4. Session Management & JetStream Integration

Session Lifecycle

Creation (during CONNECT):

  1. GetOrCreateMqttAccountSessionManager() — Lazy per-account ASM initialization
  2. MqttCreateAccountSessionManager() — Creates ASM with JetStream streams:
    • $MQTT.msgs.<account> — QoS 1/2 message persistence
    • $MQTT.sess.<account> — Session state persistence
    • $MQTT.rmsgs.<account> — Retained messages (MaxMsgsPerSubject=1)
  3. Lock session by client ID hash
  4. Create or restore MqttSession

Persistence (via MqttJsa):

  • Serialize session state to $MQTT.sess stream keyed by client ID hash
  • Clean session: delete all session data
  • Reconnect (!clean): reload from JetStream, rebind subscriptions

Takeover (same client ID reconnects):

  • Lock old session → disconnect old client → transfer to new client → resume pending

JetStream Consumer Management

QoS 1/2 subscriptions create durable consumers:

  • Durable name: <sessionIdHash>_<nuid>
  • Filter: $MQTT.msgs.<converted_subject>
  • Delivery subject: $MQTT.sub.<nuid>
  • Ack policy: Explicit
  • Max ack pending: From account config (default 1024)

Retained Messages

  • PUBLISH with retain flag → store in $MQTT.rmsgs.<subject> (MaxMsgsPerSubject=1)
  • New SUBSCRIBE → LoadLastMsgForAsync() to find matching retained messages
  • PUBLISH with retain + empty payload → delete retained message

Extension Methods to Implement

Method Purpose
StartMqtt() Listener startup
MqttConfigAuth() Wire MQTT auth overrides
MqttHandleClosedClient() Session cleanup on disconnect
MqttUpdateMaxAckPending() Propagate config to consumers
MqttGetJsaForAccount() Get/create JSA for account
MqttStoreQosMsgForAccountOnNewSubject() Store QoS msg on new subject
GetOrCreateMqttAccountSessionManager() Lazy ASM creation
MqttCreateAccountSessionManager() ASM + stream creation
MqttDetermineReplicas() Cluster-aware replica count
MqttProcessConnect() CONNECT handling
MqttHandleWill() Will message delivery
MqttProcessPub() PUBLISH handling
MqttInitiateMsgDelivery() Inject into NATS substrate
MqttStoreQoS2MsgOnce() QoS 2 idempotent storage
MqttProcessPubRel() PUBREL handling
MqttCheckPubRetainedPerms() Retained message permission audit

5. New Files

File Purpose Est. LOC
Mqtt/MqttParser.cs Parse + dispatch state machine 400-500
Mqtt/MqttPacketHandlers.cs CONNECT/PUB/SUB/UNSUB/PING/DISCONNECT processing 800-1000
Mqtt/MqttClientExtensions.cs ClientConnection MQTT methods (IsMqtt, enqueue helpers) 200-300
NatsServer.Mqtt.cs Server-side MQTT partial (CreateMqttClient, listener) 300-400

Modified files:

  • Mqtt/MqttHandler.cs — Replace stubs with real implementations
  • NatsServer.cs — Add _mqttListener field
  • NatsServer.Lifecycle.cs — Add MQTT listener to shutdown
  • ClientConnection.cs — Change IsMqtt(), add Mqtt property, ReadLoop branch

Estimated total: 2,000-3,000 LOC new code


6. Testing Strategy

Unit Tests

Parser tests (Tests/Server/Mqtt/MqttParserTests.cs):

  • Valid CONNECT packet → all fields extracted
  • CONNECT with will message → will topic/message parsed
  • Partial CONNECT (fragmented) → state saved, resume on next buffer
  • Invalid first packet (not CONNECT) → error
  • PUBLISH QoS 0/1/2 → topic conversion, payload, PI
  • SUBSCRIBE with multiple filters → all converted
  • UNSUBSCRIBE → filters parsed
  • Remaining length edge cases (1-byte, 2-byte, 4-byte encodings)

Dispatch tests (Tests/Server/Mqtt/MqttDispatchTests.cs):

  • Each packet type dispatches to correct handler
  • CONNECT-first enforcement
  • Invalid packet type → error

Handler tests (mocked server/session):

  • CONNECT: clean session, session resumption, client ID generation
  • PUBLISH QoS 0/1/2 flows
  • SUBSCRIBE: subscription + consumer creation
  • UNSUBSCRIBE: cleanup
  • DISCONNECT: will cleared, session cleanup
  • Will delivery on abnormal close

Integration Tests

70 existing deferred tests in Mqtt/MqttTests.cs become runnable progressively.

New boot tests:

  • MqttBoot_AcceptsConnection_ShouldSucceed
  • MqttBoot_StartAndShutdown_ShouldSucceed
  • End-to-end: client A publishes → client B receives

Test approach

  • Unit tests: mocked dependencies, no server boot
  • Integration tests: NatsServer.Start() with opts.Mqtt.Port = 0 (ephemeral)
  • Raw TCP + binary MQTT packets for protocol tests (no third-party MQTT client)

7. Implementation Order (Bottom-Up)

  1. Listener + client creationStartMqtt(), CreateMqttClient(), shutdown wiring
  2. Parser + dispatchMqttParse(), ReadLoop integration, IsMqtt()
  3. CONNECT — Parse + process, CONNACK response, auth
  4. PING — Simplest packet handler (proves dispatch works)
  5. DISCONNECT — Graceful close, will suppression
  6. PUBLISH QoS 0 — Basic message flow through NATS substrate
  7. SUBSCRIBE + UNSUBSCRIBE — NATS subscription management
  8. PUBLISH QoS 1 — PUBACK flow, JetStream consumer creation
  9. PUBLISH QoS 2 — PUBREC/PUBREL/PUBCOMP handshake
  10. Will messages — Abnormal disconnect handling
  11. Retained messages — Store/retrieve/delete retained
  12. Session persistence — Clean/dirty session lifecycle
  13. Session takeover — Same client ID reconnect
  14. Config integrationMqttConfigAuth, MqttUpdateMaxAckPending, MqttDetermineReplicas, MqttCheckPubRetainedPerms