feat: implement mqtt packet-level parser and writer

This commit is contained in:
Joseph Doherty
2026-02-23 14:41:23 -05:00
parent 958c4aa8ed
commit 7faf42c588
5 changed files with 153 additions and 0 deletions

View File

@@ -0,0 +1,63 @@
namespace NATS.Server.Mqtt;
public enum MqttControlPacketType : byte
{
Reserved = 0,
Connect = 1,
ConnAck = 2,
Publish = 3,
PubAck = 4,
Subscribe = 8,
SubAck = 9,
PingReq = 12,
PingResp = 13,
Disconnect = 14,
}
public sealed record MqttControlPacket(
MqttControlPacketType Type,
byte Flags,
int RemainingLength,
ReadOnlyMemory<byte> Payload);
public static class MqttPacketReader
{
public static MqttControlPacket Read(ReadOnlySpan<byte> buffer)
{
if (buffer.Length < 2)
throw new FormatException("MQTT packet is shorter than fixed header.");
var first = buffer[0];
var type = (MqttControlPacketType)(first >> 4);
var flags = (byte)(first & 0x0F);
var remainingLength = DecodeRemainingLength(buffer[1..], out var consumed);
var payloadStart = 1 + consumed;
var totalLength = payloadStart + remainingLength;
if (remainingLength < 0 || totalLength > buffer.Length)
throw new FormatException("MQTT packet remaining length exceeds available bytes.");
var payload = buffer[payloadStart..totalLength].ToArray();
return new MqttControlPacket(type, flags, remainingLength, payload);
}
internal static int DecodeRemainingLength(ReadOnlySpan<byte> encoded, out int consumed)
{
var multiplier = 1;
var value = 0;
consumed = 0;
for (var i = 0; i < encoded.Length && i < 4; i++)
{
var digit = encoded[i];
consumed++;
value += (digit & 0x7F) * multiplier;
if ((digit & 0x80) == 0)
return value;
multiplier *= 128;
}
throw new FormatException("Invalid MQTT remaining length encoding.");
}
}

View File

@@ -0,0 +1,38 @@
namespace NATS.Server.Mqtt;
public static class MqttPacketWriter
{
public static byte[] Write(MqttControlPacketType type, ReadOnlySpan<byte> payload, byte flags = 0)
{
if (type == MqttControlPacketType.Reserved)
throw new ArgumentOutOfRangeException(nameof(type), "MQTT control packet type must be non-zero.");
var remainingLength = payload.Length;
var encodedRemainingLength = EncodeRemainingLength(remainingLength);
var buffer = new byte[1 + encodedRemainingLength.Length + remainingLength];
buffer[0] = (byte)(((byte)type << 4) | (flags & 0x0F));
encodedRemainingLength.CopyTo(buffer.AsSpan(1));
payload.CopyTo(buffer.AsSpan(1 + encodedRemainingLength.Length));
return buffer;
}
internal static byte[] EncodeRemainingLength(int value)
{
if (value < 0 || value > 268_435_455)
throw new ArgumentOutOfRangeException(nameof(value), "MQTT remaining length must be between 0 and 268435455.");
Span<byte> scratch = stackalloc byte[4];
var index = 0;
do
{
var digit = (byte)(value % 128);
value /= 128;
if (value > 0)
digit |= 0x80;
scratch[index++] = digit;
} while (value > 0);
return scratch[..index].ToArray();
}
}

View File

@@ -12,6 +12,12 @@ public sealed record MqttPacket(MqttPacketType Type, string Topic, string Payloa
public sealed class MqttProtocolParser
{
public MqttControlPacket ParsePacket(ReadOnlySpan<byte> packet)
=> MqttPacketReader.Read(packet);
public byte[] WritePacket(MqttControlPacketType type, ReadOnlySpan<byte> payload, byte flags = 0)
=> MqttPacketWriter.Write(type, payload, flags);
public MqttPacket ParseLine(string line)
{
var trimmed = line.Trim();

View File

@@ -0,0 +1,26 @@
using NATS.Server.Mqtt;
namespace NATS.Server.Tests.Mqtt;
public class MqttPacketParserTests
{
[Fact]
public void Connect_packet_fixed_header_and_remaining_length_parse_correctly()
{
var packet = MqttPacketReader.Read(ConnectPacketBytes.Sample);
packet.Type.ShouldBe(MqttControlPacketType.Connect);
packet.RemainingLength.ShouldBe(12);
packet.Payload.Length.ShouldBe(12);
}
private static class ConnectPacketBytes
{
public static readonly byte[] Sample =
[
0x10, 0x0C, // CONNECT + remaining length
0x00, 0x04, (byte)'M', (byte)'Q', (byte)'T', (byte)'T',
0x04, 0x02, 0x00, 0x3C, // protocol level/flags/keepalive
0x00, 0x00, // empty client id
];
}
}

View File

@@ -0,0 +1,20 @@
using NATS.Server.Mqtt;
namespace NATS.Server.Tests.Mqtt;
public class MqttPacketWriterTests
{
[Fact]
public void Writer_emits_fixed_header_and_round_trips_with_reader()
{
byte[] payload = Enumerable.Repeat((byte)0xAB, 130).ToArray();
var encoded = MqttPacketWriter.Write(MqttControlPacketType.Publish, payload);
encoded[0].ShouldBe((byte)0x30); // PUBLISH type with default flags
var decoded = MqttPacketReader.Read(encoded);
decoded.Type.ShouldBe(MqttControlPacketType.Publish);
decoded.RemainingLength.ShouldBe(payload.Length);
decoded.Payload.ToArray().ShouldBe(payload);
}
}