From 2e2ffee41a8b781f6aa598580a2842339b03a62e Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 15:41:45 -0500 Subject: [PATCH] feat(mqtt): add MQTT packet parser, dispatch, and ReadLoop integration Implement Task 2 of MQTT orchestration: - Create MqttParser.cs with loop-based packet parser and dispatch switch - Add MqttReader field to MqttHandler for per-connection parsing state - Wire ReadLoop to call MqttParser for MQTT connections - Implement PINGREQ handler (enqueues PINGRESP) - CONNECT-first enforcement (rejects non-CONNECT as first packet) - Partial packet handling via MqttReader pending buffer - 13 unit tests covering parser, dispatch, partial packets, and edge cases - Stub dispatch entries for CONNECT, PUBLISH, SUB, UNSUB, DISCONNECT (NotImplementedException) --- .../ZB.MOM.NatsNet.Server/ClientConnection.cs | 52 ++++ .../ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs | 10 + .../ZB.MOM.NatsNet.Server/Mqtt/MqttParser.cs | 162 ++++++++++ .../Mqtt/MqttParserTests.cs | 279 ++++++++++++++++++ reports/current.md | 2 +- 5 files changed, 504 insertions(+), 1 deletion(-) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttParser.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Mqtt/MqttParserTests.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs index c9c5028..abead62 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs @@ -25,6 +25,7 @@ using Microsoft.Extensions.Logging; using ZB.MOM.NatsNet.Server.Auth; using ZB.MOM.NatsNet.Server.Internal; using ZB.MOM.NatsNet.Server.Internal.DataStructures; +using ZB.MOM.NatsNet.Server.Mqtt; using ZB.MOM.NatsNet.Server.Protocol; using ZB.MOM.NatsNet.Server.WebSocket; @@ -1405,8 +1406,59 @@ public sealed partial class ClientConnection internal void ReadLoop(byte[]? pre) { LastIn = DateTime.UtcNow; + + // Process any pre-read bytes first. if (pre is { Length: > 0 }) + { TraceInOp("PRE", pre); + if (IsMqtt()) + { + var preErr = MqttParser.Parse(this, pre, pre.Length); + if (preErr != null) + { + CloseConnection(ClosedState.ParseError); + return; + } + } + } + + // MQTT clients use the MqttParser; NATS clients use ProtocolParser (not yet wired). + if (!IsMqtt()) + return; + + // Main read loop — read from network stream until closed. + var buf = new byte[32768]; // 32 KB read buffer + try + { + while (true) + { + int n; + try + { + n = _nc!.Read(buf, 0, buf.Length); + } + catch + { + break; // Connection closed or errored. + } + + if (n <= 0) + break; // Connection closed. + + LastIn = DateTime.UtcNow; + + var err = MqttParser.Parse(this, buf, n); + if (err != null) + { + CloseConnection(ClosedState.ParseError); + return; + } + } + } + finally + { + CloseConnection(ClosedState.ClientClosed); + } } // ========================================================================= diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs index fa510eb..ae290fc 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs @@ -110,6 +110,16 @@ internal sealed class MqttHandler /// Multiplier accumulator used during multi-byte remaining-length decoding. public int RemLenMult { get; set; } + // ------------------------------------------------------------------ + // Reader + // ------------------------------------------------------------------ + + /// + /// Per-connection MQTT reader for binary packet parsing. + /// Created lazily on first use. + /// + public MqttReader Reader { get; } = new(); + // ------------------------------------------------------------------ // Thread safety // ------------------------------------------------------------------ diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttParser.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttParser.cs new file mode 100644 index 0000000..5949a8e --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttParser.cs @@ -0,0 +1,162 @@ +// Copyright 2020-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/mqtt.go mqttParse() in the NATS server Go source. + +namespace ZB.MOM.NatsNet.Server.Mqtt; + +/// +/// MQTT binary packet parser and dispatch. +/// Reads packets from a byte buffer using and dispatches +/// to the appropriate handler based on packet type. +/// Mirrors Go mqttParse() in server/mqtt.go. +/// +internal static class MqttParser +{ + /// PINGRESP packet bytes: 0xD0 0x00. + private static readonly byte[] PingRespPacket = [MqttPacket.PingResp, 0x00]; + + /// + /// Parses MQTT packets from and dispatches to handlers. + /// Returns null on success, or an exception describing the parse/dispatch error. + /// Handles partial packets by saving state in the client's . + /// Mirrors Go mqttParse(r *mqttReader, c *client, ...). + /// + public static Exception? Parse(ClientConnection c, byte[] buf, int len) + { + var mqtt = c.Mqtt!; + var r = mqtt.Reader; + + // Slice buffer to actual length if needed. + if (len < buf.Length) + { + var tmp = new byte[len]; + Buffer.BlockCopy(buf, 0, tmp, 0, len); + buf = tmp; + } + + r.Reset(buf); + + var connected = (c.Flags & ClientFlags.ConnectReceived) != 0; + Exception? err = null; + + while (err == null && r.HasMore()) + { + r.PacketStart = r.Position; + + // Read packet type + flags byte. + byte b; + try { b = r.ReadByte("packet type"); } + catch (Exception ex) { err = ex; break; } + + var pt = (byte)(b & MqttPacket.Mask); + + // CONNECT must be the first packet. + if (!connected && pt != MqttPacket.Connect) + { + err = new InvalidOperationException( + $"the first packet should be a CONNECT (0x{MqttPacket.Connect:X2}), got 0x{pt:X2}"); + break; + } + + // Read remaining length (variable-length encoding). + int pl; + bool complete; + try + { + (pl, complete) = r.ReadPacketLen(); + } + catch (Exception ex) { err = ex; break; } + + if (!complete) + break; // Partial packet — state saved in reader. + + // Dispatch based on packet type. + switch (pt) + { + case MqttPacket.Pub: + // TODO: Task 4 — MqttParsePub + MqttProcessPub + err = new NotImplementedException("PUBLISH not yet implemented"); + break; + + case MqttPacket.PubAck: + // TODO: Task 5 — process PUBACK + err = new NotImplementedException("PUBACK not yet implemented"); + break; + + case MqttPacket.PubRec: + // TODO: Task 5 — process PUBREC + err = new NotImplementedException("PUBREC not yet implemented"); + break; + + case MqttPacket.PubRel: + // TODO: Task 5 — process PUBREL + err = new NotImplementedException("PUBREL not yet implemented"); + break; + + case MqttPacket.PubComp: + // TODO: Task 5 — process PUBCOMP + err = new NotImplementedException("PUBCOMP not yet implemented"); + break; + + case MqttPacket.Sub: + // TODO: Task 4 — MqttParseSubs + MqttProcessSubs + err = new NotImplementedException("SUBSCRIBE not yet implemented"); + break; + + case MqttPacket.Unsub: + // TODO: Task 4 — MqttParseUnsubs + MqttProcessUnsubs + err = new NotImplementedException("UNSUBSCRIBE not yet implemented"); + break; + + case MqttPacket.Ping: + HandlePingReq(c); + break; + + case MqttPacket.Connect: + if (connected) + { + // Second CONNECT on same connection is a protocol violation. + err = new InvalidOperationException("second CONNECT packet not allowed"); + break; + } + // TODO: Task 3 — MqttParseConnect + MqttProcessConnect + err = new NotImplementedException("CONNECT not yet implemented"); + break; + + case MqttPacket.Disconnect: + // TODO: Task 3 — handle DISCONNECT + err = new NotImplementedException("DISCONNECT not yet implemented"); + break; + + default: + err = new InvalidOperationException($"unknown MQTT packet type: 0x{pt:X2}"); + break; + } + } + + return err; + } + + /// + /// Handles PINGREQ by enqueueing a PINGRESP packet. + /// Mirrors Go mqttEnqueuePingResp(). + /// + private static void HandlePingReq(ClientConnection c) + { + lock (c) + { + c.EnqueueProto(PingRespPacket); + } + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Mqtt/MqttParserTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Mqtt/MqttParserTests.cs new file mode 100644 index 0000000..702e92c --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Mqtt/MqttParserTests.cs @@ -0,0 +1,279 @@ +// Copyright 2020-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +using Shouldly; +using ZB.MOM.NatsNet.Server; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Mqtt; + +namespace ZB.MOM.NatsNet.Server.Tests.Mqtt; + +/// +/// Unit tests for — validates packet type extraction, +/// remaining length decoding, CONNECT-first enforcement, partial packet handling, +/// and PINGREQ dispatch. +/// +public sealed class MqttParserTests +{ + /// + /// Creates a minimal ClientConnection with MQTT handler for testing. + /// + private static ClientConnection CreateMqttClient() + { + var c = new ClientConnection(ClientKind.Client); + c.InitMqtt(new MqttHandler()); + return c; + } + + // ========================================================================= + // CONNECT-first enforcement + // ========================================================================= + + [Fact] + public void Parse_NonConnectFirst_ShouldReturnError() + { + var c = CreateMqttClient(); + // PINGREQ before CONNECT → error + var buf = new byte[] { MqttPacket.Ping, 0x00 }; + var err = MqttParser.Parse(c, buf, buf.Length); + err.ShouldNotBeNull(); + err.Message.ShouldContain("first packet should be a CONNECT"); + } + + [Fact] + public void Parse_PublishBeforeConnect_ShouldReturnError() + { + var c = CreateMqttClient(); + // PUBLISH QoS 0 before CONNECT + var buf = new byte[] { MqttPacket.Pub, 0x05, 0x00, 0x01, (byte)'t', 0x68, 0x69 }; + var err = MqttParser.Parse(c, buf, buf.Length); + err.ShouldNotBeNull(); + err.Message.ShouldContain("first packet should be a CONNECT"); + } + + [Fact] + public void Parse_ConnectFirst_ShouldNotRejectAsNonConnect() + { + var c = CreateMqttClient(); + // CONNECT packet (minimal): type=0x10, remaining len=0 + // This will hit the "not yet implemented" but NOT the "first packet" error. + var buf = new byte[] { MqttPacket.Connect, 0x00 }; + var err = MqttParser.Parse(c, buf, buf.Length); + err.ShouldNotBeNull(); // Will be NotImplementedException + err.ShouldBeOfType(); + err.Message.ShouldContain("CONNECT not yet implemented"); + } + + [Fact] + public void Parse_SecondConnect_ShouldReturnError() + { + var c = CreateMqttClient(); + // Simulate that CONNECT was already received. + c.Flags |= ClientFlags.ConnectReceived; + var buf = new byte[] { MqttPacket.Connect, 0x00 }; + var err = MqttParser.Parse(c, buf, buf.Length); + err.ShouldNotBeNull(); + err.Message.ShouldContain("second CONNECT"); + } + + // ========================================================================= + // PINGREQ dispatch + // ========================================================================= + + [Fact] + public void Parse_PingReq_ShouldEnqueuePingResp() + { + var c = CreateMqttClient(); + // Mark as connected so ping is accepted. + c.Flags |= ClientFlags.ConnectReceived; + + // Use a memory stream to capture EnqueueProto output. + var ms = new MemoryStream(); + // Set up the connection's network stream. + typeof(ClientConnection) + .GetField("_nc", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)! + .SetValue(c, ms); + + var buf = new byte[] { MqttPacket.Ping, 0x00 }; + var err = MqttParser.Parse(c, buf, buf.Length); + err.ShouldBeNull(); + + // Verify PINGRESP was written. + var written = ms.ToArray(); + written.Length.ShouldBe(2); + written[0].ShouldBe(MqttPacket.PingResp); + written[1].ShouldBe((byte)0x00); + } + + [Fact] + public void Parse_MultiplePings_ShouldSucceed() + { + var c = CreateMqttClient(); + c.Flags |= ClientFlags.ConnectReceived; + + var ms = new MemoryStream(); + typeof(ClientConnection) + .GetField("_nc", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)! + .SetValue(c, ms); + + // Two PINGREQ packets in the same buffer. + var buf = new byte[] { MqttPacket.Ping, 0x00, MqttPacket.Ping, 0x00 }; + var err = MqttParser.Parse(c, buf, buf.Length); + err.ShouldBeNull(); + + // Two PINGRESP packets should have been written. + var written = ms.ToArray(); + written.Length.ShouldBe(4); + written[0].ShouldBe(MqttPacket.PingResp); + written[1].ShouldBe((byte)0x00); + written[2].ShouldBe(MqttPacket.PingResp); + written[3].ShouldBe((byte)0x00); + } + + // ========================================================================= + // Remaining length decoding + // ========================================================================= + + [Fact] + public void Parse_SingleByteRemainingLength_ShouldWork() + { + // SUB packet with remaining length = 5 (single byte < 128) + // After CONNECT is received, a SUB packet should parse the remaining length correctly. + var c = CreateMqttClient(); + c.Flags |= ClientFlags.ConnectReceived; + + // SUBSCRIBE: type=0x82, remaining len=5, then 5 bytes of payload + var buf = new byte[] { 0x82, 0x05, 0x00, 0x01, 0x00, 0x01, 0x74 }; + var err = MqttParser.Parse(c, buf, buf.Length); + // Will hit NotImplementedException for SUBSCRIBE — that's fine, it proves parsing worked. + err.ShouldNotBeNull(); + err.ShouldBeOfType(); + } + + [Fact] + public void Parse_TwoByteRemainingLength_ShouldWork() + { + var c = CreateMqttClient(); + c.Flags |= ClientFlags.ConnectReceived; + + // Remaining length = 200 → encoded as [0xC8, 0x01] + // (200 & 0x7F) | 0x80 = 0xC8, 200 >> 7 = 1 → 0x01 + // type(1) + remlen(2) + payload(200) = 203 bytes total. + var buf = new byte[203]; + buf[0] = MqttPacket.Pub; + buf[1] = 0xC8; + buf[2] = 0x01; + // Remaining 200 bytes are zero (payload). + + var err = MqttParser.Parse(c, buf, buf.Length); + err.ShouldNotBeNull(); + err.ShouldBeOfType(); // PUBLISH not yet implemented + } + + // ========================================================================= + // Partial packet handling + // ========================================================================= + + [Fact] + public void Parse_PartialPacket_ShouldSaveState() + { + var c = CreateMqttClient(); + c.Flags |= ClientFlags.ConnectReceived; + + // Send only the first byte of a PING packet (missing the 0x00 remaining length). + var buf = new byte[] { MqttPacket.Ping }; + var err = MqttParser.Parse(c, buf, buf.Length); + // Should succeed (partial packet saved) — no error because it just stops. + // Actually, the ReadByte for packet type succeeds, then ReadPacketLen has no data, + // so it returns (0, false) — incomplete, state saved. + err.ShouldBeNull(); + + // Now send the remaining length byte. + var buf2 = new byte[] { 0x00 }; + // The reader's pending buffer should have the first byte saved. + var ms = new MemoryStream(); + typeof(ClientConnection) + .GetField("_nc", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)! + .SetValue(c, ms); + + err = MqttParser.Parse(c, buf2, buf2.Length); + err.ShouldBeNull(); + + // PINGRESP should have been written. + var written = ms.ToArray(); + written.Length.ShouldBe(2); + written[0].ShouldBe(MqttPacket.PingResp); + } + + [Fact] + public void Parse_PartialPayload_ShouldSaveState() + { + var c = CreateMqttClient(); + c.Flags |= ClientFlags.ConnectReceived; + + // SUBSCRIBE packet: type=0x82, remaining length=10, but only send 5 payload bytes. + var buf = new byte[] { 0x82, 0x0A, 0x01, 0x02, 0x03, 0x04, 0x05 }; + // remaining length = 10, but only 5 bytes of payload present → partial. + var err = MqttParser.Parse(c, buf, buf.Length); + // Should save state and return null (needs more data). + err.ShouldBeNull(); + } + + // ========================================================================= + // Unknown packet type + // ========================================================================= + + [Fact] + public void Parse_UnknownPacketType_ShouldReturnError() + { + var c = CreateMqttClient(); + c.Flags |= ClientFlags.ConnectReceived; + + // Packet type 0x00 is reserved/invalid. + var buf = new byte[] { 0x00, 0x00 }; + var err = MqttParser.Parse(c, buf, buf.Length); + err.ShouldNotBeNull(); + err.Message.ShouldContain("unknown MQTT packet type"); + } + + // ========================================================================= + // Empty buffer + // ========================================================================= + + [Fact] + public void Parse_EmptyBuffer_ShouldSucceed() + { + var c = CreateMqttClient(); + var buf = Array.Empty(); + var err = MqttParser.Parse(c, buf, 0); + err.ShouldBeNull(); // Nothing to parse. + } + + // ========================================================================= + // Buffer length parameter + // ========================================================================= + + [Fact] + public void Parse_LenSmallerThanBuffer_ShouldOnlyParseLenBytes() + { + var c = CreateMqttClient(); + c.Flags |= ClientFlags.ConnectReceived; + + var ms = new MemoryStream(); + typeof(ClientConnection) + .GetField("_nc", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)! + .SetValue(c, ms); + + // Buffer has two PING packets, but len says only the first one. + var buf = new byte[] { MqttPacket.Ping, 0x00, MqttPacket.Ping, 0x00 }; + var err = MqttParser.Parse(c, buf, 2); // Only first 2 bytes. + err.ShouldBeNull(); + + var written = ms.ToArray(); + written.Length.ShouldBe(2); // Only one PINGRESP. + } +} diff --git a/reports/current.md b/reports/current.md index 7f98db9..a8848f6 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-03-01 20:35:42 UTC +Generated: 2026-03-01 20:41:46 UTC ## Modules (12 total)