diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs
index c9c5028..abead62 100644
--- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs
@@ -25,6 +25,7 @@ using Microsoft.Extensions.Logging;
using ZB.MOM.NatsNet.Server.Auth;
using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
+using ZB.MOM.NatsNet.Server.Mqtt;
using ZB.MOM.NatsNet.Server.Protocol;
using ZB.MOM.NatsNet.Server.WebSocket;
@@ -1405,8 +1406,59 @@ public sealed partial class ClientConnection
internal void ReadLoop(byte[]? pre)
{
LastIn = DateTime.UtcNow;
+
+ // Process any pre-read bytes first.
if (pre is { Length: > 0 })
+ {
TraceInOp("PRE", pre);
+ if (IsMqtt())
+ {
+ var preErr = MqttParser.Parse(this, pre, pre.Length);
+ if (preErr != null)
+ {
+ CloseConnection(ClosedState.ParseError);
+ return;
+ }
+ }
+ }
+
+ // MQTT clients use the MqttParser; NATS clients use ProtocolParser (not yet wired).
+ if (!IsMqtt())
+ return;
+
+ // Main read loop — read from network stream until closed.
+ var buf = new byte[32768]; // 32 KB read buffer
+ try
+ {
+ while (true)
+ {
+ int n;
+ try
+ {
+ n = _nc!.Read(buf, 0, buf.Length);
+ }
+ catch
+ {
+ break; // Connection closed or errored.
+ }
+
+ if (n <= 0)
+ break; // Connection closed.
+
+ LastIn = DateTime.UtcNow;
+
+ var err = MqttParser.Parse(this, buf, n);
+ if (err != null)
+ {
+ CloseConnection(ClosedState.ParseError);
+ return;
+ }
+ }
+ }
+ finally
+ {
+ CloseConnection(ClosedState.ClientClosed);
+ }
}
// =========================================================================
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs
index fa510eb..ae290fc 100644
--- a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs
@@ -110,6 +110,16 @@ internal sealed class MqttHandler
/// Multiplier accumulator used during multi-byte remaining-length decoding.
public int RemLenMult { get; set; }
+ // ------------------------------------------------------------------
+ // Reader
+ // ------------------------------------------------------------------
+
+ ///
+ /// Per-connection MQTT reader for binary packet parsing.
+ /// Created lazily on first use.
+ ///
+ public MqttReader Reader { get; } = new();
+
// ------------------------------------------------------------------
// Thread safety
// ------------------------------------------------------------------
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttParser.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttParser.cs
new file mode 100644
index 0000000..5949a8e
--- /dev/null
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttParser.cs
@@ -0,0 +1,162 @@
+// Copyright 2020-2026 The NATS Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Adapted from server/mqtt.go mqttParse() in the NATS server Go source.
+
+namespace ZB.MOM.NatsNet.Server.Mqtt;
+
+///
+/// MQTT binary packet parser and dispatch.
+/// Reads packets from a byte buffer using and dispatches
+/// to the appropriate handler based on packet type.
+/// Mirrors Go mqttParse() in server/mqtt.go.
+///
+internal static class MqttParser
+{
+ /// PINGRESP packet bytes: 0xD0 0x00.
+ private static readonly byte[] PingRespPacket = [MqttPacket.PingResp, 0x00];
+
+ ///
+ /// Parses MQTT packets from and dispatches to handlers.
+ /// Returns null on success, or an exception describing the parse/dispatch error.
+ /// Handles partial packets by saving state in the client's .
+ /// Mirrors Go mqttParse(r *mqttReader, c *client, ...).
+ ///
+ public static Exception? Parse(ClientConnection c, byte[] buf, int len)
+ {
+ var mqtt = c.Mqtt!;
+ var r = mqtt.Reader;
+
+ // Slice buffer to actual length if needed.
+ if (len < buf.Length)
+ {
+ var tmp = new byte[len];
+ Buffer.BlockCopy(buf, 0, tmp, 0, len);
+ buf = tmp;
+ }
+
+ r.Reset(buf);
+
+ var connected = (c.Flags & ClientFlags.ConnectReceived) != 0;
+ Exception? err = null;
+
+ while (err == null && r.HasMore())
+ {
+ r.PacketStart = r.Position;
+
+ // Read packet type + flags byte.
+ byte b;
+ try { b = r.ReadByte("packet type"); }
+ catch (Exception ex) { err = ex; break; }
+
+ var pt = (byte)(b & MqttPacket.Mask);
+
+ // CONNECT must be the first packet.
+ if (!connected && pt != MqttPacket.Connect)
+ {
+ err = new InvalidOperationException(
+ $"the first packet should be a CONNECT (0x{MqttPacket.Connect:X2}), got 0x{pt:X2}");
+ break;
+ }
+
+ // Read remaining length (variable-length encoding).
+ int pl;
+ bool complete;
+ try
+ {
+ (pl, complete) = r.ReadPacketLen();
+ }
+ catch (Exception ex) { err = ex; break; }
+
+ if (!complete)
+ break; // Partial packet — state saved in reader.
+
+ // Dispatch based on packet type.
+ switch (pt)
+ {
+ case MqttPacket.Pub:
+ // TODO: Task 4 — MqttParsePub + MqttProcessPub
+ err = new NotImplementedException("PUBLISH not yet implemented");
+ break;
+
+ case MqttPacket.PubAck:
+ // TODO: Task 5 — process PUBACK
+ err = new NotImplementedException("PUBACK not yet implemented");
+ break;
+
+ case MqttPacket.PubRec:
+ // TODO: Task 5 — process PUBREC
+ err = new NotImplementedException("PUBREC not yet implemented");
+ break;
+
+ case MqttPacket.PubRel:
+ // TODO: Task 5 — process PUBREL
+ err = new NotImplementedException("PUBREL not yet implemented");
+ break;
+
+ case MqttPacket.PubComp:
+ // TODO: Task 5 — process PUBCOMP
+ err = new NotImplementedException("PUBCOMP not yet implemented");
+ break;
+
+ case MqttPacket.Sub:
+ // TODO: Task 4 — MqttParseSubs + MqttProcessSubs
+ err = new NotImplementedException("SUBSCRIBE not yet implemented");
+ break;
+
+ case MqttPacket.Unsub:
+ // TODO: Task 4 — MqttParseUnsubs + MqttProcessUnsubs
+ err = new NotImplementedException("UNSUBSCRIBE not yet implemented");
+ break;
+
+ case MqttPacket.Ping:
+ HandlePingReq(c);
+ break;
+
+ case MqttPacket.Connect:
+ if (connected)
+ {
+ // Second CONNECT on same connection is a protocol violation.
+ err = new InvalidOperationException("second CONNECT packet not allowed");
+ break;
+ }
+ // TODO: Task 3 — MqttParseConnect + MqttProcessConnect
+ err = new NotImplementedException("CONNECT not yet implemented");
+ break;
+
+ case MqttPacket.Disconnect:
+ // TODO: Task 3 — handle DISCONNECT
+ err = new NotImplementedException("DISCONNECT not yet implemented");
+ break;
+
+ default:
+ err = new InvalidOperationException($"unknown MQTT packet type: 0x{pt:X2}");
+ break;
+ }
+ }
+
+ return err;
+ }
+
+ ///
+ /// Handles PINGREQ by enqueueing a PINGRESP packet.
+ /// Mirrors Go mqttEnqueuePingResp().
+ ///
+ private static void HandlePingReq(ClientConnection c)
+ {
+ lock (c)
+ {
+ c.EnqueueProto(PingRespPacket);
+ }
+ }
+}
diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Mqtt/MqttParserTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Mqtt/MqttParserTests.cs
new file mode 100644
index 0000000..702e92c
--- /dev/null
+++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Mqtt/MqttParserTests.cs
@@ -0,0 +1,279 @@
+// Copyright 2020-2026 The NATS Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+
+using Shouldly;
+using ZB.MOM.NatsNet.Server;
+using ZB.MOM.NatsNet.Server.Internal;
+using ZB.MOM.NatsNet.Server.Mqtt;
+
+namespace ZB.MOM.NatsNet.Server.Tests.Mqtt;
+
+///
+/// Unit tests for — validates packet type extraction,
+/// remaining length decoding, CONNECT-first enforcement, partial packet handling,
+/// and PINGREQ dispatch.
+///
+public sealed class MqttParserTests
+{
+ ///
+ /// Creates a minimal ClientConnection with MQTT handler for testing.
+ ///
+ private static ClientConnection CreateMqttClient()
+ {
+ var c = new ClientConnection(ClientKind.Client);
+ c.InitMqtt(new MqttHandler());
+ return c;
+ }
+
+ // =========================================================================
+ // CONNECT-first enforcement
+ // =========================================================================
+
+ [Fact]
+ public void Parse_NonConnectFirst_ShouldReturnError()
+ {
+ var c = CreateMqttClient();
+ // PINGREQ before CONNECT → error
+ var buf = new byte[] { MqttPacket.Ping, 0x00 };
+ var err = MqttParser.Parse(c, buf, buf.Length);
+ err.ShouldNotBeNull();
+ err.Message.ShouldContain("first packet should be a CONNECT");
+ }
+
+ [Fact]
+ public void Parse_PublishBeforeConnect_ShouldReturnError()
+ {
+ var c = CreateMqttClient();
+ // PUBLISH QoS 0 before CONNECT
+ var buf = new byte[] { MqttPacket.Pub, 0x05, 0x00, 0x01, (byte)'t', 0x68, 0x69 };
+ var err = MqttParser.Parse(c, buf, buf.Length);
+ err.ShouldNotBeNull();
+ err.Message.ShouldContain("first packet should be a CONNECT");
+ }
+
+ [Fact]
+ public void Parse_ConnectFirst_ShouldNotRejectAsNonConnect()
+ {
+ var c = CreateMqttClient();
+ // CONNECT packet (minimal): type=0x10, remaining len=0
+ // This will hit the "not yet implemented" but NOT the "first packet" error.
+ var buf = new byte[] { MqttPacket.Connect, 0x00 };
+ var err = MqttParser.Parse(c, buf, buf.Length);
+ err.ShouldNotBeNull(); // Will be NotImplementedException
+ err.ShouldBeOfType();
+ err.Message.ShouldContain("CONNECT not yet implemented");
+ }
+
+ [Fact]
+ public void Parse_SecondConnect_ShouldReturnError()
+ {
+ var c = CreateMqttClient();
+ // Simulate that CONNECT was already received.
+ c.Flags |= ClientFlags.ConnectReceived;
+ var buf = new byte[] { MqttPacket.Connect, 0x00 };
+ var err = MqttParser.Parse(c, buf, buf.Length);
+ err.ShouldNotBeNull();
+ err.Message.ShouldContain("second CONNECT");
+ }
+
+ // =========================================================================
+ // PINGREQ dispatch
+ // =========================================================================
+
+ [Fact]
+ public void Parse_PingReq_ShouldEnqueuePingResp()
+ {
+ var c = CreateMqttClient();
+ // Mark as connected so ping is accepted.
+ c.Flags |= ClientFlags.ConnectReceived;
+
+ // Use a memory stream to capture EnqueueProto output.
+ var ms = new MemoryStream();
+ // Set up the connection's network stream.
+ typeof(ClientConnection)
+ .GetField("_nc", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!
+ .SetValue(c, ms);
+
+ var buf = new byte[] { MqttPacket.Ping, 0x00 };
+ var err = MqttParser.Parse(c, buf, buf.Length);
+ err.ShouldBeNull();
+
+ // Verify PINGRESP was written.
+ var written = ms.ToArray();
+ written.Length.ShouldBe(2);
+ written[0].ShouldBe(MqttPacket.PingResp);
+ written[1].ShouldBe((byte)0x00);
+ }
+
+ [Fact]
+ public void Parse_MultiplePings_ShouldSucceed()
+ {
+ var c = CreateMqttClient();
+ c.Flags |= ClientFlags.ConnectReceived;
+
+ var ms = new MemoryStream();
+ typeof(ClientConnection)
+ .GetField("_nc", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!
+ .SetValue(c, ms);
+
+ // Two PINGREQ packets in the same buffer.
+ var buf = new byte[] { MqttPacket.Ping, 0x00, MqttPacket.Ping, 0x00 };
+ var err = MqttParser.Parse(c, buf, buf.Length);
+ err.ShouldBeNull();
+
+ // Two PINGRESP packets should have been written.
+ var written = ms.ToArray();
+ written.Length.ShouldBe(4);
+ written[0].ShouldBe(MqttPacket.PingResp);
+ written[1].ShouldBe((byte)0x00);
+ written[2].ShouldBe(MqttPacket.PingResp);
+ written[3].ShouldBe((byte)0x00);
+ }
+
+ // =========================================================================
+ // Remaining length decoding
+ // =========================================================================
+
+ [Fact]
+ public void Parse_SingleByteRemainingLength_ShouldWork()
+ {
+ // SUB packet with remaining length = 5 (single byte < 128)
+ // After CONNECT is received, a SUB packet should parse the remaining length correctly.
+ var c = CreateMqttClient();
+ c.Flags |= ClientFlags.ConnectReceived;
+
+ // SUBSCRIBE: type=0x82, remaining len=5, then 5 bytes of payload
+ var buf = new byte[] { 0x82, 0x05, 0x00, 0x01, 0x00, 0x01, 0x74 };
+ var err = MqttParser.Parse(c, buf, buf.Length);
+ // Will hit NotImplementedException for SUBSCRIBE — that's fine, it proves parsing worked.
+ err.ShouldNotBeNull();
+ err.ShouldBeOfType();
+ }
+
+ [Fact]
+ public void Parse_TwoByteRemainingLength_ShouldWork()
+ {
+ var c = CreateMqttClient();
+ c.Flags |= ClientFlags.ConnectReceived;
+
+ // Remaining length = 200 → encoded as [0xC8, 0x01]
+ // (200 & 0x7F) | 0x80 = 0xC8, 200 >> 7 = 1 → 0x01
+ // type(1) + remlen(2) + payload(200) = 203 bytes total.
+ var buf = new byte[203];
+ buf[0] = MqttPacket.Pub;
+ buf[1] = 0xC8;
+ buf[2] = 0x01;
+ // Remaining 200 bytes are zero (payload).
+
+ var err = MqttParser.Parse(c, buf, buf.Length);
+ err.ShouldNotBeNull();
+ err.ShouldBeOfType(); // PUBLISH not yet implemented
+ }
+
+ // =========================================================================
+ // Partial packet handling
+ // =========================================================================
+
+ [Fact]
+ public void Parse_PartialPacket_ShouldSaveState()
+ {
+ var c = CreateMqttClient();
+ c.Flags |= ClientFlags.ConnectReceived;
+
+ // Send only the first byte of a PING packet (missing the 0x00 remaining length).
+ var buf = new byte[] { MqttPacket.Ping };
+ var err = MqttParser.Parse(c, buf, buf.Length);
+ // Should succeed (partial packet saved) — no error because it just stops.
+ // Actually, the ReadByte for packet type succeeds, then ReadPacketLen has no data,
+ // so it returns (0, false) — incomplete, state saved.
+ err.ShouldBeNull();
+
+ // Now send the remaining length byte.
+ var buf2 = new byte[] { 0x00 };
+ // The reader's pending buffer should have the first byte saved.
+ var ms = new MemoryStream();
+ typeof(ClientConnection)
+ .GetField("_nc", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!
+ .SetValue(c, ms);
+
+ err = MqttParser.Parse(c, buf2, buf2.Length);
+ err.ShouldBeNull();
+
+ // PINGRESP should have been written.
+ var written = ms.ToArray();
+ written.Length.ShouldBe(2);
+ written[0].ShouldBe(MqttPacket.PingResp);
+ }
+
+ [Fact]
+ public void Parse_PartialPayload_ShouldSaveState()
+ {
+ var c = CreateMqttClient();
+ c.Flags |= ClientFlags.ConnectReceived;
+
+ // SUBSCRIBE packet: type=0x82, remaining length=10, but only send 5 payload bytes.
+ var buf = new byte[] { 0x82, 0x0A, 0x01, 0x02, 0x03, 0x04, 0x05 };
+ // remaining length = 10, but only 5 bytes of payload present → partial.
+ var err = MqttParser.Parse(c, buf, buf.Length);
+ // Should save state and return null (needs more data).
+ err.ShouldBeNull();
+ }
+
+ // =========================================================================
+ // Unknown packet type
+ // =========================================================================
+
+ [Fact]
+ public void Parse_UnknownPacketType_ShouldReturnError()
+ {
+ var c = CreateMqttClient();
+ c.Flags |= ClientFlags.ConnectReceived;
+
+ // Packet type 0x00 is reserved/invalid.
+ var buf = new byte[] { 0x00, 0x00 };
+ var err = MqttParser.Parse(c, buf, buf.Length);
+ err.ShouldNotBeNull();
+ err.Message.ShouldContain("unknown MQTT packet type");
+ }
+
+ // =========================================================================
+ // Empty buffer
+ // =========================================================================
+
+ [Fact]
+ public void Parse_EmptyBuffer_ShouldSucceed()
+ {
+ var c = CreateMqttClient();
+ var buf = Array.Empty();
+ var err = MqttParser.Parse(c, buf, 0);
+ err.ShouldBeNull(); // Nothing to parse.
+ }
+
+ // =========================================================================
+ // Buffer length parameter
+ // =========================================================================
+
+ [Fact]
+ public void Parse_LenSmallerThanBuffer_ShouldOnlyParseLenBytes()
+ {
+ var c = CreateMqttClient();
+ c.Flags |= ClientFlags.ConnectReceived;
+
+ var ms = new MemoryStream();
+ typeof(ClientConnection)
+ .GetField("_nc", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!
+ .SetValue(c, ms);
+
+ // Buffer has two PING packets, but len says only the first one.
+ var buf = new byte[] { MqttPacket.Ping, 0x00, MqttPacket.Ping, 0x00 };
+ var err = MqttParser.Parse(c, buf, 2); // Only first 2 bytes.
+ err.ShouldBeNull();
+
+ var written = ms.ToArray();
+ written.Length.ShouldBe(2); // Only one PINGRESP.
+ }
+}
diff --git a/reports/current.md b/reports/current.md
index 7f98db9..a8848f6 100644
--- a/reports/current.md
+++ b/reports/current.md
@@ -1,6 +1,6 @@
# NATS .NET Porting Status Report
-Generated: 2026-03-01 20:35:42 UTC
+Generated: 2026-03-01 20:41:46 UTC
## Modules (12 total)