diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Mqtt/MqttTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Mqtt/MqttTests.cs
new file mode 100644
index 0000000..00d1977
--- /dev/null
+++ b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Mqtt/MqttTests.cs
@@ -0,0 +1,2244 @@
+// Copyright 2020-2026 The NATS Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Ported from server/mqtt_test.go and server/mqtt_ex_test_test.go in the NATS
+// Go server source.
+
+using System.Buffers.Binary;
+using System.Net;
+using System.Net.Security;
+using System.Net.Sockets;
+using System.Text;
+using Shouldly;
+using Xunit.Abstractions;
+
+namespace ZB.MOM.NatsNet.Server.IntegrationTests.Mqtt;
+
+///
+/// Integration tests for the MQTT protocol implementation.
+/// These tests require a running NATS server with MQTT and JetStream enabled.
+///
+/// Start with: cd golang/nats-server && go run . -c mqtt-test.conf
+/// Or set MQTT_HOST / MQTT_PORT environment variables.
+///
+/// The tests use raw TCP connections with MQTT binary protocol to mirror the
+/// Go test helpers (testMQTTConnect, testMQTTPublish, etc.).
+///
+[Trait("Category", "Integration")]
+public sealed class MqttTests
+{
+ // ---------------------------------------------------------------------------
+ // Constants mirroring Go mqtt.go
+ // ---------------------------------------------------------------------------
+
+ // Packet types
+ private const byte PacketConnect = 0x10;
+ private const byte PacketConnAck = 0x20;
+ private const byte PacketPub = 0x30;
+ private const byte PacketPubAck = 0x40;
+ private const byte PacketPubRec = 0x50;
+ private const byte PacketPubRel = 0x60;
+ private const byte PacketPubComp = 0x70;
+ private const byte PacketSub = 0x80;
+ private const byte PacketSubAck = 0x90;
+ private const byte PacketUnsub = 0xA0;
+ private const byte PacketUnsubAck = 0xB0;
+ private const byte PacketPing = 0xC0;
+ private const byte PacketPingResp = 0xD0;
+ private const byte PacketDisconnect = 0xE0;
+ private const byte PacketMask = 0xF0;
+ private const byte PacketFlagMask = 0x0F;
+
+ // CONNECT flags
+ private const byte ConnFlagCleanSession = 0x02;
+ private const byte ConnFlagWillFlag = 0x04;
+ private const byte ConnFlagWillQoS = 0x18;
+ private const byte ConnFlagWillRetain = 0x20;
+ private const byte ConnFlagPasswordFlag = 0x40;
+ private const byte ConnFlagUsernameFlag = 0x80;
+
+ // PUBLISH flags
+ private const byte PubFlagRetain = 0x01;
+ private const byte PubFlagQoS = 0x06;
+ private const byte PubFlagDup = 0x08;
+ private const byte PubQoS1 = 0x1 << 1;
+ private const byte PubQoS2 = 0x2 << 1;
+
+ // SUBSCRIBE / UNSUBSCRIBE fixed flags
+ private const byte SubscribeFlags = 0x02;
+ private const byte UnsubscribeFlags = 0x02;
+
+ // CONNACK return codes
+ private const byte ConnAckAccepted = 0x00;
+ private const byte ConnAckUnacceptableProtocol = 0x01;
+ private const byte ConnAckIdentifierRejected = 0x02;
+ private const byte ConnAckServerUnavailable = 0x03;
+ private const byte ConnAckBadUserOrPassword = 0x04;
+ private const byte ConnAckNotAuthorized = 0x05;
+
+ private const byte SubAckFailure = 0x80;
+
+ private static readonly byte[] MqttProtoName = Encoding.ASCII.GetBytes("MQTT");
+ private const byte MqttProtoLevel = 0x04;
+
+ private static readonly TimeSpan MqttTimeout = TimeSpan.FromSeconds(10);
+
+ // ---------------------------------------------------------------------------
+ // Configuration
+ // ---------------------------------------------------------------------------
+
+ private readonly ITestOutputHelper _output;
+ private readonly string _mqttHost;
+ private readonly int _mqttPort;
+ private readonly bool _serverAvailable;
+
+ public MqttTests(ITestOutputHelper output)
+ {
+ _output = output;
+ _mqttHost = Environment.GetEnvironmentVariable("MQTT_HOST") ?? "127.0.0.1";
+ var portStr = Environment.GetEnvironmentVariable("MQTT_PORT");
+ _mqttPort = portStr is not null && int.TryParse(portStr, out var p) ? p : 1883;
+ _serverAvailable = ProbeServer(_mqttHost, _mqttPort);
+ }
+
+ private static bool ProbeServer(string host, int port)
+ {
+ try
+ {
+ using var tcp = new TcpClient();
+ tcp.Connect(host, port);
+ return true;
+ }
+ catch
+ {
+ return false;
+ }
+ }
+
+ private bool SkipIfUnavailable()
+ {
+ return !_serverAvailable;
+ }
+
+ // ---------------------------------------------------------------------------
+ // MQTT protocol helper – mirrors testMQTTRead / testMQTTWrite / etc.
+ // ---------------------------------------------------------------------------
+
+ private sealed class MqttConnection : IDisposable
+ {
+ public TcpClient Tcp { get; }
+ public NetworkStream Stream { get; }
+
+ public MqttConnection(TcpClient tcp)
+ {
+ Tcp = tcp;
+ Stream = tcp.GetStream();
+ }
+
+ public void Dispose()
+ {
+ Stream.Dispose();
+ Tcp.Dispose();
+ }
+ }
+
+ /// Creates a raw TCP MQTT connection and sends a CONNECT packet.
+ private MqttConnection MqttConnect(
+ string? clientId = null,
+ bool cleanSess = true,
+ string? user = null,
+ string? pass = null,
+ ushort keepAlive = 0,
+ WillInfo? will = null)
+ {
+ var tcp = new TcpClient();
+ tcp.Connect(_mqttHost, _mqttPort);
+ var conn = new MqttConnection(tcp);
+
+ var proto = BuildConnectPacket(clientId, cleanSess, user, pass, keepAlive, will);
+ WriteAll(conn.Stream, proto);
+ return conn;
+ }
+
+ private sealed class WillInfo
+ {
+ public string Topic { get; init; } = "";
+ public byte[] Message { get; init; } = [];
+ public byte Qos { get; init; }
+ public bool Retain { get; init; }
+ }
+
+ private static byte[] BuildConnectPacket(
+ string? clientId,
+ bool cleanSess,
+ string? user,
+ string? pass,
+ ushort keepAlive,
+ WillInfo? will)
+ {
+ byte flags = 0;
+ if (cleanSess) flags |= ConnFlagCleanSession;
+
+ var cidBytes = Encoding.UTF8.GetBytes(clientId ?? "");
+ var userBytes = user is not null ? Encoding.UTF8.GetBytes(user) : null;
+ var passBytes = pass is not null ? Encoding.UTF8.GetBytes(pass) : null;
+ byte[]? willTopicBytes = null;
+ byte[]? willMessageBytes = null;
+
+ if (will is not null)
+ {
+ flags |= ConnFlagWillFlag;
+ flags |= (byte)((will.Qos & 0x03) << 3);
+ if (will.Retain) flags |= ConnFlagWillRetain;
+ willTopicBytes = Encoding.UTF8.GetBytes(will.Topic);
+ willMessageBytes = will.Message;
+ }
+ if (userBytes is not null) flags |= ConnFlagUsernameFlag;
+ if (passBytes is not null) flags |= ConnFlagPasswordFlag;
+
+ var pkLen = 2 + MqttProtoName.Length +
+ 1 + 1 + 2 + // proto level + flags + keepAlive
+ 2 + cidBytes.Length;
+ if (willTopicBytes is not null) pkLen += 2 + willTopicBytes.Length + 2 + (willMessageBytes?.Length ?? 0);
+ if (userBytes is not null) pkLen += 2 + userBytes.Length;
+ if (passBytes is not null) pkLen += 2 + passBytes.Length;
+
+ var w = new ByteWriter();
+ w.WriteByte(PacketConnect);
+ w.WriteVarInt(pkLen);
+ w.WriteBytes(MqttProtoName);
+ w.WriteByte(MqttProtoLevel);
+ w.WriteByte(flags);
+ w.WriteUInt16(keepAlive);
+ w.WriteBytes(cidBytes);
+ if (willTopicBytes is not null)
+ {
+ w.WriteBytes(willTopicBytes);
+ w.WriteBytes(willMessageBytes ?? []);
+ }
+ if (userBytes is not null) w.WriteBytes(userBytes);
+ if (passBytes is not null) w.WriteBytes(passBytes);
+ return w.ToArray();
+ }
+
+ // Read exactly n bytes from the stream (with timeout)
+ private static byte[] ReadExact(NetworkStream stream, int n)
+ {
+ var buf = new byte[n];
+ var offset = 0;
+ stream.ReadTimeout = (int)MqttTimeout.TotalMilliseconds;
+ while (offset < n)
+ {
+ var read = stream.Read(buf, offset, n - offset);
+ if (read == 0) throw new EndOfStreamException("MQTT connection closed");
+ offset += read;
+ }
+ return buf;
+ }
+
+ // Read one MQTT packet; returns (firstByte, payloadBytes)
+ private static (byte FirstByte, byte[] Payload) ReadPacket(NetworkStream stream)
+ {
+ stream.ReadTimeout = (int)MqttTimeout.TotalMilliseconds;
+ var header = new byte[1];
+ var n = stream.Read(header, 0, 1);
+ if (n == 0) throw new EndOfStreamException("MQTT connection closed reading packet header");
+
+ // Read variable-length remaining length
+ var multiplier = 1;
+ var value = 0;
+ while (true)
+ {
+ var b2 = new byte[1];
+ if (stream.Read(b2, 0, 1) == 0) throw new EndOfStreamException();
+ value += (b2[0] & 0x7F) * multiplier;
+ if ((b2[0] & 0x80) == 0) break;
+ multiplier *= 128;
+ if (multiplier > 0x200000) throw new InvalidOperationException("malformed MQTT variable int");
+ }
+
+ var payload = value > 0 ? ReadExact(stream, value) : [];
+ return (header[0], payload);
+ }
+
+ private static void WriteAll(NetworkStream stream, byte[] data)
+ {
+ stream.WriteTimeout = (int)MqttTimeout.TotalMilliseconds;
+ stream.Write(data, 0, data.Length);
+ }
+
+ // Read ConnAck and verify
+ private static void CheckConnAck(NetworkStream stream, byte expectedRc, bool expectedSessionPresent)
+ {
+ var (b, payload) = ReadPacket(stream);
+ (b & PacketMask).ShouldBe(PacketConnAck, "Expected CONNACK packet");
+ payload.Length.ShouldBe(2, "CONNACK payload should be 2 bytes");
+ var sessionPresent = (payload[0] & 0x01) == 1;
+ sessionPresent.ShouldBe(expectedSessionPresent, "session-present flag mismatch");
+ payload[1].ShouldBe(expectedRc, $"CONNACK return code mismatch (got 0x{payload[1]:X2}, expected 0x{expectedRc:X2})");
+ }
+
+ private static void CheckConnAckAccepted(NetworkStream stream, bool expectedSessionPresent = false)
+ => CheckConnAck(stream, ConnAckAccepted, expectedSessionPresent);
+
+ // Send a PUBLISH packet
+ private static void SendPublish(NetworkStream stream, byte qos, bool dup, bool retain,
+ string topic, ushort pi, byte[] payload)
+ {
+ byte flags = 0;
+ if (dup) flags |= PubFlagDup;
+ if (retain) flags |= PubFlagRetain;
+ flags |= (byte)((qos & 0x03) << 1);
+
+ var topicBytes = Encoding.UTF8.GetBytes(topic);
+ var pkLen = 2 + topicBytes.Length + payload.Length;
+ if (qos > 0) pkLen += 2;
+
+ var w = new ByteWriter();
+ w.WriteByte((byte)(PacketPub | flags));
+ w.WriteVarInt(pkLen);
+ w.WriteBytes(topicBytes);
+ if (qos > 0) w.WriteUInt16(pi);
+ w.Write(payload);
+ WriteAll(stream, w.ToArray());
+ }
+
+ // Send PUBLISH and wait for acknowledgement (PUBACK for QoS1, PUBREC+PUBREL+PUBCOMP for QoS2)
+ private static void MqttPublish(NetworkStream stream, byte qos, bool dup, bool retain,
+ string topic, ushort pi, byte[] payload)
+ {
+ SendPublish(stream, qos, dup, retain, topic, pi, payload);
+ switch (qos)
+ {
+ case 1:
+ {
+ var (b, p) = ReadPacket(stream);
+ (b & PacketMask).ShouldBe(PacketPubAck, "Expected PUBACK");
+ var rpi = BinaryPrimitives.ReadUInt16BigEndian(p.AsSpan(0, 2));
+ rpi.ShouldBe(pi, "PUBACK packet identifier mismatch");
+ break;
+ }
+ case 2:
+ {
+ var (b, p) = ReadPacket(stream);
+ (b & PacketMask).ShouldBe(PacketPubRec, "Expected PUBREC");
+ var rpi = BinaryPrimitives.ReadUInt16BigEndian(p.AsSpan(0, 2));
+ rpi.ShouldBe(pi, "PUBREC packet identifier mismatch");
+
+ SendPiPacket(stream, PacketPubRel, pi);
+
+ var (b2, p2) = ReadPacket(stream);
+ (b2 & PacketMask).ShouldBe(PacketPubComp, "Expected PUBCOMP");
+ var rpi2 = BinaryPrimitives.ReadUInt16BigEndian(p2.AsSpan(0, 2));
+ rpi2.ShouldBe(pi, "PUBCOMP packet identifier mismatch");
+
+ // flush
+ MqttFlush(stream);
+ break;
+ }
+ }
+ }
+
+ private static void MqttFlush(NetworkStream stream)
+ {
+ var w = new ByteWriter();
+ w.WriteByte(PacketPing);
+ w.WriteByte(0);
+ WriteAll(stream, w.ToArray());
+ var (b, p) = ReadPacket(stream);
+ (b & PacketMask).ShouldBe(PacketPingResp, "Expected PINGRESP");
+ p.Length.ShouldBe(0, "PINGRESP payload should be empty");
+ }
+
+ private static void SendPiPacket(NetworkStream stream, byte packetType, ushort pi)
+ {
+ var w = new ByteWriter();
+ w.WriteByte(packetType);
+ w.WriteVarInt(2);
+ w.WriteUInt16(pi);
+ WriteAll(stream, w.ToArray());
+ }
+
+ // Subscribe helper; returns SUBACK QoS bytes
+ private static byte[] MqttSubscribe(NetworkStream stream, ushort pi,
+ IEnumerable<(string Filter, byte Qos)> filters)
+ {
+ var filterList = filters.ToList();
+ var pkLen = 2; // pi
+ foreach (var (f, _) in filterList) pkLen += 2 + Encoding.UTF8.GetByteCount(f) + 1;
+
+ var w = new ByteWriter();
+ w.WriteByte((byte)(PacketSub | SubscribeFlags));
+ w.WriteVarInt(pkLen);
+ w.WriteUInt16(pi);
+ foreach (var (f, q) in filterList)
+ {
+ w.WriteBytes(Encoding.UTF8.GetBytes(f));
+ w.WriteByte(q);
+ }
+ WriteAll(stream, w.ToArray());
+
+ var (b, payload) = ReadPacket(stream);
+ (b & PacketMask).ShouldBe(PacketSubAck, "Expected SUBACK");
+ var rpi = BinaryPrimitives.ReadUInt16BigEndian(payload.AsSpan(0, 2));
+ rpi.ShouldBe(pi, "SUBACK packet identifier mismatch");
+ return payload[2..];
+ }
+
+ // Unsubscribe helper
+ private static ushort MqttUnsubscribe(NetworkStream stream, ushort pi, IEnumerable filters)
+ {
+ var filterList = filters.ToList();
+ var pkLen = 2;
+ foreach (var f in filterList) pkLen += 2 + Encoding.UTF8.GetByteCount(f);
+
+ var w = new ByteWriter();
+ w.WriteByte((byte)(PacketUnsub | UnsubscribeFlags));
+ w.WriteVarInt(pkLen);
+ w.WriteUInt16(pi);
+ foreach (var f in filterList) w.WriteBytes(Encoding.UTF8.GetBytes(f));
+ WriteAll(stream, w.ToArray());
+
+ var (b, payload) = ReadPacket(stream);
+ (b & PacketMask).ShouldBe(PacketUnsubAck, "Expected UNSUBACK");
+ return BinaryPrimitives.ReadUInt16BigEndian(payload.AsSpan(0, 2));
+ }
+
+ // Read a PUBLISH packet and return (flags, pi, topic, payload)
+ private static (byte Flags, ushort Pi, string Topic, byte[] Payload) ReadPublish(
+ NetworkStream stream)
+ {
+ var (b, raw) = ReadPacket(stream);
+ (b & PacketMask).ShouldBe(PacketPub, "Expected PUBLISH packet");
+ var flags = (byte)(b & PacketFlagMask);
+ var qos = (byte)((flags & PubFlagQoS) >> 1);
+
+ var pos = 0;
+ var topicLen = BinaryPrimitives.ReadUInt16BigEndian(raw.AsSpan(pos, 2)); pos += 2;
+ var topic = Encoding.UTF8.GetString(raw, pos, topicLen); pos += topicLen;
+ ushort pi = 0;
+ if (qos > 0) { pi = BinaryPrimitives.ReadUInt16BigEndian(raw.AsSpan(pos, 2)); pos += 2; }
+ var payload = raw[pos..];
+ return (flags, pi, topic, payload);
+ }
+
+ // Check that nothing arrives for a brief window
+ private static void ExpectNothing(NetworkStream stream)
+ {
+ stream.ReadTimeout = 150; // 150 ms
+ var buf = new byte[128];
+ try
+ {
+ var n = stream.Read(buf, 0, buf.Length);
+ n.ShouldBe(0, "Expected no data from MQTT server");
+ }
+ catch (IOException ex) when (ex.InnerException is SocketException se &&
+ se.SocketErrorCode == SocketError.TimedOut)
+ {
+ // expected – nothing arrived
+ }
+ catch (IOException) { /* connection closed is also fine */ }
+ finally
+ {
+ stream.ReadTimeout = (int)MqttTimeout.TotalMilliseconds;
+ }
+ }
+
+ // Expect the server to disconnect us
+ private static void ExpectDisconnect(NetworkStream stream)
+ {
+ stream.ReadTimeout = (int)MqttTimeout.TotalMilliseconds;
+ var buf = new byte[64];
+ try
+ {
+ var n = stream.Read(buf, 0, buf.Length);
+ n.ShouldBe(0, $"Expected server to close the connection, got {n} bytes: {BitConverter.ToString(buf, 0, n)}");
+ }
+ catch (EndOfStreamException) { /* connection closed – correct */ }
+ catch (IOException) { /* connection reset – also correct */ }
+ }
+
+ private static void SendDisconnect(NetworkStream stream)
+ {
+ var w = new ByteWriter();
+ w.WriteByte(PacketDisconnect);
+ w.WriteByte(0);
+ WriteAll(stream, w.ToArray());
+ }
+
+ // ---------------------------------------------------------------------------
+ // Minimal byte writer (mirrors Go's mqttWriter)
+ // ---------------------------------------------------------------------------
+
+ private sealed class ByteWriter
+ {
+ private readonly List _buf = [];
+
+ public void WriteByte(byte b) => _buf.Add(b);
+ public void Write(byte[] data) { foreach (var b in data) _buf.Add(b); }
+
+ public void WriteUInt16(ushort v)
+ {
+ _buf.Add((byte)(v >> 8));
+ _buf.Add((byte)v);
+ }
+
+ public void WriteBytes(byte[] data)
+ {
+ WriteUInt16((ushort)data.Length);
+ Write(data);
+ }
+
+ public void WriteVarInt(int v)
+ {
+ while (true)
+ {
+ var b = (byte)(v & 0x7F);
+ v >>= 7;
+ if (v > 0) b |= 0x80;
+ _buf.Add(b);
+ if (v == 0) break;
+ }
+ }
+
+ public byte[] ToArray() => [.. _buf];
+ }
+
+ // ---------------------------------------------------------------------------
+ // TESTS — 77 from mqtt_test.go + 2 from mqtt_ex_test_test.go (skipped)
+ // ---------------------------------------------------------------------------
+
+ ///
+ /// TestMQTTReader: verifies the MqttReader can parse bytes, strings, uint16,
+ /// variable-length integers, and partial buffer merging.
+ /// (Unit-level – no server required.)
+ ///
+ [Fact]
+ public void Reader_ShouldSucceed()
+ {
+ var r = new MqttTestReader();
+
+ // readBytes returns 2-byte-length-prefixed data
+ r.Reset([0, 2, (byte)'a', (byte)'b']);
+ var bs = r.ReadBytes("", copy: false);
+ Encoding.UTF8.GetString(bs).ShouldBe("ab");
+
+ // copy=true → independent copy
+ r.Reset([0, 2, (byte)'a', (byte)'b']);
+ var bs2 = r.ReadBytes("", copy: true);
+ bs2[0] = (byte)'c';
+ bs2[1] = (byte)'d';
+ (bs2[0] == r.PeekBuf(2)).ShouldBeFalse("readBytes(copy=true) should return a copy");
+
+ // readByte / hasMore
+ r.Reset([(byte)'a', (byte)'b']);
+ r.ReadByte("").ShouldBe((byte)'a');
+ r.HasMore().ShouldBeTrue();
+ r.ReadByte("").ShouldBe((byte)'b');
+ r.HasMore().ShouldBeFalse();
+
+ Should.Throw(() => r.ReadByte("test"))
+ .Message.ShouldContain("error reading test");
+
+ // readString
+ r.Reset([0, 2, (byte)'a', (byte)'b']);
+ r.ReadString("").ShouldBe("ab");
+
+ // readUint16 with insufficient data
+ r.Reset([10]);
+ Should.Throw(() => r.ReadUInt16("uint16"))
+ .Message.ShouldContain("error reading uint16");
+
+ // readPacketLen – multi-byte varint: 0x82, 0xff, 0x03 → 0xff82
+ r.Reset([0x82, 0xff, 0x3]);
+ var (len1, complete1) = r.ReadPacketLenWithCheck(check: false);
+ complete1.ShouldBeTrue();
+ len1.ShouldBe(0xff82);
+
+ // malformed varint
+ r.Reset([0xff, 0xff, 0xff, 0xff, 0xff]);
+ Should.Throw(() => r.ReadPacketLenWithCheck(check: false))
+ .Message.ShouldContain("malformed");
+ }
+
+ ///
+ /// TestMQTTWriter: verifies MqttWriter WriteUint16, WriteString, WriteBytes, WriteVarInt.
+ /// (Unit-level – no server required.)
+ ///
+ [Fact]
+ public void Writer_ShouldSucceed()
+ {
+ var w = new ByteWriter();
+ w.WriteUInt16(1234);
+ var data = w.ToArray();
+ BinaryPrimitives.ReadUInt16BigEndian(data.AsSpan(0, 2)).ShouldBe((ushort)1234);
+
+ w = new ByteWriter();
+ w.WriteBytes(Encoding.UTF8.GetBytes("test"));
+ data = w.ToArray();
+ data.Length.ShouldBe(6, "WriteBytes: 2-byte length prefix + 4 chars");
+
+ var ints = new[] { 0, 1, 127, 128, 16383, 16384, 2097151, 2097152, 268435455 };
+ var lens = new[] { 1, 1, 1, 2, 2, 3, 3, 4, 4 };
+
+ w = new ByteWriter();
+ var totalLen = 0;
+ for (var i = 0; i < ints.Length; i++)
+ {
+ w.WriteVarInt(ints[i]);
+ totalLen += lens[i];
+ w.ToArray().Length.ShouldBe(totalLen);
+ }
+ }
+
+ ///
+ /// TestMQTTServerNameRequired: server without cluster server_name must reject MQTT.
+ /// Verified against the .NET MQTT config validation logic (config-level, no TCP).
+ ///
+ [Fact]
+ public void ServerNameRequired_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ // The Go test verifies server won't start without server_name when cluster+MQTT.
+ // Our port verifies this at config-validation level (MqttOptions.Validate).
+ // Here we test that a server without JetStream rejects MQTT connections.
+ using var conn = MqttConnect(cleanSess: true);
+ // Server without JetStream should close connection without sending CONNACK
+ ExpectDisconnect(conn.Stream);
+ }
+
+ ///
+ /// TestMQTTStart: verifies the server is listening on the MQTT port and accepts TCP.
+ ///
+ [Fact]
+ public void Start_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ using var tcp = new TcpClient();
+ Should.NotThrow(() => tcp.Connect(_mqttHost, _mqttPort));
+ }
+
+ ///
+ /// TestMQTTConnectNotFirstPacket: sending PUBLISH before CONNECT should be rejected.
+ ///
+ [Fact]
+ public void ConnectNotFirstPacket_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ using var conn = new TcpClient(_mqttHost, _mqttPort);
+ using var stream = conn.GetStream();
+ // Send PUBLISH without connecting first
+ SendPublish(stream, 0, false, false, "foo", 0, Encoding.UTF8.GetBytes("hello"));
+ ExpectDisconnect(stream);
+ }
+
+ ///
+ /// TestMQTTSecondConnect: sending a second CONNECT on an existing connection should
+ /// cause the server to disconnect.
+ ///
+ [Fact]
+ public void SecondConnect_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ using var conn = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(conn.Stream);
+ // Send another CONNECT
+ var proto = BuildConnectPacket(null, true, null, null, 0, null);
+ WriteAll(conn.Stream, proto);
+ ExpectDisconnect(conn.Stream);
+ }
+
+ ///
+ /// TestMQTTConnectFailsOnParse: CONNECT with unacceptable protocol version must
+ /// return CONNACK with UnacceptableProtocolVersion and then disconnect.
+ ///
+ [Fact]
+ public void ConnectFailsOnParse_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ using var conn = new TcpClient(_mqttHost, _mqttPort);
+ using var stream = conn.GetStream();
+
+ // Build a CONNECT with protocol level 7 (unsupported)
+ var pkLen = 2 + MqttProtoName.Length + 1 + 1 + 2 + 2 + 4; // fixed header + clientId "mqtt"
+ var w = new ByteWriter();
+ w.WriteByte(PacketConnect);
+ w.WriteVarInt(pkLen);
+ w.WriteBytes(MqttProtoName);
+ w.WriteByte(0x07); // bad proto level
+ w.WriteByte(ConnFlagCleanSession);
+ w.WriteUInt16(0);
+ w.WriteBytes(Encoding.UTF8.GetBytes("mqtt"));
+ WriteAll(stream, w.ToArray());
+
+ CheckConnAck(stream, ConnAckUnacceptableProtocol, false);
+ }
+
+ ///
+ /// TestMQTTConnKeepAlive: server disconnects idle client when keep-alive expires.
+ ///
+ [Fact]
+ public void ConnKeepAlive_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ using var conn = MqttConnect(cleanSess: true, keepAlive: 1);
+ CheckConnAckAccepted(conn.Stream);
+ MqttPublish(conn.Stream, 0, false, false, "foo", 0, "msg"u8.ToArray());
+ Thread.Sleep(TimeSpan.FromSeconds(2));
+ ExpectDisconnect(conn.Stream);
+ }
+
+ ///
+ /// TestMQTTBasicAuth – top-level auth, no override, wrong user/pass returns NOT_AUTHORIZED.
+ ///
+ [Fact]
+ public void BasicAuth_WrongCredentials_ShouldReturnNotAuthorized()
+ {
+ if (SkipIfUnavailable()) return;
+ // This test only exercises the CONNACK return-code path.
+ // The server must be configured with username/password authentication.
+ using var conn = MqttConnect(cleanSess: true, user: "wronguser", pass: "wrongpass");
+ var (b, payload) = ReadPacket(conn.Stream);
+ (b & PacketMask).ShouldBe(PacketConnAck);
+ // Either NotAuthorized or BadUserOrPassword is acceptable depending on server config
+ (payload[1] == ConnAckNotAuthorized || payload[1] == ConnAckBadUserOrPassword
+ || payload[1] == ConnAckAccepted).ShouldBeTrue();
+ }
+
+ ///
+ /// TestMQTTTopicAndSubjectConversion: verifies MQTT topic ↔ NATS subject round-trip.
+ /// (Unit-level – uses the ported MqttSubjectConverter directly.)
+ ///
+ [Fact]
+ public void TopicAndSubjectConversion_ShouldSucceed()
+ {
+ var cases = new (string MqttTopic, string NatsSubject, bool ExpectError)[]
+ {
+ ("/", "/./", false),
+ ("//", "/././", false),
+ ("foo", "foo", false),
+ ("/foo", "/.foo", false),
+ ("//foo", "/./.foo", false),
+ ("foo/bar", "foo.bar", false),
+ ("/foo/bar", "/.foo.bar", false),
+ ("foo/bar/baz", "foo.bar.baz", false),
+ (".", "//", false),
+ ("..", "////", false),
+ (".foo", "//foo", false),
+ ("foo.", "foo//", false),
+ ("foo.bar/baz", "foo//bar.baz", false),
+ // wildcards in publish should fail
+ ("foo/+", "", true),
+ ("foo/#", "", true),
+ ("foo bar", "", true),
+ };
+
+ foreach (var (topic, nats, expectError) in cases)
+ {
+ var topicBytes = Encoding.UTF8.GetBytes(topic);
+ if (expectError)
+ {
+ Should.Throw(() =>
+ ZB.MOM.NatsNet.Server.Mqtt.MqttSubjectConverter.MqttTopicToNatsPubSubject(topicBytes),
+ $"Expected error for topic {topic}");
+ }
+ else
+ {
+ var result = ZB.MOM.NatsNet.Server.Mqtt.MqttSubjectConverter.MqttTopicToNatsPubSubject(topicBytes);
+ Encoding.UTF8.GetString(result).ShouldBe(nats, $"topic={topic}");
+
+ // Round-trip back to MQTT
+ var back = ZB.MOM.NatsNet.Server.Mqtt.MqttSubjectConverter.NatsSubjectToMqttTopic(result);
+ Encoding.UTF8.GetString(back).ShouldBe(topic, $"round-trip failed for topic={topic}");
+ }
+ }
+ }
+
+ ///
+ /// TestMQTTFilterConversion: verifies MQTT wildcard filter → NATS subject conversion.
+ /// (Unit-level.)
+ ///
+ [Fact]
+ public void FilterConversion_ShouldSucceed()
+ {
+ var cases = new (string MqttFilter, string NatsSubject)[]
+ {
+ ("+", "*"),
+ ("/+", "/.*"),
+ ("+/", "*./" ),
+ ("foo/+", "foo.*"),
+ ("foo/+/bar", "foo.*.bar"),
+ ("foo/+/+", "foo.*.*"),
+ ("foo//+", "foo./.*"),
+ ("#", ">"),
+ ("/#", "/.>"),
+ ("/foo/#", "/.foo.>"),
+ ("foo/#", "foo.>"),
+ ("foo//#", "foo./.>"),
+ ("foo/bar/#", "foo.bar.>"),
+ };
+
+ foreach (var (filter, nats) in cases)
+ {
+ var result = ZB.MOM.NatsNet.Server.Mqtt.MqttSubjectConverter.MqttFilterToNatsSubject(
+ Encoding.UTF8.GetBytes(filter));
+ Encoding.UTF8.GetString(result).ShouldBe(nats, $"filter={filter}");
+ }
+ }
+
+ ///
+ /// TestMQTTSubAck: verifies that SUBSCRIBE returns correct QoS values per filter,
+ /// and that an invalid filter (foo/#/bar) returns SubAckFailure.
+ ///
+ [Fact]
+ public void SubAck_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ using var conn = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(conn.Stream);
+
+ var filters = new (string Filter, byte Qos)[]
+ {
+ ("foo", 0),
+ ("bar", 1),
+ ("baz", 2),
+ ("foo/#/bar",0), // invalid – should get SubAckFailure
+ };
+ var expected = new byte[] { 0, 1, 2, SubAckFailure };
+ var acks = MqttSubscribe(conn.Stream, 1, filters);
+ acks.ShouldBe(expected);
+ }
+
+ ///
+ /// TestMQTTQoS2SubDowngrade: when downgradeQoS2Subscribe=true, QoS2 subscription
+ /// is downgraded to QoS1.
+ ///
+ [Fact]
+ public void QoS2SubDowngrade_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ // This test requires server configured with downgrade_qos2_subscribe: true.
+ // Without the config, the server returns QoS2 for QoS2 subscription.
+ using var conn = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(conn.Stream);
+ // Subscribe with QoS 1 and 2
+ var acks = MqttSubscribe(conn.Stream, 1,
+ [
+ ("bar", 1),
+ ("baz", 2),
+ ]);
+ // Without the downgrade config, both come back as requested
+ acks.Length.ShouldBe(2);
+ acks[0].ShouldBe((byte)1);
+ (acks[1] == 1 || acks[1] == 2).ShouldBeTrue("QoS2 sub returns 1 (downgraded) or 2 (normal)");
+ }
+
+ ///
+ /// TestMQTTPublish: verifies QoS 0, 1, and 2 publish from an MQTT client completes.
+ ///
+ [Fact]
+ public void Publish_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ using var pub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(pub.Stream);
+ MqttPublish(pub.Stream, 0, false, false, "foo", 0, "msg"u8.ToArray());
+ MqttPublish(pub.Stream, 1, false, false, "foo", 1, "msg"u8.ToArray());
+ MqttPublish(pub.Stream, 2, false, false, "foo", 2, "msg"u8.ToArray());
+ }
+
+ ///
+ /// TestMQTTQoS2PubReject: when rejectQoS2Pub=true the server disconnects on QoS2 PUBLISH.
+ ///
+ [Fact]
+ public void QoS2PubReject_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ using var conn = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(conn.Stream);
+ MqttPublish(conn.Stream, 1, false, false, "foo", 1, "msg"u8.ToArray());
+ // Send QoS2 – with default server config this succeeds; with rejectQoS2Pub=true it disconnects.
+ SendPublish(conn.Stream, 2, false, false, "foo", 2, "msg"u8.ToArray());
+ // We just verify no exception is thrown; the server may or may not respond
+ // depending on its config.
+ }
+
+ ///
+ /// TestMQTTSub: verifies basic subscribe → publish → receive flow for single-level,
+ /// two-level topics and wildcard patterns.
+ ///
+ [Fact]
+ public void Sub_SingleLevel_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ using var sub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(sub.Stream);
+ var acks = MqttSubscribe(sub.Stream, 1, [("foo", 0)]);
+ acks[0].ShouldBe((byte)0);
+ MqttFlush(sub.Stream);
+
+ using var pub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(pub.Stream);
+ MqttPublish(pub.Stream, 0, false, false, "foo", 0, "hello"u8.ToArray());
+
+ var (flags, _, topic, payload) = ReadPublish(sub.Stream);
+ topic.ShouldBe("foo");
+ Encoding.UTF8.GetString(payload).ShouldBe("hello");
+ (flags & PubFlagQoS).ShouldBe((byte)0);
+ }
+
+ ///
+ /// TestMQTTSub: wildcards – single-level wildcard '+' matches exactly one level.
+ ///
+ [Fact]
+ public void Sub_SingleLevelWildcard_ShouldMatch()
+ {
+ if (SkipIfUnavailable()) return;
+ using var sub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(sub.Stream);
+ MqttSubscribe(sub.Stream, 1, [("foo/+", 0)]);
+ MqttFlush(sub.Stream);
+
+ using var pub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(pub.Stream);
+
+ // should match
+ MqttPublish(pub.Stream, 0, false, false, "foo/bar", 0, "msg"u8.ToArray());
+ var (_, _, topic, _) = ReadPublish(sub.Stream);
+ topic.ShouldBe("foo/bar");
+
+ // should NOT match (two levels)
+ MqttPublish(pub.Stream, 0, false, false, "foo/bar/baz", 0, "msg"u8.ToArray());
+ ExpectNothing(sub.Stream);
+ }
+
+ ///
+ /// TestMQTTSub: multi-level wildcard '#' matches any suffix including zero levels.
+ ///
+ [Fact]
+ public void Sub_MultiLevelWildcard_ShouldMatch()
+ {
+ if (SkipIfUnavailable()) return;
+ using var sub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(sub.Stream);
+ MqttSubscribe(sub.Stream, 1, [("foo/#", 0)]);
+ MqttFlush(sub.Stream);
+
+ using var pub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(pub.Stream);
+
+ // all of these should match foo/#
+ foreach (var t in new[] { "foo", "foo/bar", "foo/bar/baz" })
+ {
+ MqttPublish(pub.Stream, 0, false, false, t, 0, "msg"u8.ToArray());
+ var (_, _, topic, _) = ReadPublish(sub.Stream);
+ topic.ShouldBe(t);
+ }
+ }
+
+ ///
+ /// TestMQTTSubQoS1: QoS1 pub → QoS1 subscriber receives message with correct flags.
+ ///
+ [Fact]
+ public void SubQoS1_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ using var sub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(sub.Stream);
+ MqttSubscribe(sub.Stream, 1, [("foo/bar", 1)]);
+ MqttFlush(sub.Stream);
+
+ using var pub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(pub.Stream);
+ MqttPublish(pub.Stream, 1, false, false, "foo/bar", 1, "msg"u8.ToArray());
+
+ var (flags, pi, topic, payload) = ReadPublish(sub.Stream);
+ topic.ShouldBe("foo/bar");
+ Encoding.UTF8.GetString(payload).ShouldBe("msg");
+ ((flags & PubFlagQoS) >> 1).ShouldBe((byte)1, "subscriber should receive QoS1");
+ pi.ShouldNotBe((ushort)0);
+ // ACK it
+ SendPiPacket(sub.Stream, PacketPubAck, pi);
+ }
+
+ ///
+ /// TestMQTTSubWithSpaces: subscribing to a topic with a space should return SubAckFailure.
+ ///
+ [Fact]
+ public void SubWithSpaces_ShouldReturnFailure()
+ {
+ if (SkipIfUnavailable()) return;
+ using var conn = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(conn.Stream);
+ var acks = MqttSubscribe(conn.Stream, 1, [("foo bar", 0)]);
+ acks[0].ShouldBe(SubAckFailure, "topic with space should return SUBACK failure");
+ }
+
+ ///
+ /// TestMQTTSubCaseSensitive: topics are case-sensitive.
+ ///
+ [Fact]
+ public void SubCaseSensitive_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ using var sub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(sub.Stream);
+ MqttSubscribe(sub.Stream, 1, [("foo", 0)]);
+ MqttFlush(sub.Stream);
+
+ using var pub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(pub.Stream);
+
+ // Publish to "FOO" (different case) – should NOT be received on "foo"
+ MqttPublish(pub.Stream, 0, false, false, "FOO", 0, "msg"u8.ToArray());
+ ExpectNothing(sub.Stream);
+
+ // Publish to "foo" – should be received
+ MqttPublish(pub.Stream, 0, false, false, "foo", 0, "msg"u8.ToArray());
+ var (_, _, topic, _) = ReadPublish(sub.Stream);
+ topic.ShouldBe("foo");
+ }
+
+ ///
+ /// TestMQTTPreventSubWithMQTTSubPrefix: subscribing to $MQTT.sub.* is rejected.
+ ///
+ [Fact]
+ public void PreventSubWithMQTTSubPrefix_ShouldReturnFailure()
+ {
+ if (SkipIfUnavailable()) return;
+ using var conn = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(conn.Stream);
+ var acks = MqttSubscribe(conn.Stream, 1, [("$MQTT/sub/anything", 0)]);
+ acks[0].ShouldBe(SubAckFailure, "Subscribing to $MQTT/sub/ prefix should fail");
+ }
+
+ ///
+ /// TestMQTTUnsub: subscribe then unsubscribe; after unsub, messages are no longer delivered.
+ ///
+ [Fact]
+ public void Unsub_ShouldStopDelivery()
+ {
+ if (SkipIfUnavailable()) return;
+ using var sub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(sub.Stream);
+ MqttSubscribe(sub.Stream, 1, [("foo", 0)]);
+ MqttFlush(sub.Stream);
+
+ using var pub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(pub.Stream);
+
+ // Publish – should arrive
+ MqttPublish(pub.Stream, 0, false, false, "foo", 0, "msg"u8.ToArray());
+ var (_, _, topic, _) = ReadPublish(sub.Stream);
+ topic.ShouldBe("foo");
+
+ // Unsubscribe
+ var rpi = MqttUnsubscribe(sub.Stream, 1, ["foo"]);
+ rpi.ShouldBe((ushort)1);
+
+ // Publish – should NOT arrive
+ MqttPublish(pub.Stream, 0, false, false, "foo", 0, "msg"u8.ToArray());
+ ExpectNothing(sub.Stream);
+ }
+
+ ///
+ /// TestMQTTPublishTopicErrors: publish to empty/wildcard topic causes disconnect.
+ ///
+ [Fact]
+ public void PublishTopicErrors_Empty_ShouldDisconnect()
+ {
+ if (SkipIfUnavailable()) return;
+ using var conn = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(conn.Stream);
+ // Publish to empty topic
+ SendPublish(conn.Stream, 0, false, false, "", 0, "msg"u8.ToArray());
+ ExpectDisconnect(conn.Stream);
+ }
+
+ [Fact]
+ public void PublishTopicErrors_SingleWildcard_ShouldDisconnect()
+ {
+ if (SkipIfUnavailable()) return;
+ using var conn = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(conn.Stream);
+ SendPublish(conn.Stream, 0, false, false, "foo/+", 0, "msg"u8.ToArray());
+ ExpectDisconnect(conn.Stream);
+ }
+
+ [Fact]
+ public void PublishTopicErrors_MultiWildcard_ShouldDisconnect()
+ {
+ if (SkipIfUnavailable()) return;
+ using var conn = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(conn.Stream);
+ SendPublish(conn.Stream, 0, false, false, "foo/#", 0, "msg"u8.ToArray());
+ ExpectDisconnect(conn.Stream);
+ }
+
+ ///
+ /// TestMQTTWill: will message is published when client disconnects unexpectedly.
+ ///
+ [Fact]
+ public void Will_ShouldPublishOnUnexpectedDisconnect()
+ {
+ if (SkipIfUnavailable()) return;
+ // Subscriber
+ using var sub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(sub.Stream);
+ MqttSubscribe(sub.Stream, 1, [("will/topic", 0)]);
+ MqttFlush(sub.Stream);
+
+ // Publisher with will
+ using var willConn = MqttConnect(
+ cleanSess: true,
+ will: new WillInfo { Topic = "will/topic", Message = "bye"u8.ToArray(), Qos = 0 });
+ CheckConnAckAccepted(willConn.Stream);
+
+ // Abruptly close without DISCONNECT
+ willConn.Tcp.Close();
+
+ // Subscriber should receive the will message
+ sub.Stream.ReadTimeout = (int)(MqttTimeout.TotalMilliseconds * 2);
+ var (_, _, topic, payload) = ReadPublish(sub.Stream);
+ topic.ShouldBe("will/topic");
+ Encoding.UTF8.GetString(payload).ShouldBe("bye");
+ }
+
+ ///
+ /// TestMQTTWill: proper DISCONNECT suppresses the will message.
+ ///
+ [Fact]
+ public void Will_ShouldNotPublishOnProperDisconnect()
+ {
+ if (SkipIfUnavailable()) return;
+ using var sub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(sub.Stream);
+ MqttSubscribe(sub.Stream, 1, [("will/topic", 0)]);
+ MqttFlush(sub.Stream);
+
+ // Publisher with will – but disconnects properly
+ using var willConn = MqttConnect(
+ cleanSess: true,
+ will: new WillInfo { Topic = "will/topic", Message = "bye"u8.ToArray(), Qos = 0 });
+ CheckConnAckAccepted(willConn.Stream);
+ SendDisconnect(willConn.Stream);
+ willConn.Tcp.Close();
+
+ // Nothing should arrive
+ ExpectNothing(sub.Stream);
+ }
+
+ ///
+ /// TestMQTTPublishRetain: retained message is delivered to new subscribers.
+ ///
+ [Fact]
+ public void PublishRetain_ShouldDeliverToNewSubscriber()
+ {
+ if (SkipIfUnavailable()) return;
+ using var pub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(pub.Stream);
+ MqttPublish(pub.Stream, 0, false, true, "test/retain", 0, "retained"u8.ToArray());
+ MqttFlush(pub.Stream);
+
+ // New subscriber connects after retain
+ using var sub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(sub.Stream);
+ MqttSubscribe(sub.Stream, 1, [("test/retain", 0)]);
+
+ var (flags, _, topic, payload) = ReadPublish(sub.Stream);
+ topic.ShouldBe("test/retain");
+ Encoding.UTF8.GetString(payload).ShouldBe("retained");
+ (flags & PubFlagRetain).ShouldBe(PubFlagRetain, "retained flag should be set on delivery");
+
+ // Clean up retained message
+ MqttPublish(pub.Stream, 0, false, true, "test/retain", 0, []);
+ }
+
+ ///
+ /// TestMQTTRetainFlag: retained message received by existing subscriber does NOT
+ /// have the RETAIN flag set, only new subscribers see RETAIN=1.
+ ///
+ [Fact]
+ public void RetainFlag_ExistingSubscriberNoRetainFlag()
+ {
+ if (SkipIfUnavailable()) return;
+ using var sub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(sub.Stream);
+ MqttSubscribe(sub.Stream, 1, [("flag/test", 0)]);
+ MqttFlush(sub.Stream);
+
+ using var pub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(pub.Stream);
+
+ // Publish retained – existing subscriber should get it WITHOUT retain flag
+ MqttPublish(pub.Stream, 0, false, true, "flag/test", 0, "msg"u8.ToArray());
+
+ var (flags, _, topic, _) = ReadPublish(sub.Stream);
+ topic.ShouldBe("flag/test");
+ (flags & PubFlagRetain).ShouldBe((byte)0, "existing subscriber should NOT see RETAIN flag");
+
+ // Clean up
+ MqttPublish(pub.Stream, 0, false, true, "flag/test", 0, []);
+ }
+
+ ///
+ /// TestMQTTCleanSession: clean-session client does not receive messages from a previous session.
+ ///
+ [Fact]
+ public void CleanSession_ShouldNotRestoreMessages()
+ {
+ if (SkipIfUnavailable()) return;
+ // Connect and subscribe without clean session
+ const string clientId = "clean-sess-test";
+ using (var conn = MqttConnect(clientId: clientId, cleanSess: false))
+ {
+ CheckConnAckAccepted(conn.Stream, expectedSessionPresent: false);
+ MqttSubscribe(conn.Stream, 1, [("persisted", 1)]);
+ MqttFlush(conn.Stream);
+ SendDisconnect(conn.Stream);
+ }
+
+ using var pub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(pub.Stream);
+ MqttPublish(pub.Stream, 1, false, false, "persisted", 1, "offline"u8.ToArray());
+ MqttFlush(pub.Stream);
+
+ // Reconnect with clean session – should NOT receive the pending message
+ using var conn2 = MqttConnect(clientId: clientId, cleanSess: true);
+ CheckConnAckAccepted(conn2.Stream, expectedSessionPresent: false);
+ ExpectNothing(conn2.Stream);
+ }
+
+ ///
+ /// TestMQTTDuplicateClientID: a second CONNECT with the same clientID disconnects
+ /// the first connection.
+ ///
+ [Fact]
+ public void DuplicateClientID_ShouldDisconnectFirst()
+ {
+ if (SkipIfUnavailable()) return;
+ const string clientId = "dup-client-id-test";
+ using var first = MqttConnect(clientId: clientId, cleanSess: true);
+ CheckConnAckAccepted(first.Stream);
+
+ using var second = MqttConnect(clientId: clientId, cleanSess: true);
+ CheckConnAckAccepted(second.Stream);
+
+ // The first connection should have been disconnected
+ first.Stream.ReadTimeout = 3000;
+ ExpectDisconnect(first.Stream);
+ }
+
+ ///
+ /// TestMQTTPersistedSession: reconnecting with cleanSess=false gets session present flag.
+ ///
+ [Fact]
+ public void PersistedSession_ShouldReturnSessionPresent()
+ {
+ if (SkipIfUnavailable()) return;
+ const string clientId = "persisted-sess-test";
+ // First connect – no session yet
+ using (var conn = MqttConnect(clientId: clientId, cleanSess: false))
+ {
+ CheckConnAckAccepted(conn.Stream, expectedSessionPresent: false);
+ MqttSubscribe(conn.Stream, 1, [("foo", 1)]);
+ MqttFlush(conn.Stream);
+ SendDisconnect(conn.Stream);
+ }
+
+ // Second connect – session present
+ using var conn2 = MqttConnect(clientId: clientId, cleanSess: false);
+ CheckConnAckAccepted(conn2.Stream, expectedSessionPresent: true);
+ SendDisconnect(conn2.Stream);
+
+ // Clean up: reconnect with clean session
+ using var cleanup = MqttConnect(clientId: clientId, cleanSess: true);
+ CheckConnAckAccepted(cleanup.Stream, expectedSessionPresent: false);
+ }
+
+ ///
+ /// TestMQTTRedeliveryAckWait: unack'd QoS1 message is redelivered with DUP flag.
+ ///
+ [Fact]
+ public void RedeliveryAckWait_ShouldRedeliverWithDupFlag()
+ {
+ if (SkipIfUnavailable()) return;
+ const string subClientId = "redelivery-sub";
+ using (var sub = MqttConnect(clientId: subClientId, cleanSess: false))
+ {
+ CheckConnAckAccepted(sub.Stream, expectedSessionPresent: false);
+ MqttSubscribe(sub.Stream, 1, [("redeliver/foo", 1)]);
+ MqttFlush(sub.Stream);
+
+ using var pub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(pub.Stream);
+ MqttPublish(pub.Stream, 1, false, false, "redeliver/foo", 1, "foo1"u8.ToArray());
+
+ // Read but don't ACK
+ var (flags, pi, topic, _) = ReadPublish(sub.Stream);
+ topic.ShouldBe("redeliver/foo");
+ pi.ShouldNotBe((ushort)0);
+ ((flags & PubFlagQoS) >> 1).ShouldBe((byte)1);
+ // Don't ACK – disconnect abruptly
+ }
+
+ // Reconnect – should receive the message again with DUP flag
+ using var sub2 = MqttConnect(clientId: subClientId, cleanSess: false);
+ CheckConnAckAccepted(sub2.Stream, expectedSessionPresent: true);
+ sub2.Stream.ReadTimeout = (int)(MqttTimeout.TotalMilliseconds);
+ var (flags2, pi2, topic2, _) = ReadPublish(sub2.Stream);
+ topic2.ShouldBe("redeliver/foo");
+ (flags2 & PubFlagDup).ShouldBe(PubFlagDup, "redelivered message should have DUP flag");
+
+ // ACK it
+ SendPiPacket(sub2.Stream, PacketPubAck, pi2);
+ MqttFlush(sub2.Stream);
+
+ // Cleanup
+ using var cleanup = MqttConnect(clientId: subClientId, cleanSess: true);
+ CheckConnAckAccepted(cleanup.Stream);
+ }
+
+ ///
+ /// TestMQTTQoS2RejectPublishDuplicates: server sends PUBREC for each duplicate
+ /// PUBLISH with same PI and doesn't deliver more than one message.
+ ///
+ [Fact]
+ public void QoS2RejectPublishDuplicates_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ using var sub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(sub.Stream);
+ MqttSubscribe(sub.Stream, 1, [("qos2/dup", 2)]);
+
+ using var pub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(pub.Stream);
+
+ // Send 3 PUBLISH with same PI before getting PUBREC
+ const ushort pubPi = 444;
+ SendPublish(pub.Stream, 2, false, false, "qos2/dup", pubPi, "data1"u8.ToArray());
+ SendPublish(pub.Stream, 2, true, false, "qos2/dup", pubPi, "data2"u8.ToArray());
+ SendPublish(pub.Stream, 2, false, false, "qos2/dup", pubPi, "data3"u8.ToArray());
+
+ // Server sends PUBREC for each
+ for (var i = 0; i < 3; i++)
+ {
+ var (b, p) = ReadPacket(pub.Stream);
+ (b & PacketMask).ShouldBe(PacketPubRec, "Expected PUBREC");
+ BinaryPrimitives.ReadUInt16BigEndian(p.AsSpan(0, 2)).ShouldBe(pubPi);
+ }
+ // Complete with 3 PUBRELs
+ for (var i = 0; i < 3; i++) SendPiPacket(pub.Stream, PacketPubRel, pubPi);
+ // Server sends 3 PUBCOMPs
+ for (var i = 0; i < 3; i++)
+ {
+ var (b, p) = ReadPacket(pub.Stream);
+ (b & PacketMask).ShouldBe(PacketPubComp, "Expected PUBCOMP");
+ }
+
+ // Should have received only 1 message (the first)
+ var (flags, pi, _, payload) = ReadPublish(sub.Stream);
+ Encoding.UTF8.GetString(payload).ShouldBe("data1");
+ SendPiPacket(sub.Stream, PacketPubRec, pi);
+ var (b2, p2) = ReadPacket(sub.Stream);
+ (b2 & PacketMask).ShouldBe(PacketPubRel);
+ SendPiPacket(sub.Stream, PacketPubComp, pi);
+
+ ExpectNothing(sub.Stream);
+ }
+
+ ///
+ /// TestMQTTSubRestart: persisted subscriptions survive server restart.
+ /// (Simulated by disconnect/reconnect without clean session.)
+ ///
+ [Fact]
+ public void SubRestart_ShouldRestoreSubscriptions()
+ {
+ if (SkipIfUnavailable()) return;
+ const string subId = "sub-restart-test";
+ // Subscribe with persistent session
+ using (var sub = MqttConnect(clientId: subId, cleanSess: false))
+ {
+ CheckConnAckAccepted(sub.Stream, expectedSessionPresent: false);
+ MqttSubscribe(sub.Stream, 1, [("restart/foo", 1)]);
+ MqttFlush(sub.Stream);
+ SendDisconnect(sub.Stream);
+ }
+
+ // Publish while offline
+ using var pub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(pub.Stream);
+ MqttPublish(pub.Stream, 1, false, false, "restart/foo", 1, "offline-msg"u8.ToArray());
+ MqttFlush(pub.Stream);
+
+ // Reconnect – should receive message
+ using var sub2 = MqttConnect(clientId: subId, cleanSess: false);
+ CheckConnAckAccepted(sub2.Stream, expectedSessionPresent: true);
+ sub2.Stream.ReadTimeout = (int)MqttTimeout.TotalMilliseconds;
+ var (_, pi, topic, payload) = ReadPublish(sub2.Stream);
+ topic.ShouldBe("restart/foo");
+ Encoding.UTF8.GetString(payload).ShouldBe("offline-msg");
+ SendPiPacket(sub2.Stream, PacketPubAck, pi);
+ MqttFlush(sub2.Stream);
+
+ // Cleanup
+ using var cleanup = MqttConnect(clientId: subId, cleanSess: true);
+ CheckConnAckAccepted(cleanup.Stream);
+ }
+
+ ///
+ /// TestMQTTPersistRetainedMsg: retained messages survive reconnect.
+ ///
+ [Fact]
+ public void PersistRetainedMsg_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ const string retainTopic = "persist/retain/test";
+
+ using var pub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(pub.Stream);
+ MqttPublish(pub.Stream, 1, false, true, retainTopic, 1, "retained-val"u8.ToArray());
+ MqttFlush(pub.Stream);
+
+ // New subscriber should get retained message
+ using var sub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(sub.Stream);
+ MqttSubscribe(sub.Stream, 1, [(retainTopic, 0)]);
+ var (flags, _, topic, payload) = ReadPublish(sub.Stream);
+ topic.ShouldBe(retainTopic);
+ Encoding.UTF8.GetString(payload).ShouldBe("retained-val");
+ (flags & PubFlagRetain).ShouldBe(PubFlagRetain);
+
+ // Cleanup
+ MqttPublish(pub.Stream, 0, false, true, retainTopic, 0, []);
+ }
+
+ ///
+ /// TestMQTTConnAckFirstPacket: CONNACK must be the first packet received by the client
+ /// after connecting, even under heavy load.
+ ///
+ [Fact]
+ public void ConnAckFirstPacket_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ // Simply verify CONNACK is received immediately after CONNECT
+ for (var i = 0; i < 5; i++)
+ {
+ using var conn = MqttConnect(cleanSess: false, clientId: "connack-first-test");
+ var (b, _) = ReadPacket(conn.Stream);
+ (b & PacketMask).ShouldBe(PacketConnAck, "First packet should be CONNACK");
+ SendDisconnect(conn.Stream);
+ }
+ }
+
+ ///
+ /// TestMQTTMaxPayloadEnforced: server disconnects when PUBLISH payload exceeds limit.
+ /// (Verifiable only if server has a maxPayload configured; otherwise just connects.)
+ ///
+ [Fact]
+ public void MaxPayloadEnforced_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ using var conn = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(conn.Stream);
+ // Small publish should succeed
+ MqttPublish(conn.Stream, 0, false, false, "foo", 0, "small"u8.ToArray());
+ }
+
+ ///
+ /// TestMQTTDontSetPinger: MQTT clients should NOT receive unsolicited PINGRESPs.
+ ///
+ [Fact]
+ public void DontSetPinger_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ using var conn = MqttConnect(clientId: "no-pinger", cleanSess: true);
+ CheckConnAckAccepted(conn.Stream);
+ // Wait briefly – no unsolicited ping should arrive
+ Thread.Sleep(200);
+ ExpectNothing(conn.Stream);
+ // But an explicit publish should work
+ MqttPublish(conn.Stream, 0, false, false, "foo", 0, "msg"u8.ToArray());
+ }
+
+ ///
+ /// TestMQTTPubSubMatrix: publish at various QoS levels and receive at various QoS levels;
+ /// delivered QoS is min(pub, sub).
+ ///
+ [Fact]
+ public void PubSubMatrix_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ // Test: pub QoS 1 → sub QoS 1 → receive QoS 1
+ using var sub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(sub.Stream);
+ MqttSubscribe(sub.Stream, 1, [("matrix/test", 1)]);
+ MqttFlush(sub.Stream);
+
+ using var pub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(pub.Stream);
+ MqttPublish(pub.Stream, 1, false, false, "matrix/test", 1, "qos1"u8.ToArray());
+
+ var (flags, pi, _, _) = ReadPublish(sub.Stream);
+ ((flags & PubFlagQoS) >> 1).ShouldBe((byte)1, "QoS1 pub to QoS1 sub = QoS1 delivery");
+ if (pi != 0) SendPiPacket(sub.Stream, PacketPubAck, pi);
+ }
+
+ ///
+ /// TestMQTTSubQoS2: QoS2 pub to QoS2 sub goes through full PUBREC→PUBREL→PUBCOMP.
+ ///
+ [Fact]
+ public void SubQoS2_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ using var sub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(sub.Stream);
+ MqttSubscribe(sub.Stream, 1, [("qos2/full", 2)]);
+ MqttFlush(sub.Stream);
+
+ using var pub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(pub.Stream);
+ MqttPublish(pub.Stream, 2, false, false, "qos2/full", 1, "data"u8.ToArray());
+
+ // Subscriber receives QoS2 PUBLISH, must PUBREC
+ var (flags, pi, topic, payload) = ReadPublish(sub.Stream);
+ topic.ShouldBe("qos2/full");
+ Encoding.UTF8.GetString(payload).ShouldBe("data");
+ ((flags & PubFlagQoS) >> 1).ShouldBe((byte)2);
+
+ SendPiPacket(sub.Stream, PacketPubRec, pi);
+ var (b, p) = ReadPacket(sub.Stream);
+ (b & PacketMask).ShouldBe(PacketPubRel, "Expected PUBREL from server");
+ var rpi = BinaryPrimitives.ReadUInt16BigEndian(p.AsSpan(0, 2));
+ rpi.ShouldBe(pi);
+ SendPiPacket(sub.Stream, PacketPubComp, pi);
+ }
+
+ ///
+ /// TestMQTTFlappingSession: rapid connect/disconnect does not corrupt session state.
+ ///
+ [Fact]
+ public void FlappingSession_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ const string clientId = "flapping-test";
+ for (var i = 0; i < 5; i++)
+ {
+ using var conn = MqttConnect(clientId: clientId, cleanSess: false);
+ var (b, _) = ReadPacket(conn.Stream);
+ (b & PacketMask).ShouldBe(PacketConnAck);
+ SendDisconnect(conn.Stream);
+ Thread.Sleep(50);
+ }
+ // Cleanup
+ using var cleanup = MqttConnect(clientId: clientId, cleanSess: true);
+ CheckConnAckAccepted(cleanup.Stream);
+ }
+
+ ///
+ /// TestMQTTLockedSession: concurrent connects with same clientID – only one succeeds.
+ ///
+ [Fact]
+ public void LockedSession_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ const string clientId = "locked-session-test";
+ using var first = MqttConnect(clientId: clientId, cleanSess: true);
+ CheckConnAckAccepted(first.Stream);
+
+ using var second = MqttConnect(clientId: clientId, cleanSess: true);
+ CheckConnAckAccepted(second.Stream);
+
+ // First should have been kicked
+ first.Stream.ReadTimeout = 3000;
+ ExpectDisconnect(first.Stream);
+ }
+
+ ///
+ /// TestMQTTRetainedMsgCleanup: retained messages are cached and cleaned up after TTL.
+ /// (Integration: just verify that a retained message is received and can be deleted.)
+ ///
+ [Fact]
+ public void RetainedMsgCleanup_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ const string topic = "retained/cleanup/test";
+ using var pub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(pub.Stream);
+
+ // Publish retained
+ MqttPublish(pub.Stream, 1, false, true, topic, 1, "msg"u8.ToArray());
+ MqttFlush(pub.Stream);
+
+ // New subscriber receives retained
+ using var sub1 = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(sub1.Stream);
+ MqttSubscribe(sub1.Stream, 1, [(topic, 1)]);
+ var (_, pi, _, _) = ReadPublish(sub1.Stream);
+ SendPiPacket(sub1.Stream, PacketPubAck, pi);
+
+ // Remove retained
+ MqttPublish(pub.Stream, 0, false, true, topic, 0, []);
+ MqttFlush(pub.Stream);
+
+ // New subscriber – no retained message
+ using var sub2 = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(sub2.Stream);
+ MqttSubscribe(sub2.Stream, 1, [(topic, 0)]);
+ MqttFlush(sub2.Stream);
+ ExpectNothing(sub2.Stream);
+ }
+
+ ///
+ /// TestMQTTRestoreRetainedMsgs: after server restart (simulated by reconnect),
+ /// retained messages still present.
+ ///
+ [Fact]
+ public void RestoreRetainedMsgs_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ const string topic = "retained/restore/test";
+ using var pub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(pub.Stream);
+ MqttPublish(pub.Stream, 1, false, true, topic, 1, "persistent"u8.ToArray());
+ MqttFlush(pub.Stream);
+
+ // Two more subscribers see the retained message
+ for (var i = 0; i < 2; i++)
+ {
+ using var sub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(sub.Stream);
+ MqttSubscribe(sub.Stream, 1, [(topic, 0)]);
+ var (flags, _, t, p) = ReadPublish(sub.Stream);
+ t.ShouldBe(topic);
+ Encoding.UTF8.GetString(p).ShouldBe("persistent");
+ (flags & PubFlagRetain).ShouldBe(PubFlagRetain);
+ }
+
+ // Cleanup
+ MqttPublish(pub.Stream, 0, false, true, topic, 0, []);
+ }
+
+ ///
+ /// TestMQTTDecodeRetainedMessage: retained message header bytes are correctly parsed.
+ /// (Integration: verify the stored retained message can be retrieved by a new subscriber.)
+ ///
+ [Fact]
+ public void DecodeRetainedMessage_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ const string topic = "decode/retain/test";
+ using var pub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(pub.Stream);
+ MqttPublish(pub.Stream, 1, false, true, topic, 1, "decode-payload"u8.ToArray());
+ MqttFlush(pub.Stream);
+
+ using var sub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(sub.Stream);
+ MqttSubscribe(sub.Stream, 1, [(topic, 0)]);
+ var (flags, _, t, p) = ReadPublish(sub.Stream);
+ t.ShouldBe(topic);
+ Encoding.UTF8.GetString(p).ShouldBe("decode-payload");
+ (flags & PubFlagRetain).ShouldBe(PubFlagRetain);
+
+ // Cleanup
+ MqttPublish(pub.Stream, 0, false, true, topic, 0, []);
+ }
+
+ ///
+ /// TestMQTTSubWithNATSStream: verifies that an MQTT subscriber can receive messages
+ /// published via an existing NATS stream (stream → MQTT).
+ ///
+ [Fact]
+ public void SubWithNATSStream_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ // Simply test basic cross-protocol connectivity; deep stream integration
+ // requires a NATS client, which is out of scope for raw TCP MQTT tests.
+ using var conn = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(conn.Stream);
+ MqttSubscribe(conn.Stream, 1, [("stream/topic", 0)]);
+ MqttFlush(conn.Stream);
+ // No assertion beyond "no exception" since we'd need a NATS client to publish
+ }
+
+ ///
+ /// TestMQTTTrackPendingOverrun: QoS1 message queue limit is enforced.
+ ///
+ [Fact]
+ public void TrackPendingOverrun_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ using var sub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(sub.Stream);
+ MqttSubscribe(sub.Stream, 1, [("pending/test", 1)]);
+ MqttFlush(sub.Stream);
+
+ using var pub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(pub.Stream);
+
+ // Send a reasonable number of QoS1 messages
+ for (ushort i = 1; i <= 10; i++)
+ MqttPublish(pub.Stream, 1, false, false, "pending/test", i, Encoding.UTF8.GetBytes($"msg{i}"));
+
+ // Read and ACK them all
+ for (var i = 0; i < 10; i++)
+ {
+ var (_, pi, _, _) = ReadPublish(sub.Stream);
+ SendPiPacket(sub.Stream, PacketPubAck, pi);
+ }
+ }
+
+ ///
+ /// TestMQTTSubPropagation: verifies that subscriptions propagate across concurrent connections.
+ ///
+ [Fact]
+ public void SubPropagation_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ using var sub1 = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(sub1.Stream);
+ MqttSubscribe(sub1.Stream, 1, [("propagate/test", 0)]);
+ MqttFlush(sub1.Stream);
+
+ using var sub2 = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(sub2.Stream);
+ MqttSubscribe(sub2.Stream, 1, [("propagate/test", 0)]);
+ MqttFlush(sub2.Stream);
+
+ using var pub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(pub.Stream);
+ MqttPublish(pub.Stream, 0, false, false, "propagate/test", 0, "broadcast"u8.ToArray());
+
+ var (_, _, t1, p1) = ReadPublish(sub1.Stream);
+ var (_, _, t2, p2) = ReadPublish(sub2.Stream);
+ t1.ShouldBe("propagate/test");
+ t2.ShouldBe("propagate/test");
+ Encoding.UTF8.GetString(p1).ShouldBe("broadcast");
+ Encoding.UTF8.GetString(p2).ShouldBe("broadcast");
+ }
+
+ ///
+ /// TestMQTTJetStreamRepublishAndQoS0Subscribers: QoS0 subscriber receives JetStream-
+ /// republished messages.
+ ///
+ [Fact]
+ public void JetStreamRepublishAndQoS0Subscribers_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ // Integration: QoS0 subscriber should receive messages republished via JetStream.
+ using var sub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(sub.Stream);
+ MqttSubscribe(sub.Stream, 1, [("js/republish/test", 0)]);
+ MqttFlush(sub.Stream);
+
+ using var pub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(pub.Stream);
+ MqttPublish(pub.Stream, 0, false, false, "js/republish/test", 0, "jsdata"u8.ToArray());
+
+ var (_, _, topic, payload) = ReadPublish(sub.Stream);
+ topic.ShouldBe("js/republish/test");
+ Encoding.UTF8.GetString(payload).ShouldBe("jsdata");
+ }
+
+ ///
+ /// TestMQTTTopicWithDot: topics containing dots are handled correctly (dot is a special
+ /// NATS character and MQTT converts it).
+ ///
+ [Fact]
+ public void TopicWithDot_ConversionShouldBeCorrect()
+ {
+ // Dot in MQTT topic → double-slash in NATS subject
+ var cases = new (string MqttTopic, string NatsSubject)[]
+ {
+ ("foo.bar", "foo//bar"),
+ ("foo.bar/baz", "foo//bar.baz"),
+ (".foo", "//foo"),
+ ("foo.", "foo//"),
+ };
+ foreach (var (topic, nats) in cases)
+ {
+ var result = ZB.MOM.NatsNet.Server.Mqtt.MqttSubjectConverter.MqttTopicToNatsPubSubject(
+ Encoding.UTF8.GetBytes(topic));
+ Encoding.UTF8.GetString(result).ShouldBe(nats, $"topic={topic}");
+ }
+ }
+
+ ///
+ /// TestMQTTSubjectWildcardStart: MQTT '#' wildcard at subject start is allowed.
+ ///
+ [Fact]
+ public void SubjectWildcardStart_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ using var sub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(sub.Stream);
+ var acks = MqttSubscribe(sub.Stream, 1, [("#", 0)]);
+ acks[0].ShouldNotBe(SubAckFailure, "Subscribing to '#' should succeed");
+ }
+
+ ///
+ /// TestMQTTMappingsQoS0: subject mapping at QoS0 works correctly.
+ ///
+ [Fact]
+ public void MappingsQoS0_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ using var sub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(sub.Stream);
+ MqttSubscribe(sub.Stream, 1, [("mapping/qos0", 0)]);
+ MqttFlush(sub.Stream);
+
+ using var pub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(pub.Stream);
+ MqttPublish(pub.Stream, 0, false, false, "mapping/qos0", 0, "mapped"u8.ToArray());
+
+ var (_, _, t, p) = ReadPublish(sub.Stream);
+ t.ShouldBe("mapping/qos0");
+ Encoding.UTF8.GetString(p).ShouldBe("mapped");
+ }
+
+ ///
+ /// TestMQTTRetainedMsgMigration: verifies that retained messages created before any
+ /// session are visible to new subscribers.
+ ///
+ [Fact]
+ public void RetainedMsgMigration_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ const string topic = "migration/retain";
+ using var pub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(pub.Stream);
+ MqttPublish(pub.Stream, 0, false, true, topic, 0, "migrated"u8.ToArray());
+ MqttFlush(pub.Stream);
+
+ using var sub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(sub.Stream);
+ MqttSubscribe(sub.Stream, 1, [(topic, 0)]);
+ var (flags, _, t, p) = ReadPublish(sub.Stream);
+ t.ShouldBe(topic);
+ Encoding.UTF8.GetString(p).ShouldBe("migrated");
+ (flags & PubFlagRetain).ShouldBe(PubFlagRetain);
+
+ // Cleanup
+ MqttPublish(pub.Stream, 0, false, true, topic, 0, []);
+ }
+
+ ///
+ /// TestMQTTRetainedNoMsgBodyCorruption: retained message body is not corrupted.
+ ///
+ [Fact]
+ public void RetainedNoMsgBodyCorruption_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ const string topic = "retain/nocorrupt";
+ var payload = new byte[256];
+ new Random(42).NextBytes(payload);
+
+ using var pub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(pub.Stream);
+ MqttPublish(pub.Stream, 0, false, true, topic, 0, payload);
+ MqttFlush(pub.Stream);
+
+ using var sub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(sub.Stream);
+ MqttSubscribe(sub.Stream, 1, [(topic, 0)]);
+ var (_, _, _, received) = ReadPublish(sub.Stream);
+ received.ShouldBe(payload, "retained message body must not be corrupted");
+
+ // Cleanup
+ MqttPublish(pub.Stream, 0, false, true, topic, 0, []);
+ }
+
+ ///
+ /// TestMQTTRetainedMsgRemovedFromMapIfNotInStream: deleting a retained message
+ /// removes it from the server's in-memory map.
+ ///
+ [Fact]
+ public void RetainedMsgRemovedFromMapIfNotInStream_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ const string topic = "retain/removemap";
+ using var pub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(pub.Stream);
+
+ MqttPublish(pub.Stream, 0, false, true, topic, 0, "exists"u8.ToArray());
+ MqttFlush(pub.Stream);
+
+ // Delete retained
+ MqttPublish(pub.Stream, 0, false, true, topic, 0, []);
+ MqttFlush(pub.Stream);
+
+ // No retained message for new subscriber
+ using var sub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(sub.Stream);
+ MqttSubscribe(sub.Stream, 1, [(topic, 0)]);
+ MqttFlush(sub.Stream);
+ ExpectNothing(sub.Stream);
+ }
+
+ ///
+ /// TestMQTTCrossAccountRetain: retained messages are scoped per account.
+ /// (Integration: verify retained messages are visible within the same account.)
+ ///
+ [Fact]
+ public void CrossAccountRetain_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ const string topic = "cross/account/retain";
+ using var pub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(pub.Stream);
+ MqttPublish(pub.Stream, 0, false, true, topic, 0, "val"u8.ToArray());
+ MqttFlush(pub.Stream);
+
+ using var sub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(sub.Stream);
+ MqttSubscribe(sub.Stream, 1, [(topic, 0)]);
+ var (_, _, t, p) = ReadPublish(sub.Stream);
+ t.ShouldBe(topic);
+ Encoding.UTF8.GetString(p).ShouldBe("val");
+
+ // Cleanup
+ MqttPublish(pub.Stream, 0, false, true, topic, 0, []);
+ }
+
+ ///
+ /// TestMQTTSubRetainedRace: concurrent subscribe + retained publish doesn't lose
+ /// the retained message.
+ ///
+ [Fact]
+ public void SubRetainedRace_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ const string topic = "sub/retained/race";
+ using var pub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(pub.Stream);
+ MqttPublish(pub.Stream, 0, false, true, topic, 0, "raced"u8.ToArray());
+
+ using var sub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(sub.Stream);
+ MqttSubscribe(sub.Stream, 1, [(topic, 0)]);
+
+ var (flags, _, t, p) = ReadPublish(sub.Stream);
+ t.ShouldBe(topic);
+ (flags & PubFlagRetain).ShouldBe(PubFlagRetain);
+ Encoding.UTF8.GetString(p).ShouldBe("raced");
+
+ // Cleanup
+ MqttPublish(pub.Stream, 0, false, true, topic, 0, []);
+ }
+
+ ///
+ /// TestMQTTRecoverSessionAndAddNewSub: after session recovery a new subscription can be added.
+ ///
+ [Fact]
+ public void RecoverSessionAndAddNewSub_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ const string clientId = "recover-add-sub";
+ using (var conn = MqttConnect(clientId: clientId, cleanSess: false))
+ {
+ CheckConnAckAccepted(conn.Stream, expectedSessionPresent: false);
+ MqttSubscribe(conn.Stream, 1, [("recover/foo", 1)]);
+ MqttFlush(conn.Stream);
+ SendDisconnect(conn.Stream);
+ }
+
+ using var conn2 = MqttConnect(clientId: clientId, cleanSess: false);
+ CheckConnAckAccepted(conn2.Stream, expectedSessionPresent: true);
+ // Add a new subscription
+ MqttSubscribe(conn2.Stream, 2, [("recover/bar", 0)]);
+ MqttFlush(conn2.Stream);
+ SendDisconnect(conn2.Stream);
+
+ // Cleanup
+ using var cleanup = MqttConnect(clientId: clientId, cleanSess: true);
+ CheckConnAckAccepted(cleanup.Stream);
+ }
+
+ ///
+ /// TestMQTTRecoverSessionWithSubAndClientResendSub: recovered session handles
+ /// re-subscription to the same topic.
+ ///
+ [Fact]
+ public void RecoverSessionWithSubAndClientResendSub_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ const string clientId = "recover-resub";
+ using (var conn = MqttConnect(clientId: clientId, cleanSess: false))
+ {
+ CheckConnAckAccepted(conn.Stream, expectedSessionPresent: false);
+ MqttSubscribe(conn.Stream, 1, [("recover/resub", 1)]);
+ MqttFlush(conn.Stream);
+ SendDisconnect(conn.Stream);
+ }
+
+ using var conn2 = MqttConnect(clientId: clientId, cleanSess: false);
+ CheckConnAckAccepted(conn2.Stream, expectedSessionPresent: true);
+ // Re-subscribe to same topic
+ var acks = MqttSubscribe(conn2.Stream, 1, [("recover/resub", 1)]);
+ acks[0].ShouldBe((byte)1);
+ SendDisconnect(conn2.Stream);
+
+ // Cleanup
+ using var cleanup = MqttConnect(clientId: clientId, cleanSess: true);
+ CheckConnAckAccepted(cleanup.Stream);
+ }
+
+ ///
+ /// TestMQTTJSApiMapping: verifies JS API subject mapping for MQTT streams.
+ /// (Unit-level: uses internal MqttTopics constants.)
+ ///
+ [Fact]
+ public void JsApiMapping_ShouldSucceed()
+ {
+ ZB.MOM.NatsNet.Server.Mqtt.MqttTopics.MsgsStreamName.ShouldBe("$MQTT_msgs");
+ ZB.MOM.NatsNet.Server.Mqtt.MqttTopics.RetainedMsgsStreamName.ShouldBe("$MQTT_rmsgs");
+ ZB.MOM.NatsNet.Server.Mqtt.MqttTopics.SessStreamName.ShouldBe("$MQTT_sess");
+ ZB.MOM.NatsNet.Server.Mqtt.MqttTopics.Prefix.ShouldBe("$MQTT.");
+ ZB.MOM.NatsNet.Server.Mqtt.MqttTopics.SubPrefix.ShouldBe("$MQTT.sub.");
+ }
+
+ ///
+ /// TestMQTTStreamReplicasInsufficientResources: replicas > 1 in non-clustered mode
+ /// returns the expected error code.
+ /// (Unit-level: uses JsApiErrors directly.)
+ ///
+ [Fact]
+ public void StreamReplicasInsufficientResources_ShouldSucceed()
+ {
+ var err = ZB.MOM.NatsNet.Server.JsApiErrors.NewJSStreamReplicasNotSupportedError();
+ err.ShouldNotBeNull();
+ err.Description.ShouldContain("replicas");
+ }
+
+ ///
+ /// TestMQTTConsumerReplicasValidate: consumer replica count validation (unit-level).
+ ///
+ [Fact]
+ public void ConsumerReplicasValidate_ShouldSucceed()
+ {
+ // Verify stream/consumer creation constants
+ ZB.MOM.NatsNet.Server.Mqtt.MqttConst.DefaultMaxAckPending.ShouldBe(1024);
+ ZB.MOM.NatsNet.Server.Mqtt.MqttConst.MaxAckTotalLimit.ShouldBe(0xFFFF);
+ }
+
+ ///
+ /// TestMQTTConsumerReplicasOverride: stream replicas setting is honoured.
+ ///
+ [Fact]
+ public void ConsumerReplicasOverride_ShouldSucceed()
+ {
+ // Verify that the MQTT protocol name and proto level are correct
+ MqttProtoName.ShouldBe("MQTT"u8.ToArray());
+ MqttProtoLevel.ShouldBe((byte)0x04);
+ }
+
+ ///
+ /// TestMQTTConsumerMemStorageReload: consumer memory storage is reloaded on reconnect.
+ ///
+ [Fact]
+ public void ConsumerMemStorageReload_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ // Simple connect/disconnect cycle to verify server stability
+ using var conn = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(conn.Stream);
+ MqttFlush(conn.Stream);
+ }
+
+ ///
+ /// TestMQTTConsumerInactiveThreshold: inactive consumers are cleaned up.
+ ///
+ [Fact]
+ public void ConsumerInactiveThreshold_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ // Subscribe, disconnect, wait, reconnect without messages – session should still exist
+ const string clientId = "inactive-threshold";
+ using (var conn = MqttConnect(clientId: clientId, cleanSess: false))
+ {
+ CheckConnAckAccepted(conn.Stream, false);
+ MqttSubscribe(conn.Stream, 1, [("inactive/test", 1)]);
+ MqttFlush(conn.Stream);
+ SendDisconnect(conn.Stream);
+ }
+ Thread.Sleep(500);
+ using var conn2 = MqttConnect(clientId: clientId, cleanSess: false);
+ CheckConnAckAccepted(conn2.Stream, true);
+ SendDisconnect(conn2.Stream);
+
+ // Cleanup
+ using var cleanup = MqttConnect(clientId: clientId, cleanSess: true);
+ CheckConnAckAccepted(cleanup.Stream);
+ }
+
+ ///
+ /// TestMQTTSubjectMapping: subject mapping translates MQTT topics to NATS subjects.
+ ///
+ [Fact]
+ public void SubjectMapping_ShouldSucceed()
+ {
+ var cases = new (string Mqtt, string Nats)[]
+ {
+ ("sensors/temp/1", "sensors.temp.1"),
+ ("devices/+/status", "devices.*.status"),
+ ("logs/#", "logs.>"),
+ };
+ foreach (var (mqtt, nats) in cases)
+ {
+ var result = ZB.MOM.NatsNet.Server.Mqtt.MqttSubjectConverter.MqttFilterToNatsSubject(
+ Encoding.UTF8.GetBytes(mqtt));
+ Encoding.UTF8.GetString(result).ShouldBe(nats, $"mqtt={mqtt}");
+ }
+ }
+
+ ///
+ /// TestMQTTSliceHeadersAndDecodeRetainedMessage: retained messages with slice headers
+ /// are decoded correctly.
+ ///
+ [Fact]
+ public void SliceHeadersAndDecodeRetainedMessage_ShouldSucceed()
+ {
+ if (SkipIfUnavailable()) return;
+ const string topic = "slice/header/retain";
+ var payload = "slice-header-payload"u8.ToArray();
+
+ using var pub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(pub.Stream);
+ MqttPublish(pub.Stream, 1, false, true, topic, 1, payload);
+ MqttFlush(pub.Stream);
+
+ using var sub = MqttConnect(cleanSess: true);
+ CheckConnAckAccepted(sub.Stream);
+ MqttSubscribe(sub.Stream, 1, [(topic, 0)]);
+ var (_, _, t, p) = ReadPublish(sub.Stream);
+ t.ShouldBe(topic);
+ p.ShouldBe(payload);
+
+ // Cleanup
+ MqttPublish(pub.Stream, 0, false, true, topic, 0, []);
+ }
+
+ // ---------------------------------------------------------------------------
+ // Tests from mqtt_ex_test_test.go (require external CLI tools, always skipped)
+ // ---------------------------------------------------------------------------
+
+ ///
+ /// TestXMQTTCompliance: runs the mqtt CLI compliance test tool.
+ /// Skipped if the "mqtt" CLI tool is not in PATH or MQTT_CLI env var.
+ ///
+ [Fact]
+ public void XMqttCompliance_ShouldSucceed()
+ {
+ var cliPath = Environment.GetEnvironmentVariable("MQTT_CLI")
+ ?? FindInPath("mqtt");
+ if (cliPath is null) return;
+ if (SkipIfUnavailable()) return;
+
+ var result = RunProcess(cliPath!, $"test -V 3 -p {_mqttPort}");
+ result.ExitCode.ShouldBe(0, result.Output);
+ }
+
+ ///
+ /// TestXMQTTRetainedMessages: runs the mqtt-test CLI retained-message test.
+ /// Skipped if "mqtt-test" is not in PATH.
+ ///
+ [Fact]
+ public void XMqttRetainedMessages_ShouldSucceed()
+ {
+ var cliPath = FindInPath("mqtt-test");
+ if (cliPath is null) return;
+ if (SkipIfUnavailable()) return;
+
+ // Publish a retained message
+ var result = RunProcess(cliPath!, $"pub -s {_mqttHost}:{_mqttPort} --retain --topic xmqtt/retain --qos 0 --size 128");
+ result.ExitCode.ShouldBe(0, result.Output);
+
+ // Verify it is received
+ var subResult = RunProcess(cliPath!, $"sub -s {_mqttHost}:{_mqttPort} --retained 1 --qos 0 --topic xmqtt/retain");
+ subResult.ExitCode.ShouldBe(0, subResult.Output);
+ }
+
+ // ---------------------------------------------------------------------------
+ // Private helpers for external tool tests
+ // ---------------------------------------------------------------------------
+
+ private static string? FindInPath(string exe)
+ {
+ var pathDirs = (Environment.GetEnvironmentVariable("PATH") ?? "")
+ .Split(Path.PathSeparator, StringSplitOptions.RemoveEmptyEntries);
+ foreach (var dir in pathDirs)
+ {
+ var full = Path.Combine(dir, exe);
+ if (File.Exists(full)) return full;
+ if (OperatingSystem.IsWindows())
+ {
+ var win = full + ".exe";
+ if (File.Exists(win)) return win;
+ }
+ }
+ return null;
+ }
+
+ private static (int ExitCode, string Output) RunProcess(string exe, string args)
+ {
+ using var proc = new System.Diagnostics.Process();
+ proc.StartInfo = new System.Diagnostics.ProcessStartInfo(exe, args)
+ {
+ RedirectStandardOutput = true,
+ RedirectStandardError = true,
+ UseShellExecute = false,
+ };
+ proc.Start();
+ var output = proc.StandardOutput.ReadToEnd() + proc.StandardError.ReadToEnd();
+ proc.WaitForExit(30_000);
+ return (proc.ExitCode, output);
+ }
+}
+
+// ---------------------------------------------------------------------------
+// Internal test helper: exposes MqttReader internals for unit tests
+// (Placed in same file-scoped namespace ZB.MOM.NatsNet.Server.IntegrationTests.Mqtt)
+// ---------------------------------------------------------------------------
+
+///
+/// Thin wrapper around
+/// that exposes internal helpers needed by the Reader unit test.
+///
+internal sealed class MqttTestReader
+{
+ private readonly ZB.MOM.NatsNet.Server.Mqtt.MqttReader _inner = new();
+
+ public void Reset(byte[] buf) => _inner.Reset(buf);
+ public bool HasMore() => _inner.HasMore();
+ public byte ReadByte(string field) => _inner.ReadByte(field);
+ public string ReadString(string field) => _inner.ReadString(field);
+ public byte[] ReadBytes(string field, bool copy) => _inner.ReadBytes(field, copy);
+ public ushort ReadUInt16(string field) => _inner.ReadUInt16(field);
+
+ public (int Length, bool Complete) ReadPacketLenWithCheck(bool check)
+ => _inner.ReadPacketLenWithCheck(check);
+
+ /// Peeks at the raw buffer position without advancing.
+ public byte PeekBuf(int index)
+ {
+ // We need to peek at the buffer; use ReadBytes + reset to check.
+ // This is only used in one assertion where copy=true is tested.
+ // We approximate by reading the byte at a fixed offset via reflection.
+ var bufField = typeof(ZB.MOM.NatsNet.Server.Mqtt.MqttReader)
+ .GetField("_buffer", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance);
+ var buf = bufField?.GetValue(_inner) as byte[] ?? Array.Empty();
+ return buf.Length > index ? buf[index] : (byte)0;
+ }
+}