feat(mqtt): add MqttBinaryDecoder with CONNECT/PUBLISH/SUBSCRIBE parsing and 27 tests
Port binary MQTT 3.1.1 packet body decoding from Go mqtt.go (~lines 700–1500, 2200). MqttBinaryDecoder parses CONNECT (protocol name, connect flags, keepalive, client ID, will topic/message, username, password), PUBLISH (topic, packet ID for QoS>0, DUP/ RETAIN flags, application payload), and SUBSCRIBE (packet ID, topic-filter + QoS list). Adds TranslateFilterToNatsSubject for simple MQTT wildcard conversion (+ → *, # → >, / → .). All 27 tests in MqttBinaryParserTests pass.
This commit is contained in:
325
src/NATS.Server/Mqtt/MqttBinaryDecoder.cs
Normal file
325
src/NATS.Server/Mqtt/MqttBinaryDecoder.cs
Normal file
@@ -0,0 +1,325 @@
|
||||
// Binary MQTT packet body decoder.
|
||||
// Go reference: golang/nats-server/server/mqtt.go
|
||||
// CONNECT parsing — mqttParseSub / mqttParseConnect (lines ~700–850)
|
||||
// PUBLISH parsing — mqttParsePublish (lines ~1200–1300)
|
||||
// SUBSCRIBE parsing — mqttParseSub (lines ~1400–1500)
|
||||
// Wildcard translation — mqttToNATSSubjectConversion (lines ~2200–2250)
|
||||
|
||||
namespace NATS.Server.Mqtt;
|
||||
|
||||
/// <summary>
|
||||
/// Decoded fields from an MQTT CONNECT packet body.
|
||||
/// Go reference: server/mqtt.go mqttParseConnect ~line 700.
|
||||
/// </summary>
|
||||
public readonly record struct MqttConnectInfo(
|
||||
string ProtocolName,
|
||||
byte ProtocolLevel,
|
||||
bool CleanSession,
|
||||
ushort KeepAlive,
|
||||
string ClientId,
|
||||
string? WillTopic,
|
||||
byte[]? WillMessage,
|
||||
byte WillQoS,
|
||||
bool WillRetain,
|
||||
string? Username,
|
||||
string? Password);
|
||||
|
||||
/// <summary>
|
||||
/// Decoded fields from an MQTT PUBLISH packet body.
|
||||
/// Go reference: server/mqtt.go mqttParsePublish ~line 1200.
|
||||
/// </summary>
|
||||
public readonly record struct MqttPublishInfo(
|
||||
string Topic,
|
||||
ushort PacketId,
|
||||
byte QoS,
|
||||
bool Dup,
|
||||
bool Retain,
|
||||
ReadOnlyMemory<byte> Payload);
|
||||
|
||||
/// <summary>
|
||||
/// Decoded fields from an MQTT SUBSCRIBE packet body.
|
||||
/// Go reference: server/mqtt.go mqttParseSub ~line 1400.
|
||||
/// </summary>
|
||||
public readonly record struct MqttSubscribeInfo(
|
||||
ushort PacketId,
|
||||
IReadOnlyList<(string TopicFilter, byte QoS)> Filters);
|
||||
|
||||
/// <summary>
|
||||
/// Decodes the variable-header and payload of CONNECT, PUBLISH, and SUBSCRIBE
|
||||
/// MQTT 3.1.1 control packets, and translates MQTT wildcards to NATS subjects.
|
||||
/// </summary>
|
||||
public static class MqttBinaryDecoder
|
||||
{
|
||||
// -------------------------------------------------------------------------
|
||||
// CONNECT parsing
|
||||
// Go reference: server/mqtt.go mqttParseConnect ~line 700
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Parses the payload bytes of an MQTT CONNECT packet (everything after the
|
||||
/// fixed header and remaining-length bytes, i.e. the value of
|
||||
/// <see cref="MqttControlPacket.Payload"/>).
|
||||
/// </summary>
|
||||
/// <param name="payload">
|
||||
/// The payload bytes as returned by <see cref="MqttPacketReader.Read"/>.
|
||||
/// </param>
|
||||
/// <returns>A populated <see cref="MqttConnectInfo"/>.</returns>
|
||||
/// <exception cref="FormatException">
|
||||
/// Thrown when the packet is malformed or the protocol name is not "MQTT".
|
||||
/// </exception>
|
||||
public static MqttConnectInfo ParseConnect(ReadOnlySpan<byte> payload)
|
||||
{
|
||||
// Variable header layout (MQTT 3.1.1 spec §3.1):
|
||||
// 2-byte length prefix + protocol name bytes ("MQTT")
|
||||
// 1 byte protocol level (4 = 3.1.1, 5 = 5.0)
|
||||
// 1 byte connect flags
|
||||
// 2 bytes keepalive (big-endian)
|
||||
// Payload:
|
||||
// 2+N client ID
|
||||
// if will flag: 2+N will topic, 2+N will message
|
||||
// if username: 2+N username
|
||||
// if password: 2+N password
|
||||
|
||||
var pos = 0;
|
||||
|
||||
// Protocol name
|
||||
var protocolName = ReadUtf8String(payload, ref pos);
|
||||
if (protocolName != "MQTT" && protocolName != "MQIsdp")
|
||||
throw new FormatException($"Unknown MQTT protocol name: '{protocolName}'");
|
||||
|
||||
if (pos + 4 > payload.Length)
|
||||
throw new FormatException("MQTT CONNECT packet too short for variable header.");
|
||||
|
||||
var protocolLevel = payload[pos++];
|
||||
|
||||
// Connect flags byte
|
||||
// Bit 1 = CleanSession, Bit 2 = WillFlag, Bits 3-4 = WillQoS, Bit 5 = WillRetain,
|
||||
// Bit 6 = PasswordFlag, Bit 7 = UsernameFlag
|
||||
var connectFlags = payload[pos++];
|
||||
var cleanSession = (connectFlags & 0x02) != 0;
|
||||
var willFlag = (connectFlags & 0x04) != 0;
|
||||
var willQoS = (byte)((connectFlags >> 3) & 0x03);
|
||||
var willRetain = (connectFlags & 0x20) != 0;
|
||||
var passwordFlag = (connectFlags & 0x40) != 0;
|
||||
var usernameFlag = (connectFlags & 0x80) != 0;
|
||||
|
||||
// Keep-alive (big-endian uint16)
|
||||
var keepAlive = ReadUInt16BigEndian(payload, ref pos);
|
||||
|
||||
// Payload fields
|
||||
var clientId = ReadUtf8String(payload, ref pos);
|
||||
|
||||
string? willTopic = null;
|
||||
byte[]? willMessage = null;
|
||||
if (willFlag)
|
||||
{
|
||||
willTopic = ReadUtf8String(payload, ref pos);
|
||||
willMessage = ReadBinaryField(payload, ref pos);
|
||||
}
|
||||
|
||||
string? username = null;
|
||||
if (usernameFlag)
|
||||
username = ReadUtf8String(payload, ref pos);
|
||||
|
||||
string? password = null;
|
||||
if (passwordFlag)
|
||||
password = ReadUtf8String(payload, ref pos);
|
||||
|
||||
return new MqttConnectInfo(
|
||||
ProtocolName: protocolName,
|
||||
ProtocolLevel: protocolLevel,
|
||||
CleanSession: cleanSession,
|
||||
KeepAlive: keepAlive,
|
||||
ClientId: clientId,
|
||||
WillTopic: willTopic,
|
||||
WillMessage: willMessage,
|
||||
WillQoS: willQoS,
|
||||
WillRetain: willRetain,
|
||||
Username: username,
|
||||
Password: password);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// PUBLISH parsing
|
||||
// Go reference: server/mqtt.go mqttParsePublish ~line 1200
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Parses the payload bytes of an MQTT PUBLISH packet.
|
||||
/// The <paramref name="flags"/> nibble comes from
|
||||
/// <see cref="MqttControlPacket.Flags"/> of the fixed header.
|
||||
/// </summary>
|
||||
/// <param name="payload">The payload bytes from <see cref="MqttControlPacket.Payload"/>.</param>
|
||||
/// <param name="flags">The lower nibble of the fixed header byte (DUP/QoS/RETAIN flags).</param>
|
||||
/// <returns>A populated <see cref="MqttPublishInfo"/>.</returns>
|
||||
public static MqttPublishInfo ParsePublish(ReadOnlySpan<byte> payload, byte flags)
|
||||
{
|
||||
// Fixed-header flags nibble layout (MQTT 3.1.1 spec §3.3.1):
|
||||
// Bit 3 = DUP
|
||||
// Bits 2-1 = QoS (0, 1, or 2)
|
||||
// Bit 0 = RETAIN
|
||||
var dup = (flags & 0x08) != 0;
|
||||
var qos = (byte)((flags >> 1) & 0x03);
|
||||
var retain = (flags & 0x01) != 0;
|
||||
|
||||
var pos = 0;
|
||||
|
||||
// Variable header: topic name (2-byte length prefix + UTF-8)
|
||||
var topic = ReadUtf8String(payload, ref pos);
|
||||
|
||||
// Packet identifier — only present for QoS > 0
|
||||
ushort packetId = 0;
|
||||
if (qos > 0)
|
||||
packetId = ReadUInt16BigEndian(payload, ref pos);
|
||||
|
||||
// Remaining bytes are the application payload
|
||||
var messagePayload = payload[pos..].ToArray();
|
||||
|
||||
return new MqttPublishInfo(
|
||||
Topic: topic,
|
||||
PacketId: packetId,
|
||||
QoS: qos,
|
||||
Dup: dup,
|
||||
Retain: retain,
|
||||
Payload: messagePayload);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// SUBSCRIBE parsing
|
||||
// Go reference: server/mqtt.go mqttParseSub ~line 1400
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Parses the payload bytes of an MQTT SUBSCRIBE packet.
|
||||
/// </summary>
|
||||
/// <param name="payload">The payload bytes from <see cref="MqttControlPacket.Payload"/>.</param>
|
||||
/// <returns>A populated <see cref="MqttSubscribeInfo"/>.</returns>
|
||||
public static MqttSubscribeInfo ParseSubscribe(ReadOnlySpan<byte> payload)
|
||||
{
|
||||
// Variable header: packet identifier (2 bytes, big-endian)
|
||||
// Payload: one or more topic-filter entries, each:
|
||||
// 2-byte length prefix + UTF-8 filter string + 1-byte requested QoS
|
||||
|
||||
var pos = 0;
|
||||
var packetId = ReadUInt16BigEndian(payload, ref pos);
|
||||
|
||||
var filters = new List<(string, byte)>();
|
||||
while (pos < payload.Length)
|
||||
{
|
||||
var topicFilter = ReadUtf8String(payload, ref pos);
|
||||
if (pos >= payload.Length)
|
||||
throw new FormatException("MQTT SUBSCRIBE packet missing QoS byte after topic filter.");
|
||||
var filterQoS = payload[pos++];
|
||||
filters.Add((topicFilter, filterQoS));
|
||||
}
|
||||
|
||||
return new MqttSubscribeInfo(packetId, filters);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// MQTT wildcard → NATS subject translation
|
||||
// Go reference: server/mqtt.go mqttToNATSSubjectConversion ~line 2200
|
||||
//
|
||||
// Simple translation (filter → NATS, wildcards permitted):
|
||||
// '+' → '*' (single-level wildcard)
|
||||
// '#' → '>' (multi-level wildcard)
|
||||
// '/' → '.' (topic separator)
|
||||
//
|
||||
// NOTE: This method implements the simple/naïve translation that the task
|
||||
// description specifies. The full Go implementation also handles dots,
|
||||
// leading/trailing slashes, and empty levels differently (see
|
||||
// MqttTopicMappingParityTests for the complete behavior). This method is
|
||||
// intentionally limited to the four rules requested by the task spec.
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Translates an MQTT topic filter to a NATS subject using the simple rules:
|
||||
/// <list type="bullet">
|
||||
/// <item><c>+</c> → <c>*</c> (single-level wildcard)</item>
|
||||
/// <item><c>#</c> → <c>></c> (multi-level wildcard)</item>
|
||||
/// <item><c>/</c> → <c>.</c> (separator)</item>
|
||||
/// </list>
|
||||
/// Go reference: server/mqtt.go mqttToNATSSubjectConversion ~line 2200.
|
||||
/// </summary>
|
||||
/// <param name="mqttFilter">An MQTT topic filter string.</param>
|
||||
/// <returns>The equivalent NATS subject string.</returns>
|
||||
public static string TranslateFilterToNatsSubject(string mqttFilter)
|
||||
{
|
||||
if (mqttFilter.Length == 0)
|
||||
return string.Empty;
|
||||
|
||||
var result = new char[mqttFilter.Length];
|
||||
for (var i = 0; i < mqttFilter.Length; i++)
|
||||
{
|
||||
result[i] = mqttFilter[i] switch
|
||||
{
|
||||
'+' => '*',
|
||||
'#' => '>',
|
||||
'/' => '.',
|
||||
var c => c,
|
||||
};
|
||||
}
|
||||
|
||||
return new string(result);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Internal helpers
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Reads a 2-byte big-endian length-prefixed UTF-8 string from
|
||||
/// <paramref name="data"/> starting at <paramref name="pos"/>, advancing
|
||||
/// <paramref name="pos"/> past the consumed bytes.
|
||||
/// </summary>
|
||||
private static string ReadUtf8String(ReadOnlySpan<byte> data, ref int pos)
|
||||
{
|
||||
if (pos + 2 > data.Length)
|
||||
throw new FormatException("MQTT packet truncated reading string length prefix.");
|
||||
|
||||
var length = (data[pos] << 8) | data[pos + 1];
|
||||
pos += 2;
|
||||
|
||||
if (pos + length > data.Length)
|
||||
throw new FormatException("MQTT packet truncated reading string body.");
|
||||
|
||||
var value = System.Text.Encoding.UTF8.GetString(data.Slice(pos, length));
|
||||
pos += length;
|
||||
return value;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reads a 2-byte big-endian length-prefixed binary field (e.g. will
|
||||
/// message, password) from <paramref name="data"/>, advancing
|
||||
/// <paramref name="pos"/> past the consumed bytes.
|
||||
/// </summary>
|
||||
private static byte[] ReadBinaryField(ReadOnlySpan<byte> data, ref int pos)
|
||||
{
|
||||
if (pos + 2 > data.Length)
|
||||
throw new FormatException("MQTT packet truncated reading binary field length prefix.");
|
||||
|
||||
var length = (data[pos] << 8) | data[pos + 1];
|
||||
pos += 2;
|
||||
|
||||
if (pos + length > data.Length)
|
||||
throw new FormatException("MQTT packet truncated reading binary field body.");
|
||||
|
||||
var value = data.Slice(pos, length).ToArray();
|
||||
pos += length;
|
||||
return value;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reads a big-endian uint16 from <paramref name="data"/> at
|
||||
/// <paramref name="pos"/>, advancing <paramref name="pos"/> by 2.
|
||||
/// </summary>
|
||||
private static ushort ReadUInt16BigEndian(ReadOnlySpan<byte> data, ref int pos)
|
||||
{
|
||||
if (pos + 2 > data.Length)
|
||||
throw new FormatException("MQTT packet truncated reading uint16.");
|
||||
|
||||
var value = (ushort)((data[pos] << 8) | data[pos + 1]);
|
||||
pos += 2;
|
||||
return value;
|
||||
}
|
||||
}
|
||||
477
tests/NATS.Server.Tests/Mqtt/MqttBinaryParserTests.cs
Normal file
477
tests/NATS.Server.Tests/Mqtt/MqttBinaryParserTests.cs
Normal file
@@ -0,0 +1,477 @@
|
||||
// Binary MQTT packet parser tests.
|
||||
// Go reference: golang/nats-server/server/mqtt.go
|
||||
// CONNECT parsing — mqttParseConnect (~line 700)
|
||||
// PUBLISH parsing — mqttParsePublish (~line 1200)
|
||||
// SUBSCRIBE parsing — mqttParseSub (~line 1400)
|
||||
// Wildcard translation — mqttToNATSSubjectConversion (~line 2200)
|
||||
|
||||
using System.Text;
|
||||
using NATS.Server.Mqtt;
|
||||
|
||||
namespace NATS.Server.Tests.Mqtt;
|
||||
|
||||
public class MqttBinaryParserTests
|
||||
{
|
||||
// =========================================================================
|
||||
// Helpers — build well-formed CONNECT packet payloads
|
||||
// =========================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Builds the payload bytes (everything after the fixed header) of an MQTT
|
||||
/// 3.1.1 CONNECT packet.
|
||||
/// </summary>
|
||||
private static byte[] BuildConnectPayload(
|
||||
string clientId,
|
||||
bool cleanSession = true,
|
||||
ushort keepAlive = 60,
|
||||
string? username = null,
|
||||
string? password = null,
|
||||
string? willTopic = null,
|
||||
byte[]? willMessage = null,
|
||||
byte willQoS = 0,
|
||||
bool willRetain = false)
|
||||
{
|
||||
using var ms = new System.IO.MemoryStream();
|
||||
using var w = new System.IO.BinaryWriter(ms);
|
||||
|
||||
// Protocol name "MQTT"
|
||||
WriteString(w, "MQTT");
|
||||
|
||||
// Protocol level 4 (MQTT 3.1.1)
|
||||
w.Write((byte)4);
|
||||
|
||||
// Connect flags
|
||||
byte flags = 0;
|
||||
if (cleanSession) flags |= 0x02;
|
||||
if (willTopic != null) flags |= 0x04;
|
||||
flags |= (byte)((willQoS & 0x03) << 3);
|
||||
if (willRetain) flags |= 0x20;
|
||||
if (password != null) flags |= 0x40;
|
||||
if (username != null) flags |= 0x80;
|
||||
w.Write(flags);
|
||||
|
||||
// Keep-alive (big-endian)
|
||||
WriteUInt16BE(w, keepAlive);
|
||||
|
||||
// Payload fields
|
||||
WriteString(w, clientId);
|
||||
|
||||
if (willTopic != null)
|
||||
{
|
||||
WriteString(w, willTopic);
|
||||
WriteBinaryField(w, willMessage ?? []);
|
||||
}
|
||||
|
||||
if (username != null) WriteString(w, username);
|
||||
if (password != null) WriteString(w, password);
|
||||
|
||||
return ms.ToArray();
|
||||
}
|
||||
|
||||
private static void WriteString(System.IO.BinaryWriter w, string value)
|
||||
{
|
||||
var bytes = Encoding.UTF8.GetBytes(value);
|
||||
WriteUInt16BE(w, (ushort)bytes.Length);
|
||||
w.Write(bytes);
|
||||
}
|
||||
|
||||
private static void WriteBinaryField(System.IO.BinaryWriter w, byte[] data)
|
||||
{
|
||||
WriteUInt16BE(w, (ushort)data.Length);
|
||||
w.Write(data);
|
||||
}
|
||||
|
||||
private static void WriteUInt16BE(System.IO.BinaryWriter w, ushort value)
|
||||
{
|
||||
w.Write((byte)(value >> 8));
|
||||
w.Write((byte)(value & 0xFF));
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// 1. ParseConnect — valid packet
|
||||
// Go reference: server/mqtt.go mqttParseConnect ~line 700
|
||||
// =========================================================================
|
||||
|
||||
[Fact]
|
||||
public void ParseConnect_ValidPacket_ReturnsConnectInfo()
|
||||
{
|
||||
// Go: mqttParseConnect — basic CONNECT with protocol name, level, and empty client ID
|
||||
var payload = BuildConnectPayload("test-client", cleanSession: true, keepAlive: 30);
|
||||
|
||||
var info = MqttBinaryDecoder.ParseConnect(payload);
|
||||
|
||||
info.ProtocolName.ShouldBe("MQTT");
|
||||
info.ProtocolLevel.ShouldBe((byte)4);
|
||||
info.CleanSession.ShouldBeTrue();
|
||||
info.KeepAlive.ShouldBe((ushort)30);
|
||||
info.ClientId.ShouldBe("test-client");
|
||||
info.Username.ShouldBeNull();
|
||||
info.Password.ShouldBeNull();
|
||||
info.WillTopic.ShouldBeNull();
|
||||
info.WillMessage.ShouldBeNull();
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// 2. ParseConnect — with credentials
|
||||
// Go reference: server/mqtt.go mqttParseConnect ~line 780
|
||||
// =========================================================================
|
||||
|
||||
[Fact]
|
||||
public void ParseConnect_WithCredentials()
|
||||
{
|
||||
// Go: mqttParseConnect — username and password flags set in connect flags byte
|
||||
var payload = BuildConnectPayload(
|
||||
"cred-client",
|
||||
cleanSession: true,
|
||||
keepAlive: 60,
|
||||
username: "alice",
|
||||
password: "s3cr3t");
|
||||
|
||||
var info = MqttBinaryDecoder.ParseConnect(payload);
|
||||
|
||||
info.ClientId.ShouldBe("cred-client");
|
||||
info.Username.ShouldBe("alice");
|
||||
info.Password.ShouldBe("s3cr3t");
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// 3. ParseConnect — with will message
|
||||
// Go reference: server/mqtt.go mqttParseConnect ~line 740
|
||||
// =========================================================================
|
||||
|
||||
[Fact]
|
||||
public void ParseConnect_WithWillMessage()
|
||||
{
|
||||
// Go: mqttParseConnect — WillFlag + WillTopic + WillMessage in payload
|
||||
var willBytes = Encoding.UTF8.GetBytes("offline");
|
||||
var payload = BuildConnectPayload(
|
||||
"will-client",
|
||||
willTopic: "status/device",
|
||||
willMessage: willBytes,
|
||||
willQoS: 1,
|
||||
willRetain: true);
|
||||
|
||||
var info = MqttBinaryDecoder.ParseConnect(payload);
|
||||
|
||||
info.ClientId.ShouldBe("will-client");
|
||||
info.WillTopic.ShouldBe("status/device");
|
||||
info.WillMessage.ShouldNotBeNull();
|
||||
info.WillMessage!.ShouldBe(willBytes);
|
||||
info.WillQoS.ShouldBe((byte)1);
|
||||
info.WillRetain.ShouldBeTrue();
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// 4. ParseConnect — clean session flag
|
||||
// Go reference: server/mqtt.go mqttParseConnect ~line 710
|
||||
// =========================================================================
|
||||
|
||||
[Fact]
|
||||
public void ParseConnect_CleanSessionFlag()
|
||||
{
|
||||
// Go: mqttParseConnect — clean session bit 1 of connect flags
|
||||
var withClean = BuildConnectPayload("c1", cleanSession: true);
|
||||
var withoutClean = BuildConnectPayload("c2", cleanSession: false);
|
||||
|
||||
MqttBinaryDecoder.ParseConnect(withClean).CleanSession.ShouldBeTrue();
|
||||
MqttBinaryDecoder.ParseConnect(withoutClean).CleanSession.ShouldBeFalse();
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// 5. ParsePublish — QoS 0 (no packet ID)
|
||||
// Go reference: server/mqtt.go mqttParsePublish ~line 1200
|
||||
// =========================================================================
|
||||
|
||||
[Fact]
|
||||
public void ParsePublish_QoS0()
|
||||
{
|
||||
// Go: mqttParsePublish — QoS 0: no packet identifier present
|
||||
// Build payload: 2-byte length + "sensors/temp" + message bytes
|
||||
var topic = "sensors/temp";
|
||||
var topicBytes = Encoding.UTF8.GetBytes(topic);
|
||||
var message = Encoding.UTF8.GetBytes("23.5");
|
||||
|
||||
using var ms = new System.IO.MemoryStream();
|
||||
using var w = new System.IO.BinaryWriter(ms);
|
||||
WriteUInt16BE(w, (ushort)topicBytes.Length);
|
||||
w.Write(topicBytes);
|
||||
w.Write(message);
|
||||
var payload = ms.ToArray();
|
||||
|
||||
// flags = 0x00 → QoS 0, no DUP, no RETAIN
|
||||
var info = MqttBinaryDecoder.ParsePublish(payload, flags: 0x00);
|
||||
|
||||
info.Topic.ShouldBe("sensors/temp");
|
||||
info.QoS.ShouldBe((byte)0);
|
||||
info.PacketId.ShouldBe((ushort)0);
|
||||
info.Dup.ShouldBeFalse();
|
||||
info.Retain.ShouldBeFalse();
|
||||
Encoding.UTF8.GetString(info.Payload.Span).ShouldBe("23.5");
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// 6. ParsePublish — QoS 1 (has packet ID)
|
||||
// Go reference: server/mqtt.go mqttParsePublish ~line 1230
|
||||
// =========================================================================
|
||||
|
||||
[Fact]
|
||||
public void ParsePublish_QoS1()
|
||||
{
|
||||
// Go: mqttParsePublish — QoS 1: 2-byte packet identifier follows topic
|
||||
var topic = "events/click";
|
||||
var topicBytes = Encoding.UTF8.GetBytes(topic);
|
||||
var message = Encoding.UTF8.GetBytes("payload-data");
|
||||
|
||||
using var ms = new System.IO.MemoryStream();
|
||||
using var w = new System.IO.BinaryWriter(ms);
|
||||
WriteUInt16BE(w, (ushort)topicBytes.Length);
|
||||
w.Write(topicBytes);
|
||||
WriteUInt16BE(w, 42); // packet ID = 42
|
||||
w.Write(message);
|
||||
var payload = ms.ToArray();
|
||||
|
||||
// flags = 0x02 → QoS 1 (bits 2-1 = 01)
|
||||
var info = MqttBinaryDecoder.ParsePublish(payload, flags: 0x02);
|
||||
|
||||
info.Topic.ShouldBe("events/click");
|
||||
info.QoS.ShouldBe((byte)1);
|
||||
info.PacketId.ShouldBe((ushort)42);
|
||||
Encoding.UTF8.GetString(info.Payload.Span).ShouldBe("payload-data");
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// 7. ParsePublish — retain flag
|
||||
// Go reference: server/mqtt.go mqttParsePublish ~line 1210
|
||||
// =========================================================================
|
||||
|
||||
[Fact]
|
||||
public void ParsePublish_RetainFlag()
|
||||
{
|
||||
// Go: mqttParsePublish — RETAIN flag is bit 0 of the fixed-header flags nibble
|
||||
var topicBytes = Encoding.UTF8.GetBytes("home/light");
|
||||
|
||||
using var ms = new System.IO.MemoryStream();
|
||||
using var w = new System.IO.BinaryWriter(ms);
|
||||
WriteUInt16BE(w, (ushort)topicBytes.Length);
|
||||
w.Write(topicBytes);
|
||||
w.Write(Encoding.UTF8.GetBytes("on"));
|
||||
var payload = ms.ToArray();
|
||||
|
||||
// flags = 0x01 → RETAIN set, QoS 0
|
||||
var info = MqttBinaryDecoder.ParsePublish(payload, flags: 0x01);
|
||||
|
||||
info.Topic.ShouldBe("home/light");
|
||||
info.Retain.ShouldBeTrue();
|
||||
info.QoS.ShouldBe((byte)0);
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// 8. ParseSubscribe — single topic
|
||||
// Go reference: server/mqtt.go mqttParseSub ~line 1400
|
||||
// =========================================================================
|
||||
|
||||
[Fact]
|
||||
public void ParseSubscribe_SingleTopic()
|
||||
{
|
||||
// Go: mqttParseSub — SUBSCRIBE with a single topic filter entry
|
||||
// Payload: 2-byte packet-id + (2-byte len + topic + 1-byte QoS) per entry
|
||||
using var ms = new System.IO.MemoryStream();
|
||||
using var w = new System.IO.BinaryWriter(ms);
|
||||
|
||||
WriteUInt16BE(w, 7); // packet ID = 7
|
||||
WriteString(w, "sport/tennis/#"); // topic filter
|
||||
w.Write((byte)0); // QoS 0
|
||||
|
||||
var payload = ms.ToArray();
|
||||
var info = MqttBinaryDecoder.ParseSubscribe(payload);
|
||||
|
||||
info.PacketId.ShouldBe((ushort)7);
|
||||
info.Filters.Count.ShouldBe(1);
|
||||
info.Filters[0].TopicFilter.ShouldBe("sport/tennis/#");
|
||||
info.Filters[0].QoS.ShouldBe((byte)0);
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// 9. ParseSubscribe — multiple topics with different QoS
|
||||
// Go reference: server/mqtt.go mqttParseSub ~line 1420
|
||||
// =========================================================================
|
||||
|
||||
[Fact]
|
||||
public void ParseSubscribe_MultipleTopics()
|
||||
{
|
||||
// Go: mqttParseSub — multiple topic filter entries in one SUBSCRIBE
|
||||
using var ms = new System.IO.MemoryStream();
|
||||
using var w = new System.IO.BinaryWriter(ms);
|
||||
|
||||
WriteUInt16BE(w, 99); // packet ID = 99
|
||||
WriteString(w, "sensors/+"); // filter 1
|
||||
w.Write((byte)0); // QoS 0
|
||||
WriteString(w, "events/#"); // filter 2
|
||||
w.Write((byte)1); // QoS 1
|
||||
WriteString(w, "alerts/critical"); // filter 3
|
||||
w.Write((byte)2); // QoS 2
|
||||
|
||||
var payload = ms.ToArray();
|
||||
var info = MqttBinaryDecoder.ParseSubscribe(payload);
|
||||
|
||||
info.PacketId.ShouldBe((ushort)99);
|
||||
info.Filters.Count.ShouldBe(3);
|
||||
|
||||
info.Filters[0].TopicFilter.ShouldBe("sensors/+");
|
||||
info.Filters[0].QoS.ShouldBe((byte)0);
|
||||
|
||||
info.Filters[1].TopicFilter.ShouldBe("events/#");
|
||||
info.Filters[1].QoS.ShouldBe((byte)1);
|
||||
|
||||
info.Filters[2].TopicFilter.ShouldBe("alerts/critical");
|
||||
info.Filters[2].QoS.ShouldBe((byte)2);
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// 10. TranslateWildcard — '+' → '*'
|
||||
// Go reference: server/mqtt.go mqttToNATSSubjectConversion ~line 2200
|
||||
// =========================================================================
|
||||
|
||||
[Fact]
|
||||
public void TranslateWildcard_Plus()
|
||||
{
|
||||
// Go: mqttToNATSSubjectConversion — '+' maps to '*' (single-level)
|
||||
var result = MqttBinaryDecoder.TranslateFilterToNatsSubject("+");
|
||||
result.ShouldBe("*");
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// 11. TranslateWildcard — '#' → '>'
|
||||
// Go reference: server/mqtt.go mqttToNATSSubjectConversion ~line 2210
|
||||
// =========================================================================
|
||||
|
||||
[Fact]
|
||||
public void TranslateWildcard_Hash()
|
||||
{
|
||||
// Go: mqttToNATSSubjectConversion — '#' maps to '>' (multi-level)
|
||||
var result = MqttBinaryDecoder.TranslateFilterToNatsSubject("#");
|
||||
result.ShouldBe(">");
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// 12. TranslateWildcard — '/' → '.'
|
||||
// Go reference: server/mqtt.go mqttToNATSSubjectConversion ~line 2220
|
||||
// =========================================================================
|
||||
|
||||
[Fact]
|
||||
public void TranslateWildcard_Slash()
|
||||
{
|
||||
// Go: mqttToNATSSubjectConversion — '/' separator maps to '.'
|
||||
var result = MqttBinaryDecoder.TranslateFilterToNatsSubject("a/b/c");
|
||||
result.ShouldBe("a.b.c");
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// 13. TranslateWildcard — complex combined translation
|
||||
// Go reference: server/mqtt.go mqttToNATSSubjectConversion ~line 2200
|
||||
// =========================================================================
|
||||
|
||||
[Fact]
|
||||
public void TranslateWildcard_Complex()
|
||||
{
|
||||
// Go: mqttToNATSSubjectConversion — combines '/', '+', '#'
|
||||
// sport/+/score/# → sport.*.score.>
|
||||
var result = MqttBinaryDecoder.TranslateFilterToNatsSubject("sport/+/score/#");
|
||||
result.ShouldBe("sport.*.score.>");
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// 14. DecodeRemainingLength — multi-byte values (VarInt edge cases)
|
||||
// Go reference: server/mqtt.go TestMQTTReader / TestMQTTWriter
|
||||
// =========================================================================
|
||||
|
||||
[Theory]
|
||||
[InlineData(new byte[] { 0x00 }, 0, 1)]
|
||||
[InlineData(new byte[] { 0x01 }, 1, 1)]
|
||||
[InlineData(new byte[] { 0x7F }, 127, 1)]
|
||||
[InlineData(new byte[] { 0x80, 0x01 }, 128, 2)]
|
||||
[InlineData(new byte[] { 0xFF, 0x7F }, 16383, 2)]
|
||||
[InlineData(new byte[] { 0x80, 0x80, 0x01 }, 16384, 3)]
|
||||
[InlineData(new byte[] { 0xFF, 0xFF, 0x7F }, 2097151, 3)]
|
||||
[InlineData(new byte[] { 0x80, 0x80, 0x80, 0x01 }, 2097152, 4)]
|
||||
[InlineData(new byte[] { 0xFF, 0xFF, 0xFF, 0x7F }, 268435455, 4)]
|
||||
public void DecodeRemainingLength_MultiByteValues(byte[] encoded, int expectedValue, int expectedConsumed)
|
||||
{
|
||||
// Go TestMQTTReader: verifies variable-length integer decoding at all boundary values
|
||||
var value = MqttPacketReader.DecodeRemainingLength(encoded, out var consumed);
|
||||
|
||||
value.ShouldBe(expectedValue);
|
||||
consumed.ShouldBe(expectedConsumed);
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// Additional edge-case tests
|
||||
// =========================================================================
|
||||
|
||||
[Fact]
|
||||
public void ParsePublish_DupFlag_IsSet()
|
||||
{
|
||||
// DUP flag is bit 3 of the fixed-header flags nibble (0x08).
|
||||
// When QoS > 0, a 2-byte packet identifier must follow the topic.
|
||||
var topicBytes = Encoding.UTF8.GetBytes("dup/topic");
|
||||
|
||||
using var ms = new System.IO.MemoryStream();
|
||||
using var w = new System.IO.BinaryWriter(ms);
|
||||
WriteUInt16BE(w, (ushort)topicBytes.Length);
|
||||
w.Write(topicBytes);
|
||||
WriteUInt16BE(w, 5); // packet ID = 5 (required for QoS 1)
|
||||
var payload = ms.ToArray();
|
||||
|
||||
// flags = 0x0A → DUP (bit 3) + QoS 1 (bits 2-1)
|
||||
var info = MqttBinaryDecoder.ParsePublish(payload, flags: 0x0A);
|
||||
|
||||
info.Dup.ShouldBeTrue();
|
||||
info.QoS.ShouldBe((byte)1);
|
||||
info.PacketId.ShouldBe((ushort)5);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ParseConnect_EmptyClientId_IsAllowed()
|
||||
{
|
||||
// MQTT 3.1.1 §3.1.3.1 allows empty client IDs with CleanSession=true
|
||||
var payload = BuildConnectPayload("", cleanSession: true);
|
||||
|
||||
var info = MqttBinaryDecoder.ParseConnect(payload);
|
||||
|
||||
info.ClientId.ShouldBe(string.Empty);
|
||||
info.CleanSession.ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void TranslateWildcard_EmptyString_ReturnsEmpty()
|
||||
{
|
||||
var result = MqttBinaryDecoder.TranslateFilterToNatsSubject(string.Empty);
|
||||
result.ShouldBe(string.Empty);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void TranslateWildcard_PlainTopic_NoChange()
|
||||
{
|
||||
// A topic with no wildcards or slashes should pass through unchanged
|
||||
var result = MqttBinaryDecoder.TranslateFilterToNatsSubject("plainword");
|
||||
result.ShouldBe("plainword");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ParsePublish_EmptyPayload_IsAllowed()
|
||||
{
|
||||
// A PUBLISH with no application payload is valid (e.g. retain-delete)
|
||||
var topicBytes = Encoding.UTF8.GetBytes("empty/payload");
|
||||
|
||||
using var ms = new System.IO.MemoryStream();
|
||||
using var w = new System.IO.BinaryWriter(ms);
|
||||
WriteUInt16BE(w, (ushort)topicBytes.Length);
|
||||
w.Write(topicBytes);
|
||||
var payload = ms.ToArray();
|
||||
|
||||
var info = MqttBinaryDecoder.ParsePublish(payload, flags: 0x00);
|
||||
|
||||
info.Topic.ShouldBe("empty/payload");
|
||||
info.Payload.Length.ShouldBe(0);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user