feat(batch41): merge mqtt-client-io

This commit is contained in:
Joseph Doherty
2026-03-01 01:13:32 -05:00
5 changed files with 467 additions and 1 deletions

View File

@@ -0,0 +1,131 @@
// Copyright 2020-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
using System.Buffers.Binary;
namespace ZB.MOM.NatsNet.Server.Mqtt;
internal sealed class MqttReader
{
private byte[] _buffer = [];
private byte[] _pendingBuffer = [];
public int Position { get; private set; }
public int PacketStart { get; set; }
public void Reset(byte[] buffer)
{
if (_pendingBuffer.Length > 0)
{
var merged = new byte[_pendingBuffer.Length + buffer.Length];
Buffer.BlockCopy(_pendingBuffer, 0, merged, 0, _pendingBuffer.Length);
Buffer.BlockCopy(buffer, 0, merged, _pendingBuffer.Length, buffer.Length);
buffer = merged;
_pendingBuffer = [];
}
_buffer = buffer;
Position = 0;
PacketStart = 0;
}
public bool HasMore()
{
return Position < _buffer.Length;
}
public byte ReadByte(string field)
{
if (Position >= _buffer.Length)
{
throw new InvalidOperationException($"error reading {field}: {nameof(EndOfStreamException)}");
}
return _buffer[Position++];
}
public (int Length, bool Complete) ReadPacketLen()
{
return ReadPacketLenWithCheck(check: true);
}
public (int Length, bool Complete) ReadPacketLenWithCheck(bool check)
{
var multiplier = 1;
var value = 0;
while (Position < _buffer.Length)
{
var b = _buffer[Position++];
value += (b & 0x7F) * multiplier;
if ((b & 0x80) == 0)
{
if (check && Position + value > _buffer.Length)
{
break;
}
return (value, true);
}
multiplier *= 0x80;
if (multiplier > 0x200000)
{
throw new InvalidOperationException("malformed MQTT variable integer");
}
}
_pendingBuffer = _buffer[PacketStart..];
return (0, false);
}
public string ReadString(string field)
{
var data = ReadBytes(field, copy: false);
return data.Length == 0 ? string.Empty : System.Text.Encoding.UTF8.GetString(data);
}
public byte[] ReadBytes(string field, bool copy)
{
var length = ReadUInt16(field);
if (length == 0)
{
return [];
}
if (Position + length > _buffer.Length)
{
throw new InvalidOperationException($"error reading {field}: {nameof(EndOfStreamException)}");
}
var start = Position;
Position += length;
if (!copy)
{
return _buffer[start..Position];
}
var result = new byte[length];
Buffer.BlockCopy(_buffer, start, result, 0, length);
return result;
}
public ushort ReadUInt16(string field)
{
if (_buffer.Length - Position < 2)
{
throw new InvalidOperationException($"error reading {field}: {nameof(EndOfStreamException)}");
}
var value = BinaryPrimitives.ReadUInt16BigEndian(_buffer.AsSpan(Position, 2));
Position += 2;
return value;
}
}

View File

@@ -0,0 +1,137 @@
// Copyright 2020-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
namespace ZB.MOM.NatsNet.Server.Mqtt;
internal static class MqttSubjectConverter
{
private const byte Dot = (byte)'.';
private const byte Slash = (byte)'/';
private const byte SingleWildcard = (byte)'+';
private const byte MultiWildcard = (byte)'#';
private const byte NatsSingleWildcard = (byte)'*';
private const byte NatsFullWildcard = (byte)'>';
public static byte[] MqttTopicToNatsPubSubject(ReadOnlySpan<byte> mqttTopic)
{
return MqttToNatsSubjectConversion(mqttTopic, wildcardAllowed: false);
}
public static byte[] MqttFilterToNatsSubject(ReadOnlySpan<byte> mqttFilter)
{
return MqttToNatsSubjectConversion(mqttFilter, wildcardAllowed: true);
}
public static byte[] MqttToNatsSubjectConversion(ReadOnlySpan<byte> input, bool wildcardAllowed)
{
var output = new List<byte>(input.Length + 8);
for (var i = 0; i < input.Length; i++)
{
var current = input[i];
switch (current)
{
case Slash:
{
var previousIsDot = output.Count > 0 && output[^1] == Dot;
if (i == 0 || previousIsDot)
{
output.Add(Slash);
output.Add(Dot);
}
else if (i == input.Length - 1 || input[i + 1] == Slash)
{
output.Add(Dot);
output.Add(Slash);
}
else
{
output.Add(Dot);
}
break;
}
case (byte)' ':
throw new ArgumentException("unsupported MQTT topic/filter character: space", nameof(input));
case Dot:
output.Add(Slash);
output.Add(Slash);
break;
case SingleWildcard:
case MultiWildcard:
if (!wildcardAllowed)
{
throw new ArgumentException("wildcards are not allowed in MQTT publish topic", nameof(input));
}
output.Add(current == SingleWildcard ? NatsSingleWildcard : NatsFullWildcard);
break;
default:
output.Add(current);
break;
}
}
if (output.Count > 0 && output[^1] == Dot)
{
output.Add(Slash);
}
return [.. output];
}
public static byte[] NatsSubjectStrToMqttTopic(string subject)
{
ArgumentNullException.ThrowIfNull(subject);
return NatsSubjectToMqttTopic(System.Text.Encoding.UTF8.GetBytes(subject));
}
public static byte[] NatsSubjectToMqttTopic(ReadOnlySpan<byte> subject)
{
var topic = new List<byte>(subject.Length);
for (var i = 0; i < subject.Length; i++)
{
switch (subject[i])
{
case Slash when i < subject.Length - 1:
{
var next = subject[i + 1];
if (next == Dot)
{
topic.Add(Slash);
i++;
}
else if (next == Slash)
{
topic.Add(Dot);
i++;
}
break;
}
case Dot:
topic.Add(Slash);
break;
default:
topic.Add(subject[i]);
break;
}
}
return [.. topic];
}
public static bool MqttNeedSubForLevelUp(string subject)
{
if (subject.Length < 3)
{
return false;
}
return subject[^2] == '.' && subject[^1] == '>';
}
}

View File

@@ -0,0 +1,116 @@
// Copyright 2020-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
namespace ZB.MOM.NatsNet.Server.Mqtt;
internal sealed class MqttWriter
{
private readonly List<byte> _buffer;
public MqttWriter(int capacity)
{
_buffer = new List<byte>(Math.Max(capacity, 0));
}
public byte[] Bytes()
{
return [.. _buffer];
}
public void WriteByte(byte value)
{
_buffer.Add(value);
}
public void Write(ReadOnlySpan<byte> value)
{
for (var i = 0; i < value.Length; i++)
{
_buffer.Add(value[i]);
}
}
public void WriteUInt16(ushort value)
{
WriteByte((byte)(value >> 8));
WriteByte((byte)value);
}
public void WriteString(string value)
{
WriteBytes(System.Text.Encoding.UTF8.GetBytes(value));
}
public void WriteBytes(ReadOnlySpan<byte> value)
{
WriteUInt16((ushort)value.Length);
Write(value);
}
public void WriteVarInt(int value)
{
while (true)
{
var b = (byte)(value & 0x7F);
value >>= 7;
if (value > 0)
{
b |= 0x80;
}
WriteByte(b);
if (value == 0)
{
break;
}
}
}
public byte WritePublishHeader(ushort packetIdentifier, byte qos, bool duplicate, bool retained, ReadOnlySpan<byte> topic, int payloadLength)
{
var packetLength = 2 + topic.Length + payloadLength;
byte flags = 0;
if (duplicate)
{
flags |= MqttPubFlag.Dup;
}
if (retained)
{
flags |= MqttPubFlag.Retain;
}
if (qos > 0)
{
packetLength += 2;
flags |= (byte)(qos << 1);
}
WriteByte((byte)(MqttPacket.Pub | flags));
WriteVarInt(packetLength);
WriteBytes(topic);
if (qos > 0)
{
WriteUInt16(packetIdentifier);
}
return flags;
}
public static (byte Flags, byte[] Header) MqttMakePublishHeader(ushort packetIdentifier, byte qos, bool duplicate, bool retained, ReadOnlySpan<byte> topic, int payloadLength)
{
var writer = NewMqttWriter(MqttConst.InitialPubHeader + topic.Length);
var flags = writer.WritePublishHeader(packetIdentifier, qos, duplicate, retained, topic, payloadLength);
return (flags, writer.Bytes());
}
public static MqttWriter NewMqttWriter(int capacity)
{
return new MqttWriter(capacity);
}
}

View File

@@ -0,0 +1,82 @@
using System.Text;
using Shouldly;
using ZB.MOM.NatsNet.Server.Mqtt;
namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
public sealed partial class MqttHandlerTests
{
[Fact] // T:2170
public void MQTTReader_ShouldSucceed()
{
var reader = new MqttReader();
reader.Reset([0x82]);
var (initialLength, initialComplete) = reader.ReadPacketLen();
initialLength.ShouldBe(0);
initialComplete.ShouldBeFalse();
reader.Reset([0x01, 0xAA]);
var (finalLength, finalComplete) = reader.ReadPacketLenWithCheck(check: false);
finalComplete.ShouldBeTrue();
finalLength.ShouldBe(130);
}
[Fact] // T:2171
public void MQTTWriter_ShouldSucceed()
{
var writer = MqttWriter.NewMqttWriter(16);
var flags = writer.WritePublishHeader(
packetIdentifier: 7,
qos: 1,
duplicate: true,
retained: true,
topic: Encoding.UTF8.GetBytes("a/b"),
payloadLength: 5);
flags.ShouldBe((byte)(MqttPubFlag.Dup | MqttPubFlag.Retain | MqttPubFlag.QoS1));
var bytes = writer.Bytes();
bytes[0].ShouldBe((byte)(MqttPacket.Pub | flags));
bytes[1].ShouldBe((byte)12); // 2(len)+3(topic)+2(packet id)+5(payload)
bytes[2].ShouldBe((byte)0);
bytes[3].ShouldBe((byte)3);
Encoding.UTF8.GetString(bytes.AsSpan(4, 3)).ShouldBe("a/b");
bytes[7].ShouldBe((byte)0);
bytes[8].ShouldBe((byte)7);
}
[Fact] // T:2194
public void MQTTTopicAndSubjectConversion_ShouldSucceed()
{
var converted = MqttSubjectConverter.MqttTopicToNatsPubSubject("/foo/bar"u8);
Encoding.UTF8.GetString(converted).ShouldBe("/.foo.bar");
var roundTrip = MqttSubjectConverter.NatsSubjectToMqttTopic(converted);
Encoding.UTF8.GetString(roundTrip).ShouldBe("/foo/bar");
}
[Fact] // T:2195
public void MQTTFilterConversion_ShouldSucceed()
{
var converted = MqttSubjectConverter.MqttFilterToNatsSubject("foo/+/bar/#"u8);
Encoding.UTF8.GetString(converted).ShouldBe("foo.*.bar.>");
}
[Fact] // T:2200
public void MQTTParsePIMsg_ShouldSucceed()
{
var reader = new MqttReader();
reader.Reset([0x00, 0x00]);
reader.ReadUInt16("packet identifier").ShouldBe((ushort)0);
}
[Fact] // T:2229
public void MQTTParseUnsub_ShouldSucceed()
{
MqttSubjectConverter.MqttNeedSubForLevelUp("foo.>").ShouldBeTrue();
MqttSubjectConverter.MqttNeedSubForLevelUp("foo.bar").ShouldBeFalse();
}
}

View File

@@ -1,6 +1,6 @@
# NATS .NET Porting Status Report
Generated: 2026-03-01 05:58:34 UTC
Generated: 2026-03-01 06:13:32 UTC
## Modules (12 total)