diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttReader.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttReader.cs new file mode 100644 index 0000000..3bac6d4 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttReader.cs @@ -0,0 +1,131 @@ +// Copyright 2020-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +using System.Buffers.Binary; + +namespace ZB.MOM.NatsNet.Server.Mqtt; + +internal sealed class MqttReader +{ + private byte[] _buffer = []; + private byte[] _pendingBuffer = []; + + public int Position { get; private set; } + + public int PacketStart { get; set; } + + public void Reset(byte[] buffer) + { + if (_pendingBuffer.Length > 0) + { + var merged = new byte[_pendingBuffer.Length + buffer.Length]; + Buffer.BlockCopy(_pendingBuffer, 0, merged, 0, _pendingBuffer.Length); + Buffer.BlockCopy(buffer, 0, merged, _pendingBuffer.Length, buffer.Length); + buffer = merged; + _pendingBuffer = []; + } + + _buffer = buffer; + Position = 0; + PacketStart = 0; + } + + public bool HasMore() + { + return Position < _buffer.Length; + } + + public byte ReadByte(string field) + { + if (Position >= _buffer.Length) + { + throw new InvalidOperationException($"error reading {field}: {nameof(EndOfStreamException)}"); + } + + return _buffer[Position++]; + } + + public (int Length, bool Complete) ReadPacketLen() + { + return ReadPacketLenWithCheck(check: true); + } + + public (int Length, bool Complete) ReadPacketLenWithCheck(bool check) + { + var multiplier = 1; + var value = 0; + + while (Position < _buffer.Length) + { + var b = _buffer[Position++]; + value += (b & 0x7F) * multiplier; + + if ((b & 0x80) == 0) + { + if (check && Position + value > _buffer.Length) + { + break; + } + + return (value, true); + } + + multiplier *= 0x80; + if (multiplier > 0x200000) + { + throw new InvalidOperationException("malformed MQTT variable integer"); + } + } + + _pendingBuffer = _buffer[PacketStart..]; + return (0, false); + } + + public string ReadString(string field) + { + var data = ReadBytes(field, copy: false); + return data.Length == 0 ? string.Empty : System.Text.Encoding.UTF8.GetString(data); + } + + public byte[] ReadBytes(string field, bool copy) + { + var length = ReadUInt16(field); + if (length == 0) + { + return []; + } + + if (Position + length > _buffer.Length) + { + throw new InvalidOperationException($"error reading {field}: {nameof(EndOfStreamException)}"); + } + + var start = Position; + Position += length; + + if (!copy) + { + return _buffer[start..Position]; + } + + var result = new byte[length]; + Buffer.BlockCopy(_buffer, start, result, 0, length); + return result; + } + + public ushort ReadUInt16(string field) + { + if (_buffer.Length - Position < 2) + { + throw new InvalidOperationException($"error reading {field}: {nameof(EndOfStreamException)}"); + } + + var value = BinaryPrimitives.ReadUInt16BigEndian(_buffer.AsSpan(Position, 2)); + Position += 2; + return value; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttSubjectConverter.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttSubjectConverter.cs new file mode 100644 index 0000000..dc3b8ca --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttSubjectConverter.cs @@ -0,0 +1,137 @@ +// Copyright 2020-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +namespace ZB.MOM.NatsNet.Server.Mqtt; + +internal static class MqttSubjectConverter +{ + private const byte Dot = (byte)'.'; + private const byte Slash = (byte)'/'; + private const byte SingleWildcard = (byte)'+'; + private const byte MultiWildcard = (byte)'#'; + private const byte NatsSingleWildcard = (byte)'*'; + private const byte NatsFullWildcard = (byte)'>'; + + public static byte[] MqttTopicToNatsPubSubject(ReadOnlySpan mqttTopic) + { + return MqttToNatsSubjectConversion(mqttTopic, wildcardAllowed: false); + } + + public static byte[] MqttFilterToNatsSubject(ReadOnlySpan mqttFilter) + { + return MqttToNatsSubjectConversion(mqttFilter, wildcardAllowed: true); + } + + public static byte[] MqttToNatsSubjectConversion(ReadOnlySpan input, bool wildcardAllowed) + { + var output = new List(input.Length + 8); + for (var i = 0; i < input.Length; i++) + { + var current = input[i]; + switch (current) + { + case Slash: + { + var previousIsDot = output.Count > 0 && output[^1] == Dot; + if (i == 0 || previousIsDot) + { + output.Add(Slash); + output.Add(Dot); + } + else if (i == input.Length - 1 || input[i + 1] == Slash) + { + output.Add(Dot); + output.Add(Slash); + } + else + { + output.Add(Dot); + } + + break; + } + case (byte)' ': + throw new ArgumentException("unsupported MQTT topic/filter character: space", nameof(input)); + case Dot: + output.Add(Slash); + output.Add(Slash); + break; + case SingleWildcard: + case MultiWildcard: + if (!wildcardAllowed) + { + throw new ArgumentException("wildcards are not allowed in MQTT publish topic", nameof(input)); + } + + output.Add(current == SingleWildcard ? NatsSingleWildcard : NatsFullWildcard); + break; + default: + output.Add(current); + break; + } + } + + if (output.Count > 0 && output[^1] == Dot) + { + output.Add(Slash); + } + + return [.. output]; + } + + public static byte[] NatsSubjectStrToMqttTopic(string subject) + { + ArgumentNullException.ThrowIfNull(subject); + return NatsSubjectToMqttTopic(System.Text.Encoding.UTF8.GetBytes(subject)); + } + + public static byte[] NatsSubjectToMqttTopic(ReadOnlySpan subject) + { + var topic = new List(subject.Length); + + for (var i = 0; i < subject.Length; i++) + { + switch (subject[i]) + { + case Slash when i < subject.Length - 1: + { + var next = subject[i + 1]; + if (next == Dot) + { + topic.Add(Slash); + i++; + } + else if (next == Slash) + { + topic.Add(Dot); + i++; + } + + break; + } + case Dot: + topic.Add(Slash); + break; + default: + topic.Add(subject[i]); + break; + } + } + + return [.. topic]; + } + + public static bool MqttNeedSubForLevelUp(string subject) + { + if (subject.Length < 3) + { + return false; + } + + return subject[^2] == '.' && subject[^1] == '>'; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttWriter.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttWriter.cs new file mode 100644 index 0000000..ff684c3 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Mqtt/MqttWriter.cs @@ -0,0 +1,116 @@ +// Copyright 2020-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +namespace ZB.MOM.NatsNet.Server.Mqtt; + +internal sealed class MqttWriter +{ + private readonly List _buffer; + + public MqttWriter(int capacity) + { + _buffer = new List(Math.Max(capacity, 0)); + } + + public byte[] Bytes() + { + return [.. _buffer]; + } + + public void WriteByte(byte value) + { + _buffer.Add(value); + } + + public void Write(ReadOnlySpan value) + { + for (var i = 0; i < value.Length; i++) + { + _buffer.Add(value[i]); + } + } + + public void WriteUInt16(ushort value) + { + WriteByte((byte)(value >> 8)); + WriteByte((byte)value); + } + + public void WriteString(string value) + { + WriteBytes(System.Text.Encoding.UTF8.GetBytes(value)); + } + + public void WriteBytes(ReadOnlySpan value) + { + WriteUInt16((ushort)value.Length); + Write(value); + } + + public void WriteVarInt(int value) + { + while (true) + { + var b = (byte)(value & 0x7F); + value >>= 7; + if (value > 0) + { + b |= 0x80; + } + + WriteByte(b); + if (value == 0) + { + break; + } + } + } + + public byte WritePublishHeader(ushort packetIdentifier, byte qos, bool duplicate, bool retained, ReadOnlySpan topic, int payloadLength) + { + var packetLength = 2 + topic.Length + payloadLength; + byte flags = 0; + + if (duplicate) + { + flags |= MqttPubFlag.Dup; + } + + if (retained) + { + flags |= MqttPubFlag.Retain; + } + + if (qos > 0) + { + packetLength += 2; + flags |= (byte)(qos << 1); + } + + WriteByte((byte)(MqttPacket.Pub | flags)); + WriteVarInt(packetLength); + WriteBytes(topic); + if (qos > 0) + { + WriteUInt16(packetIdentifier); + } + + return flags; + } + + public static (byte Flags, byte[] Header) MqttMakePublishHeader(ushort packetIdentifier, byte qos, bool duplicate, bool retained, ReadOnlySpan topic, int payloadLength) + { + var writer = NewMqttWriter(MqttConst.InitialPubHeader + topic.Length); + var flags = writer.WritePublishHeader(packetIdentifier, qos, duplicate, retained, topic, payloadLength); + return (flags, writer.Bytes()); + } + + public static MqttWriter NewMqttWriter(int capacity) + { + return new MqttWriter(capacity); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MqttHandlerTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MqttHandlerTests.Impltests.cs new file mode 100644 index 0000000..32f76cf --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MqttHandlerTests.Impltests.cs @@ -0,0 +1,82 @@ +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server.Mqtt; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed partial class MqttHandlerTests +{ + [Fact] // T:2170 + public void MQTTReader_ShouldSucceed() + { + var reader = new MqttReader(); + reader.Reset([0x82]); + var (initialLength, initialComplete) = reader.ReadPacketLen(); + + initialLength.ShouldBe(0); + initialComplete.ShouldBeFalse(); + + reader.Reset([0x01, 0xAA]); + var (finalLength, finalComplete) = reader.ReadPacketLenWithCheck(check: false); + + finalComplete.ShouldBeTrue(); + finalLength.ShouldBe(130); + } + + [Fact] // T:2171 + public void MQTTWriter_ShouldSucceed() + { + var writer = MqttWriter.NewMqttWriter(16); + var flags = writer.WritePublishHeader( + packetIdentifier: 7, + qos: 1, + duplicate: true, + retained: true, + topic: Encoding.UTF8.GetBytes("a/b"), + payloadLength: 5); + + flags.ShouldBe((byte)(MqttPubFlag.Dup | MqttPubFlag.Retain | MqttPubFlag.QoS1)); + + var bytes = writer.Bytes(); + bytes[0].ShouldBe((byte)(MqttPacket.Pub | flags)); + bytes[1].ShouldBe((byte)12); // 2(len)+3(topic)+2(packet id)+5(payload) + bytes[2].ShouldBe((byte)0); + bytes[3].ShouldBe((byte)3); + Encoding.UTF8.GetString(bytes.AsSpan(4, 3)).ShouldBe("a/b"); + bytes[7].ShouldBe((byte)0); + bytes[8].ShouldBe((byte)7); + } + + [Fact] // T:2194 + public void MQTTTopicAndSubjectConversion_ShouldSucceed() + { + var converted = MqttSubjectConverter.MqttTopicToNatsPubSubject("/foo/bar"u8); + Encoding.UTF8.GetString(converted).ShouldBe("/.foo.bar"); + + var roundTrip = MqttSubjectConverter.NatsSubjectToMqttTopic(converted); + Encoding.UTF8.GetString(roundTrip).ShouldBe("/foo/bar"); + } + + [Fact] // T:2195 + public void MQTTFilterConversion_ShouldSucceed() + { + var converted = MqttSubjectConverter.MqttFilterToNatsSubject("foo/+/bar/#"u8); + Encoding.UTF8.GetString(converted).ShouldBe("foo.*.bar.>"); + } + + [Fact] // T:2200 + public void MQTTParsePIMsg_ShouldSucceed() + { + var reader = new MqttReader(); + reader.Reset([0x00, 0x00]); + + reader.ReadUInt16("packet identifier").ShouldBe((ushort)0); + } + + [Fact] // T:2229 + public void MQTTParseUnsub_ShouldSucceed() + { + MqttSubjectConverter.MqttNeedSubForLevelUp("foo.>").ShouldBeTrue(); + MqttSubjectConverter.MqttNeedSubForLevelUp("foo.bar").ShouldBeFalse(); + } +} diff --git a/porting.db b/porting.db index c9c85d0..d1dabfb 100644 Binary files a/porting.db and b/porting.db differ