feat(mqtt): implement QoS 1 PUBACK and QoS 2 PUBREC/PUBREL/PUBCOMP handlers

QoS 1: deliver message then send PUBACK. QoS 2: store in-memory
pending PUBREL, send PUBREC; on PUBREL deliver and send PUBCOMP.
Wire all four PI-packet dispatches (PUBACK/PUBREC/PUBREL/PUBCOMP)
in parser. Add QoS2Pending dictionary to MqttHandler for in-memory
QoS 2 tracking. 6 new tests for QoS 1/2 flows including full
QoS 2 handshake.
This commit is contained in:
Joseph Doherty
2026-03-01 16:08:28 -05:00
parent 715367b9ea
commit b465d095e3
5 changed files with 307 additions and 22 deletions

View File

@@ -91,6 +91,16 @@ internal sealed class MqttHandler
/// </summary>
public bool DowngradeQoS2Sub { get; set; }
// ------------------------------------------------------------------
// QoS 2 in-memory pending store (full JetStream persistence in Task 6)
// ------------------------------------------------------------------
/// <summary>
/// In-memory store for QoS 2 inbound messages awaiting PUBREL.
/// Keyed by packet identifier. Replaces JetStream $MQTT_qos2in stream.
/// </summary>
public Dictionary<ushort, MqttPublishInfo> QoS2Pending { get; } = new();
// ------------------------------------------------------------------
// Parse state (used by the read-loop MQTT byte-stream parser)
// ------------------------------------------------------------------

View File

@@ -313,21 +313,146 @@ internal static class MqttPacketHandlers
/// <summary>
/// Processes an inbound MQTT PUBLISH packet.
/// QoS 0: routes immediately via NATS pub/sub. QoS 1/2: deferred to Task 5.
/// QoS 0: routes immediately. QoS 1: deliver + PUBACK. QoS 2: store + PUBREC.
/// Mirrors Go <c>mqttProcessPub()</c>.
/// </summary>
public static Exception? ProcessPub(ClientConnection c, MqttPublishInfo pp)
{
if (pp.Qos == 0)
var payload = pp.Msg ?? [];
switch (pp.Qos)
{
case 0:
c.ProcessInboundClientMsg(payload);
return null;
case 1:
// QoS 1: deliver immediately, then send PUBACK.
c.ProcessInboundClientMsg(payload);
EnqueuePubResponse(c, MqttPacket.PubAck, pp.Pi);
return null;
case 2:
// QoS 2: store message pending PUBREL, then send PUBREC.
// Full JetStream persistence deferred to Task 6 — uses in-memory store.
lock (c)
{
c.Mqtt!.QoS2Pending[pp.Pi] = pp;
}
EnqueuePubResponse(c, MqttPacket.PubRec, pp.Pi);
return null;
default:
return new InvalidOperationException($"invalid QoS: {pp.Qos}");
}
}
// =========================================================================
// PUBACK / PUBREC / PUBREL / PUBCOMP handling
// =========================================================================
/// <summary>
/// Parses a packet-identifier-only packet (PUBACK, PUBREC, PUBREL, PUBCOMP).
/// Returns (pi, error). Mirrors Go <c>mqttParsePIPacket()</c>.
/// </summary>
public static (ushort pi, Exception? err) ParsePiPacket(MqttReader r)
{
ushort pi;
try { pi = r.ReadUInt16("packet identifier"); }
catch (Exception ex) { return (0, ex); }
if (pi == 0)
return (0, new InvalidOperationException("packet identifier must not be 0"));
return (pi, null);
}
/// <summary>
/// Processes an inbound PUBREL packet (QoS 2 phase 2).
/// Retrieves the stored QoS 2 message, delivers it, and sends PUBCOMP.
/// Mirrors Go <c>mqttProcessPubRel()</c>.
/// </summary>
public static Exception? ProcessPubRel(ClientConnection c, ushort pi)
{
// Always send PUBCOMP, even if message not found (idempotency).
MqttPublishInfo? pp;
lock (c)
{
c.Mqtt!.QoS2Pending.Remove(pi, out pp);
}
// Send PUBCOMP.
EnqueuePubResponse(c, MqttPacket.PubComp, pi);
// Deliver the stored message if found.
if (pp != null)
{
// QoS 0: immediate delivery via internal routing.
var payload = pp.Msg ?? [];
c.ProcessInboundClientMsg(payload);
}
return null;
}
// QoS 1/2: deferred to Task 5 (JetStream integration).
return new NotImplementedException($"PUBLISH QoS {pp.Qos} not yet implemented");
/// <summary>
/// Processes an inbound PUBACK (client acknowledges QoS 1 message from server).
/// Removes from pending tracking. Mirrors Go <c>mqttProcessPubAck()</c>.
/// </summary>
public static Exception? ProcessPubAck(ClientConnection c, ushort pi)
{
lock (c)
{
c.Mqtt!.Pending.Remove(pi);
}
return null;
}
/// <summary>
/// Processes an inbound PUBREC (client acknowledges QoS 2 message from server, phase 1).
/// Transitions to PUBREL phase. Mirrors Go <c>mqttProcessPubRec()</c>.
/// </summary>
public static Exception? ProcessPubRec(ClientConnection c, ushort pi)
{
// In the full implementation, this would store a PUBREL in JetStream.
// For now, send PUBREL immediately and remove from pending.
lock (c)
{
c.Mqtt!.Pending.Remove(pi);
}
EnqueuePubResponse(c, MqttPacket.PubRel, pi);
return null;
}
/// <summary>
/// Processes an inbound PUBCOMP (client acknowledges PUBREL, QoS 2 complete).
/// Mirrors Go <c>mqttProcessPubComp()</c>.
/// </summary>
public static Exception? ProcessPubComp(ClientConnection c, ushort pi)
{
// Final cleanup — remove any tracking for this PI.
lock (c)
{
c.Mqtt!.Pending.Remove(pi);
}
return null;
}
/// <summary>
/// Enqueues a PUBACK/PUBREC/PUBREL/PUBCOMP response packet.
/// All four share the same 4-byte format: [type] [0x02] [PI high] [PI low].
/// PUBREL has bit 1 set in byte 0 per MQTT spec [3.6.1-1].
/// Mirrors Go <c>mqttEnqueuePubResponse()</c>.
/// </summary>
public static void EnqueuePubResponse(ClientConnection c, byte packetType, ushort pi)
{
var b0 = packetType;
// PUBREL requires fixed header bits 0010 per MQTT spec.
if (packetType == MqttPacket.PubRel)
b0 |= 0x02;
ReadOnlySpan<byte> packet = [b0, 0x02, (byte)(pi >> 8), (byte)(pi & 0xFF)];
lock (c)
{
c.EnqueueProto(packet);
}
}
// =========================================================================

View File

@@ -92,24 +92,36 @@ internal static class MqttParser
break;
case MqttPacket.PubAck:
// TODO: Task 5 — process PUBACK
err = new NotImplementedException("PUBACK not yet implemented");
{
var (ackPi, ackErr) = MqttPacketHandlers.ParsePiPacket(r);
if (ackErr != null) { err = ackErr; break; }
err = MqttPacketHandlers.ProcessPubAck(c, ackPi);
break;
}
case MqttPacket.PubRec:
// TODO: Task 5 — process PUBREC
err = new NotImplementedException("PUBREC not yet implemented");
{
var (recPi, recErr) = MqttPacketHandlers.ParsePiPacket(r);
if (recErr != null) { err = recErr; break; }
err = MqttPacketHandlers.ProcessPubRec(c, recPi);
break;
}
case MqttPacket.PubRel:
// TODO: Task 5 — process PUBREL
err = new NotImplementedException("PUBREL not yet implemented");
{
var (relPi, relErr) = MqttPacketHandlers.ParsePiPacket(r);
if (relErr != null) { err = relErr; break; }
err = MqttPacketHandlers.ProcessPubRel(c, relPi);
break;
}
case MqttPacket.PubComp:
// TODO: Task 5 — process PUBCOMP
err = new NotImplementedException("PUBCOMP not yet implemented");
{
var (compPi, compErr) = MqttPacketHandlers.ParsePiPacket(r);
if (compErr != null) { err = compErr; break; }
err = MqttPacketHandlers.ProcessPubComp(c, compPi);
break;
}
case MqttPacket.Sub:
var (subPi, subFilters, subErr) = MqttPacketHandlers.ParseSubsOrUnsubs(r, b, pl, isSub: true);

View File

@@ -174,14 +174,14 @@ public sealed class MqttPubSubTests
}
[Fact]
public void Parser_PublishQoS1_ShouldReturnNotImplemented()
public void Parser_PublishQoS1_ShouldSendPubAck()
{
var c = CreateConnectedMqttClient();
// PUBLISH QoS 1: type=0x32, topic="t", PI=1, payload="x"
// PUBLISH QoS 1: type=0x32, topic="t", PI=5, payload="x"
var data = new List<byte>();
data.Add(0x00); data.Add(0x01); data.Add((byte)'t'); // topic
data.Add(0x00); data.Add(0x01); // PI = 1
data.Add(0x00); data.Add(0x05); // PI = 5
data.Add((byte)'x'); // payload
var buf = new List<byte>();
@@ -190,8 +190,146 @@ public sealed class MqttPubSubTests
buf.AddRange(data);
var err = MqttParser.Parse(c, buf.ToArray(), buf.Count);
err.ShouldNotBeNull();
err.ShouldBeOfType<NotImplementedException>();
err.ShouldBeNull();
// Verify PUBACK: [0x40] [0x02] [PI high] [PI low]
var ms = GetStream(c);
var written = ms.ToArray();
written.Length.ShouldBe(4);
written[0].ShouldBe(MqttPacket.PubAck); // 0x40
written[1].ShouldBe((byte)0x02);
written[2].ShouldBe((byte)0x00); // PI high
written[3].ShouldBe((byte)0x05); // PI low
}
[Fact]
public void Parser_PublishQoS2_ShouldSendPubRec()
{
var c = CreateConnectedMqttClient();
// PUBLISH QoS 2: type=0x34, topic="t", PI=10, payload="y"
var data = new List<byte>();
data.Add(0x00); data.Add(0x01); data.Add((byte)'t'); // topic
data.Add(0x00); data.Add(0x0A); // PI = 10
data.Add((byte)'y'); // payload
var buf = new List<byte>();
buf.Add((byte)(MqttPacket.Pub | MqttPubFlag.QoS2)); // 0x34
buf.Add((byte)data.Count);
buf.AddRange(data);
var err = MqttParser.Parse(c, buf.ToArray(), buf.Count);
err.ShouldBeNull();
// Verify PUBREC: [0x50] [0x02] [PI high] [PI low]
var ms = GetStream(c);
var written = ms.ToArray();
written.Length.ShouldBe(4);
written[0].ShouldBe(MqttPacket.PubRec); // 0x50
written[1].ShouldBe((byte)0x02);
written[2].ShouldBe((byte)0x00); // PI high
written[3].ShouldBe((byte)0x0A); // PI low
// Message should be stored in QoS2Pending.
c.Mqtt!.QoS2Pending.ShouldContainKey((ushort)10);
}
[Fact]
public void Parser_QoS2_FullHandshake_PubRecPubRelPubComp()
{
var c = CreateConnectedMqttClient();
// Step 1: PUBLISH QoS 2 → PUBREC
var pubData = new List<byte>();
pubData.Add(0x00); pubData.Add(0x01); pubData.Add((byte)'t');
pubData.Add(0x00); pubData.Add(0x07); // PI = 7
pubData.AddRange(Encoding.UTF8.GetBytes("qos2msg"));
var pubBuf = new List<byte>();
pubBuf.Add((byte)(MqttPacket.Pub | MqttPubFlag.QoS2));
pubBuf.Add((byte)pubData.Count);
pubBuf.AddRange(pubData);
var err = MqttParser.Parse(c, pubBuf.ToArray(), pubBuf.Count);
err.ShouldBeNull();
c.Mqtt!.QoS2Pending.ShouldContainKey((ushort)7);
// Reset stream to capture only PUBCOMP.
var ms = GetStream(c);
ms.SetLength(0);
// Step 2: PUBREL from client → PUBCOMP
// PUBREL: [0x62] [0x02] [PI high] [PI low]
var pubrelBuf = new byte[] { 0x62, 0x02, 0x00, 0x07 };
err = MqttParser.Parse(c, pubrelBuf, pubrelBuf.Length);
err.ShouldBeNull();
// Verify PUBCOMP was sent.
var written = ms.ToArray();
written.Length.ShouldBe(4);
written[0].ShouldBe(MqttPacket.PubComp); // 0x70
written[1].ShouldBe((byte)0x02);
written[2].ShouldBe((byte)0x00);
written[3].ShouldBe((byte)0x07);
// Message should be removed from QoS2Pending.
c.Mqtt.QoS2Pending.ShouldNotContainKey((ushort)7);
}
[Fact]
public void Parser_PubAck_ShouldRemoveFromPending()
{
var c = CreateConnectedMqttClient();
// Pre-populate pending.
c.Mqtt!.Pending[(ushort)3] = null;
c.Mqtt.Pending.ShouldContainKey((ushort)3);
// PUBACK: [0x40] [0x02] [0x00] [0x03]
var buf = new byte[] { 0x40, 0x02, 0x00, 0x03 };
var err = MqttParser.Parse(c, buf, buf.Length);
err.ShouldBeNull();
c.Mqtt.Pending.ShouldNotContainKey((ushort)3);
}
[Fact]
public void Parser_PubRec_ShouldSendPubRel()
{
var c = CreateConnectedMqttClient();
// Pre-populate pending.
c.Mqtt!.Pending[(ushort)9] = null;
// PUBREC: [0x50] [0x02] [0x00] [0x09]
var buf = new byte[] { 0x50, 0x02, 0x00, 0x09 };
var err = MqttParser.Parse(c, buf, buf.Length);
err.ShouldBeNull();
// Should have sent PUBREL: [0x62] [0x02] [0x00] [0x09]
var ms = GetStream(c);
var written = ms.ToArray();
written.Length.ShouldBe(4);
written[0].ShouldBe((byte)0x62); // PUBREL with bit 1 set
written[3].ShouldBe((byte)0x09);
// Pending should be cleared.
c.Mqtt.Pending.ShouldNotContainKey((ushort)9);
}
[Fact]
public void Parser_PubComp_ShouldRemoveFromPending()
{
var c = CreateConnectedMqttClient();
c.Mqtt!.Pending[(ushort)15] = null;
// PUBCOMP: [0x70] [0x02] [0x00] [0x0F]
var buf = new byte[] { 0x70, 0x02, 0x00, 0x0F };
var err = MqttParser.Parse(c, buf, buf.Length);
err.ShouldBeNull();
c.Mqtt.Pending.ShouldNotContainKey((ushort)15);
}
// =========================================================================

View File

@@ -1,6 +1,6 @@
# NATS .NET Porting Status Report
Generated: 2026-03-01 21:04:38 UTC
Generated: 2026-03-01 21:08:29 UTC
## Modules (12 total)