From 715367b9ea7362e655305e6d1673afe200508fd1 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 16:04:37 -0500 Subject: [PATCH] feat(mqtt): implement PUBLISH QoS 0, SUBSCRIBE, and UNSUBSCRIBE handlers Add ParsePub, ParseSubsOrUnsubs, ProcessPub (QoS 0), ProcessSubs, ProcessUnsubs, EnqueueSubAck, and EnqueueUnsubAck to MqttPacketHandlers. Wire PUB/SUB/UNSUB dispatch cases in MqttParser. Add ReadSlice to MqttReader for raw payload extraction. 18 new unit tests covering parsing, flags, error cases, QoS downgrade, and full flow. 1 new integration test verifying SUBSCRIBE handshake over TCP. --- .../Mqtt/MqttPacketHandlers.cs | 276 +++++++++ .../ZB.MOM.NatsNet.Server/Mqtt/MqttParser.cs | 16 +- .../ZB.MOM.NatsNet.Server/Mqtt/MqttReader.cs | 17 + .../ServerBootTests.cs | 98 ++++ .../Mqtt/MqttParserTests.cs | 29 +- .../Mqtt/MqttPubSubTests.cs | 530 ++++++++++++++++++ reports/current.md | 2 +- 7 files changed, 947 insertions(+), 21 deletions(-) create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Mqtt/MqttPubSubTests.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttPacketHandlers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttPacketHandlers.cs index 7e581db..366df7a 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttPacketHandlers.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttPacketHandlers.cs @@ -237,4 +237,280 @@ internal static class MqttPacketHandlers // Close the connection cleanly. c.CloseConnection(ClosedState.ClientClosed); } + + // ========================================================================= + // PUBLISH parsing + processing + // ========================================================================= + + /// + /// Parses an MQTT PUBLISH packet from the reader. + /// is the remaining length, is the lower + /// nibble of the first byte (DUP/QoS/RETAIN). + /// Mirrors Go mqttParsePub(). + /// + public static (MqttPublishInfo? pp, Exception? err) ParsePub( + MqttReader r, int pl, byte flags, bool rejectQoS2) + { + var start = r.Position; + + // Extract QoS from flags bits 1-2. + var qos = (byte)((flags & MqttPubFlag.QoS) >> 1); + if (qos > 2) + return (null, new InvalidOperationException($"QoS value must be 0, 1, or 2, got {qos}")); + if (qos == 2 && rejectQoS2) + return (null, new InvalidOperationException("QoS-2 PUBLISH rejected by server policy")); + + // Topic name (2-byte length prefix + UTF-8 bytes). + byte[] topicBytes; + try { topicBytes = r.ReadBytes("topic", copy: false); } + catch (Exception ex) { return (null, ex); } + + if (topicBytes.Length == 0) + return (null, new InvalidOperationException("empty topic in PUBLISH")); + + // Convert MQTT topic to NATS subject. + byte[] subjectBytes; + try { subjectBytes = MqttSubjectConverter.MqttTopicToNatsPubSubject(topicBytes); } + catch (Exception ex) { return (null, ex); } + + // Packet identifier (QoS > 0 only). + ushort pi = 0; + if (qos > 0) + { + try { pi = r.ReadUInt16("packet identifier"); } + catch (Exception ex) { return (null, ex); } + if (pi == 0) + return (null, new InvalidOperationException("packet identifier must not be 0 for QoS > 0")); + } + + // Payload = remaining bytes in the packet. + var consumed = r.Position - start; + var payloadLen = pl - consumed; + byte[]? payload = null; + if (payloadLen > 0) + { + try { payload = r.ReadSlice(payloadLen, "publish payload"); } + catch (Exception ex) { return (null, ex); } + } + else if (payloadLen < 0) + { + return (null, new InvalidOperationException("PUBLISH packet payload length underflow")); + } + + var pp = new MqttPublishInfo + { + Topic = Encoding.UTF8.GetString(topicBytes), + Subject = Encoding.UTF8.GetString(subjectBytes), + Msg = payload, + Qos = qos, + Retain = (flags & MqttPubFlag.Retain) != 0, + Dup = (flags & MqttPubFlag.Dup) != 0, + Pi = pi, + }; + + return (pp, null); + } + + /// + /// Processes an inbound MQTT PUBLISH packet. + /// QoS 0: routes immediately via NATS pub/sub. QoS 1/2: deferred to Task 5. + /// Mirrors Go mqttProcessPub(). + /// + public static Exception? ProcessPub(ClientConnection c, MqttPublishInfo pp) + { + if (pp.Qos == 0) + { + // QoS 0: immediate delivery via internal routing. + var payload = pp.Msg ?? []; + c.ProcessInboundClientMsg(payload); + return null; + } + + // QoS 1/2: deferred to Task 5 (JetStream integration). + return new NotImplementedException($"PUBLISH QoS {pp.Qos} not yet implemented"); + } + + // ========================================================================= + // SUBSCRIBE / UNSUBSCRIBE parsing + processing + // ========================================================================= + + /// + /// Parses SUBSCRIBE or UNSUBSCRIBE packet filters from the reader. + /// Returns (packetId, filters, error). + /// Mirrors Go mqttParseSubsOrUnsubs(). + /// + public static (ushort pi, List? filters, Exception? err) ParseSubsOrUnsubs( + MqttReader r, byte firstByte, int pl, bool isSub) + { + var kind = isSub ? "SUBSCRIBE" : "UNSUBSCRIBE"; + + // Validate reserved flags (must be 0x02 per spec). + var expectedFlags = isSub ? MqttConst.SubscribeFlags : MqttConst.UnsubscribeFlags; + var actualFlags = (byte)(firstByte & MqttPacket.FlagMask); + if (actualFlags != expectedFlags) + return (0, null, new InvalidOperationException( + $"{kind} reserved flags must be 0x{expectedFlags:X2}, got 0x{actualFlags:X2}")); + + var end = r.Position + pl; + + // Packet identifier. + ushort pi; + try { pi = r.ReadUInt16("packet identifier"); } + catch (Exception ex) { return (0, null, ex); } + if (pi == 0) + return (0, null, new InvalidOperationException("packet identifier must not be 0")); + + var filters = new List(); + while (r.Position < end) + { + // Topic filter (2-byte length + UTF-8 bytes). + byte[] topicBytes; + try { topicBytes = r.ReadBytes("topic filter", copy: false); } + catch (Exception ex) { return (0, null, ex); } + + if (topicBytes.Length == 0) + return (0, null, new InvalidOperationException("empty topic filter")); + + // Convert MQTT filter to NATS subject. + string natsSubject; + try + { + var subjBytes = MqttSubjectConverter.MqttFilterToNatsSubject(topicBytes); + natsSubject = Encoding.UTF8.GetString(subjBytes); + } + catch (Exception ex) { return (0, null, ex); } + + byte qos = 0; + if (isSub) + { + try { qos = r.ReadByte("QoS"); } + catch (Exception ex) { return (0, null, ex); } + if (qos > 2) + return (0, null, new InvalidOperationException($"invalid QoS value: {qos}")); + } + + filters.Add(new MqttFilter + { + Filter = natsSubject, + Qos = qos, + Ttopic = topicBytes, + }); + } + + if (filters.Count == 0) + return (0, null, new InvalidOperationException( + $"{kind} must contain at least one topic filter")); + + return (pi, filters, null); + } + + /// + /// Processes parsed SUBSCRIBE filters: creates NATS subscriptions and sends SUBACK. + /// Mirrors Go mqttProcessSubs() (minimal — session/JetStream deferred to Task 5-6). + /// + public static Exception? ProcessSubs(ClientConnection c, ushort pi, List filters) + { + var mqtt = c.Mqtt!; + + foreach (var f in filters) + { + // Downgrade QoS 2 if configured. + if (f.Qos == 2 && mqtt.DowngradeQoS2Sub) + f.Qos = 1; + + // Create NATS subscription using the filter as SID. + var sid = Encoding.UTF8.GetBytes(f.Filter); + var subject = Encoding.UTF8.GetBytes(f.Filter); + var (_, err) = c.ProcessSub(subject, queue: null, sid: sid, noForward: false); + if (err != null) + { + // Mark as failure in SUBACK. + f.Qos = MqttConst.SubAckFailure; + } + } + + EnqueueSubAck(c, pi, filters); + return null; + } + + /// + /// Processes parsed UNSUBSCRIBE filters: removes subscriptions and sends UNSUBACK. + /// Mirrors Go mqttProcessUnsubs() (minimal — session cleanup deferred to Task 6). + /// + public static Exception? ProcessUnsubs(ClientConnection c, ushort pi, List filters) + { + foreach (var f in filters) + { + var sid = Encoding.UTF8.GetBytes(f.Filter); + c.RemoveSubBySid(sid); + } + + EnqueueUnsubAck(c, pi); + return null; + } + + // ========================================================================= + // SUBACK / UNSUBACK encoding + // ========================================================================= + + /// + /// Enqueues a SUBACK packet. Mirrors Go mqttEnqueueSubAck(). + /// Format: [0x90] [var-int length] [PI high] [PI low] [QoS per filter...] + /// + public static void EnqueueSubAck(ClientConnection c, ushort pi, List filters) + { + var payloadLen = 2 + filters.Count; + var packet = new byte[1 + VarIntSize(payloadLen) + payloadLen]; + var pos = 0; + packet[pos++] = MqttPacket.SubAck; + pos += WriteVarInt(packet.AsSpan(pos), payloadLen); + packet[pos++] = (byte)(pi >> 8); + packet[pos++] = (byte)(pi & 0xFF); + foreach (var f in filters) + packet[pos++] = f.Qos; + + lock (c) + { + c.EnqueueProto(packet.AsSpan(0, pos)); + } + } + + /// + /// Enqueues an UNSUBACK packet. Mirrors Go mqttEnqueueUnsubAck(). + /// Format: [0xB0] [0x02] [PI high] [PI low] + /// + public static void EnqueueUnsubAck(ClientConnection c, ushort pi) + { + ReadOnlySpan unsuback = + [MqttPacket.UnsubAck, 0x02, (byte)(pi >> 8), (byte)(pi & 0xFF)]; + lock (c) + { + c.EnqueueProto(unsuback); + } + } + + // ========================================================================= + // Helpers + // ========================================================================= + + private static int VarIntSize(int value) + { + if (value < 0x80) return 1; + if (value < 0x4000) return 2; + if (value < 0x200000) return 3; + return 4; + } + + private static int WriteVarInt(Span buf, int value) + { + var pos = 0; + do + { + var b = (byte)(value & 0x7F); + value >>= 7; + if (value > 0) b |= 0x80; + buf[pos++] = b; + } while (value > 0); + return pos; + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttParser.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttParser.cs index 892cf8e..007fd90 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttParser.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttParser.cs @@ -85,8 +85,10 @@ internal static class MqttParser switch (pt) { case MqttPacket.Pub: - // TODO: Task 4 — MqttParsePub + MqttProcessPub - err = new NotImplementedException("PUBLISH not yet implemented"); + var pubFlags = (byte)(b & MqttPacket.FlagMask); + var (pp, pubErr) = MqttPacketHandlers.ParsePub(r, pl, pubFlags, mqtt.RejectQoS2Pub); + if (pubErr != null) { err = pubErr; break; } + err = MqttPacketHandlers.ProcessPub(c, pp!); break; case MqttPacket.PubAck: @@ -110,13 +112,15 @@ internal static class MqttParser break; case MqttPacket.Sub: - // TODO: Task 4 — MqttParseSubs + MqttProcessSubs - err = new NotImplementedException("SUBSCRIBE not yet implemented"); + var (subPi, subFilters, subErr) = MqttPacketHandlers.ParseSubsOrUnsubs(r, b, pl, isSub: true); + if (subErr != null) { err = subErr; break; } + err = MqttPacketHandlers.ProcessSubs(c, subPi, subFilters!); break; case MqttPacket.Unsub: - // TODO: Task 4 — MqttParseUnsubs + MqttProcessUnsubs - err = new NotImplementedException("UNSUBSCRIBE not yet implemented"); + var (unsubPi, unsubFilters, unsubErr) = MqttPacketHandlers.ParseSubsOrUnsubs(r, b, pl, isSub: false); + if (unsubErr != null) { err = unsubErr; break; } + err = MqttPacketHandlers.ProcessUnsubs(c, unsubPi, unsubFilters!); break; case MqttPacket.Ping: diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttReader.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttReader.cs index 3bac6d4..b3e314c 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttReader.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttReader.cs @@ -128,4 +128,21 @@ internal sealed class MqttReader Position += 2; return value; } + + /// + /// Reads exactly raw bytes without a length prefix. + /// Used for PUBLISH payloads where the length is known from the remaining length field. + /// + public byte[] ReadSlice(int count, string field) + { + if (count == 0) + return []; + + if (Position + count > _buffer.Length) + throw new InvalidOperationException($"error reading {field}: {nameof(EndOfStreamException)}"); + + var start = Position; + Position += count; + return _buffer[start..Position]; + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/ServerBootTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/ServerBootTests.cs index 800852f..44c2aae 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/ServerBootTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/ServerBootTests.cs @@ -250,6 +250,104 @@ public sealed class ServerBootTests : IDisposable return result.ToArray(); } + /// + /// End-to-end: CONNECT → SUBSCRIBE → verify SUBACK over the wire. + /// + [Fact] + public async Task MqttBoot_SubscribeHandshake_ShouldReceiveSubAck() + { + var opts = new ServerOptions + { + Host = "127.0.0.1", + Port = 0, + Mqtt = { Port = -1, Host = "127.0.0.1" }, + }; + + var (server, err) = NatsServer.NewServer(opts); + err.ShouldBeNull(); + server.ShouldNotBeNull(); + + try + { + server!.Start(); + var mqttAddr = server.MqttAddr(); + mqttAddr.ShouldNotBeNull(); + + using var tcp = new System.Net.Sockets.TcpClient(); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + await tcp.ConnectAsync(mqttAddr!.Address, mqttAddr.Port, cts.Token); + + var stream = tcp.GetStream(); + + // 1. CONNECT → CONNACK + var connectPacket = BuildMqttConnectPacket("sub-test"); + await stream.WriteAsync(connectPacket, cts.Token); + await stream.FlushAsync(cts.Token); + + var connack = new byte[4]; + await ReadExactAsync(stream, connack, cts.Token); + connack[0].ShouldBe((byte)0x20); // CONNACK + connack[3].ShouldBe((byte)0x00); // Accepted + + // 2. SUBSCRIBE to "test/sub" QoS 0 → SUBACK + var subscribePacket = BuildMqttSubscribePacket(packetId: 1, topic: "test/sub", qos: 0); + await stream.WriteAsync(subscribePacket, cts.Token); + await stream.FlushAsync(cts.Token); + + // SUBACK: [0x90] [0x03] [PI high] [PI low] [granted QoS] + var suback = new byte[5]; + await ReadExactAsync(stream, suback, cts.Token); + suback[0].ShouldBe((byte)0x90); // SUBACK + suback[1].ShouldBe((byte)0x03); // remaining length = 3 + suback[2].ShouldBe((byte)0x00); // PI high + suback[3].ShouldBe((byte)0x01); // PI low = 1 + suback[4].ShouldBe((byte)0x00); // granted QoS 0 + } + finally + { + server!.Shutdown(); + } + } + + /// Builds a minimal MQTT SUBSCRIBE packet. + private static byte[] BuildMqttSubscribePacket(ushort packetId, string topic, byte qos) + { + var topicBytes = System.Text.Encoding.UTF8.GetBytes(topic); + var payload = new List(); + payload.Add((byte)(packetId >> 8)); + payload.Add((byte)(packetId & 0xFF)); + payload.Add((byte)(topicBytes.Length >> 8)); + payload.Add((byte)(topicBytes.Length & 0xFF)); + payload.AddRange(topicBytes); + payload.Add(qos); + + var result = new List(); + result.Add(0x82); // SUBSCRIBE + flags 0x02 + var remLen = payload.Count; + do + { + var b = (byte)(remLen & 0x7F); + remLen >>= 7; + if (remLen > 0) b |= 0x80; + result.Add(b); + } while (remLen > 0); + result.AddRange(payload); + return result.ToArray(); + } + + /// Reads exactly .Length bytes from the stream. + private static async Task ReadExactAsync(System.Net.Sockets.NetworkStream stream, byte[] buffer, CancellationToken ct) + { + var totalRead = 0; + while (totalRead < buffer.Length) + { + var n = await stream.ReadAsync(buffer.AsMemory(totalRead, buffer.Length - totalRead), ct); + if (n == 0) break; + totalRead += n; + } + totalRead.ShouldBe(buffer.Length); + } + /// /// Validates that Shutdown() after Start() completes cleanly. /// Uses DontListen to skip TCP binding — tests lifecycle only. diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Mqtt/MqttParserTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Mqtt/MqttParserTests.cs index 600b7d1..5d0230b 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Mqtt/MqttParserTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Mqtt/MqttParserTests.cs @@ -24,7 +24,7 @@ public sealed class MqttParserTests /// private static ClientConnection CreateMqttClient() { - var c = new ClientConnection(ClientKind.Client); + var c = new ClientConnection(ClientKind.Client, nc: new MemoryStream()); c.InitMqtt(new MqttHandler()); return c; } @@ -164,37 +164,38 @@ public sealed class MqttParserTests [Fact] public void Parse_SingleByteRemainingLength_ShouldWork() { - // SUB packet with remaining length = 5 (single byte < 128) - // After CONNECT is received, a SUB packet should parse the remaining length correctly. + // SUBSCRIBE with remaining length = 6 (single byte < 128). + // Proves single-byte remaining length decoding works. var c = CreateMqttClient(); c.Flags |= ClientFlags.ConnectReceived; - // SUBSCRIBE: type=0x82, remaining len=5, then 5 bytes of payload - var buf = new byte[] { 0x82, 0x05, 0x00, 0x01, 0x00, 0x01, 0x74 }; + // SUBSCRIBE: type=0x82, remlen=6, PI=1, filter="t" (len=1), QoS=0 + var buf = new byte[] { 0x82, 0x06, 0x00, 0x01, 0x00, 0x01, (byte)'t', 0x00 }; var err = MqttParser.Parse(c, buf, buf.Length); - // Will hit NotImplementedException for SUBSCRIBE — that's fine, it proves parsing worked. - err.ShouldNotBeNull(); - err.ShouldBeOfType(); + err.ShouldBeNull(); } [Fact] public void Parse_TwoByteRemainingLength_ShouldWork() { + // PUBLISH QoS 0 with remaining length = 200 → encoded as [0xC8, 0x01]. + // Proves two-byte remaining length decoding works. var c = CreateMqttClient(); c.Flags |= ClientFlags.ConnectReceived; - // Remaining length = 200 → encoded as [0xC8, 0x01] - // (200 & 0x7F) | 0x80 = 0xC8, 200 >> 7 = 1 → 0x01 // type(1) + remlen(2) + payload(200) = 203 bytes total. var buf = new byte[203]; - buf[0] = MqttPacket.Pub; + buf[0] = MqttPacket.Pub; // 0x30, QoS 0 buf[1] = 0xC8; buf[2] = 0x01; - // Remaining 200 bytes are zero (payload). + // Topic "t": length prefix (2 bytes) + 1 byte. + buf[3] = 0x00; + buf[4] = 0x01; + buf[5] = (byte)'t'; + // Bytes 6..202 are zero (197-byte payload). var err = MqttParser.Parse(c, buf, buf.Length); - err.ShouldNotBeNull(); - err.ShouldBeOfType(); // PUBLISH not yet implemented + err.ShouldBeNull(); } // ========================================================================= diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Mqtt/MqttPubSubTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Mqtt/MqttPubSubTests.cs new file mode 100644 index 0000000..ede803e --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Mqtt/MqttPubSubTests.cs @@ -0,0 +1,530 @@ +// Copyright 2020-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Mqtt; + +namespace ZB.MOM.NatsNet.Server.Tests.Mqtt; + +/// +/// Unit tests for MQTT PUBLISH, SUBSCRIBE, and UNSUBSCRIBE packet handling. +/// +public sealed class MqttPubSubTests +{ + private static ClientConnection CreateConnectedMqttClient() + { + var ms = new MemoryStream(); + var c = new ClientConnection(ClientKind.Client, nc: ms); + c.InitMqtt(new MqttHandler()); + c.Flags |= ClientFlags.ConnectReceived; + return c; + } + + private static MemoryStream GetStream(ClientConnection c) + { + return (MemoryStream)typeof(ClientConnection) + .GetField("_nc", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)! + .GetValue(c)!; + } + + // ========================================================================= + // PUBLISH parsing + // ========================================================================= + + [Fact] + public void ParsePub_QoS0_ShouldParseCorrectly() + { + var r = new MqttReader(); + // Topic "test/topic" + payload "hello" + var topic = Encoding.UTF8.GetBytes("test/topic"); + var payload = Encoding.UTF8.GetBytes("hello"); + var data = new List(); + data.Add((byte)(topic.Length >> 8)); + data.Add((byte)(topic.Length & 0xFF)); + data.AddRange(topic); + data.AddRange(payload); + r.Reset(data.ToArray()); + + byte flags = 0x00; // QoS 0, no retain, no dup + var (pp, err) = MqttPacketHandlers.ParsePub(r, data.Count, flags, rejectQoS2: false); + err.ShouldBeNull(); + pp.ShouldNotBeNull(); + pp!.Topic.ShouldBe("test/topic"); + pp.Subject.ShouldNotBeEmpty(); + pp.Qos.ShouldBe((byte)0); + pp.Pi.ShouldBe((ushort)0); + pp.Retain.ShouldBeFalse(); + pp.Dup.ShouldBeFalse(); + Encoding.UTF8.GetString(pp.Msg!).ShouldBe("hello"); + } + + [Fact] + public void ParsePub_QoS1_ShouldParsePacketId() + { + var r = new MqttReader(); + var topic = Encoding.UTF8.GetBytes("a/b"); + var data = new List(); + data.Add((byte)(topic.Length >> 8)); + data.Add((byte)(topic.Length & 0xFF)); + data.AddRange(topic); + data.Add(0x00); data.Add(0x07); // PI = 7 + data.AddRange(Encoding.UTF8.GetBytes("msg")); + r.Reset(data.ToArray()); + + byte flags = MqttPubFlag.QoS1; // QoS 1 = 0x02 + var (pp, err) = MqttPacketHandlers.ParsePub(r, data.Count, flags, rejectQoS2: false); + err.ShouldBeNull(); + pp!.Qos.ShouldBe((byte)1); + pp.Pi.ShouldBe((ushort)7); + } + + [Fact] + public void ParsePub_QoS2Rejected_ShouldReturnError() + { + var r = new MqttReader(); + var topic = Encoding.UTF8.GetBytes("t"); + var data = new List(); + data.Add(0x00); data.Add(0x01); + data.AddRange(topic); + data.Add(0x00); data.Add(0x01); // PI = 1 + r.Reset(data.ToArray()); + + byte flags = MqttPubFlag.QoS2; // QoS 2 = 0x04 + var (pp, err) = MqttPacketHandlers.ParsePub(r, data.Count, flags, rejectQoS2: true); + err.ShouldNotBeNull(); + err.Message.ShouldContain("QoS-2 PUBLISH rejected"); + } + + [Fact] + public void ParsePub_EmptyTopic_ShouldReturnError() + { + var r = new MqttReader(); + var data = new byte[] { 0x00, 0x00 }; // zero-length topic + r.Reset(data); + + var (pp, err) = MqttPacketHandlers.ParsePub(r, data.Length, 0x00, rejectQoS2: false); + err.ShouldNotBeNull(); + err.Message.ShouldContain("empty topic"); + } + + [Fact] + public void ParsePub_RetainAndDup_ShouldSetFlags() + { + var r = new MqttReader(); + var topic = Encoding.UTF8.GetBytes("t"); + var data = new List(); + data.Add(0x00); data.Add(0x01); + data.AddRange(topic); + r.Reset(data.ToArray()); + + byte flags = MqttPubFlag.Retain | MqttPubFlag.Dup; // 0x09 + var (pp, err) = MqttPacketHandlers.ParsePub(r, data.Count, flags, rejectQoS2: false); + err.ShouldBeNull(); + pp!.Retain.ShouldBeTrue(); + pp.Dup.ShouldBeTrue(); + } + + [Fact] + public void ParsePub_EmptyPayload_ShouldSucceed() + { + var r = new MqttReader(); + var topic = Encoding.UTF8.GetBytes("t"); + var data = new List(); + data.Add(0x00); data.Add(0x01); + data.AddRange(topic); + // No payload bytes. + r.Reset(data.ToArray()); + + var (pp, err) = MqttPacketHandlers.ParsePub(r, data.Count, 0x00, rejectQoS2: false); + err.ShouldBeNull(); + pp!.Msg.ShouldBeNull(); + } + + // ========================================================================= + // PUBLISH processing via parser + // ========================================================================= + + [Fact] + public void Parser_PublishQoS0_ShouldSucceed() + { + var c = CreateConnectedMqttClient(); + + // Build PUBLISH: type=0x30 (QoS 0), topic="test", payload="hi" + var topic = Encoding.UTF8.GetBytes("test"); + var payload = Encoding.UTF8.GetBytes("hi"); + var data = new List(); + data.Add(0x00); data.Add((byte)topic.Length); + data.AddRange(topic); + data.AddRange(payload); + + var buf = new List(); + buf.Add(MqttPacket.Pub); // 0x30 + buf.Add((byte)data.Count); + buf.AddRange(data); + + var err = MqttParser.Parse(c, buf.ToArray(), buf.Count); + err.ShouldBeNull(); + } + + [Fact] + public void Parser_PublishQoS1_ShouldReturnNotImplemented() + { + var c = CreateConnectedMqttClient(); + + // PUBLISH QoS 1: type=0x32, topic="t", PI=1, payload="x" + var data = new List(); + data.Add(0x00); data.Add(0x01); data.Add((byte)'t'); // topic + data.Add(0x00); data.Add(0x01); // PI = 1 + data.Add((byte)'x'); // payload + + var buf = new List(); + buf.Add((byte)(MqttPacket.Pub | MqttPubFlag.QoS1)); // 0x32 + buf.Add((byte)data.Count); + buf.AddRange(data); + + var err = MqttParser.Parse(c, buf.ToArray(), buf.Count); + err.ShouldNotBeNull(); + err.ShouldBeOfType(); + } + + // ========================================================================= + // SUBSCRIBE parsing + // ========================================================================= + + [Fact] + public void ParseSubs_SingleFilter_ShouldParseCorrectly() + { + var r = new MqttReader(); + var filter = Encoding.UTF8.GetBytes("test/topic"); + var data = new List(); + data.Add(0x00); data.Add(0x0A); // PI = 10 + data.Add((byte)(filter.Length >> 8)); + data.Add((byte)(filter.Length & 0xFF)); + data.AddRange(filter); + data.Add(0x01); // QoS 1 + r.Reset(data.ToArray()); + + var (pi, filters, err) = MqttPacketHandlers.ParseSubsOrUnsubs( + r, (byte)(MqttPacket.Sub | MqttConst.SubscribeFlags), data.Count, isSub: true); + err.ShouldBeNull(); + pi.ShouldBe((ushort)10); + filters.ShouldNotBeNull(); + filters!.Count.ShouldBe(1); + filters[0].Filter.ShouldNotBeEmpty(); + filters[0].Qos.ShouldBe((byte)1); + } + + [Fact] + public void ParseSubs_MultipleFilters_ShouldParseAll() + { + var r = new MqttReader(); + var f1 = Encoding.UTF8.GetBytes("a/b"); + var f2 = Encoding.UTF8.GetBytes("c/d"); + var data = new List(); + data.Add(0x00); data.Add(0x01); // PI = 1 + data.Add((byte)(f1.Length >> 8)); data.Add((byte)(f1.Length & 0xFF)); + data.AddRange(f1); + data.Add(0x00); // QoS 0 + data.Add((byte)(f2.Length >> 8)); data.Add((byte)(f2.Length & 0xFF)); + data.AddRange(f2); + data.Add(0x02); // QoS 2 + r.Reset(data.ToArray()); + + var (pi, filters, err) = MqttPacketHandlers.ParseSubsOrUnsubs( + r, (byte)(MqttPacket.Sub | MqttConst.SubscribeFlags), data.Count, isSub: true); + err.ShouldBeNull(); + filters!.Count.ShouldBe(2); + filters[0].Qos.ShouldBe((byte)0); + filters[1].Qos.ShouldBe((byte)2); + } + + [Fact] + public void ParseSubs_WrongFlags_ShouldReturnError() + { + var r = new MqttReader(); + var data = new byte[] { 0x00, 0x01, 0x00, 0x01, (byte)'t', 0x00 }; + r.Reset(data); + + // Wrong flags: 0x00 instead of 0x02 + var (_, _, err) = MqttPacketHandlers.ParseSubsOrUnsubs( + r, MqttPacket.Sub, data.Length, isSub: true); // flags = 0x00 + err.ShouldNotBeNull(); + err.Message.ShouldContain("reserved flags"); + } + + [Fact] + public void ParseSubs_ZeroPacketId_ShouldReturnError() + { + var r = new MqttReader(); + var data = new byte[] { 0x00, 0x00, 0x00, 0x01, (byte)'t', 0x00 }; + r.Reset(data); + + var (_, _, err) = MqttPacketHandlers.ParseSubsOrUnsubs( + r, (byte)(MqttPacket.Sub | MqttConst.SubscribeFlags), data.Length, isSub: true); + err.ShouldNotBeNull(); + err.Message.ShouldContain("packet identifier must not be 0"); + } + + [Fact] + public void ParseSubs_InvalidQoS_ShouldReturnError() + { + var r = new MqttReader(); + var data = new byte[] { 0x00, 0x01, 0x00, 0x01, (byte)'t', 0x03 }; // QoS=3, invalid + r.Reset(data); + + var (_, _, err) = MqttPacketHandlers.ParseSubsOrUnsubs( + r, (byte)(MqttPacket.Sub | MqttConst.SubscribeFlags), data.Length, isSub: true); + err.ShouldNotBeNull(); + err.Message.ShouldContain("invalid QoS"); + } + + // ========================================================================= + // SUBSCRIBE processing via parser + // ========================================================================= + + [Fact] + public void Parser_Subscribe_ShouldSendSubAck() + { + var c = CreateConnectedMqttClient(); + + // Build SUBSCRIBE: PI=5, filter="test/topic", QoS=1 + var filter = Encoding.UTF8.GetBytes("test/topic"); + var payload = new List(); + payload.Add(0x00); payload.Add(0x05); // PI = 5 + payload.Add((byte)(filter.Length >> 8)); + payload.Add((byte)(filter.Length & 0xFF)); + payload.AddRange(filter); + payload.Add(0x01); // QoS 1 + + var buf = new List(); + buf.Add((byte)(MqttPacket.Sub | MqttConst.SubscribeFlags)); // 0x82 + buf.Add((byte)payload.Count); + buf.AddRange(payload); + + var err = MqttParser.Parse(c, buf.ToArray(), buf.Count); + err.ShouldBeNull(); + + // Verify SUBACK was written. + var ms = GetStream(c); + var data = ms.ToArray(); + data.Length.ShouldBeGreaterThan(0); + data[0].ShouldBe(MqttPacket.SubAck); // 0x90 + // SUBACK payload: PI (2 bytes) + QoS per filter (1 byte) = 3 + // Format: [0x90] [0x03] [0x00] [0x05] [0x01] + data[1].ShouldBe((byte)0x03); // remaining length + data[2].ShouldBe((byte)0x00); // PI high + data[3].ShouldBe((byte)0x05); // PI low + data[4].ShouldBe((byte)0x01); // granted QoS 1 + } + + [Fact] + public void Parser_Subscribe_QoS2Downgrade_ShouldGrantQoS1() + { + var c = CreateConnectedMqttClient(); + c.Mqtt!.DowngradeQoS2Sub = true; + + var filter = Encoding.UTF8.GetBytes("a"); + var payload = new List(); + payload.Add(0x00); payload.Add(0x01); // PI = 1 + payload.Add(0x00); payload.Add((byte)filter.Length); + payload.AddRange(filter); + payload.Add(0x02); // QoS 2 requested + + var buf = new List(); + buf.Add((byte)(MqttPacket.Sub | MqttConst.SubscribeFlags)); + buf.Add((byte)payload.Count); + buf.AddRange(payload); + + var err = MqttParser.Parse(c, buf.ToArray(), buf.Count); + err.ShouldBeNull(); + + var ms = GetStream(c); + var data = ms.ToArray(); + // Last byte of SUBACK is the granted QoS, should be 1 (downgraded from 2). + data[^1].ShouldBe((byte)0x01); + } + + // ========================================================================= + // UNSUBSCRIBE parsing + // ========================================================================= + + [Fact] + public void ParseUnsubs_SingleFilter_ShouldParseCorrectly() + { + var r = new MqttReader(); + var filter = Encoding.UTF8.GetBytes("test/topic"); + var data = new List(); + data.Add(0x00); data.Add(0x03); // PI = 3 + data.Add((byte)(filter.Length >> 8)); + data.Add((byte)(filter.Length & 0xFF)); + data.AddRange(filter); + // No QoS byte for UNSUBSCRIBE + r.Reset(data.ToArray()); + + var (pi, filters, err) = MqttPacketHandlers.ParseSubsOrUnsubs( + r, (byte)(MqttPacket.Unsub | MqttConst.UnsubscribeFlags), data.Count, isSub: false); + err.ShouldBeNull(); + pi.ShouldBe((ushort)3); + filters!.Count.ShouldBe(1); + filters[0].Qos.ShouldBe((byte)0); // Always 0 for unsub + } + + // ========================================================================= + // UNSUBSCRIBE processing via parser + // ========================================================================= + + [Fact] + public void Parser_Unsubscribe_ShouldSendUnsubAck() + { + var c = CreateConnectedMqttClient(); + + // First subscribe to create the subscription. + var filter = Encoding.UTF8.GetBytes("test/unsub"); + var subPayload = new List(); + subPayload.Add(0x00); subPayload.Add(0x01); // PI = 1 + subPayload.Add((byte)(filter.Length >> 8)); + subPayload.Add((byte)(filter.Length & 0xFF)); + subPayload.AddRange(filter); + subPayload.Add(0x00); // QoS 0 + + var subBuf = new List(); + subBuf.Add((byte)(MqttPacket.Sub | MqttConst.SubscribeFlags)); + subBuf.Add((byte)subPayload.Count); + subBuf.AddRange(subPayload); + MqttParser.Parse(c, subBuf.ToArray(), subBuf.Count); + + // Reset stream to capture UNSUBACK only. + var ms = GetStream(c); + ms.SetLength(0); + + // Now unsubscribe. + var unsubPayload = new List(); + unsubPayload.Add(0x00); unsubPayload.Add(0x02); // PI = 2 + unsubPayload.Add((byte)(filter.Length >> 8)); + unsubPayload.Add((byte)(filter.Length & 0xFF)); + unsubPayload.AddRange(filter); + + var unsubBuf = new List(); + unsubBuf.Add((byte)(MqttPacket.Unsub | MqttConst.UnsubscribeFlags)); // 0xA2 + unsubBuf.Add((byte)unsubPayload.Count); + unsubBuf.AddRange(unsubPayload); + + var err = MqttParser.Parse(c, unsubBuf.ToArray(), unsubBuf.Count); + err.ShouldBeNull(); + + // Verify UNSUBACK: [0xB0] [0x02] [PI high] [PI low] + var data = ms.ToArray(); + data.Length.ShouldBe(4); + data[0].ShouldBe(MqttPacket.UnsubAck); // 0xB0 + data[1].ShouldBe((byte)0x02); + data[2].ShouldBe((byte)0x00); // PI high + data[3].ShouldBe((byte)0x02); // PI low + } + + // ========================================================================= + // Full CONNECT + SUBSCRIBE + PUBLISH + UNSUBSCRIBE flow + // ========================================================================= + + [Fact] + public void Parser_FullFlow_ConnectSubPubUnsub() + { + var ms = new MemoryStream(); + var c = new ClientConnection(ClientKind.Client, nc: ms); + c.InitMqtt(new MqttHandler()); + + // 1. CONNECT + var connectBuf = BuildConnectPacket("flow-test"); + var err = MqttParser.Parse(c, connectBuf, connectBuf.Length); + err.ShouldBeNull(); + (c.Flags & ClientFlags.ConnectReceived).ShouldNotBe((ClientFlags)0); + + // 2. SUBSCRIBE to "test/flow" QoS 0 + var filter = Encoding.UTF8.GetBytes("test/flow"); + var subPayload = new List(); + subPayload.Add(0x00); subPayload.Add(0x01); // PI = 1 + subPayload.Add((byte)(filter.Length >> 8)); + subPayload.Add((byte)(filter.Length & 0xFF)); + subPayload.AddRange(filter); + subPayload.Add(0x00); // QoS 0 + + var subBuf = new List(); + subBuf.Add((byte)(MqttPacket.Sub | MqttConst.SubscribeFlags)); + subBuf.Add((byte)subPayload.Count); + subBuf.AddRange(subPayload); + + err = MqttParser.Parse(c, subBuf.ToArray(), subBuf.Count); + err.ShouldBeNull(); + + // 3. PUBLISH to "test/flow" QoS 0 + var topic = Encoding.UTF8.GetBytes("test/flow"); + var pubData = new List(); + pubData.Add((byte)(topic.Length >> 8)); + pubData.Add((byte)(topic.Length & 0xFF)); + pubData.AddRange(topic); + pubData.AddRange(Encoding.UTF8.GetBytes("hello")); + + var pubBuf = new List(); + pubBuf.Add(MqttPacket.Pub); + pubBuf.Add((byte)pubData.Count); + pubBuf.AddRange(pubData); + + err = MqttParser.Parse(c, pubBuf.ToArray(), pubBuf.Count); + err.ShouldBeNull(); + + // 4. UNSUBSCRIBE from "test/flow" + var unsubPayload = new List(); + unsubPayload.Add(0x00); unsubPayload.Add(0x02); // PI = 2 + unsubPayload.Add((byte)(filter.Length >> 8)); + unsubPayload.Add((byte)(filter.Length & 0xFF)); + unsubPayload.AddRange(filter); + + var unsubBuf = new List(); + unsubBuf.Add((byte)(MqttPacket.Unsub | MqttConst.UnsubscribeFlags)); + unsubBuf.Add((byte)unsubPayload.Count); + unsubBuf.AddRange(unsubPayload); + + err = MqttParser.Parse(c, unsubBuf.ToArray(), unsubBuf.Count); + err.ShouldBeNull(); + + // Verify: CONNACK(4) + SUBACK(5) + UNSUBACK(4) = 13 bytes written + var data = ms.ToArray(); + data.Length.ShouldBe(13); + data[0].ShouldBe(MqttPacket.ConnectAck); // CONNACK + data[4].ShouldBe(MqttPacket.SubAck); // SUBACK + data[9].ShouldBe(MqttPacket.UnsubAck); // UNSUBACK + } + + /// Builds a minimal MQTT CONNECT packet. + private static byte[] BuildConnectPacket(string clientId) + { + var payload = new List(); + payload.AddRange(new byte[] { 0x00, 0x04 }); + payload.AddRange(Encoding.UTF8.GetBytes("MQTT")); + payload.Add(0x04); + payload.Add(0x02); // clean session + payload.AddRange(new byte[] { 0x00, 0x3C }); + var cidBytes = Encoding.UTF8.GetBytes(clientId); + payload.Add((byte)(cidBytes.Length >> 8)); + payload.Add((byte)(cidBytes.Length & 0xFF)); + payload.AddRange(cidBytes); + + var result = new List { MqttPacket.Connect }; + var remLen = payload.Count; + do + { + var b = (byte)(remLen & 0x7F); + remLen >>= 7; + if (remLen > 0) b |= 0x80; + result.Add(b); + } while (remLen > 0); + result.AddRange(payload); + return result.ToArray(); + } +} diff --git a/reports/current.md b/reports/current.md index 57aeddc..42ba672 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-03-01 20:48:23 UTC +Generated: 2026-03-01 21:04:38 UTC ## Modules (12 total)