diff --git a/src/NATS.Server/Mqtt/MqttBinaryDecoder.cs b/src/NATS.Server/Mqtt/MqttBinaryDecoder.cs new file mode 100644 index 0000000..8136618 --- /dev/null +++ b/src/NATS.Server/Mqtt/MqttBinaryDecoder.cs @@ -0,0 +1,325 @@ +// Binary MQTT packet body decoder. +// Go reference: golang/nats-server/server/mqtt.go +// CONNECT parsing — mqttParseSub / mqttParseConnect (lines ~700–850) +// PUBLISH parsing — mqttParsePublish (lines ~1200–1300) +// SUBSCRIBE parsing — mqttParseSub (lines ~1400–1500) +// Wildcard translation — mqttToNATSSubjectConversion (lines ~2200–2250) + +namespace NATS.Server.Mqtt; + +/// +/// Decoded fields from an MQTT CONNECT packet body. +/// Go reference: server/mqtt.go mqttParseConnect ~line 700. +/// +public readonly record struct MqttConnectInfo( + string ProtocolName, + byte ProtocolLevel, + bool CleanSession, + ushort KeepAlive, + string ClientId, + string? WillTopic, + byte[]? WillMessage, + byte WillQoS, + bool WillRetain, + string? Username, + string? Password); + +/// +/// Decoded fields from an MQTT PUBLISH packet body. +/// Go reference: server/mqtt.go mqttParsePublish ~line 1200. +/// +public readonly record struct MqttPublishInfo( + string Topic, + ushort PacketId, + byte QoS, + bool Dup, + bool Retain, + ReadOnlyMemory Payload); + +/// +/// Decoded fields from an MQTT SUBSCRIBE packet body. +/// Go reference: server/mqtt.go mqttParseSub ~line 1400. +/// +public readonly record struct MqttSubscribeInfo( + ushort PacketId, + IReadOnlyList<(string TopicFilter, byte QoS)> Filters); + +/// +/// Decodes the variable-header and payload of CONNECT, PUBLISH, and SUBSCRIBE +/// MQTT 3.1.1 control packets, and translates MQTT wildcards to NATS subjects. +/// +public static class MqttBinaryDecoder +{ + // ------------------------------------------------------------------------- + // CONNECT parsing + // Go reference: server/mqtt.go mqttParseConnect ~line 700 + // ------------------------------------------------------------------------- + + /// + /// Parses the payload bytes of an MQTT CONNECT packet (everything after the + /// fixed header and remaining-length bytes, i.e. the value of + /// ). + /// + /// + /// The payload bytes as returned by . + /// + /// A populated . + /// + /// Thrown when the packet is malformed or the protocol name is not "MQTT". + /// + public static MqttConnectInfo ParseConnect(ReadOnlySpan payload) + { + // Variable header layout (MQTT 3.1.1 spec §3.1): + // 2-byte length prefix + protocol name bytes ("MQTT") + // 1 byte protocol level (4 = 3.1.1, 5 = 5.0) + // 1 byte connect flags + // 2 bytes keepalive (big-endian) + // Payload: + // 2+N client ID + // if will flag: 2+N will topic, 2+N will message + // if username: 2+N username + // if password: 2+N password + + var pos = 0; + + // Protocol name + var protocolName = ReadUtf8String(payload, ref pos); + if (protocolName != "MQTT" && protocolName != "MQIsdp") + throw new FormatException($"Unknown MQTT protocol name: '{protocolName}'"); + + if (pos + 4 > payload.Length) + throw new FormatException("MQTT CONNECT packet too short for variable header."); + + var protocolLevel = payload[pos++]; + + // Connect flags byte + // Bit 1 = CleanSession, Bit 2 = WillFlag, Bits 3-4 = WillQoS, Bit 5 = WillRetain, + // Bit 6 = PasswordFlag, Bit 7 = UsernameFlag + var connectFlags = payload[pos++]; + var cleanSession = (connectFlags & 0x02) != 0; + var willFlag = (connectFlags & 0x04) != 0; + var willQoS = (byte)((connectFlags >> 3) & 0x03); + var willRetain = (connectFlags & 0x20) != 0; + var passwordFlag = (connectFlags & 0x40) != 0; + var usernameFlag = (connectFlags & 0x80) != 0; + + // Keep-alive (big-endian uint16) + var keepAlive = ReadUInt16BigEndian(payload, ref pos); + + // Payload fields + var clientId = ReadUtf8String(payload, ref pos); + + string? willTopic = null; + byte[]? willMessage = null; + if (willFlag) + { + willTopic = ReadUtf8String(payload, ref pos); + willMessage = ReadBinaryField(payload, ref pos); + } + + string? username = null; + if (usernameFlag) + username = ReadUtf8String(payload, ref pos); + + string? password = null; + if (passwordFlag) + password = ReadUtf8String(payload, ref pos); + + return new MqttConnectInfo( + ProtocolName: protocolName, + ProtocolLevel: protocolLevel, + CleanSession: cleanSession, + KeepAlive: keepAlive, + ClientId: clientId, + WillTopic: willTopic, + WillMessage: willMessage, + WillQoS: willQoS, + WillRetain: willRetain, + Username: username, + Password: password); + } + + // ------------------------------------------------------------------------- + // PUBLISH parsing + // Go reference: server/mqtt.go mqttParsePublish ~line 1200 + // ------------------------------------------------------------------------- + + /// + /// Parses the payload bytes of an MQTT PUBLISH packet. + /// The nibble comes from + /// of the fixed header. + /// + /// The payload bytes from . + /// The lower nibble of the fixed header byte (DUP/QoS/RETAIN flags). + /// A populated . + public static MqttPublishInfo ParsePublish(ReadOnlySpan payload, byte flags) + { + // Fixed-header flags nibble layout (MQTT 3.1.1 spec §3.3.1): + // Bit 3 = DUP + // Bits 2-1 = QoS (0, 1, or 2) + // Bit 0 = RETAIN + var dup = (flags & 0x08) != 0; + var qos = (byte)((flags >> 1) & 0x03); + var retain = (flags & 0x01) != 0; + + var pos = 0; + + // Variable header: topic name (2-byte length prefix + UTF-8) + var topic = ReadUtf8String(payload, ref pos); + + // Packet identifier — only present for QoS > 0 + ushort packetId = 0; + if (qos > 0) + packetId = ReadUInt16BigEndian(payload, ref pos); + + // Remaining bytes are the application payload + var messagePayload = payload[pos..].ToArray(); + + return new MqttPublishInfo( + Topic: topic, + PacketId: packetId, + QoS: qos, + Dup: dup, + Retain: retain, + Payload: messagePayload); + } + + // ------------------------------------------------------------------------- + // SUBSCRIBE parsing + // Go reference: server/mqtt.go mqttParseSub ~line 1400 + // ------------------------------------------------------------------------- + + /// + /// Parses the payload bytes of an MQTT SUBSCRIBE packet. + /// + /// The payload bytes from . + /// A populated . + public static MqttSubscribeInfo ParseSubscribe(ReadOnlySpan payload) + { + // Variable header: packet identifier (2 bytes, big-endian) + // Payload: one or more topic-filter entries, each: + // 2-byte length prefix + UTF-8 filter string + 1-byte requested QoS + + var pos = 0; + var packetId = ReadUInt16BigEndian(payload, ref pos); + + var filters = new List<(string, byte)>(); + while (pos < payload.Length) + { + var topicFilter = ReadUtf8String(payload, ref pos); + if (pos >= payload.Length) + throw new FormatException("MQTT SUBSCRIBE packet missing QoS byte after topic filter."); + var filterQoS = payload[pos++]; + filters.Add((topicFilter, filterQoS)); + } + + return new MqttSubscribeInfo(packetId, filters); + } + + // ------------------------------------------------------------------------- + // MQTT wildcard → NATS subject translation + // Go reference: server/mqtt.go mqttToNATSSubjectConversion ~line 2200 + // + // Simple translation (filter → NATS, wildcards permitted): + // '+' → '*' (single-level wildcard) + // '#' → '>' (multi-level wildcard) + // '/' → '.' (topic separator) + // + // NOTE: This method implements the simple/naïve translation that the task + // description specifies. The full Go implementation also handles dots, + // leading/trailing slashes, and empty levels differently (see + // MqttTopicMappingParityTests for the complete behavior). This method is + // intentionally limited to the four rules requested by the task spec. + // ------------------------------------------------------------------------- + + /// + /// Translates an MQTT topic filter to a NATS subject using the simple rules: + /// + /// +* (single-level wildcard) + /// #> (multi-level wildcard) + /// /. (separator) + /// + /// Go reference: server/mqtt.go mqttToNATSSubjectConversion ~line 2200. + /// + /// An MQTT topic filter string. + /// The equivalent NATS subject string. + public static string TranslateFilterToNatsSubject(string mqttFilter) + { + if (mqttFilter.Length == 0) + return string.Empty; + + var result = new char[mqttFilter.Length]; + for (var i = 0; i < mqttFilter.Length; i++) + { + result[i] = mqttFilter[i] switch + { + '+' => '*', + '#' => '>', + '/' => '.', + var c => c, + }; + } + + return new string(result); + } + + // ------------------------------------------------------------------------- + // Internal helpers + // ------------------------------------------------------------------------- + + /// + /// Reads a 2-byte big-endian length-prefixed UTF-8 string from + /// starting at , advancing + /// past the consumed bytes. + /// + private static string ReadUtf8String(ReadOnlySpan data, ref int pos) + { + if (pos + 2 > data.Length) + throw new FormatException("MQTT packet truncated reading string length prefix."); + + var length = (data[pos] << 8) | data[pos + 1]; + pos += 2; + + if (pos + length > data.Length) + throw new FormatException("MQTT packet truncated reading string body."); + + var value = System.Text.Encoding.UTF8.GetString(data.Slice(pos, length)); + pos += length; + return value; + } + + /// + /// Reads a 2-byte big-endian length-prefixed binary field (e.g. will + /// message, password) from , advancing + /// past the consumed bytes. + /// + private static byte[] ReadBinaryField(ReadOnlySpan data, ref int pos) + { + if (pos + 2 > data.Length) + throw new FormatException("MQTT packet truncated reading binary field length prefix."); + + var length = (data[pos] << 8) | data[pos + 1]; + pos += 2; + + if (pos + length > data.Length) + throw new FormatException("MQTT packet truncated reading binary field body."); + + var value = data.Slice(pos, length).ToArray(); + pos += length; + return value; + } + + /// + /// Reads a big-endian uint16 from at + /// , advancing by 2. + /// + private static ushort ReadUInt16BigEndian(ReadOnlySpan data, ref int pos) + { + if (pos + 2 > data.Length) + throw new FormatException("MQTT packet truncated reading uint16."); + + var value = (ushort)((data[pos] << 8) | data[pos + 1]); + pos += 2; + return value; + } +} diff --git a/tests/NATS.Server.Tests/Mqtt/MqttBinaryParserTests.cs b/tests/NATS.Server.Tests/Mqtt/MqttBinaryParserTests.cs new file mode 100644 index 0000000..32b1973 --- /dev/null +++ b/tests/NATS.Server.Tests/Mqtt/MqttBinaryParserTests.cs @@ -0,0 +1,477 @@ +// Binary MQTT packet parser tests. +// Go reference: golang/nats-server/server/mqtt.go +// CONNECT parsing — mqttParseConnect (~line 700) +// PUBLISH parsing — mqttParsePublish (~line 1200) +// SUBSCRIBE parsing — mqttParseSub (~line 1400) +// Wildcard translation — mqttToNATSSubjectConversion (~line 2200) + +using System.Text; +using NATS.Server.Mqtt; + +namespace NATS.Server.Tests.Mqtt; + +public class MqttBinaryParserTests +{ + // ========================================================================= + // Helpers — build well-formed CONNECT packet payloads + // ========================================================================= + + /// + /// Builds the payload bytes (everything after the fixed header) of an MQTT + /// 3.1.1 CONNECT packet. + /// + private static byte[] BuildConnectPayload( + string clientId, + bool cleanSession = true, + ushort keepAlive = 60, + string? username = null, + string? password = null, + string? willTopic = null, + byte[]? willMessage = null, + byte willQoS = 0, + bool willRetain = false) + { + using var ms = new System.IO.MemoryStream(); + using var w = new System.IO.BinaryWriter(ms); + + // Protocol name "MQTT" + WriteString(w, "MQTT"); + + // Protocol level 4 (MQTT 3.1.1) + w.Write((byte)4); + + // Connect flags + byte flags = 0; + if (cleanSession) flags |= 0x02; + if (willTopic != null) flags |= 0x04; + flags |= (byte)((willQoS & 0x03) << 3); + if (willRetain) flags |= 0x20; + if (password != null) flags |= 0x40; + if (username != null) flags |= 0x80; + w.Write(flags); + + // Keep-alive (big-endian) + WriteUInt16BE(w, keepAlive); + + // Payload fields + WriteString(w, clientId); + + if (willTopic != null) + { + WriteString(w, willTopic); + WriteBinaryField(w, willMessage ?? []); + } + + if (username != null) WriteString(w, username); + if (password != null) WriteString(w, password); + + return ms.ToArray(); + } + + private static void WriteString(System.IO.BinaryWriter w, string value) + { + var bytes = Encoding.UTF8.GetBytes(value); + WriteUInt16BE(w, (ushort)bytes.Length); + w.Write(bytes); + } + + private static void WriteBinaryField(System.IO.BinaryWriter w, byte[] data) + { + WriteUInt16BE(w, (ushort)data.Length); + w.Write(data); + } + + private static void WriteUInt16BE(System.IO.BinaryWriter w, ushort value) + { + w.Write((byte)(value >> 8)); + w.Write((byte)(value & 0xFF)); + } + + // ========================================================================= + // 1. ParseConnect — valid packet + // Go reference: server/mqtt.go mqttParseConnect ~line 700 + // ========================================================================= + + [Fact] + public void ParseConnect_ValidPacket_ReturnsConnectInfo() + { + // Go: mqttParseConnect — basic CONNECT with protocol name, level, and empty client ID + var payload = BuildConnectPayload("test-client", cleanSession: true, keepAlive: 30); + + var info = MqttBinaryDecoder.ParseConnect(payload); + + info.ProtocolName.ShouldBe("MQTT"); + info.ProtocolLevel.ShouldBe((byte)4); + info.CleanSession.ShouldBeTrue(); + info.KeepAlive.ShouldBe((ushort)30); + info.ClientId.ShouldBe("test-client"); + info.Username.ShouldBeNull(); + info.Password.ShouldBeNull(); + info.WillTopic.ShouldBeNull(); + info.WillMessage.ShouldBeNull(); + } + + // ========================================================================= + // 2. ParseConnect — with credentials + // Go reference: server/mqtt.go mqttParseConnect ~line 780 + // ========================================================================= + + [Fact] + public void ParseConnect_WithCredentials() + { + // Go: mqttParseConnect — username and password flags set in connect flags byte + var payload = BuildConnectPayload( + "cred-client", + cleanSession: true, + keepAlive: 60, + username: "alice", + password: "s3cr3t"); + + var info = MqttBinaryDecoder.ParseConnect(payload); + + info.ClientId.ShouldBe("cred-client"); + info.Username.ShouldBe("alice"); + info.Password.ShouldBe("s3cr3t"); + } + + // ========================================================================= + // 3. ParseConnect — with will message + // Go reference: server/mqtt.go mqttParseConnect ~line 740 + // ========================================================================= + + [Fact] + public void ParseConnect_WithWillMessage() + { + // Go: mqttParseConnect — WillFlag + WillTopic + WillMessage in payload + var willBytes = Encoding.UTF8.GetBytes("offline"); + var payload = BuildConnectPayload( + "will-client", + willTopic: "status/device", + willMessage: willBytes, + willQoS: 1, + willRetain: true); + + var info = MqttBinaryDecoder.ParseConnect(payload); + + info.ClientId.ShouldBe("will-client"); + info.WillTopic.ShouldBe("status/device"); + info.WillMessage.ShouldNotBeNull(); + info.WillMessage!.ShouldBe(willBytes); + info.WillQoS.ShouldBe((byte)1); + info.WillRetain.ShouldBeTrue(); + } + + // ========================================================================= + // 4. ParseConnect — clean session flag + // Go reference: server/mqtt.go mqttParseConnect ~line 710 + // ========================================================================= + + [Fact] + public void ParseConnect_CleanSessionFlag() + { + // Go: mqttParseConnect — clean session bit 1 of connect flags + var withClean = BuildConnectPayload("c1", cleanSession: true); + var withoutClean = BuildConnectPayload("c2", cleanSession: false); + + MqttBinaryDecoder.ParseConnect(withClean).CleanSession.ShouldBeTrue(); + MqttBinaryDecoder.ParseConnect(withoutClean).CleanSession.ShouldBeFalse(); + } + + // ========================================================================= + // 5. ParsePublish — QoS 0 (no packet ID) + // Go reference: server/mqtt.go mqttParsePublish ~line 1200 + // ========================================================================= + + [Fact] + public void ParsePublish_QoS0() + { + // Go: mqttParsePublish — QoS 0: no packet identifier present + // Build payload: 2-byte length + "sensors/temp" + message bytes + var topic = "sensors/temp"; + var topicBytes = Encoding.UTF8.GetBytes(topic); + var message = Encoding.UTF8.GetBytes("23.5"); + + using var ms = new System.IO.MemoryStream(); + using var w = new System.IO.BinaryWriter(ms); + WriteUInt16BE(w, (ushort)topicBytes.Length); + w.Write(topicBytes); + w.Write(message); + var payload = ms.ToArray(); + + // flags = 0x00 → QoS 0, no DUP, no RETAIN + var info = MqttBinaryDecoder.ParsePublish(payload, flags: 0x00); + + info.Topic.ShouldBe("sensors/temp"); + info.QoS.ShouldBe((byte)0); + info.PacketId.ShouldBe((ushort)0); + info.Dup.ShouldBeFalse(); + info.Retain.ShouldBeFalse(); + Encoding.UTF8.GetString(info.Payload.Span).ShouldBe("23.5"); + } + + // ========================================================================= + // 6. ParsePublish — QoS 1 (has packet ID) + // Go reference: server/mqtt.go mqttParsePublish ~line 1230 + // ========================================================================= + + [Fact] + public void ParsePublish_QoS1() + { + // Go: mqttParsePublish — QoS 1: 2-byte packet identifier follows topic + var topic = "events/click"; + var topicBytes = Encoding.UTF8.GetBytes(topic); + var message = Encoding.UTF8.GetBytes("payload-data"); + + using var ms = new System.IO.MemoryStream(); + using var w = new System.IO.BinaryWriter(ms); + WriteUInt16BE(w, (ushort)topicBytes.Length); + w.Write(topicBytes); + WriteUInt16BE(w, 42); // packet ID = 42 + w.Write(message); + var payload = ms.ToArray(); + + // flags = 0x02 → QoS 1 (bits 2-1 = 01) + var info = MqttBinaryDecoder.ParsePublish(payload, flags: 0x02); + + info.Topic.ShouldBe("events/click"); + info.QoS.ShouldBe((byte)1); + info.PacketId.ShouldBe((ushort)42); + Encoding.UTF8.GetString(info.Payload.Span).ShouldBe("payload-data"); + } + + // ========================================================================= + // 7. ParsePublish — retain flag + // Go reference: server/mqtt.go mqttParsePublish ~line 1210 + // ========================================================================= + + [Fact] + public void ParsePublish_RetainFlag() + { + // Go: mqttParsePublish — RETAIN flag is bit 0 of the fixed-header flags nibble + var topicBytes = Encoding.UTF8.GetBytes("home/light"); + + using var ms = new System.IO.MemoryStream(); + using var w = new System.IO.BinaryWriter(ms); + WriteUInt16BE(w, (ushort)topicBytes.Length); + w.Write(topicBytes); + w.Write(Encoding.UTF8.GetBytes("on")); + var payload = ms.ToArray(); + + // flags = 0x01 → RETAIN set, QoS 0 + var info = MqttBinaryDecoder.ParsePublish(payload, flags: 0x01); + + info.Topic.ShouldBe("home/light"); + info.Retain.ShouldBeTrue(); + info.QoS.ShouldBe((byte)0); + } + + // ========================================================================= + // 8. ParseSubscribe — single topic + // Go reference: server/mqtt.go mqttParseSub ~line 1400 + // ========================================================================= + + [Fact] + public void ParseSubscribe_SingleTopic() + { + // Go: mqttParseSub — SUBSCRIBE with a single topic filter entry + // Payload: 2-byte packet-id + (2-byte len + topic + 1-byte QoS) per entry + using var ms = new System.IO.MemoryStream(); + using var w = new System.IO.BinaryWriter(ms); + + WriteUInt16BE(w, 7); // packet ID = 7 + WriteString(w, "sport/tennis/#"); // topic filter + w.Write((byte)0); // QoS 0 + + var payload = ms.ToArray(); + var info = MqttBinaryDecoder.ParseSubscribe(payload); + + info.PacketId.ShouldBe((ushort)7); + info.Filters.Count.ShouldBe(1); + info.Filters[0].TopicFilter.ShouldBe("sport/tennis/#"); + info.Filters[0].QoS.ShouldBe((byte)0); + } + + // ========================================================================= + // 9. ParseSubscribe — multiple topics with different QoS + // Go reference: server/mqtt.go mqttParseSub ~line 1420 + // ========================================================================= + + [Fact] + public void ParseSubscribe_MultipleTopics() + { + // Go: mqttParseSub — multiple topic filter entries in one SUBSCRIBE + using var ms = new System.IO.MemoryStream(); + using var w = new System.IO.BinaryWriter(ms); + + WriteUInt16BE(w, 99); // packet ID = 99 + WriteString(w, "sensors/+"); // filter 1 + w.Write((byte)0); // QoS 0 + WriteString(w, "events/#"); // filter 2 + w.Write((byte)1); // QoS 1 + WriteString(w, "alerts/critical"); // filter 3 + w.Write((byte)2); // QoS 2 + + var payload = ms.ToArray(); + var info = MqttBinaryDecoder.ParseSubscribe(payload); + + info.PacketId.ShouldBe((ushort)99); + info.Filters.Count.ShouldBe(3); + + info.Filters[0].TopicFilter.ShouldBe("sensors/+"); + info.Filters[0].QoS.ShouldBe((byte)0); + + info.Filters[1].TopicFilter.ShouldBe("events/#"); + info.Filters[1].QoS.ShouldBe((byte)1); + + info.Filters[2].TopicFilter.ShouldBe("alerts/critical"); + info.Filters[2].QoS.ShouldBe((byte)2); + } + + // ========================================================================= + // 10. TranslateWildcard — '+' → '*' + // Go reference: server/mqtt.go mqttToNATSSubjectConversion ~line 2200 + // ========================================================================= + + [Fact] + public void TranslateWildcard_Plus() + { + // Go: mqttToNATSSubjectConversion — '+' maps to '*' (single-level) + var result = MqttBinaryDecoder.TranslateFilterToNatsSubject("+"); + result.ShouldBe("*"); + } + + // ========================================================================= + // 11. TranslateWildcard — '#' → '>' + // Go reference: server/mqtt.go mqttToNATSSubjectConversion ~line 2210 + // ========================================================================= + + [Fact] + public void TranslateWildcard_Hash() + { + // Go: mqttToNATSSubjectConversion — '#' maps to '>' (multi-level) + var result = MqttBinaryDecoder.TranslateFilterToNatsSubject("#"); + result.ShouldBe(">"); + } + + // ========================================================================= + // 12. TranslateWildcard — '/' → '.' + // Go reference: server/mqtt.go mqttToNATSSubjectConversion ~line 2220 + // ========================================================================= + + [Fact] + public void TranslateWildcard_Slash() + { + // Go: mqttToNATSSubjectConversion — '/' separator maps to '.' + var result = MqttBinaryDecoder.TranslateFilterToNatsSubject("a/b/c"); + result.ShouldBe("a.b.c"); + } + + // ========================================================================= + // 13. TranslateWildcard — complex combined translation + // Go reference: server/mqtt.go mqttToNATSSubjectConversion ~line 2200 + // ========================================================================= + + [Fact] + public void TranslateWildcard_Complex() + { + // Go: mqttToNATSSubjectConversion — combines '/', '+', '#' + // sport/+/score/# → sport.*.score.> + var result = MqttBinaryDecoder.TranslateFilterToNatsSubject("sport/+/score/#"); + result.ShouldBe("sport.*.score.>"); + } + + // ========================================================================= + // 14. DecodeRemainingLength — multi-byte values (VarInt edge cases) + // Go reference: server/mqtt.go TestMQTTReader / TestMQTTWriter + // ========================================================================= + + [Theory] + [InlineData(new byte[] { 0x00 }, 0, 1)] + [InlineData(new byte[] { 0x01 }, 1, 1)] + [InlineData(new byte[] { 0x7F }, 127, 1)] + [InlineData(new byte[] { 0x80, 0x01 }, 128, 2)] + [InlineData(new byte[] { 0xFF, 0x7F }, 16383, 2)] + [InlineData(new byte[] { 0x80, 0x80, 0x01 }, 16384, 3)] + [InlineData(new byte[] { 0xFF, 0xFF, 0x7F }, 2097151, 3)] + [InlineData(new byte[] { 0x80, 0x80, 0x80, 0x01 }, 2097152, 4)] + [InlineData(new byte[] { 0xFF, 0xFF, 0xFF, 0x7F }, 268435455, 4)] + public void DecodeRemainingLength_MultiByteValues(byte[] encoded, int expectedValue, int expectedConsumed) + { + // Go TestMQTTReader: verifies variable-length integer decoding at all boundary values + var value = MqttPacketReader.DecodeRemainingLength(encoded, out var consumed); + + value.ShouldBe(expectedValue); + consumed.ShouldBe(expectedConsumed); + } + + // ========================================================================= + // Additional edge-case tests + // ========================================================================= + + [Fact] + public void ParsePublish_DupFlag_IsSet() + { + // DUP flag is bit 3 of the fixed-header flags nibble (0x08). + // When QoS > 0, a 2-byte packet identifier must follow the topic. + var topicBytes = Encoding.UTF8.GetBytes("dup/topic"); + + using var ms = new System.IO.MemoryStream(); + using var w = new System.IO.BinaryWriter(ms); + WriteUInt16BE(w, (ushort)topicBytes.Length); + w.Write(topicBytes); + WriteUInt16BE(w, 5); // packet ID = 5 (required for QoS 1) + var payload = ms.ToArray(); + + // flags = 0x0A → DUP (bit 3) + QoS 1 (bits 2-1) + var info = MqttBinaryDecoder.ParsePublish(payload, flags: 0x0A); + + info.Dup.ShouldBeTrue(); + info.QoS.ShouldBe((byte)1); + info.PacketId.ShouldBe((ushort)5); + } + + [Fact] + public void ParseConnect_EmptyClientId_IsAllowed() + { + // MQTT 3.1.1 §3.1.3.1 allows empty client IDs with CleanSession=true + var payload = BuildConnectPayload("", cleanSession: true); + + var info = MqttBinaryDecoder.ParseConnect(payload); + + info.ClientId.ShouldBe(string.Empty); + info.CleanSession.ShouldBeTrue(); + } + + [Fact] + public void TranslateWildcard_EmptyString_ReturnsEmpty() + { + var result = MqttBinaryDecoder.TranslateFilterToNatsSubject(string.Empty); + result.ShouldBe(string.Empty); + } + + [Fact] + public void TranslateWildcard_PlainTopic_NoChange() + { + // A topic with no wildcards or slashes should pass through unchanged + var result = MqttBinaryDecoder.TranslateFilterToNatsSubject("plainword"); + result.ShouldBe("plainword"); + } + + [Fact] + public void ParsePublish_EmptyPayload_IsAllowed() + { + // A PUBLISH with no application payload is valid (e.g. retain-delete) + var topicBytes = Encoding.UTF8.GetBytes("empty/payload"); + + using var ms = new System.IO.MemoryStream(); + using var w = new System.IO.BinaryWriter(ms); + WriteUInt16BE(w, (ushort)topicBytes.Length); + w.Write(topicBytes); + var payload = ms.ToArray(); + + var info = MqttBinaryDecoder.ParsePublish(payload, flags: 0x00); + + info.Topic.ShouldBe("empty/payload"); + info.Payload.Length.ShouldBe(0); + } +}