diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs index ae290fc..1b43f32 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttHandler.cs @@ -91,6 +91,16 @@ internal sealed class MqttHandler /// public bool DowngradeQoS2Sub { get; set; } + // ------------------------------------------------------------------ + // QoS 2 in-memory pending store (full JetStream persistence in Task 6) + // ------------------------------------------------------------------ + + /// + /// In-memory store for QoS 2 inbound messages awaiting PUBREL. + /// Keyed by packet identifier. Replaces JetStream $MQTT_qos2in stream. + /// + public Dictionary QoS2Pending { get; } = new(); + // ------------------------------------------------------------------ // Parse state (used by the read-loop MQTT byte-stream parser) // ------------------------------------------------------------------ diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttPacketHandlers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttPacketHandlers.cs index 366df7a..0f93b51 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttPacketHandlers.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttPacketHandlers.cs @@ -313,21 +313,146 @@ internal static class MqttPacketHandlers /// /// Processes an inbound MQTT PUBLISH packet. - /// QoS 0: routes immediately via NATS pub/sub. QoS 1/2: deferred to Task 5. + /// QoS 0: routes immediately. QoS 1: deliver + PUBACK. QoS 2: store + PUBREC. /// Mirrors Go mqttProcessPub(). /// public static Exception? ProcessPub(ClientConnection c, MqttPublishInfo pp) { - if (pp.Qos == 0) + var payload = pp.Msg ?? []; + + switch (pp.Qos) { - // QoS 0: immediate delivery via internal routing. - var payload = pp.Msg ?? []; - c.ProcessInboundClientMsg(payload); - return null; + case 0: + c.ProcessInboundClientMsg(payload); + return null; + + case 1: + // QoS 1: deliver immediately, then send PUBACK. + c.ProcessInboundClientMsg(payload); + EnqueuePubResponse(c, MqttPacket.PubAck, pp.Pi); + return null; + + case 2: + // QoS 2: store message pending PUBREL, then send PUBREC. + // Full JetStream persistence deferred to Task 6 — uses in-memory store. + lock (c) + { + c.Mqtt!.QoS2Pending[pp.Pi] = pp; + } + EnqueuePubResponse(c, MqttPacket.PubRec, pp.Pi); + return null; + + default: + return new InvalidOperationException($"invalid QoS: {pp.Qos}"); + } + } + + // ========================================================================= + // PUBACK / PUBREC / PUBREL / PUBCOMP handling + // ========================================================================= + + /// + /// Parses a packet-identifier-only packet (PUBACK, PUBREC, PUBREL, PUBCOMP). + /// Returns (pi, error). Mirrors Go mqttParsePIPacket(). + /// + public static (ushort pi, Exception? err) ParsePiPacket(MqttReader r) + { + ushort pi; + try { pi = r.ReadUInt16("packet identifier"); } + catch (Exception ex) { return (0, ex); } + if (pi == 0) + return (0, new InvalidOperationException("packet identifier must not be 0")); + return (pi, null); + } + + /// + /// Processes an inbound PUBREL packet (QoS 2 phase 2). + /// Retrieves the stored QoS 2 message, delivers it, and sends PUBCOMP. + /// Mirrors Go mqttProcessPubRel(). + /// + public static Exception? ProcessPubRel(ClientConnection c, ushort pi) + { + // Always send PUBCOMP, even if message not found (idempotency). + MqttPublishInfo? pp; + lock (c) + { + c.Mqtt!.QoS2Pending.Remove(pi, out pp); } - // QoS 1/2: deferred to Task 5 (JetStream integration). - return new NotImplementedException($"PUBLISH QoS {pp.Qos} not yet implemented"); + // Send PUBCOMP. + EnqueuePubResponse(c, MqttPacket.PubComp, pi); + + // Deliver the stored message if found. + if (pp != null) + { + var payload = pp.Msg ?? []; + c.ProcessInboundClientMsg(payload); + } + + return null; + } + + /// + /// Processes an inbound PUBACK (client acknowledges QoS 1 message from server). + /// Removes from pending tracking. Mirrors Go mqttProcessPubAck(). + /// + public static Exception? ProcessPubAck(ClientConnection c, ushort pi) + { + lock (c) + { + c.Mqtt!.Pending.Remove(pi); + } + return null; + } + + /// + /// Processes an inbound PUBREC (client acknowledges QoS 2 message from server, phase 1). + /// Transitions to PUBREL phase. Mirrors Go mqttProcessPubRec(). + /// + public static Exception? ProcessPubRec(ClientConnection c, ushort pi) + { + // In the full implementation, this would store a PUBREL in JetStream. + // For now, send PUBREL immediately and remove from pending. + lock (c) + { + c.Mqtt!.Pending.Remove(pi); + } + EnqueuePubResponse(c, MqttPacket.PubRel, pi); + return null; + } + + /// + /// Processes an inbound PUBCOMP (client acknowledges PUBREL, QoS 2 complete). + /// Mirrors Go mqttProcessPubComp(). + /// + public static Exception? ProcessPubComp(ClientConnection c, ushort pi) + { + // Final cleanup — remove any tracking for this PI. + lock (c) + { + c.Mqtt!.Pending.Remove(pi); + } + return null; + } + + /// + /// Enqueues a PUBACK/PUBREC/PUBREL/PUBCOMP response packet. + /// All four share the same 4-byte format: [type] [0x02] [PI high] [PI low]. + /// PUBREL has bit 1 set in byte 0 per MQTT spec [3.6.1-1]. + /// Mirrors Go mqttEnqueuePubResponse(). + /// + public static void EnqueuePubResponse(ClientConnection c, byte packetType, ushort pi) + { + var b0 = packetType; + // PUBREL requires fixed header bits 0010 per MQTT spec. + if (packetType == MqttPacket.PubRel) + b0 |= 0x02; + + ReadOnlySpan packet = [b0, 0x02, (byte)(pi >> 8), (byte)(pi & 0xFF)]; + lock (c) + { + c.EnqueueProto(packet); + } } // ========================================================================= diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttParser.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttParser.cs index 007fd90..b4038ae 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttParser.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttParser.cs @@ -92,24 +92,36 @@ internal static class MqttParser break; case MqttPacket.PubAck: - // TODO: Task 5 — process PUBACK - err = new NotImplementedException("PUBACK not yet implemented"); + { + var (ackPi, ackErr) = MqttPacketHandlers.ParsePiPacket(r); + if (ackErr != null) { err = ackErr; break; } + err = MqttPacketHandlers.ProcessPubAck(c, ackPi); break; + } case MqttPacket.PubRec: - // TODO: Task 5 — process PUBREC - err = new NotImplementedException("PUBREC not yet implemented"); + { + var (recPi, recErr) = MqttPacketHandlers.ParsePiPacket(r); + if (recErr != null) { err = recErr; break; } + err = MqttPacketHandlers.ProcessPubRec(c, recPi); break; + } case MqttPacket.PubRel: - // TODO: Task 5 — process PUBREL - err = new NotImplementedException("PUBREL not yet implemented"); + { + var (relPi, relErr) = MqttPacketHandlers.ParsePiPacket(r); + if (relErr != null) { err = relErr; break; } + err = MqttPacketHandlers.ProcessPubRel(c, relPi); break; + } case MqttPacket.PubComp: - // TODO: Task 5 — process PUBCOMP - err = new NotImplementedException("PUBCOMP not yet implemented"); + { + var (compPi, compErr) = MqttPacketHandlers.ParsePiPacket(r); + if (compErr != null) { err = compErr; break; } + err = MqttPacketHandlers.ProcessPubComp(c, compPi); break; + } case MqttPacket.Sub: var (subPi, subFilters, subErr) = MqttPacketHandlers.ParseSubsOrUnsubs(r, b, pl, isSub: true); diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Mqtt/MqttPubSubTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Mqtt/MqttPubSubTests.cs index ede803e..9d185bb 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Mqtt/MqttPubSubTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Mqtt/MqttPubSubTests.cs @@ -174,14 +174,14 @@ public sealed class MqttPubSubTests } [Fact] - public void Parser_PublishQoS1_ShouldReturnNotImplemented() + public void Parser_PublishQoS1_ShouldSendPubAck() { var c = CreateConnectedMqttClient(); - // PUBLISH QoS 1: type=0x32, topic="t", PI=1, payload="x" + // PUBLISH QoS 1: type=0x32, topic="t", PI=5, payload="x" var data = new List(); data.Add(0x00); data.Add(0x01); data.Add((byte)'t'); // topic - data.Add(0x00); data.Add(0x01); // PI = 1 + data.Add(0x00); data.Add(0x05); // PI = 5 data.Add((byte)'x'); // payload var buf = new List(); @@ -190,8 +190,146 @@ public sealed class MqttPubSubTests buf.AddRange(data); var err = MqttParser.Parse(c, buf.ToArray(), buf.Count); - err.ShouldNotBeNull(); - err.ShouldBeOfType(); + err.ShouldBeNull(); + + // Verify PUBACK: [0x40] [0x02] [PI high] [PI low] + var ms = GetStream(c); + var written = ms.ToArray(); + written.Length.ShouldBe(4); + written[0].ShouldBe(MqttPacket.PubAck); // 0x40 + written[1].ShouldBe((byte)0x02); + written[2].ShouldBe((byte)0x00); // PI high + written[3].ShouldBe((byte)0x05); // PI low + } + + [Fact] + public void Parser_PublishQoS2_ShouldSendPubRec() + { + var c = CreateConnectedMqttClient(); + + // PUBLISH QoS 2: type=0x34, topic="t", PI=10, payload="y" + var data = new List(); + data.Add(0x00); data.Add(0x01); data.Add((byte)'t'); // topic + data.Add(0x00); data.Add(0x0A); // PI = 10 + data.Add((byte)'y'); // payload + + var buf = new List(); + buf.Add((byte)(MqttPacket.Pub | MqttPubFlag.QoS2)); // 0x34 + buf.Add((byte)data.Count); + buf.AddRange(data); + + var err = MqttParser.Parse(c, buf.ToArray(), buf.Count); + err.ShouldBeNull(); + + // Verify PUBREC: [0x50] [0x02] [PI high] [PI low] + var ms = GetStream(c); + var written = ms.ToArray(); + written.Length.ShouldBe(4); + written[0].ShouldBe(MqttPacket.PubRec); // 0x50 + written[1].ShouldBe((byte)0x02); + written[2].ShouldBe((byte)0x00); // PI high + written[3].ShouldBe((byte)0x0A); // PI low + + // Message should be stored in QoS2Pending. + c.Mqtt!.QoS2Pending.ShouldContainKey((ushort)10); + } + + [Fact] + public void Parser_QoS2_FullHandshake_PubRecPubRelPubComp() + { + var c = CreateConnectedMqttClient(); + + // Step 1: PUBLISH QoS 2 → PUBREC + var pubData = new List(); + pubData.Add(0x00); pubData.Add(0x01); pubData.Add((byte)'t'); + pubData.Add(0x00); pubData.Add(0x07); // PI = 7 + pubData.AddRange(Encoding.UTF8.GetBytes("qos2msg")); + + var pubBuf = new List(); + pubBuf.Add((byte)(MqttPacket.Pub | MqttPubFlag.QoS2)); + pubBuf.Add((byte)pubData.Count); + pubBuf.AddRange(pubData); + + var err = MqttParser.Parse(c, pubBuf.ToArray(), pubBuf.Count); + err.ShouldBeNull(); + c.Mqtt!.QoS2Pending.ShouldContainKey((ushort)7); + + // Reset stream to capture only PUBCOMP. + var ms = GetStream(c); + ms.SetLength(0); + + // Step 2: PUBREL from client → PUBCOMP + // PUBREL: [0x62] [0x02] [PI high] [PI low] + var pubrelBuf = new byte[] { 0x62, 0x02, 0x00, 0x07 }; + err = MqttParser.Parse(c, pubrelBuf, pubrelBuf.Length); + err.ShouldBeNull(); + + // Verify PUBCOMP was sent. + var written = ms.ToArray(); + written.Length.ShouldBe(4); + written[0].ShouldBe(MqttPacket.PubComp); // 0x70 + written[1].ShouldBe((byte)0x02); + written[2].ShouldBe((byte)0x00); + written[3].ShouldBe((byte)0x07); + + // Message should be removed from QoS2Pending. + c.Mqtt.QoS2Pending.ShouldNotContainKey((ushort)7); + } + + [Fact] + public void Parser_PubAck_ShouldRemoveFromPending() + { + var c = CreateConnectedMqttClient(); + + // Pre-populate pending. + c.Mqtt!.Pending[(ushort)3] = null; + c.Mqtt.Pending.ShouldContainKey((ushort)3); + + // PUBACK: [0x40] [0x02] [0x00] [0x03] + var buf = new byte[] { 0x40, 0x02, 0x00, 0x03 }; + var err = MqttParser.Parse(c, buf, buf.Length); + err.ShouldBeNull(); + + c.Mqtt.Pending.ShouldNotContainKey((ushort)3); + } + + [Fact] + public void Parser_PubRec_ShouldSendPubRel() + { + var c = CreateConnectedMqttClient(); + + // Pre-populate pending. + c.Mqtt!.Pending[(ushort)9] = null; + + // PUBREC: [0x50] [0x02] [0x00] [0x09] + var buf = new byte[] { 0x50, 0x02, 0x00, 0x09 }; + var err = MqttParser.Parse(c, buf, buf.Length); + err.ShouldBeNull(); + + // Should have sent PUBREL: [0x62] [0x02] [0x00] [0x09] + var ms = GetStream(c); + var written = ms.ToArray(); + written.Length.ShouldBe(4); + written[0].ShouldBe((byte)0x62); // PUBREL with bit 1 set + written[3].ShouldBe((byte)0x09); + + // Pending should be cleared. + c.Mqtt.Pending.ShouldNotContainKey((ushort)9); + } + + [Fact] + public void Parser_PubComp_ShouldRemoveFromPending() + { + var c = CreateConnectedMqttClient(); + + c.Mqtt!.Pending[(ushort)15] = null; + + // PUBCOMP: [0x70] [0x02] [0x00] [0x0F] + var buf = new byte[] { 0x70, 0x02, 0x00, 0x0F }; + var err = MqttParser.Parse(c, buf, buf.Length); + err.ShouldBeNull(); + + c.Mqtt.Pending.ShouldNotContainKey((ushort)15); } // ========================================================================= diff --git a/reports/current.md b/reports/current.md index 42ba672..21756a4 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-03-01 21:04:38 UTC +Generated: 2026-03-01 21:08:29 UTC ## Modules (12 total)